summaryrefslogtreecommitdiff
path: root/src/mmo/socket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mmo/socket.cpp')
-rw-r--r--src/mmo/socket.cpp474
1 files changed, 474 insertions, 0 deletions
diff --git a/src/mmo/socket.cpp b/src/mmo/socket.cpp
new file mode 100644
index 0000000..73e32a4
--- /dev/null
+++ b/src/mmo/socket.cpp
@@ -0,0 +1,474 @@
+#include "socket.hpp"
+
+#include <arpa/inet.h>
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+//#include <sys/types.h>
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include <cstdlib>
+#include <cstring>
+#include <ctime>
+
+#include "../io/cxxstdio.hpp"
+#include "core.hpp"
+#include "timer.hpp"
+#include "utils.hpp"
+
+#include "../poison.hpp"
+
+static
+io::FD_Set readfds;
+static
+int fd_max;
+
+static
+const uint32_t RFIFO_SIZE = 65536;
+static
+const uint32_t WFIFO_SIZE = 65536;
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wold-style-cast"
+static
+std::array<std::unique_ptr<Session>, FD_SETSIZE> session;
+#pragma GCC diagnostic pop
+
+void set_session(io::FD fd, std::unique_ptr<Session> sess)
+{
+ int f = fd.uncast_dammit();
+ assert (0 <= f && f < FD_SETSIZE);
+ session[f] = std::move(sess);
+}
+Session *get_session(io::FD fd)
+{
+ int f = fd.uncast_dammit();
+ if (0 <= f && f < FD_SETSIZE)
+ return session[f].get();
+ return nullptr;
+}
+void reset_session(io::FD fd)
+{
+ int f = fd.uncast_dammit();
+ assert (0 <= f && f < FD_SETSIZE);
+ session[f] = nullptr;
+}
+int get_fd_max() { return fd_max; }
+IteratorPair<ValueIterator<io::FD, IncrFD>> iter_fds()
+{
+ return {io::FD::cast_dammit(0), io::FD::cast_dammit(fd_max)};
+}
+
+/// clean up by discarding handled bytes
+inline
+void RFIFOFLUSH(Session *s)
+{
+ really_memmove(&s->rdata[0], &s->rdata[s->rdata_pos], RFIFOREST(s));
+ s->rdata_size = RFIFOREST(s);
+ s->rdata_pos = 0;
+}
+
+/// how much room there is to read more data
+inline
+size_t RFIFOSPACE(Session *s)
+{
+ return s->max_rdata - s->rdata_size;
+}
+
+
+/// Discard all input
+static
+void null_parse(Session *s);
+/// Default parser for new connections
+static
+void (*default_func_parse)(Session *) = null_parse;
+
+void set_defaultparse(void (*defaultparse)(Session *))
+{
+ default_func_parse = defaultparse;
+}
+
+/// Read from socket to the queue
+static
+void recv_to_fifo(Session *s)
+{
+ if (s->eof)
+ return;
+
+ ssize_t len = s->fd.read(&s->rdata[s->rdata_size],
+ RFIFOSPACE(s));
+
+ if (len > 0)
+ {
+ s->rdata_size += len;
+ s->connected = 1;
+ }
+ else
+ {
+ s->eof = 1;
+ }
+}
+
+static
+void send_from_fifo(Session *s)
+{
+ if (s->eof)
+ return;
+
+ ssize_t len = s->fd.write(&s->wdata[0], s->wdata_size);
+
+ if (len > 0)
+ {
+ s->wdata_size -= len;
+ if (s->wdata_size)
+ {
+ really_memmove(&s->wdata[0], &s->wdata[len],
+ s->wdata_size);
+ }
+ s->connected = 1;
+ }
+ else
+ {
+ s->eof = 1;
+ }
+}
+
+static
+void null_parse(Session *s)
+{
+ PRINTF("null_parse : %d\n", s);
+ RFIFOSKIP(s, RFIFOREST(s));
+}
+
+
+static
+void connect_client(Session *ls)
+{
+ struct sockaddr_in client_address;
+ socklen_t len = sizeof(client_address);
+
+ io::FD fd = ls->fd.accept(reinterpret_cast<struct sockaddr *>(&client_address), &len);
+ if (fd == io::FD())
+ {
+ perror("accept");
+ return;
+ }
+ if (fd.uncast_dammit() >= SOFT_LIMIT)
+ {
+ FPRINTF(stderr, "softlimit reached, disconnecting : %d\n", fd.uncast_dammit());
+ fd.shutdown(SHUT_RDWR);
+ fd.close();
+ return;
+ }
+ if (fd_max <= fd.uncast_dammit())
+ {
+ fd_max = fd.uncast_dammit() + 1;
+ }
+
+ const int yes = 1;
+ /// Allow to bind() again after the server restarts.
+ // Since the socket is still in the TIME_WAIT, there's a possibility
+ // that formerly lost packets might be delivered and confuse the server.
+ fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
+ /// Send packets as soon as possible
+ /// even if the kernel thinks there is too little for it to be worth it!
+ /// Testing shows this is indeed a good idea.
+ fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes);
+
+ // Linux-ism: Set socket options to optimize for thin streams
+ // See http://lwn.net/Articles/308919/ and
+ // Documentation/networking/tcp-thin.txt .. Kernel 3.2+
+#ifdef TCP_THIN_LINEAR_TIMEOUTS
+ fd.setsockopt(IPPROTO_TCP, TCP_THIN_LINEAR_TIMEOUTS, &yes, sizeof yes);
+#endif
+#ifdef TCP_THIN_DUPACK
+ fd.setsockopt(IPPROTO_TCP, TCP_THIN_DUPACK, &yes, sizeof yes);
+#endif
+
+ readfds.set(fd);
+
+ fd.fcntl(F_SETFL, O_NONBLOCK);
+
+ set_session(fd, make_unique<Session>());
+ Session *s = get_session(fd);
+ s->fd = fd;
+ s->rdata.new_(RFIFO_SIZE);
+ s->wdata.new_(WFIFO_SIZE);
+
+ s->max_rdata = RFIFO_SIZE;
+ s->max_wdata = WFIFO_SIZE;
+ s->func_recv = recv_to_fifo;
+ s->func_send = send_from_fifo;
+ s->func_parse = default_func_parse;
+ s->client_ip = IP4Address(client_address.sin_addr);
+ s->created = TimeT::now();
+ s->connected = 0;
+}
+
+Session *make_listen_port(uint16_t port)
+{
+ struct sockaddr_in server_address;
+ io::FD fd = io::FD::socket(AF_INET, SOCK_STREAM, 0);
+ if (fd == io::FD())
+ {
+ perror("socket");
+ return nullptr;
+ }
+ if (fd_max <= fd.uncast_dammit())
+ fd_max = fd.uncast_dammit() + 1;
+
+ fd.fcntl(F_SETFL, O_NONBLOCK);
+
+ const int yes = 1;
+ /// Allow to bind() again after the server restarts.
+ // Since the socket is still in the TIME_WAIT, there's a possibility
+ // that formerly lost packets might be delivered and confuse the server.
+ fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
+ /// Send packets as soon as possible
+ /// even if the kernel thinks there is too little for it to be worth it!
+ // I'm not convinced this is a good idea; although in minimizes the
+ // latency for an individual write, it increases traffic in general.
+ fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes);
+
+ server_address.sin_family = AF_INET;
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wold-style-cast"
+#if __GNUC__ > 4 || __GNUC_MINOR__ >= 8
+# pragma GCC diagnostic ignored "-Wuseless-cast"
+#endif
+ server_address.sin_addr.s_addr = htonl(INADDR_ANY);
+ server_address.sin_port = htons(port);
+#pragma GCC diagnostic pop
+
+ if (fd.bind(reinterpret_cast<struct sockaddr *>(&server_address),
+ sizeof(server_address)) == -1)
+ {
+ perror("bind");
+ exit(1);
+ }
+ if (fd.listen(5) == -1)
+ { /* error */
+ perror("listen");
+ exit(1);
+ }
+
+ readfds.set(fd);
+
+ set_session(fd, make_unique<Session>());
+ Session *s = get_session(fd);
+ s->fd = fd;
+
+ s->func_recv = connect_client;
+ s->created = TimeT::now();
+ s->connected = 1;
+
+ return s;
+}
+
+Session *make_connection(IP4Address ip, uint16_t port)
+{
+ struct sockaddr_in server_address;
+ io::FD fd = io::FD::socket(AF_INET, SOCK_STREAM, 0);
+ if (fd == io::FD())
+ {
+ perror("socket");
+ return nullptr;
+ }
+ if (fd_max <= fd.uncast_dammit())
+ fd_max = fd.uncast_dammit() + 1;
+
+ const int yes = 1;
+ /// Allow to bind() again after the server restarts.
+ // Since the socket is still in the TIME_WAIT, there's a possibility
+ // that formerly lost packets might be delivered and confuse the server.
+ fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
+ /// Send packets as soon as possible
+ /// even if the kernel thinks there is too little for it to be worth it!
+ // I'm not convinced this is a good idea; although in minimizes the
+ // latency for an individual write, it increases traffic in general.
+ fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes);
+
+ server_address.sin_family = AF_INET;
+ server_address.sin_addr = in_addr(ip);
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wold-style-cast"
+#if __GNUC__ > 4 || __GNUC_MINOR__ >= 8
+# pragma GCC diagnostic ignored "-Wuseless-cast"
+#endif
+ server_address.sin_port = htons(port);
+#pragma GCC diagnostic pop
+
+ fd.fcntl(F_SETFL, O_NONBLOCK);
+
+ /// Errors not caught - we must not block
+ /// Let the main select() loop detect when we know the state
+ fd.connect(reinterpret_cast<struct sockaddr *>(&server_address),
+ sizeof(struct sockaddr_in));
+
+ readfds.set(fd);
+
+ set_session(fd, make_unique<Session>());
+ Session *s = get_session(fd);
+ s->fd = fd;
+ s->rdata.new_(RFIFO_SIZE);
+ s->wdata.new_(WFIFO_SIZE);
+
+ s->max_rdata = RFIFO_SIZE;
+ s->max_wdata = WFIFO_SIZE;
+ s->func_recv = recv_to_fifo;
+ s->func_send = send_from_fifo;
+ s->func_parse = default_func_parse;
+ s->created = TimeT::now();
+ s->connected = 1;
+
+ return s;
+}
+
+void delete_session(Session *s)
+{
+ if (!s)
+ return;
+
+ io::FD fd = s->fd;
+ // If this was the highest fd, decrease it
+ // We could add a loop to decrement fd_max further for every null session,
+ // but this is cheap and good enough for the typical case
+ if (fd.uncast_dammit() == fd_max - 1)
+ fd_max--;
+ readfds.clr(fd);
+ {
+ s->rdata.delete_();
+ s->wdata.delete_();
+ s->session_data.reset();
+ reset_session(fd);
+ }
+
+ // just close() would try to keep sending buffers
+ fd.shutdown(SHUT_RDWR);
+ fd.close();
+}
+
+void realloc_fifo(Session *s, size_t rfifo_size, size_t wfifo_size)
+{
+ if (s->max_rdata != rfifo_size && s->rdata_size < rfifo_size)
+ {
+ s->rdata.resize(rfifo_size);
+ s->max_rdata = rfifo_size;
+ }
+ if (s->max_wdata != wfifo_size && s->wdata_size < wfifo_size)
+ {
+ s->wdata.resize(wfifo_size);
+ s->max_wdata = wfifo_size;
+ }
+}
+
+void WFIFOSET(Session *s, size_t len)
+{
+ if (s->wdata_size + len + 16384 > s->max_wdata)
+ {
+ realloc_fifo(s, s->max_rdata, s->max_wdata << 1);
+ PRINTF("socket: %d wdata expanded to %zu bytes.\n", s, s->max_wdata);
+ }
+ if (s->wdata_size + len + 2048 < s->max_wdata)
+ s->wdata_size += len;
+ else
+ FPRINTF(stderr, "socket: %d wdata lost !!\n", s), abort();
+}
+
+void do_sendrecv(interval_t next_ms)
+{
+ bool any = false;
+ io::FD_Set rfd = readfds, wfd;
+ for (io::FD i : iter_fds())
+ {
+ Session *s = get_session(i);
+ if (s)
+ {
+ any = true;
+ if (s->wdata_size)
+ wfd.set(i);
+ }
+ }
+ if (!any)
+ {
+ if (!has_timers())
+ {
+ PRINTF("Shutting down - nothing to do\n");
+ runflag = false;
+ }
+ return;
+ }
+ struct timeval timeout;
+ {
+ std::chrono::seconds next_s = std::chrono::duration_cast<std::chrono::seconds>(next_ms);
+ std::chrono::microseconds next_us = next_ms - next_s;
+ timeout.tv_sec = next_s.count();
+ timeout.tv_usec = next_us.count();
+ }
+ if (io::FD_Set::select(fd_max, &rfd, &wfd, NULL, &timeout) <= 0)
+ return;
+ for (io::FD i : iter_fds())
+ {
+ Session *s = get_session(i);
+ if (!s)
+ continue;
+ if (wfd.isset(i))
+ {
+ if (s->func_send)
+ //send_from_fifo(i);
+ s->func_send(s);
+ }
+ if (rfd.isset(i))
+ {
+ if (s->func_recv)
+ //recv_to_fifo(i);
+ //or connect_client(i);
+ s->func_recv(s);
+ }
+ }
+}
+
+void do_parsepacket(void)
+{
+ for (io::FD i : iter_fds())
+ {
+ Session *s = get_session(i);
+ if (!s)
+ continue;
+ if (!s->connected
+ && static_cast<time_t>(TimeT::now()) - static_cast<time_t>(s->created) > CONNECT_TIMEOUT)
+ {
+ PRINTF("Session #%d timed out\n", s);
+ s->eof = 1;
+ }
+ if (!s->rdata_size && !s->eof)
+ continue;
+ if (s->func_parse)
+ {
+ s->func_parse(s);
+ /// some func_parse may call delete_session
+ s = get_session(i);
+ if (s && s->eof)
+ {
+ delete_session(s);
+ s = nullptr;
+ }
+ if (!s)
+ continue;
+ }
+ /// Reclaim buffer space for what was read
+ RFIFOFLUSH(s);
+ }
+}
+
+void RFIFOSKIP(Session *s, size_t len)
+{
+ s->rdata_pos += len;
+
+ if (s->rdata_size < s->rdata_pos)
+ {
+ FPRINTF(stderr, "too many skip\n");
+ abort();
+ }
+}