diff options
Diffstat (limited to 'src/common/network.c')
-rw-r--r-- | src/common/network.c | 1062 |
1 files changed, 1062 insertions, 0 deletions
diff --git a/src/common/network.c b/src/common/network.c new file mode 100644 index 000000000..2574f3cc1 --- /dev/null +++ b/src/common/network.c @@ -0,0 +1,1062 @@ + // +// Network Subsystem (previously known as socket system) +// +// Author: Florian Wilkemeyer <fw@f-ws.de> +// +// Copyright (c) rAthena Project (www.rathena.org) - Licensed under GNU GPL +// For more information, see LICENCE in the main folder +// +// +//#ifdef HAVE_ACCETP4 +#define _GNU_SOURCE +//#endif + +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <errno.h> + +#include <sys/types.h> +#include <sys/fcntl.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <arpa/inet.h> + + +#include "../common/cbasetypes.h" +#include "../common/showmsg.h" +#include "../common/timer.h" +#include "../common/evdp.h" +#include "../common/netbuffer.h" + +#include "../common/network.h" + +#define ENABLE_IPV6 +#define HAVE_ACCEPT4 +#define EVENTS_PER_CYCLE 10 +#define PARANOID_CHECKS + +// Local Vars (settings..) +static int l_ListenBacklog = 64; + +// +// Global Session Array (previously exported as session[] +// +SESSION g_Session[MAXCONN]; + + +// +static bool onSend(int32 fd); + + +#define _network_free_netbuf_async( buf ) add_timer( 0, _network_async_free_netbuf_proc, 0, (intptr_t) buf) +static int _network_async_free_netbuf_proc(int tid, unsigned int tick, int id, intptr_t data){ + // netbuf is in data + netbuffer_put( (netbuf)data ); + + return 0; +}//end: _network_async_free_netbuf_proc() + + + +void network_init(){ + SESSION *s; + int32 i; + + memset(g_Session, 0x00, (sizeof(SESSION) * MAXCONN) ); + + for(i = 0; i < MAXCONN; i++){ + s = &g_Session[i]; + + s->type = NST_FREE; + s->disconnect_in_progress = false; + + } + + // Initialize the correspondig event dispatcher + evdp_init(); + + // + add_timer_func_list(_network_async_free_netbuf_proc, "_network_async_free_netbuf_proc"); + +}//end: network_init() + + +void network_final(){ + + // @TODO: + // .. disconnect and cleanup everything! + + evdp_final(); + +}//end: network_final() + + +void network_do(){ + struct EVDP_EVENT l_events[EVENTS_PER_CYCLE]; + register struct EVDP_EVENT *ev; + register int n, nfds; + register SESSION *s; + + nfds = evdp_wait( l_events, EVENTS_PER_CYCLE, 1000); // @TODO: timer_getnext() + + for(n = 0; n < nfds; n++){ + ev = &l_events[n]; + s = &g_Session[ ev->fd ]; + + if(ev->events & EVDP_EVENT_HUP){ + network_disconnect( ev->fd ); + continue; // no further event processing. + }// endif vent is HUP (disconnect) + + + if(ev->events & EVDP_EVENT_IN){ + + if(s->onRecv != NULL){ + if( false == s->onRecv(ev->fd) ){ + network_disconnect(ev->fd); + continue; // .. + } + }else{ + ShowError("network_do: fd #%u has no onRecv proc set. - disconnecting\n", ev->fd); + network_disconnect(ev->fd); + continue; + } + + }// endif event is IN (recv) + + + if(ev->events & EVDP_EVENT_OUT){ + if(s->onSend != NULL){ + if( false == s->onSend(ev->fd) ){ + network_disconnect(ev->fd); + continue; + } + }else{ + ShowError("network_do: fd #%u has no onSend proc set. - disconnecting\n", ev->fd); + network_disconnect(ev->fd); + continue; + } + }// endif event is OUT (send) + + }//endfor + +}//end: network_do() + + +static bool _setnonblock(int32 fd){ + int flags = fcntl(fd, F_GETFL, 0); + if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) + return false; + + return true; +}//end: _setnonblock() + + +static bool _network_accept(int32 fd){ + SESSION *listener = &g_Session[fd]; + SESSION *s; + union{ + struct sockaddr_in v4; +#ifdef ENABLE_IPV6 + struct sockaddr_in6 v6; +#endif + } _addr; + int newfd; + socklen_t addrlen; + struct sockaddr *addr; + + // Accept until OS returns - nothing to accept anymore + // - this is required due to our EVDP abstraction. (which handles on listening sockets similar to epoll's EPOLLET flag.) + while(1){ +#ifdef ENABLE_IPV6 + if(listener->v6 == true){ + addrlen = sizeof(_addr.v6); + addr = (struct sockaddr*)&_addr.v6; + }else{ +#endif + addrlen = sizeof(_addr.v4); + addr = (struct sockaddr*)&_addr.v4; +#ifdef ENABLE_IPV6 + } +#endif + +#ifdef HAVE_ACCEPT4 + newfd = accept4(fd, addr, &addrlen, SOCK_NONBLOCK); +#else + newfd = accept(fd, addr, &addrlen); +#endif + + if(newfd == -1){ + if(errno == EAGAIN || errno == EWOULDBLOCK) + break; // this is fully valid & whished., se explaination on top of while(1) + + // Otherwis .. we have serious problems :( seems tahat our listner has gone away.. + // @TODO handle this .. + ShowError("_network_accept: accept() returned error. closing listener. (errno: %u / %s)\n", errno, strerror(errno)); + + return false; // will call disconnect after return. + //break; + } + +#ifndef HAVE_ACCEPT4 // no accept4 means, we have to set nonblock by ourself. .. + if(_setnonblock(newfd) == false){ + ShowError("_network_accept: failed to set newly accepted connection nonblocking (errno: %u / %s). - disconnecting.\n", errno, strerror(errno)); + close(newfd); + continue; + } +#endif + + // Check connection limits. + if(newfd >= MAXCONN){ + ShowError("_network_accept: failed to accept connection - MAXCONN (%u) exceeded.\n", MAXCONN); + close(newfd); + continue; // we have to loop over the events (and disconnect them too ..) but otherwise we would leak event notifications. + } + + + // Create new Session. + s = &g_Session[newfd]; + s->type = NST_CLIENT; + + // The new connection inherits listenr's handlers. + s->onDisconnect = listener->onDisconnect; + s->onConnect = listener->onConnect; // maybe useless but .. fear the future .. :~ + + // Register the new connection @ EVDP + if( evdp_addclient(newfd, &s->evdp_data) == false){ + ShowError("_network_accept: failed to accept connection - event subsystem returned an error.\n"); + close(newfd); + s->type = NST_FREE; + } + + // Call the onConnect handler on the listener. + if( listener->onConnect(newfd) == false ){ + // Resfused by onConnect handler.. + evdp_remove(newfd, &s->evdp_data); + + close(newfd); + s->type = NST_FREE; + + s->data = NULL; // be on the safe side ~ ! + continue; + } + + + } + + return true; +}//end: _network_accept() + + +void network_disconnect(int32 fd){ + SESSION *s = &g_Session[fd]; + netbuf b, bn; + + // Prevent recursive calls + // by wrong implemented on disconnect handlers.. and such.. + if(s->disconnect_in_progress == true) + return; + + s->disconnect_in_progress = true; + + + // Disconnect Todo: + // - Call onDisconnect Handler + // - Release all Assigned buffers. + // - remove from event system (notifications) + // - cleanup session structure + // - close connection. + // + + if(s->onDisconnect != NULL && + s->type != NST_LISTENER){ + + s->onDisconnect( fd ); + } + + // Read Buffer + if(s->read.buf != NULL){ + netbuffer_put(s->read.buf); + s->read.buf = NULL; + } + + // Write Buffer(s) + b = s->write.buf; + while(1){ + if(b == NULL) break; + + bn = b->next; + + netbuffer_put(b); + + b = bn; + } + s->write.buf = NULL; + s->write.buf_last = NULL; + + s->write.n_outstanding = 0; + s->write.max_outstanding = 0; + + + // Remove from event system. + evdp_remove(fd, &s->evdp_data); + + // Cleanup Session Structure. + s->type = NST_FREE; + s->data = NULL; // no application level data assigned + s->disconnect_in_progress = false; + + + // Close connection + close(fd); + +}//end: network_disconnect() + + +int32 network_addlistener(bool v6, const char *addr, uint16 port){ + SESSION *s; + int optval, fd; + +#if !defined(ENABLE_IPV6) + if(v6 == true){ + ShowError("network_addlistener(%c, '%s', %u): this release has no IPV6 support.\n", (v6==true?'t':'f'), addr, port); + return -1; + } +#endif + + +#ifdef ENABLE_IPV6 + if(v6 == true) + fd = socket(AF_INET6, SOCK_STREAM, 0); + else +#endif + fd = socket(AF_INET, SOCK_STREAM, 0); + + // Error? + if(fd == -1){ + ShowError("network_addlistener(%c, '%s', %u): socket() failed (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); + return -1; + } + + // Too many connections? + if(fd >= MAXCONN){ + ShowError("network_addlistener(%c, '%s', %u): cannot create listener, exceeds more than supported connections (%u).\n", (v6==true?'t':'f'), addr, port, MAXCONN); + close(fd); + return -1; + } + + + s = &g_Session[fd]; + if(s->type != NST_FREE){ // additional checks.. :) + ShowError("network_addlistener(%c, '%s', %u): failed, got fd #%u which is already in use in local session table?!\n", (v6==true?'t':'f'), addr, port, fd); + close(fd); + return -1; + } + + + // Fill ip addr structs +#ifdef ENABLE_IPV6 + if(v6 == true){ + memset(&s->addr.v6, 0x00, sizeof(s->addr.v6)); + s->addr.v6.sin6_family = AF_INET6; + s->addr.v6.sin6_port = htons(port); + if(inet_pton(AF_INET6, addr, &s->addr.v6.sin6_addr) != 1){ + ShowError("network_addlistener(%c, '%s', %u): failed to parse the given IPV6 address.\n", (v6==true?'t':'f'), addr, port); + close(fd); + return -1; + } + + }else{ +#endif + memset(&s->addr.v4, 0x00, sizeof(s->addr.v4)); + s->addr.v4.sin_family = AF_INET; + s->addr.v4.sin_port = htons(port); + s->addr.v4.sin_addr.s_addr = inet_addr(addr); +#ifdef ENABLE_IPV6 + } +#endif + + + // if OS has support for SO_REUSEADDR, apply the flag + // so the address could be used when there're still time_wait sockets outstanding from previous application run. +#ifdef SO_REUSEADDR + optval=1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); +#endif + + // Bind +#ifdef ENABLE_IPV6 + if(v6 == true){ + if( bind(fd, (struct sockaddr*)&s->addr.v6, sizeof(s->addr.v6)) == -1) { + ShowError("network_addlistener(%c, '%s', %u): bind failed (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); + close(fd); + return -1; + } + }else{ +#endif + if( bind(fd, (struct sockaddr*)&s->addr.v4, sizeof(s->addr.v4)) == -1) { + ShowError("network_addlistener(%c, '%s', %u): bind failed (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); + close(fd); + return -1; + } +#ifdef ENABLE_IPV6 + } +#endif + + if( listen(fd, l_ListenBacklog) == -1){ + ShowError("network_addlistener(%c, '%s', %u): listen failed (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); + close(fd); + return -1; + } + + + // Set to nonblock! + if(_setnonblock(fd) == false){ + ShowError("network_addlistener(%c, '%s', %u): cannot set to nonblock (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); + close(fd); + return -1; + } + + + // Rgister @ evdp. + if( evdp_addlistener(fd, &s->evdp_data) != true){ + ShowError("network_addlistener(%c, '%s', %u): eventdispatcher subsystem returned an error.\n", (v6==true?'t':'f'), addr, port); + close(fd); + return -1; + } + + + // Apply flags on Session array for this conneciton. + if(v6 == true) s->v6 = true; + else s->v6 = false; + + s->type = NST_LISTENER; + s->onRecv = _network_accept; + + ShowStatus("Added Listener on '%s':%u\n", addr, port, (v6==true ? "(ipv6)":"(ipv4)") ); + + return fd; +}//end: network_addlistener() + + +static bool _network_connect_establishedHandler(int32 fd){ + register SESSION *s = &g_Session[fd]; + int val; + socklen_t val_len; + + if(s->type == NST_FREE) + return true; // due to multiple non coalesced event notifications + // this can happen .. when a previous handled event has already disconnected the connection + // within the same cycle.. + + val = -1; + val_len = sizeof(val); + getsockopt(fd, SOL_SOCKET, SO_ERROR, &val, &val_len); + + if(val != 0){ + // :( .. cleanup session.. + s->type = NST_FREE; + s->onSend = NULL; + s->onConnect = NULL; + s->onDisconnect = NULL; + + evdp_remove(fd, &s->evdp_data); + close(fd); + + return true; // we CANT return false, + // becuase the normal disconnect procedure would execute the ondisconnect handler, which we dont want .. in this case. + }else{ + // ok + if(s->onConnect(fd) == false) { + // onConnect handler has refused the connection .. + // cleanup .. and ok + s->type = NST_FREE; + s->onSend = NULL; + s->onConnect = NULL; + s->onDisconnect = NULL; + + evdp_remove(fd, &s->evdp_data); + close(fd); + + return true; // we dnot want the ondisconnect handler to be executed, so its okay to handle this by ourself. + } + + // connection established ! + // + if( evdp_outgoingconnection_established(fd, &s->evdp_data) == false ){ + return false; // we want the normal disconnect procedure.. with call to ondisconnect handler. + } + + s->onSend = NULL; + + ShowStatus("#%u connection successfull!\n", fd); + } + + return true; +}//end: _network_connect_establishedHandler() + + +int32 network_connect(bool v6, + const char *addr, + uint16 port, + const char *from_addr, + uint16 from_port, + bool (*onConnectionEstablishedHandler)(int32 fd), + void (*onConnectionLooseHandler)(int32 fd) +){ + register SESSION *s; + int32 fd, optval, ret; + struct sockaddr_in ip4; +#ifdef ENABLE_IPV6 + struct sockaddr_in6 ip6; +#endif + +#ifdef ENABLE_IPV6 + if(v6 == true) + fd = socket(AF_INET6, SOCK_STREAM, 0); + else +#endif + fd = socket(AF_INET, SOCK_STREAM, 0); + +#ifndef ENABLE_IPV6 + // check.. + if(v6 == true){ + ShowError("network_connect(%c, '%s', %u...): tried to create an ipv6 connection, IPV6 is not supported in this release.\n", (v6==true?'t':'f'), addr, port); + return -1; + } +#endif + + // check connection limits. + if(fd >= MAXCONN){ + ShowError("network_connect(%c, '%s', %u...): cannot create new connection, exceeeds more than supported connections (%u)\n", (v6==true?'t':'f'), addr, port ); + close(fd); + return -1; + } + + + // Originating IP/Port pair given ? + if(from_addr != NULL && *from_addr != 0){ + //.. + #ifdef SO_REUSEADDR + optval=1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); + #endif + + #ifdef ENABLE_IPV6 + if(v6 == true){ + memset(&ip6, 0x00, sizeof(ip6)); + ip6.sin6_family = AF_INET6; + ip6.sin6_port = htons(from_port); + + if(inet_pton(AF_INET6, from_addr, &ip6.sin6_addr) != 1){ + ShowError("network_connect(%c, '%s', %u...): cannot parse originating (from) IPV6 address (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); + close(fd); + return -1; + } + + ret = bind(fd, (struct sockaddr*)&ip6, sizeof(ip6)); + }else{ + #endif + memset(&ip4, 0x00, sizeof(ip4)); + + ip4.sin_family = AF_INET; + ip4.sin_port = htons(from_port); + ip4.sin_addr.s_addr = inet_addr(from_addr); + ret = bind(fd, (struct sockaddr*)&ip4, sizeof(ip4)); + #ifdef ENABLE_IPV6 + } + #endif + + } + + + // Set non block + if(_setnonblock(fd) == false){ + ShowError("network_connect(%c, '%s', %u...): cannot set socket to nonblocking (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); + close(fd); + return -1; + } + + + // Create ip addr block to connect to .. +#ifdef ENABLE_IPV6 + if(v6 == true){ + memset(&ip6, 0x00, sizeof(ip6)); + ip6.sin6_family = AF_INET6; + ip6.sin6_port = htons(port); + + if(inet_pton(AF_INET6, addr, &ip6.sin6_addr) != 1){ + ShowError("network_connect(%c, '%s', %u...): cannot parse destination IPV6 address (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); + close(fd); + return -1; + } + + }else{ +#endif + memset(&ip4, 0x00, sizeof(ip4)); + + ip4.sin_family = AF_INET; + ip4.sin_port = htons(port); + ip4.sin_addr.s_addr = inet_addr(addr); +#ifdef ENABLE_IPV6 + } +#endif + + + // Assign Session.. + s = &g_Session[fd]; + s->type = NST_OUTGOING; + s->v6 = v6; + s->onConnect = onConnectionEstablishedHandler; + s->onDisconnect = onConnectionLooseHandler; + s->onRecv = NULL; + s->onSend = _network_connect_establishedHandler; +#ifdef ENABLE_IPV6 + if(v6 == true) + memcpy(&s->addr.v6, &ip6, sizeof(ip6)); + else +#endif + memcpy(&s->addr.v4, &ip4, sizeof(ip4)); + + // Register @ EVDP. as outgoing (see doc of the function) + if(evdp_addconnecting(fd, &s->evdp_data) == false){ + ShowError("network_connect(%c, '%s', %u...): eventdispatcher subsystem returned an error.\n", (v6==true?'t':'f'), addr, port); + + // cleanup session x.x.. + s->type = NST_FREE; + s->onConnect = NULL; + s->onDisconnect = NULL; + s->onSend = NULL; + + // close, return error code. + close(fd); + return -1; + } + + +#ifdef ENABLE_IPV6 + if(v6 == true) + ret = connect(fd, (struct sockaddr*)&ip6, sizeof(ip6)); + else +#endif + ret = connect(fd, (struct sockaddr*)&ip4, sizeof(ip4)); + + + // + if(ret != 0 && errno != EINPROGRESS){ + ShowWarning("network_connect(%c, '%s', %u...): connection failed (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); + + // Cleanup session .. + s->type = NST_FREE; + s->onConnect = NULL; + s->onDisconnect = NULL; + s->onSend = NULL; + + // .. remove from evdp and close fd. + evdp_remove(fd, &s->evdp_data); + close(fd); + return -1; + } + + + // ! The Info Message :~D + ShowStatus("network_connect fd#%u (%s:%u) in progress.. \n", fd, addr, port); + +return fd; +}//end: network_connect() + + +static bool _onSend(int32 fd){ + register SESSION *s = &g_Session[fd]; + register netbuf buf, buf_next; + register uint32 szNeeded; + register int wLen; + + if(s->type == NST_FREE) + return true; // Possible due to multipl non coalsced event notifications + // so onSend gets called after disconnect caused by an previous vent. + // we can ignore the call to onSend, then. + + buf = s->write.buf; + while(1){ + if(buf == NULL) + break; + + buf_next = buf->next; + + + szNeeded = (buf->dataLen - s->write.dataPos); // using th session-local .dataPos member, due to shared write buffer support. + + // try to write. + wLen = write(fd, &buf->buf[s->write.dataPos], szNeeded); + if(wLen == 0){ + return false; // eof. + }else if(wLen == -1){ + if(errno == EAGAIN || errno == EWOULDBLOCK) + return true; // dont disconnect / try again later. + + // all other errors. . + return false; + } + + // Wrote data.. => + szNeeded -= wLen; + if(szNeeded > 0){ + // still data left .. + // + s->write.dataPos += wLen; // fix offset. + return true; + }else{ + // this buffer has been written successfully + // could be returned to pool. + netbuffer_put(buf); + s->write.n_outstanding--; // When threadsafe -> Interlocked here. + s->write.dataPos = 0; + } + + + buf = buf_next; + } + + // okay, + // reaching this part means: + // while interrupted by break - + // which means all buffers are written, nothing left + // + + s->write.buf_last = NULL; + s->write.buf = NULL; + s->write.n_outstanding = 0; + s->write.dataPos = 0; + + // Remove from event dispatcher (write notification) + // + evdp_writable_remove(fd, &s->evdp_data); + + return true; +}//end: _onSend() + + +static bool _onRORecv(int32 fd){ + register SESSION *s = &g_Session[fd]; + register uint32 szNeeded; + register char *p; + register int rLen; + + if(s->type == NST_FREE) + return true; // Possible due to multiple non coalesced events by evdp. + // simply ignore this call returning positive result. + + // Initialize p and szNeeded depending on change + // + switch(s->read.state){ + case NRS_WAITOP: + szNeeded = s->read.head_left; + p = ((char*)&s->read.head[0]) + (2-szNeeded); + break; + + case NRS_WAITLEN: + szNeeded = s->read.head_left; + p = ((char*)&s->read.head[1]) + (2-szNeeded); + break; + + case NRS_WAITDATA:{ + register netbuf buf = s->read.buf; + + szNeeded = (buf->dataLen - buf->dataPos); + p = (char*)&buf->buf[ buf->dataPos ]; + } + break; + + default: + // .. the impossible gets possible .. + ShowError("_onRORecv: fd #%u has unknown read.state (%d) - disconnecting\n", fd, s->read.state); + return false; + break; + } + + + // + + rLen = read(fd, p, szNeeded); + if(rLen == 0){ + // eof.. + return false; + }else if(rLen == -1){ + + if(errno == EAGAIN || errno == EWOULDBLOCK){ + // try again later .. (this case shouldnt happen, because we're event trigered.. but .. sometimes it happens :) + return true; + } + + // an additional interesting case would be + // EINTR, this 'could' be handled .. but: + // posix says that its possible that data gets currupted during irq + // or data gor read and not reported.., so we'd have a data loss.. + // (which shouldnt happen with stream based protocols such as tcp) + // its better to disonnect the client in that case. + + return false; + } + + // + // Got Data: + // next action also depends on current state .. + // + szNeeded -= rLen; + switch(s->read.state){ + case NRS_WAITOP: + + if(szNeeded > 0){ + // still data missing .. + s->read.head_left = szNeeded; + return true; // wait for completion. + }else{ + // complete .. + // next state depends on packet type. + + s->read.head[1] = ((uint16*)s->netparser_data)[ s->read.head[0] ]; // store lenght of packet by opcode head[0] to head[1] + + if(s->read.head[1] == ROPACKET_UNKNOWN){ + // unknown packet - disconnect + ShowWarning("_onRORecv: fd #%u got unlnown packet 0x%04x - disconnecting.\n", fd, s->read.head[0]); + return false; + } + else if(s->read.head[1] == ROPACKET_DYNLEN){ + // dynamic length + // next state: requrie len. + s->read.state = NRS_WAITLEN; + s->read.head_left = 2; + return true; // + } + else if(s->read.head[1] == 2){ + // packet has no data (only opcode) + register netbuf buf = netbuffer_get(2); // :D whoohoo its giant! + + NBUFW(buf, 0) = s->read.head[0]; // store opcode @ packet begin. + buf->dataPos = 2; + buf->dataLen = 2; + buf->next = NULL; + + // Back to initial state -> Need opcode. + s->read.state = NRS_WAITOP; + s->read.head_left = 2; + s->read.buf = NULL; + + // Call completion routine here. + s->onPacketComplete(fd, s->read.head[0], 2, buf); + + return true; // done :) + } + else{ + // paket needs .. data .. + register netbuf buf = netbuffer_get( s->read.head[1] ); + + NBUFW(buf, 0) = s->read.head[0]; // store opcode @ packet begin. + buf->dataPos = 2; + buf->dataLen = s->read.head[1]; + buf->next = NULL; + + // attach buffer. + s->read.buf = buf; + + // set state: + s->read.state = NRS_WAITDATA; + + return true; + } + + }//endif: szNeeded > 0 (opcode read completed?) + + break; + + + case NRS_WAITLEN: + + if(szNeeded > 0){ + // incomplete .. + s->read.head_left = szNeeded; + return true; + }else{ + + if(s->read.head[1] == 4){ + // packet has no data (only opcode + length) + register netbuf buf = netbuffer_get( 4 ); + + NBUFL(buf, 0) = *((uint32*)&s->read.head[0]); // copy Opcode + length to netbuffer using MOVL + buf->dataPos = 4; + buf->dataLen = 4; + buf->next = NULL; + + // set initial state (need opcode) + s->read.state = NRS_WAITOP; + s->read.head_left = 2; + s->read.buf = NULL; + + // call completion routine. + s->onPacketComplete(fd, s->read.head[0], 4, buf); + + return true; + } + else if(s->read.head[1] < 4){ + // invalid header. + ShowWarning("_onRORecv: fd #%u invalid header - got packet 0x%04x, reported length < 4 - INVALID - disconnecting\n", fd, s->read.head[0]); + return false; + } + else{ + // Data needed + // next state -> waitdata! + register netbuf buf = netbuffer_get( s->read.head[1] ); + + NBUFL(buf, 0) = *((uint32*)&s->read.head[0]); // copy Opcode + length to netbuffer using MOVL + buf->dataPos = 4; + buf->dataLen = s->read.head[1]; + buf->next = NULL; + + // attach to session: + s->read.buf = buf; + s->read.state = NRS_WAITDATA; + + return true; + } + + }//endif: szNeeded > 0 (length read complete?) + + break; + + + case NRS_WAITDATA: + + if(szNeeded == 0){ + // Packet finished! + // compltion. + register netbuf buf = s->read.buf; + + // set initial state. + s->read.state = NRS_WAITOP; + s->read.head_left = 2; + s->read.buf = NULL; + + // Call completion routine. + s->onPacketComplete(fd, NBUFW(buf, 0), buf->dataLen, buf); + + return true; + }else{ + // still data needed + s->read.buf->dataPos += rLen; + + return true; + } + break; + + + // + default: + ShowError("_onRORecv: fd #%u has unknown read.state (%d) [2] - disconnecting\n", fd, s->read.state); + return false; + break; + } + + + return false; +}//end: _onRORecv() + + +void network_send(int32 fd, netbuf buf){ + register SESSION *s = &g_Session[fd]; + +#ifdef PARANOID_CHECKS + if(fd >= MAXCONN){ + ShowError("network_send: tried to attach buffer to connection idientifer #%u which is out of bounds.\n", fd); + _network_free_netbuf_async(buf); + return; + } +#endif + + + if(s->type == NST_FREE) + return; + + // Check Max Outstanding buffers limit. + if( (s->write.max_outstanding > 0) && + (s->write.n_outstanding >= s->write.max_outstanding) ){ + + ShowWarning("network_send: fd #%u max Outstanding buffers exceeded. - disconnecting.\n", fd); + network_disconnect(fd); + // + _network_free_netbuf_async(buf); + return; + } + + + // Attach to the end: + buf->next = NULL; + if(s->write.buf_last != NULL){ + s->write.buf_last->next = buf; + s->write.buf_last = buf; + + }else{ + // currently no buffer attached. + s->write.buf = s->write.buf_last = buf; + + // register @ evdp for writable notification. + evdp_writable_add(fd, &s->evdp_data); // + } + + + // + s->write.n_outstanding++; + +}//end: network_send() + + +void network_parser_set_ro(int32 fd, + int16 *packetlentable, + void (*onPacketCompleteProc)(int32 fd, uint16 op, uint16 len, netbuf buf) + ){ + register SESSION *s = &g_Session[fd]; + register netbuf b, nb; // used for potential free attached buffers. + + if(s->type == NST_FREE) + return; + + s->onPacketComplete = onPacketCompleteProc; + + s->onRecv = _onRORecv; // .. + s->onSend = _onSend; // Using the normal generic netbuf based send function. + + s->netparser_data = packetlentable; + + // Initial State -> Need Packet OPCode. + s->read.state = NRS_WAITOP; + s->read.head_left = 2; + + + // Detach (if..) all buffers. + if(s->read.buf != NULL){ + _network_free_netbuf_async(s->read.buf); // + s->read.buf = NULL; + } + + if(s->write.buf != NULL){ + b = s->write.buf; + while(1){ + nb = b->next; + + _network_free_netbuf_async(b); + + b = nb; + } + + s->write.buf = NULL; + s->write.buf_last = NULL; + s->write.n_outstanding = 0; + } + + // not changing any limits on outstanding .. + // + +}//end: network_parser_set_ro() + |