summaryrefslogtreecommitdiff
path: root/src/net/socket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net/socket.cpp')
-rw-r--r--src/net/socket.cpp511
1 files changed, 511 insertions, 0 deletions
diff --git a/src/net/socket.cpp b/src/net/socket.cpp
new file mode 100644
index 0000000..6880bfa
--- /dev/null
+++ b/src/net/socket.cpp
@@ -0,0 +1,511 @@
+#include "socket.hpp"
+// socket.cpp - Network event system.
+//
+// Copyright © ????-2004 Athena Dev Teams
+// Copyright © 2004-2011 The Mana World Development Team
+// Copyright © 2011-2014 Ben Longbons <b.r.longbons@gmail.com>
+// Copyright © 2013 MadCamel
+//
+// This file is part of The Mana World (Athena server)
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+#include <netinet/tcp.h>
+#include <sys/socket.h>
+
+#include <fcntl.h>
+
+#include <cstdlib>
+
+#include <array>
+
+#include "../compat/memory.hpp"
+
+#include "../io/cxxstdio.hpp"
+
+// TODO get rid of ordering violations
+#include "../mmo/utils.hpp"
+#include "../mmo/core.hpp"
+
+#include "timer.hpp"
+
+#include "../poison.hpp"
+
+static
+io::FD_Set readfds;
+static
+int fd_max;
+
+static
+const uint32_t RFIFO_SIZE = 65536;
+static
+const uint32_t WFIFO_SIZE = 65536;
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wold-style-cast"
+static
+std::array<std::unique_ptr<Session>, FD_SETSIZE> session;
+#pragma GCC diagnostic pop
+
+Session::Session(SessionIO io, SessionParsers p)
+: created()
+, connected()
+, eof()
+, timed_close()
+, rdata(), wdata()
+, max_rdata(), max_wdata()
+, rdata_size(), wdata_size()
+, rdata_pos()
+, client_ip()
+, func_recv()
+, func_send()
+, func_parse()
+, func_delete()
+, for_inferior()
+, session_data()
+, fd()
+{
+ set_io(io);
+ set_parsers(p);
+}
+void Session::set_io(SessionIO io)
+{
+ func_send = io.func_send;
+ func_recv = io.func_recv;
+}
+void Session::set_parsers(SessionParsers p)
+{
+ func_parse = p.func_parse;
+ func_delete = p.func_delete;
+}
+
+
+void set_session(io::FD fd, std::unique_ptr<Session> sess)
+{
+ int f = fd.uncast_dammit();
+ assert (0 <= f && f < FD_SETSIZE);
+ session[f] = std::move(sess);
+}
+Session *get_session(io::FD fd)
+{
+ int f = fd.uncast_dammit();
+ if (0 <= f && f < FD_SETSIZE)
+ return session[f].get();
+ return nullptr;
+}
+void reset_session(io::FD fd)
+{
+ int f = fd.uncast_dammit();
+ assert (0 <= f && f < FD_SETSIZE);
+ session[f] = nullptr;
+}
+int get_fd_max() { return fd_max; }
+IteratorPair<ValueIterator<io::FD, IncrFD>> iter_fds()
+{
+ return {io::FD::cast_dammit(0), io::FD::cast_dammit(fd_max)};
+}
+
+/// clean up by discarding handled bytes
+inline
+void RFIFOFLUSH(Session *s)
+{
+ really_memmove(&s->rdata[0], &s->rdata[s->rdata_pos], RFIFOREST(s));
+ s->rdata_size = RFIFOREST(s);
+ s->rdata_pos = 0;
+}
+
+/// how much room there is to read more data
+inline
+size_t RFIFOSPACE(Session *s)
+{
+ return s->max_rdata - s->rdata_size;
+}
+
+
+/// Read from socket to the queue
+static
+void recv_to_fifo(Session *s)
+{
+ ssize_t len = s->fd.read(&s->rdata[s->rdata_size],
+ RFIFOSPACE(s));
+
+ if (len > 0)
+ {
+ s->rdata_size += len;
+ s->connected = 1;
+ }
+ else
+ {
+ s->set_eof();
+ }
+}
+
+static
+void send_from_fifo(Session *s)
+{
+ ssize_t len = s->fd.write(&s->wdata[0], s->wdata_size);
+
+ if (len > 0)
+ {
+ s->wdata_size -= len;
+ if (s->wdata_size)
+ {
+ really_memmove(&s->wdata[0], &s->wdata[len],
+ s->wdata_size);
+ }
+ s->connected = 1;
+ }
+ else
+ {
+ s->set_eof();
+ }
+}
+
+static
+void nothing_delete(Session *s)
+{
+ (void)s;
+}
+
+static
+void connect_client(Session *ls)
+{
+ struct sockaddr_in client_address;
+ socklen_t len = sizeof(client_address);
+
+ io::FD fd = ls->fd.accept(reinterpret_cast<struct sockaddr *>(&client_address), &len);
+ if (fd == io::FD())
+ {
+ perror("accept");
+ return;
+ }
+ if (fd.uncast_dammit() >= SOFT_LIMIT)
+ {
+ FPRINTF(stderr, "softlimit reached, disconnecting : %d\n"_fmt, fd.uncast_dammit());
+ fd.shutdown(SHUT_RDWR);
+ fd.close();
+ return;
+ }
+ if (fd_max <= fd.uncast_dammit())
+ {
+ fd_max = fd.uncast_dammit() + 1;
+ }
+
+ const int yes = 1;
+ /// Allow to bind() again after the server restarts.
+ // Since the socket is still in the TIME_WAIT, there's a possibility
+ // that formerly lost packets might be delivered and confuse the server.
+ fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
+ /// Send packets as soon as possible
+ /// even if the kernel thinks there is too little for it to be worth it!
+ /// Testing shows this is indeed a good idea.
+ fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes);
+
+ // Linux-ism: Set socket options to optimize for thin streams
+ // See http://lwn.net/Articles/308919/ and
+ // Documentation/networking/tcp-thin.txt .. Kernel 3.2+
+#ifdef TCP_THIN_LINEAR_TIMEOUTS
+ fd.setsockopt(IPPROTO_TCP, TCP_THIN_LINEAR_TIMEOUTS, &yes, sizeof yes);
+#endif
+#ifdef TCP_THIN_DUPACK
+ fd.setsockopt(IPPROTO_TCP, TCP_THIN_DUPACK, &yes, sizeof yes);
+#endif
+
+ readfds.set(fd);
+
+ fd.fcntl(F_SETFL, O_NONBLOCK);
+
+ set_session(fd, make_unique<Session>(
+ SessionIO{.func_recv= recv_to_fifo, .func_send= send_from_fifo},
+ ls->for_inferior));
+ Session *s = get_session(fd);
+ s->fd = fd;
+ s->rdata.new_(RFIFO_SIZE);
+ s->wdata.new_(WFIFO_SIZE);
+ s->max_rdata = RFIFO_SIZE;
+ s->max_wdata = WFIFO_SIZE;
+ s->client_ip = IP4Address(client_address.sin_addr);
+ s->created = TimeT::now();
+ s->connected = 0;
+}
+
+Session *make_listen_port(uint16_t port, SessionParsers inferior)
+{
+ struct sockaddr_in server_address;
+ io::FD fd = io::FD::socket(AF_INET, SOCK_STREAM, 0);
+ if (fd == io::FD())
+ {
+ perror("socket");
+ return nullptr;
+ }
+ if (fd_max <= fd.uncast_dammit())
+ fd_max = fd.uncast_dammit() + 1;
+
+ fd.fcntl(F_SETFL, O_NONBLOCK);
+
+ const int yes = 1;
+ /// Allow to bind() again after the server restarts.
+ // Since the socket is still in the TIME_WAIT, there's a possibility
+ // that formerly lost packets might be delivered and confuse the server.
+ fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
+ /// Send packets as soon as possible
+ /// even if the kernel thinks there is too little for it to be worth it!
+ // I'm not convinced this is a good idea; although in minimizes the
+ // latency for an individual write, it increases traffic in general.
+ fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes);
+
+ server_address.sin_family = AF_INET;
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wold-style-cast"
+#if __GNUC__ > 4 || __GNUC_MINOR__ >= 8
+# pragma GCC diagnostic ignored "-Wuseless-cast"
+#endif
+ server_address.sin_addr.s_addr = htonl(INADDR_ANY);
+ server_address.sin_port = htons(port);
+#pragma GCC diagnostic pop
+
+ if (fd.bind(reinterpret_cast<struct sockaddr *>(&server_address),
+ sizeof(server_address)) == -1)
+ {
+ perror("bind");
+ exit(1);
+ }
+ if (fd.listen(5) == -1)
+ { /* error */
+ perror("listen");
+ exit(1);
+ }
+
+ readfds.set(fd);
+
+ set_session(fd, make_unique<Session>(
+ SessionIO{.func_recv= connect_client, .func_send= nullptr},
+ SessionParsers{.func_parse= nullptr, .func_delete= nothing_delete}));
+ Session *s = get_session(fd);
+ s->for_inferior = inferior;
+ s->fd = fd;
+
+ s->created = TimeT::now();
+ s->connected = 1;
+
+ return s;
+}
+
+Session *make_connection(IP4Address ip, uint16_t port, SessionParsers parsers)
+{
+ struct sockaddr_in server_address;
+ io::FD fd = io::FD::socket(AF_INET, SOCK_STREAM, 0);
+ if (fd == io::FD())
+ {
+ perror("socket");
+ return nullptr;
+ }
+ if (fd_max <= fd.uncast_dammit())
+ fd_max = fd.uncast_dammit() + 1;
+
+ const int yes = 1;
+ /// Allow to bind() again after the server restarts.
+ // Since the socket is still in the TIME_WAIT, there's a possibility
+ // that formerly lost packets might be delivered and confuse the server.
+ fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
+ /// Send packets as soon as possible
+ /// even if the kernel thinks there is too little for it to be worth it!
+ // I'm not convinced this is a good idea; although in minimizes the
+ // latency for an individual write, it increases traffic in general.
+ fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes);
+
+ server_address.sin_family = AF_INET;
+ server_address.sin_addr = in_addr(ip);
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wold-style-cast"
+#if __GNUC__ > 4 || __GNUC_MINOR__ >= 8
+# pragma GCC diagnostic ignored "-Wuseless-cast"
+#endif
+ server_address.sin_port = htons(port);
+#pragma GCC diagnostic pop
+
+ fd.fcntl(F_SETFL, O_NONBLOCK);
+
+ /// Errors not caught - we must not block
+ /// Let the main select() loop detect when we know the state
+ fd.connect(reinterpret_cast<struct sockaddr *>(&server_address),
+ sizeof(struct sockaddr_in));
+
+ readfds.set(fd);
+
+ set_session(fd, make_unique<Session>(
+ SessionIO{.func_recv= recv_to_fifo, .func_send= send_from_fifo},
+ parsers));
+ Session *s = get_session(fd);
+ s->fd = fd;
+ s->rdata.new_(RFIFO_SIZE);
+ s->wdata.new_(WFIFO_SIZE);
+
+ s->max_rdata = RFIFO_SIZE;
+ s->max_wdata = WFIFO_SIZE;
+ s->created = TimeT::now();
+ s->connected = 1;
+
+ return s;
+}
+
+void delete_session(Session *s)
+{
+ if (!s)
+ return;
+ // this needs to be before the fd_max--
+ s->func_delete(s);
+
+ io::FD fd = s->fd;
+ // If this was the highest fd, decrease it
+ // We could add a loop to decrement fd_max further for every null session,
+ // but this is cheap and good enough for the typical case
+ if (fd.uncast_dammit() == fd_max - 1)
+ fd_max--;
+ readfds.clr(fd);
+ {
+ s->rdata.delete_();
+ s->wdata.delete_();
+ s->session_data.reset();
+ reset_session(fd);
+ }
+
+ // just close() would try to keep sending buffers
+ fd.shutdown(SHUT_RDWR);
+ fd.close();
+}
+
+void realloc_fifo(Session *s, size_t rfifo_size, size_t wfifo_size)
+{
+ if (s->max_rdata != rfifo_size && s->rdata_size < rfifo_size)
+ {
+ s->rdata.resize(rfifo_size);
+ s->max_rdata = rfifo_size;
+ }
+ if (s->max_wdata != wfifo_size && s->wdata_size < wfifo_size)
+ {
+ s->wdata.resize(wfifo_size);
+ s->max_wdata = wfifo_size;
+ }
+}
+
+void WFIFOSET(Session *s, size_t len)
+{
+ if (s->wdata_size + len + 16384 > s->max_wdata)
+ {
+ realloc_fifo(s, s->max_rdata, s->max_wdata << 1);
+ PRINTF("socket: %d wdata expanded to %zu bytes.\n"_fmt, s, s->max_wdata);
+ }
+ if (s->wdata_size + len + 2048 < s->max_wdata)
+ s->wdata_size += len;
+ else
+ FPRINTF(stderr, "socket: %d wdata lost !!\n"_fmt, s), abort();
+}
+
+void do_sendrecv(interval_t next_ms)
+{
+ bool any = false;
+ io::FD_Set rfd = readfds, wfd;
+ for (io::FD i : iter_fds())
+ {
+ Session *s = get_session(i);
+ if (s)
+ {
+ any = true;
+ if (s->wdata_size)
+ wfd.set(i);
+ }
+ }
+ if (!any)
+ {
+ if (!has_timers())
+ {
+ PRINTF("Shutting down - nothing to do\n"_fmt);
+ // TODO hoist this
+ runflag = false;
+ }
+ return;
+ }
+ struct timeval timeout;
+ {
+ std::chrono::seconds next_s = std::chrono::duration_cast<std::chrono::seconds>(next_ms);
+ std::chrono::microseconds next_us = next_ms - next_s;
+ timeout.tv_sec = next_s.count();
+ timeout.tv_usec = next_us.count();
+ }
+ if (io::FD_Set::select(fd_max, &rfd, &wfd, NULL, &timeout) <= 0)
+ return;
+ for (io::FD i : iter_fds())
+ {
+ Session *s = get_session(i);
+ if (!s)
+ continue;
+ if (wfd.isset(i) && !s->eof)
+ {
+ if (s->func_send)
+ //send_from_fifo(i);
+ s->func_send(s);
+ }
+ if (rfd.isset(i) && !s->eof)
+ {
+ if (s->func_recv)
+ //recv_to_fifo(i);
+ //or connect_client(i);
+ s->func_recv(s);
+ }
+ }
+}
+
+void do_parsepacket(void)
+{
+ for (io::FD i : iter_fds())
+ {
+ Session *s = get_session(i);
+ if (!s)
+ continue;
+ if (!s->connected
+ && static_cast<time_t>(TimeT::now()) - static_cast<time_t>(s->created) > CONNECT_TIMEOUT)
+ {
+ PRINTF("Session #%d timed out\n"_fmt, s);
+ s->set_eof();
+ }
+ if (s->rdata_size && !s->eof && s->func_parse)
+ {
+ s->func_parse(s);
+ /// some func_parse may call delete_session
+ // (that's kind of evil)
+ s = get_session(i);
+ if (!s)
+ continue;
+ }
+ if (s->eof)
+ {
+ delete_session(s);
+ continue;
+ }
+ /// Reclaim buffer space for what was read
+ RFIFOFLUSH(s);
+ }
+}
+
+void RFIFOSKIP(Session *s, size_t len)
+{
+ s->rdata_pos += len;
+
+ if (s->rdata_size < s->rdata_pos)
+ {
+ FPRINTF(stderr, "too many skip\n"_fmt);
+ abort();
+ }
+}