From c812c92d1a1835f0bda783e709481188c8d92225 Mon Sep 17 00:00:00 2001 From: Ben Longbons Date: Sat, 15 Mar 2014 19:34:59 -0700 Subject: Clean up header organization --- src/common/socket.cpp | 474 -------------------------------------------------- 1 file changed, 474 deletions(-) delete mode 100644 src/common/socket.cpp (limited to 'src/common/socket.cpp') diff --git a/src/common/socket.cpp b/src/common/socket.cpp deleted file mode 100644 index 73e32a4..0000000 --- a/src/common/socket.cpp +++ /dev/null @@ -1,474 +0,0 @@ -#include "socket.hpp" - -#include -#include -#include -//#include - -#include -#include - -#include -#include -#include - -#include "../io/cxxstdio.hpp" -#include "core.hpp" -#include "timer.hpp" -#include "utils.hpp" - -#include "../poison.hpp" - -static -io::FD_Set readfds; -static -int fd_max; - -static -const uint32_t RFIFO_SIZE = 65536; -static -const uint32_t WFIFO_SIZE = 65536; - -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wold-style-cast" -static -std::array, FD_SETSIZE> session; -#pragma GCC diagnostic pop - -void set_session(io::FD fd, std::unique_ptr sess) -{ - int f = fd.uncast_dammit(); - assert (0 <= f && f < FD_SETSIZE); - session[f] = std::move(sess); -} -Session *get_session(io::FD fd) -{ - int f = fd.uncast_dammit(); - if (0 <= f && f < FD_SETSIZE) - return session[f].get(); - return nullptr; -} -void reset_session(io::FD fd) -{ - int f = fd.uncast_dammit(); - assert (0 <= f && f < FD_SETSIZE); - session[f] = nullptr; -} -int get_fd_max() { return fd_max; } -IteratorPair> iter_fds() -{ - return {io::FD::cast_dammit(0), io::FD::cast_dammit(fd_max)}; -} - -/// clean up by discarding handled bytes -inline -void RFIFOFLUSH(Session *s) -{ - really_memmove(&s->rdata[0], &s->rdata[s->rdata_pos], RFIFOREST(s)); - s->rdata_size = RFIFOREST(s); - s->rdata_pos = 0; -} - -/// how much room there is to read more data -inline -size_t RFIFOSPACE(Session *s) -{ - return s->max_rdata - s->rdata_size; -} - - -/// Discard all input -static -void null_parse(Session *s); -/// Default parser for new connections -static -void (*default_func_parse)(Session *) = null_parse; - -void set_defaultparse(void (*defaultparse)(Session *)) -{ - default_func_parse = defaultparse; -} - -/// Read from socket to the queue -static -void recv_to_fifo(Session *s) -{ - if (s->eof) - return; - - ssize_t len = s->fd.read(&s->rdata[s->rdata_size], - RFIFOSPACE(s)); - - if (len > 0) - { - s->rdata_size += len; - s->connected = 1; - } - else - { - s->eof = 1; - } -} - -static -void send_from_fifo(Session *s) -{ - if (s->eof) - return; - - ssize_t len = s->fd.write(&s->wdata[0], s->wdata_size); - - if (len > 0) - { - s->wdata_size -= len; - if (s->wdata_size) - { - really_memmove(&s->wdata[0], &s->wdata[len], - s->wdata_size); - } - s->connected = 1; - } - else - { - s->eof = 1; - } -} - -static -void null_parse(Session *s) -{ - PRINTF("null_parse : %d\n", s); - RFIFOSKIP(s, RFIFOREST(s)); -} - - -static -void connect_client(Session *ls) -{ - struct sockaddr_in client_address; - socklen_t len = sizeof(client_address); - - io::FD fd = ls->fd.accept(reinterpret_cast(&client_address), &len); - if (fd == io::FD()) - { - perror("accept"); - return; - } - if (fd.uncast_dammit() >= SOFT_LIMIT) - { - FPRINTF(stderr, "softlimit reached, disconnecting : %d\n", fd.uncast_dammit()); - fd.shutdown(SHUT_RDWR); - fd.close(); - return; - } - if (fd_max <= fd.uncast_dammit()) - { - fd_max = fd.uncast_dammit() + 1; - } - - const int yes = 1; - /// Allow to bind() again after the server restarts. - // Since the socket is still in the TIME_WAIT, there's a possibility - // that formerly lost packets might be delivered and confuse the server. - fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes); - /// Send packets as soon as possible - /// even if the kernel thinks there is too little for it to be worth it! - /// Testing shows this is indeed a good idea. - fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes); - - // Linux-ism: Set socket options to optimize for thin streams - // See http://lwn.net/Articles/308919/ and - // Documentation/networking/tcp-thin.txt .. Kernel 3.2+ -#ifdef TCP_THIN_LINEAR_TIMEOUTS - fd.setsockopt(IPPROTO_TCP, TCP_THIN_LINEAR_TIMEOUTS, &yes, sizeof yes); -#endif -#ifdef TCP_THIN_DUPACK - fd.setsockopt(IPPROTO_TCP, TCP_THIN_DUPACK, &yes, sizeof yes); -#endif - - readfds.set(fd); - - fd.fcntl(F_SETFL, O_NONBLOCK); - - set_session(fd, make_unique()); - Session *s = get_session(fd); - s->fd = fd; - s->rdata.new_(RFIFO_SIZE); - s->wdata.new_(WFIFO_SIZE); - - s->max_rdata = RFIFO_SIZE; - s->max_wdata = WFIFO_SIZE; - s->func_recv = recv_to_fifo; - s->func_send = send_from_fifo; - s->func_parse = default_func_parse; - s->client_ip = IP4Address(client_address.sin_addr); - s->created = TimeT::now(); - s->connected = 0; -} - -Session *make_listen_port(uint16_t port) -{ - struct sockaddr_in server_address; - io::FD fd = io::FD::socket(AF_INET, SOCK_STREAM, 0); - if (fd == io::FD()) - { - perror("socket"); - return nullptr; - } - if (fd_max <= fd.uncast_dammit()) - fd_max = fd.uncast_dammit() + 1; - - fd.fcntl(F_SETFL, O_NONBLOCK); - - const int yes = 1; - /// Allow to bind() again after the server restarts. - // Since the socket is still in the TIME_WAIT, there's a possibility - // that formerly lost packets might be delivered and confuse the server. - fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes); - /// Send packets as soon as possible - /// even if the kernel thinks there is too little for it to be worth it! - // I'm not convinced this is a good idea; although in minimizes the - // latency for an individual write, it increases traffic in general. - fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes); - - server_address.sin_family = AF_INET; -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wold-style-cast" -#if __GNUC__ > 4 || __GNUC_MINOR__ >= 8 -# pragma GCC diagnostic ignored "-Wuseless-cast" -#endif - server_address.sin_addr.s_addr = htonl(INADDR_ANY); - server_address.sin_port = htons(port); -#pragma GCC diagnostic pop - - if (fd.bind(reinterpret_cast(&server_address), - sizeof(server_address)) == -1) - { - perror("bind"); - exit(1); - } - if (fd.listen(5) == -1) - { /* error */ - perror("listen"); - exit(1); - } - - readfds.set(fd); - - set_session(fd, make_unique()); - Session *s = get_session(fd); - s->fd = fd; - - s->func_recv = connect_client; - s->created = TimeT::now(); - s->connected = 1; - - return s; -} - -Session *make_connection(IP4Address ip, uint16_t port) -{ - struct sockaddr_in server_address; - io::FD fd = io::FD::socket(AF_INET, SOCK_STREAM, 0); - if (fd == io::FD()) - { - perror("socket"); - return nullptr; - } - if (fd_max <= fd.uncast_dammit()) - fd_max = fd.uncast_dammit() + 1; - - const int yes = 1; - /// Allow to bind() again after the server restarts. - // Since the socket is still in the TIME_WAIT, there's a possibility - // that formerly lost packets might be delivered and confuse the server. - fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes); - /// Send packets as soon as possible - /// even if the kernel thinks there is too little for it to be worth it! - // I'm not convinced this is a good idea; although in minimizes the - // latency for an individual write, it increases traffic in general. - fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes); - - server_address.sin_family = AF_INET; - server_address.sin_addr = in_addr(ip); -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wold-style-cast" -#if __GNUC__ > 4 || __GNUC_MINOR__ >= 8 -# pragma GCC diagnostic ignored "-Wuseless-cast" -#endif - server_address.sin_port = htons(port); -#pragma GCC diagnostic pop - - fd.fcntl(F_SETFL, O_NONBLOCK); - - /// Errors not caught - we must not block - /// Let the main select() loop detect when we know the state - fd.connect(reinterpret_cast(&server_address), - sizeof(struct sockaddr_in)); - - readfds.set(fd); - - set_session(fd, make_unique()); - Session *s = get_session(fd); - s->fd = fd; - s->rdata.new_(RFIFO_SIZE); - s->wdata.new_(WFIFO_SIZE); - - s->max_rdata = RFIFO_SIZE; - s->max_wdata = WFIFO_SIZE; - s->func_recv = recv_to_fifo; - s->func_send = send_from_fifo; - s->func_parse = default_func_parse; - s->created = TimeT::now(); - s->connected = 1; - - return s; -} - -void delete_session(Session *s) -{ - if (!s) - return; - - io::FD fd = s->fd; - // If this was the highest fd, decrease it - // We could add a loop to decrement fd_max further for every null session, - // but this is cheap and good enough for the typical case - if (fd.uncast_dammit() == fd_max - 1) - fd_max--; - readfds.clr(fd); - { - s->rdata.delete_(); - s->wdata.delete_(); - s->session_data.reset(); - reset_session(fd); - } - - // just close() would try to keep sending buffers - fd.shutdown(SHUT_RDWR); - fd.close(); -} - -void realloc_fifo(Session *s, size_t rfifo_size, size_t wfifo_size) -{ - if (s->max_rdata != rfifo_size && s->rdata_size < rfifo_size) - { - s->rdata.resize(rfifo_size); - s->max_rdata = rfifo_size; - } - if (s->max_wdata != wfifo_size && s->wdata_size < wfifo_size) - { - s->wdata.resize(wfifo_size); - s->max_wdata = wfifo_size; - } -} - -void WFIFOSET(Session *s, size_t len) -{ - if (s->wdata_size + len + 16384 > s->max_wdata) - { - realloc_fifo(s, s->max_rdata, s->max_wdata << 1); - PRINTF("socket: %d wdata expanded to %zu bytes.\n", s, s->max_wdata); - } - if (s->wdata_size + len + 2048 < s->max_wdata) - s->wdata_size += len; - else - FPRINTF(stderr, "socket: %d wdata lost !!\n", s), abort(); -} - -void do_sendrecv(interval_t next_ms) -{ - bool any = false; - io::FD_Set rfd = readfds, wfd; - for (io::FD i : iter_fds()) - { - Session *s = get_session(i); - if (s) - { - any = true; - if (s->wdata_size) - wfd.set(i); - } - } - if (!any) - { - if (!has_timers()) - { - PRINTF("Shutting down - nothing to do\n"); - runflag = false; - } - return; - } - struct timeval timeout; - { - std::chrono::seconds next_s = std::chrono::duration_cast(next_ms); - std::chrono::microseconds next_us = next_ms - next_s; - timeout.tv_sec = next_s.count(); - timeout.tv_usec = next_us.count(); - } - if (io::FD_Set::select(fd_max, &rfd, &wfd, NULL, &timeout) <= 0) - return; - for (io::FD i : iter_fds()) - { - Session *s = get_session(i); - if (!s) - continue; - if (wfd.isset(i)) - { - if (s->func_send) - //send_from_fifo(i); - s->func_send(s); - } - if (rfd.isset(i)) - { - if (s->func_recv) - //recv_to_fifo(i); - //or connect_client(i); - s->func_recv(s); - } - } -} - -void do_parsepacket(void) -{ - for (io::FD i : iter_fds()) - { - Session *s = get_session(i); - if (!s) - continue; - if (!s->connected - && static_cast(TimeT::now()) - static_cast(s->created) > CONNECT_TIMEOUT) - { - PRINTF("Session #%d timed out\n", s); - s->eof = 1; - } - if (!s->rdata_size && !s->eof) - continue; - if (s->func_parse) - { - s->func_parse(s); - /// some func_parse may call delete_session - s = get_session(i); - if (s && s->eof) - { - delete_session(s); - s = nullptr; - } - if (!s) - continue; - } - /// Reclaim buffer space for what was read - RFIFOFLUSH(s); - } -} - -void RFIFOSKIP(Session *s, size_t len) -{ - s->rdata_pos += len; - - if (s->rdata_size < s->rdata_pos) - { - FPRINTF(stderr, "too many skip\n"); - abort(); - } -} -- cgit v1.2.3-70-g09d2