summaryrefslogblamecommitdiff
path: root/src/mmo/socket.cpp
blob: 79857b183875dcb83afe190a300561aa8e519259 (plain) (tree)
1
                     




















                                                                           
 
                      
                        
                       
                        

                  
                   
 

                  
                
 

                               
                             
 

                    
                    
 

                        
      

                   
           
 
      
                                  
      
                                  
 

                                                 
      
                                                         
                          
 
                                                















                            















                                           
























                                                                 

                                        
                           
 


                                                                        



                                            
                             
 
                                        


 
                                 
      
                             
 
                                                      
                                       
 

                

                             
     
        
     
                     
     

 
      
                               
 
                                                           
 

                

                             
         

                                                        
         
                         


        
                     
     

 
      
                               
 
            

 
      
                                
 
                                      
                                           
 

                                                                                          
     
                         
               
     
                                         
     
                                                                                           

                               
               
     
                                     
     
                                        
     
 



                                                                            
                                                              

                                                                            
                                                 
                                                              
 



                                                                 
                                                                           

                      
                                                                  

      
                    
 
                                  
 


                                                                              
                                 


                              

                              


                                                       

 
                                                                 
 
                                      

                                                        
     
                         
                       
     

                                        
 
                                  




                                                                            
                                                              



                                                                            
                                                              

                                        

                                                 
                                       
                                                
      

                                                       
                          
 
                                                                     
                                            
     

                       
     
                           
                                           

                         

     
                    
 

                                                                         
                                                                                   
                                 
                               
               
 

                              
 
             

 
                                                                              
 
                                      

                                                        
     
                         
                       
     

                                        
 



                                                                            
                                                              



                                                                            
                                                              
 
                                        
                                          

                                                 
                                       
                                                
      
                                          
                          
 
                                  
 

                                                                
                                                                    
                                         
 
                    
 


                                                                              
                                 





                              



                              

 
                               
 
           
               

                                           
 
                      


                                                                              
                                         
                 
                    
     


                                
                          
     

                                                     

                           

 
                                                                   
 

                                                                 
                                    



                                                                 
                                    

                                  

 
                                     
 

                                                   
                                                         
                                                                                 
     


                                                  
                                                                      

 
                                    
 
                     

                                  
     
                                    










                              
                                                          


                            
     
                           





                                                                                                
                                                                    
               
                               
     
                                    
               
                     
                                    
         
                             
                                    
                                
         
                                    
         
                             
                                  
                                       
                                

         

 
                         
 
                               
     
                                    
               
                     

                                                                                                     
         
                                                     
                         
         
                                                      
         
                             
                                                       
                                    
                               
                   

                         
                   



                              
                                                  
                      
     

 
                                      
 
                        
 
                                     
     
                                               
                
     
 
#include "socket.hpp"
//    socket.cpp - Network event system.
//
//    Copyright © ????-2004 Athena Dev Teams
//    Copyright © 2004-2011 The Mana World Development Team
//    Copyright © 2011-2014 Ben Longbons <b.r.longbons@gmail.com>
//    Copyright © 2013 MadCamel
//
//    This file is part of The Mana World (Athena server)
//
//    This program is free software: you can redistribute it and/or modify
//    it under the terms of the GNU General Public License as published by
//    the Free Software Foundation, either version 3 of the License, or
//    (at your option) any later version.
//
//    This program is distributed in the hope that it will be useful,
//    but WITHOUT ANY WARRANTY; without even the implied warranty of
//    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
//    GNU General Public License for more details.
//
//    You should have received a copy of the GNU General Public License
//    along with this program.  If not, see <http://www.gnu.org/licenses/>.

#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
//#include <sys/types.h>

#include <fcntl.h>
#include <unistd.h>

#include <cstdlib>
#include <cstring>
#include <ctime>

#include "../compat/memory.hpp"

#include "../io/cxxstdio.hpp"

#include "core.hpp"
#include "timer.hpp"
#include "utils.hpp"

#include "../poison.hpp"

static
io::FD_Set readfds;
static
int fd_max;

static
const uint32_t RFIFO_SIZE = 65536;
static
const uint32_t WFIFO_SIZE = 65536;

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
static
std::array<std::unique_ptr<Session>, FD_SETSIZE> session;
#pragma GCC diagnostic pop

Session::Session(SessionIO io, SessionParsers p)
: created()
, connected()
, eof()
, timed_close()
, rdata(), wdata()
, max_rdata(), max_wdata()
, rdata_size(), wdata_size()
, rdata_pos()
, client_ip()
, func_recv()
, func_send()
, func_parse()
, func_delete()
, for_inferior()
, session_data()
, fd()
{
    set_io(io);
    set_parsers(p);
}
void Session::set_io(SessionIO io)
{
    func_send = io.func_send;
    func_recv = io.func_recv;
}
void Session::set_parsers(SessionParsers p)
{
    func_parse = p.func_parse;
    func_delete = p.func_delete;
}


void set_session(io::FD fd, std::unique_ptr<Session> sess)
{
    int f = fd.uncast_dammit();
    assert (0 <= f && f < FD_SETSIZE);
    session[f] = std::move(sess);
}
Session *get_session(io::FD fd)
{
    int f = fd.uncast_dammit();
    if (0 <= f && f < FD_SETSIZE)
        return session[f].get();
    return nullptr;
}
void reset_session(io::FD fd)
{
    int f = fd.uncast_dammit();
    assert (0 <= f && f < FD_SETSIZE);
    session[f] = nullptr;
}
int get_fd_max() { return fd_max; }
IteratorPair<ValueIterator<io::FD, IncrFD>> iter_fds()
{
    return {io::FD::cast_dammit(0), io::FD::cast_dammit(fd_max)};
}

/// clean up by discarding handled bytes
inline
void RFIFOFLUSH(Session *s)
{
    really_memmove(&s->rdata[0], &s->rdata[s->rdata_pos], RFIFOREST(s));
    s->rdata_size = RFIFOREST(s);
    s->rdata_pos = 0;
}

/// how much room there is to read more data
inline
size_t RFIFOSPACE(Session *s)
{
    return s->max_rdata - s->rdata_size;
}


/// Read from socket to the queue
static
void recv_to_fifo(Session *s)
{
    ssize_t len = s->fd.read(&s->rdata[s->rdata_size],
                        RFIFOSPACE(s));

    if (len > 0)
    {
        s->rdata_size += len;
        s->connected = 1;
    }
    else
    {
        s->set_eof();
    }
}

static
void send_from_fifo(Session *s)
{
    ssize_t len = s->fd.write(&s->wdata[0], s->wdata_size);

    if (len > 0)
    {
        s->wdata_size -= len;
        if (s->wdata_size)
        {
            really_memmove(&s->wdata[0], &s->wdata[len],
                     s->wdata_size);
        }
        s->connected = 1;
    }
    else
    {
        s->set_eof();
    }
}

static
void nothing_delete(Session *s)
{
    (void)s;
}

static
void connect_client(Session *ls)
{
    struct sockaddr_in client_address;
    socklen_t len = sizeof(client_address);

    io::FD fd = ls->fd.accept(reinterpret_cast<struct sockaddr *>(&client_address), &len);
    if (fd == io::FD())
    {
        perror("accept");
        return;
    }
    if (fd.uncast_dammit() >= SOFT_LIMIT)
    {
        FPRINTF(stderr, "softlimit reached, disconnecting : %d\n"_fmt, fd.uncast_dammit());
        fd.shutdown(SHUT_RDWR);
        fd.close();
        return;
    }
    if (fd_max <= fd.uncast_dammit())
    {
        fd_max = fd.uncast_dammit() + 1;
    }

    const int yes = 1;
    /// Allow to bind() again after the server restarts.
    // Since the socket is still in the TIME_WAIT, there's a possibility
    // that formerly lost packets might be delivered and confuse the server.
    fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
    /// Send packets as soon as possible
    /// even if the kernel thinks there is too little for it to be worth it!
    /// Testing shows this is indeed a good idea.
    fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes);

    // Linux-ism: Set socket options to optimize for thin streams
    // See http://lwn.net/Articles/308919/ and
    // Documentation/networking/tcp-thin.txt .. Kernel 3.2+
#ifdef TCP_THIN_LINEAR_TIMEOUTS
    fd.setsockopt(IPPROTO_TCP, TCP_THIN_LINEAR_TIMEOUTS, &yes, sizeof yes);
#endif
#ifdef TCP_THIN_DUPACK
    fd.setsockopt(IPPROTO_TCP, TCP_THIN_DUPACK, &yes, sizeof yes);
#endif

    readfds.set(fd);

    fd.fcntl(F_SETFL, O_NONBLOCK);

    set_session(fd, make_unique<Session>(
                SessionIO{func_recv: recv_to_fifo, func_send: send_from_fifo},
                ls->for_inferior));
    Session *s = get_session(fd);
    s->fd = fd;
    s->rdata.new_(RFIFO_SIZE);
    s->wdata.new_(WFIFO_SIZE);
    s->max_rdata = RFIFO_SIZE;
    s->max_wdata = WFIFO_SIZE;
    s->client_ip = IP4Address(client_address.sin_addr);
    s->created = TimeT::now();
    s->connected = 0;
}

Session *make_listen_port(uint16_t port, SessionParsers inferior)
{
    struct sockaddr_in server_address;
    io::FD fd = io::FD::socket(AF_INET, SOCK_STREAM, 0);
    if (fd == io::FD())
    {
        perror("socket");
        return nullptr;
    }
    if (fd_max <= fd.uncast_dammit())
        fd_max = fd.uncast_dammit() + 1;

    fd.fcntl(F_SETFL, O_NONBLOCK);

    const int yes = 1;
    /// Allow to bind() again after the server restarts.
    // Since the socket is still in the TIME_WAIT, there's a possibility
    // that formerly lost packets might be delivered and confuse the server.
    fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
    /// Send packets as soon as possible
    /// even if the kernel thinks there is too little for it to be worth it!
    // I'm not convinced this is a good idea; although in minimizes the
    // latency for an individual write, it increases traffic in general.
    fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes);

    server_address.sin_family = AF_INET;
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#if __GNUC__ > 4 || __GNUC_MINOR__ >= 8
# pragma GCC diagnostic ignored "-Wuseless-cast"
#endif
    server_address.sin_addr.s_addr = htonl(INADDR_ANY);
    server_address.sin_port = htons(port);
#pragma GCC diagnostic pop

    if (fd.bind(reinterpret_cast<struct sockaddr *>(&server_address),
              sizeof(server_address)) == -1)
    {
        perror("bind");
        exit(1);
    }
    if (fd.listen(5) == -1)
    {                           /* error */
        perror("listen");
        exit(1);
    }

    readfds.set(fd);

    set_session(fd, make_unique<Session>(
                SessionIO{func_recv: connect_client, func_send: nullptr},
                SessionParsers{func_parse: nullptr, func_delete: nothing_delete}));
    Session *s = get_session(fd);
    s->for_inferior = inferior;
    s->fd = fd;

    s->created = TimeT::now();
    s->connected = 1;

    return s;
}

Session *make_connection(IP4Address ip, uint16_t port, SessionParsers parsers)
{
    struct sockaddr_in server_address;
    io::FD fd = io::FD::socket(AF_INET, SOCK_STREAM, 0);
    if (fd == io::FD())
    {
        perror("socket");
        return nullptr;
    }
    if (fd_max <= fd.uncast_dammit())
        fd_max = fd.uncast_dammit() + 1;

    const int yes = 1;
    /// Allow to bind() again after the server restarts.
    // Since the socket is still in the TIME_WAIT, there's a possibility
    // that formerly lost packets might be delivered and confuse the server.
    fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes);
    /// Send packets as soon as possible
    /// even if the kernel thinks there is too little for it to be worth it!
    // I'm not convinced this is a good idea; although in minimizes the
    // latency for an individual write, it increases traffic in general.
    fd.setsockopt(IPPROTO_TCP, TCP_NODELAY, &yes, sizeof yes);

    server_address.sin_family = AF_INET;
    server_address.sin_addr = in_addr(ip);
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#if __GNUC__ > 4 || __GNUC_MINOR__ >= 8
# pragma GCC diagnostic ignored "-Wuseless-cast"
#endif
    server_address.sin_port = htons(port);
#pragma GCC diagnostic pop

    fd.fcntl(F_SETFL, O_NONBLOCK);

    /// Errors not caught - we must not block
    /// Let the main select() loop detect when we know the state
    fd.connect(reinterpret_cast<struct sockaddr *>(&server_address),
             sizeof(struct sockaddr_in));

    readfds.set(fd);

    set_session(fd, make_unique<Session>(
                SessionIO{func_recv: recv_to_fifo, func_send: send_from_fifo},
                parsers));
    Session *s = get_session(fd);
    s->fd = fd;
    s->rdata.new_(RFIFO_SIZE);
    s->wdata.new_(WFIFO_SIZE);

    s->max_rdata = RFIFO_SIZE;
    s->max_wdata = WFIFO_SIZE;
    s->created = TimeT::now();
    s->connected = 1;

    return s;
}

void delete_session(Session *s)
{
    if (!s)
        return;
    // this needs to be before the fd_max--
    s->func_delete(s);

    io::FD fd = s->fd;
    // If this was the highest fd, decrease it
    // We could add a loop to decrement fd_max further for every null session,
    // but this is cheap and good enough for the typical case
    if (fd.uncast_dammit() == fd_max - 1)
        fd_max--;
    readfds.clr(fd);
    {
        s->rdata.delete_();
        s->wdata.delete_();
        s->session_data.reset();
        reset_session(fd);
    }

    // just close() would try to keep sending buffers
    fd.shutdown(SHUT_RDWR);
    fd.close();
}

void realloc_fifo(Session *s, size_t rfifo_size, size_t wfifo_size)
{
    if (s->max_rdata != rfifo_size && s->rdata_size < rfifo_size)
    {
        s->rdata.resize(rfifo_size);
        s->max_rdata = rfifo_size;
    }
    if (s->max_wdata != wfifo_size && s->wdata_size < wfifo_size)
    {
        s->wdata.resize(wfifo_size);
        s->max_wdata = wfifo_size;
    }
}

void WFIFOSET(Session *s, size_t len)
{
    if (s->wdata_size + len + 16384 > s->max_wdata)
    {
        realloc_fifo(s, s->max_rdata, s->max_wdata << 1);
        PRINTF("socket: %d wdata expanded to %zu bytes.\n"_fmt, s, s->max_wdata);
    }
    if (s->wdata_size + len + 2048 < s->max_wdata)
        s->wdata_size += len;
    else
        FPRINTF(stderr, "socket: %d wdata lost !!\n"_fmt, s), abort();
}

void do_sendrecv(interval_t next_ms)
{
    bool any = false;
    io::FD_Set rfd = readfds, wfd;
    for (io::FD i : iter_fds())
    {
        Session *s = get_session(i);
        if (s)
        {
            any = true;
            if (s->wdata_size)
                wfd.set(i);
        }
    }
    if (!any)
    {
        if (!has_timers())
        {
            PRINTF("Shutting down - nothing to do\n"_fmt);
            runflag = false;
        }
        return;
    }
    struct timeval timeout;
    {
        std::chrono::seconds next_s = std::chrono::duration_cast<std::chrono::seconds>(next_ms);
        std::chrono::microseconds next_us = next_ms - next_s;
        timeout.tv_sec = next_s.count();
        timeout.tv_usec = next_us.count();
    }
    if (io::FD_Set::select(fd_max, &rfd, &wfd, NULL, &timeout) <= 0)
        return;
    for (io::FD i : iter_fds())
    {
        Session *s = get_session(i);
        if (!s)
            continue;
        if (wfd.isset(i) && !s->eof)
        {
            if (s->func_send)
                //send_from_fifo(i);
                s->func_send(s);
        }
        if (rfd.isset(i) && !s->eof)
        {
            if (s->func_recv)
                //recv_to_fifo(i);
                //or connect_client(i);
                s->func_recv(s);
        }
    }
}

void do_parsepacket(void)
{
    for (io::FD i : iter_fds())
    {
        Session *s = get_session(i);
        if (!s)
            continue;
        if (!s->connected
            && static_cast<time_t>(TimeT::now()) - static_cast<time_t>(s->created) > CONNECT_TIMEOUT)
        {
            PRINTF("Session #%d timed out\n"_fmt, s);
            s->set_eof();
        }
        if (s->rdata_size && !s->eof && s->func_parse)
        {
            s->func_parse(s);
            /// some func_parse may call delete_session
            // (that's kind of evil)
            s = get_session(i);
            if (!s)
                continue;
        }
        if (s->eof)
        {
            delete_session(s);
            continue;
        }
        /// Reclaim buffer space for what was read
        RFIFOFLUSH(s);
    }
}

void RFIFOSKIP(Session *s, size_t len)
{
    s->rdata_pos += len;

    if (s->rdata_size < s->rdata_pos)
    {
        FPRINTF(stderr, "too many skip\n"_fmt);
        abort();
    }
}