diff options
Diffstat (limited to 'src/mmo/socket.cpp')
-rw-r--r-- | src/mmo/socket.cpp | 474 |
1 files changed, 474 insertions, 0 deletions
diff --git a/src/mmo/socket.cpp b/src/mmo/socket.cpp new file mode 100644 index 0000000..73e32a4 --- /dev/null +++ b/src/mmo/socket.cpp @@ -0,0 +1,474 @@ +#include "socket.hpp" + +#include <arpa/inet.h> +#include <netinet/tcp.h> +#include <sys/socket.h> +//#include <sys/types.h> + +#include <fcntl.h> +#include <unistd.h> + +#include <cstdlib> +#include <cstring> +#include <ctime> + +#include "../io/cxxstdio.hpp" +#include "core.hpp" +#include "timer.hpp" +#include "utils.hpp" + +#include "../poison.hpp" + +static +io::FD_Set readfds; +static +int fd_max; + +static +const uint32_t RFIFO_SIZE = 65536; +static +const uint32_t WFIFO_SIZE = 65536; + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wold-style-cast" +static +std::array<std::unique_ptr<Session>, FD_SETSIZE> session; +#pragma GCC diagnostic pop + +void set_session(io::FD fd, std::unique_ptr<Session> sess) +{ + int f = fd.uncast_dammit(); + assert (0 <= f && f < FD_SETSIZE); + session[f] = std::move(sess); +} +Session *get_session(io::FD fd) +{ + int f = fd.uncast_dammit(); + if (0 <= f && f < FD_SETSIZE) + return session[f].get(); + return nullptr; +} +void reset_session(io::FD fd) +{ + int f = fd.uncast_dammit(); + assert (0 <= f && f < FD_SETSIZE); + session[f] = nullptr; +} +int get_fd_max() { return fd_max; } +IteratorPair<ValueIterator<io::FD, IncrFD>> iter_fds() +{ + return {io::FD::cast_dammit(0), io::FD::cast_dammit(fd_max)}; +} + +/// clean up by discarding handled bytes +inline +void RFIFOFLUSH(Session *s) +{ + really_memmove(&s->rdata[0], &s->rdata[s->rdata_pos], RFIFOREST(s)); + s->rdata_size = RFIFOREST(s); + s->rdata_pos = 0; +} + +/// how much room there is to read more data +inline +size_t RFIFOSPACE(Session *s) +{ + return s->max_rdata - s->rdata_size; +} + + +/// Discard all input +static +void null_parse(Session *s); +/// Default parser for new connections +static +void (*default_func_parse)(Session *) = null_parse; + +void set_defaultparse(void (*defaultparse)(Session *)) +{ + default_func_parse = defaultparse; +} + +/// Read from socket to the queue +static +void recv_to_fifo(Session *s) +{ + if (s->eof) + return; + + ssize_t len = s->fd.read(&s->rdata[s->rdata_size], + RFIFOSPACE(s)); + + if (len > 0) + { + s->rdata_size += len; + s->connected = 1; + } + else + { + s->eof = 1; + } +} + +static +void send_from_fifo(Session *s) +{ + if (s->eof) + return; + + ssize_t len = s->fd.write(&s->wdata[0], s->wdata_size); + + if (len > 0) + { + s->wdata_size -= len; + if (s->wdata_size) + { + really_memmove(&s->wdata[0], &s->wdata[len], + s->wdata_size); + } + s->connected = 1; + } + else + { + s->eof = 1; + } +} + +static +void null_parse(Session *s) +{ + PRINTF("null_parse : %d\n", s); + RFIFOSKIP(s, RFIFOREST(s)); +} + + +static +void connect_client(Session *ls) +{ + struct sockaddr_in client_address; + socklen_t len = sizeof(client_address); + + io::FD fd = ls->fd.accept(reinterpret_cast<struct sockaddr *>(&client_address), &len); + if (fd == io::FD()) + { + perror("accept"); + return; + } + if (fd.uncast_dammit() >= SOFT_LIMIT) + { + FPRINTF(stderr, "softlimit reached, disconnecting : %d\n", fd.uncast_dammit()); + fd.shutdown(SHUT_RDWR); + fd.close(); + return; + } + if (fd_max <= fd.uncast_dammit()) + { + fd_max = fd.uncast_dammit() + 1; + } + + const int yes = 1; + /// Allow to bind() again after the server restarts. + // Since the socket is still in the TIME_WAIT, there's a possibility + // that formerly lost packets might be delivered and confuse the server. + fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes); + /// Send packets as soon as possible + /// even if the kernel thinks there is too little for it to be worth it! + /// Testing shows this is indeed a good idea. + fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes); + + // Linux-ism: Set socket options to optimize for thin streams + // See http://lwn.net/Articles/308919/ and + // Documentation/networking/tcp-thin.txt .. Kernel 3.2+ +#ifdef TCP_THIN_LINEAR_TIMEOUTS + fd.setsockopt(IPPROTO_TCP, TCP_THIN_LINEAR_TIMEOUTS, &yes, sizeof yes); +#endif +#ifdef TCP_THIN_DUPACK + fd.setsockopt(IPPROTO_TCP, TCP_THIN_DUPACK, &yes, sizeof yes); +#endif + + readfds.set(fd); + + fd.fcntl(F_SETFL, O_NONBLOCK); + + set_session(fd, make_unique<Session>()); + Session *s = get_session(fd); + s->fd = fd; + s->rdata.new_(RFIFO_SIZE); + s->wdata.new_(WFIFO_SIZE); + + s->max_rdata = RFIFO_SIZE; + s->max_wdata = WFIFO_SIZE; + s->func_recv = recv_to_fifo; + s->func_send = send_from_fifo; + s->func_parse = default_func_parse; + s->client_ip = IP4Address(client_address.sin_addr); + s->created = TimeT::now(); + s->connected = 0; +} + +Session *make_listen_port(uint16_t port) +{ + struct sockaddr_in server_address; + io::FD fd = io::FD::socket(AF_INET, SOCK_STREAM, 0); + if (fd == io::FD()) + { + perror("socket"); + return nullptr; + } + if (fd_max <= fd.uncast_dammit()) + fd_max = fd.uncast_dammit() + 1; + + fd.fcntl(F_SETFL, O_NONBLOCK); + + const int yes = 1; + /// Allow to bind() again after the server restarts. + // Since the socket is still in the TIME_WAIT, there's a possibility + // that formerly lost packets might be delivered and confuse the server. + fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes); + /// Send packets as soon as possible + /// even if the kernel thinks there is too little for it to be worth it! + // I'm not convinced this is a good idea; although in minimizes the + // latency for an individual write, it increases traffic in general. + fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes); + + server_address.sin_family = AF_INET; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wold-style-cast" +#if __GNUC__ > 4 || __GNUC_MINOR__ >= 8 +# pragma GCC diagnostic ignored "-Wuseless-cast" +#endif + server_address.sin_addr.s_addr = htonl(INADDR_ANY); + server_address.sin_port = htons(port); +#pragma GCC diagnostic pop + + if (fd.bind(reinterpret_cast<struct sockaddr *>(&server_address), + sizeof(server_address)) == -1) + { + perror("bind"); + exit(1); + } + if (fd.listen(5) == -1) + { /* error */ + perror("listen"); + exit(1); + } + + readfds.set(fd); + + set_session(fd, make_unique<Session>()); + Session *s = get_session(fd); + s->fd = fd; + + s->func_recv = connect_client; + s->created = TimeT::now(); + s->connected = 1; + + return s; +} + +Session *make_connection(IP4Address ip, uint16_t port) +{ + struct sockaddr_in server_address; + io::FD fd = io::FD::socket(AF_INET, SOCK_STREAM, 0); + if (fd == io::FD()) + { + perror("socket"); + return nullptr; + } + if (fd_max <= fd.uncast_dammit()) + fd_max = fd.uncast_dammit() + 1; + + const int yes = 1; + /// Allow to bind() again after the server restarts. + // Since the socket is still in the TIME_WAIT, there's a possibility + // that formerly lost packets might be delivered and confuse the server. + fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes); + /// Send packets as soon as possible + /// even if the kernel thinks there is too little for it to be worth it! + // I'm not convinced this is a good idea; although in minimizes the + // latency for an individual write, it increases traffic in general. + fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes); + + server_address.sin_family = AF_INET; + server_address.sin_addr = in_addr(ip); +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wold-style-cast" +#if __GNUC__ > 4 || __GNUC_MINOR__ >= 8 +# pragma GCC diagnostic ignored "-Wuseless-cast" +#endif + server_address.sin_port = htons(port); +#pragma GCC diagnostic pop + + fd.fcntl(F_SETFL, O_NONBLOCK); + + /// Errors not caught - we must not block + /// Let the main select() loop detect when we know the state + fd.connect(reinterpret_cast<struct sockaddr *>(&server_address), + sizeof(struct sockaddr_in)); + + readfds.set(fd); + + set_session(fd, make_unique<Session>()); + Session *s = get_session(fd); + s->fd = fd; + s->rdata.new_(RFIFO_SIZE); + s->wdata.new_(WFIFO_SIZE); + + s->max_rdata = RFIFO_SIZE; + s->max_wdata = WFIFO_SIZE; + s->func_recv = recv_to_fifo; + s->func_send = send_from_fifo; + s->func_parse = default_func_parse; + s->created = TimeT::now(); + s->connected = 1; + + return s; +} + +void delete_session(Session *s) +{ + if (!s) + return; + + io::FD fd = s->fd; + // If this was the highest fd, decrease it + // We could add a loop to decrement fd_max further for every null session, + // but this is cheap and good enough for the typical case + if (fd.uncast_dammit() == fd_max - 1) + fd_max--; + readfds.clr(fd); + { + s->rdata.delete_(); + s->wdata.delete_(); + s->session_data.reset(); + reset_session(fd); + } + + // just close() would try to keep sending buffers + fd.shutdown(SHUT_RDWR); + fd.close(); +} + +void realloc_fifo(Session *s, size_t rfifo_size, size_t wfifo_size) +{ + if (s->max_rdata != rfifo_size && s->rdata_size < rfifo_size) + { + s->rdata.resize(rfifo_size); + s->max_rdata = rfifo_size; + } + if (s->max_wdata != wfifo_size && s->wdata_size < wfifo_size) + { + s->wdata.resize(wfifo_size); + s->max_wdata = wfifo_size; + } +} + +void WFIFOSET(Session *s, size_t len) +{ + if (s->wdata_size + len + 16384 > s->max_wdata) + { + realloc_fifo(s, s->max_rdata, s->max_wdata << 1); + PRINTF("socket: %d wdata expanded to %zu bytes.\n", s, s->max_wdata); + } + if (s->wdata_size + len + 2048 < s->max_wdata) + s->wdata_size += len; + else + FPRINTF(stderr, "socket: %d wdata lost !!\n", s), abort(); +} + +void do_sendrecv(interval_t next_ms) +{ + bool any = false; + io::FD_Set rfd = readfds, wfd; + for (io::FD i : iter_fds()) + { + Session *s = get_session(i); + if (s) + { + any = true; + if (s->wdata_size) + wfd.set(i); + } + } + if (!any) + { + if (!has_timers()) + { + PRINTF("Shutting down - nothing to do\n"); + runflag = false; + } + return; + } + struct timeval timeout; + { + std::chrono::seconds next_s = std::chrono::duration_cast<std::chrono::seconds>(next_ms); + std::chrono::microseconds next_us = next_ms - next_s; + timeout.tv_sec = next_s.count(); + timeout.tv_usec = next_us.count(); + } + if (io::FD_Set::select(fd_max, &rfd, &wfd, NULL, &timeout) <= 0) + return; + for (io::FD i : iter_fds()) + { + Session *s = get_session(i); + if (!s) + continue; + if (wfd.isset(i)) + { + if (s->func_send) + //send_from_fifo(i); + s->func_send(s); + } + if (rfd.isset(i)) + { + if (s->func_recv) + //recv_to_fifo(i); + //or connect_client(i); + s->func_recv(s); + } + } +} + +void do_parsepacket(void) +{ + for (io::FD i : iter_fds()) + { + Session *s = get_session(i); + if (!s) + continue; + if (!s->connected + && static_cast<time_t>(TimeT::now()) - static_cast<time_t>(s->created) > CONNECT_TIMEOUT) + { + PRINTF("Session #%d timed out\n", s); + s->eof = 1; + } + if (!s->rdata_size && !s->eof) + continue; + if (s->func_parse) + { + s->func_parse(s); + /// some func_parse may call delete_session + s = get_session(i); + if (s && s->eof) + { + delete_session(s); + s = nullptr; + } + if (!s) + continue; + } + /// Reclaim buffer space for what was read + RFIFOFLUSH(s); + } +} + +void RFIFOSKIP(Session *s, size_t len) +{ + s->rdata_pos += len; + + if (s->rdata_size < s->rdata_pos) + { + FPRINTF(stderr, "too many skip\n"); + abort(); + } +} |