// // Network Subsystem (previously known as socket system) // // Author: Florian Wilkemeyer // // Copyright (c) rAthena Project (www.rathena.org) - Licensed under GNU GPL // For more information, see LICENCE in the main folder // // //#ifdef HAVE_ACCETP4 #define _GNU_SOURCE //#endif #include #include #include #include #include #include #include #include #include #include #include "../common/cbasetypes.h" #include "../common/showmsg.h" #include "../common/timer.h" #include "../common/evdp.h" #include "../common/netbuffer.h" #include "../common/network.h" #define ENABLE_IPV6 #define HAVE_ACCEPT4 #define EVENTS_PER_CYCLE 10 #define PARANOID_CHECKS // Local Vars (settings..) static int l_ListenBacklog = 64; // // Global Session Array (previously exported as session[] // SESSION g_Session[MAXCONN]; // static bool onSend(int32 fd); #define _network_free_netbuf_async( buf ) add_timer( 0, _network_async_free_netbuf_proc, 0, (intptr_t) buf) static int _network_async_free_netbuf_proc(int tid, unsigned int tick, int id, intptr_t data){ // netbuf is in data netbuffer_put( (netbuf)data ); return 0; }//end: _network_async_free_netbuf_proc() void network_init(){ SESSION *s; int32 i; memset(g_Session, 0x00, (sizeof(SESSION) * MAXCONN) ); for(i = 0; i < MAXCONN; i++){ s = &g_Session[i]; s->type = NST_FREE; s->disconnect_in_progress = false; } // Initialize the correspondig event dispatcher evdp_init(); // add_timer_func_list(_network_async_free_netbuf_proc, "_network_async_free_netbuf_proc"); }//end: network_init() void network_final(){ // @TODO: // .. disconnect and cleanup everything! evdp_final(); }//end: network_final() void network_do(){ struct EVDP_EVENT l_events[EVENTS_PER_CYCLE]; register struct EVDP_EVENT *ev; register int n, nfds; register SESSION *s; nfds = evdp_wait( l_events, EVENTS_PER_CYCLE, 1000); // @TODO: timer_getnext() for(n = 0; n < nfds; n++){ ev = &l_events[n]; s = &g_Session[ ev->fd ]; if(ev->events & EVDP_EVENT_HUP){ network_disconnect( ev->fd ); continue; // no further event processing. }// endif vent is HUP (disconnect) if(ev->events & EVDP_EVENT_IN){ if(s->onRecv != NULL){ if( false == s->onRecv(ev->fd) ){ network_disconnect(ev->fd); continue; // .. } }else{ ShowError("network_do: fd #%u has no onRecv proc set. - disconnecting\n", ev->fd); network_disconnect(ev->fd); continue; } }// endif event is IN (recv) if(ev->events & EVDP_EVENT_OUT){ if(s->onSend != NULL){ if( false == s->onSend(ev->fd) ){ network_disconnect(ev->fd); continue; } }else{ ShowError("network_do: fd #%u has no onSend proc set. - disconnecting\n", ev->fd); network_disconnect(ev->fd); continue; } }// endif event is OUT (send) }//endfor }//end: network_do() static bool _setnonblock(int32 fd){ int flags = fcntl(fd, F_GETFL, 0); if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) return false; return true; }//end: _setnonblock() static bool _network_accept(int32 fd){ SESSION *listener = &g_Session[fd]; SESSION *s; union{ struct sockaddr_in v4; #ifdef ENABLE_IPV6 struct sockaddr_in6 v6; #endif } _addr; int newfd; socklen_t addrlen; struct sockaddr *addr; // Accept until OS returns - nothing to accept anymore // - this is required due to our EVDP abstraction. (which handles on listening sockets similar to epoll's EPOLLET flag.) while(1){ #ifdef ENABLE_IPV6 if(listener->v6 == true){ addrlen = sizeof(_addr.v6); addr = (struct sockaddr*)&_addr.v6; }else{ #endif addrlen = sizeof(_addr.v4); addr = (struct sockaddr*)&_addr.v4; #ifdef ENABLE_IPV6 } #endif #ifdef HAVE_ACCEPT4 newfd = accept4(fd, addr, &addrlen, SOCK_NONBLOCK); #else newfd = accept(fd, addr, &addrlen); #endif if(newfd == -1){ if(errno == EAGAIN || errno == EWOULDBLOCK) break; // this is fully valid & whished., se explaination on top of while(1) // Otherwis .. we have serious problems :( seems tahat our listner has gone away.. // @TODO handle this .. ShowError("_network_accept: accept() returned error. closing listener. (errno: %u / %s)\n", errno, strerror(errno)); return false; // will call disconnect after return. //break; } #ifndef HAVE_ACCEPT4 // no accept4 means, we have to set nonblock by ourself. .. if(_setnonblock(newfd) == false){ ShowError("_network_accept: failed to set newly accepted connection nonblocking (errno: %u / %s). - disconnecting.\n", errno, strerror(errno)); close(newfd); continue; } #endif // Check connection limits. if(newfd >= MAXCONN){ ShowError("_network_accept: failed to accept connection - MAXCONN (%u) exceeded.\n", MAXCONN); close(newfd); continue; // we have to loop over the events (and disconnect them too ..) but otherwise we would leak event notifications. } // Create new Session. s = &g_Session[newfd]; s->type = NST_CLIENT; // The new connection inherits listenr's handlers. s->onDisconnect = listener->onDisconnect; s->onConnect = listener->onConnect; // maybe useless but .. fear the future .. :~ // Register the new connection @ EVDP if( evdp_addclient(newfd, &s->evdp_data) == false){ ShowError("_network_accept: failed to accept connection - event subsystem returned an error.\n"); close(newfd); s->type = NST_FREE; } // Call the onConnect handler on the listener. if( listener->onConnect(newfd) == false ){ // Resfused by onConnect handler.. evdp_remove(newfd, &s->evdp_data); close(newfd); s->type = NST_FREE; s->data = NULL; // be on the safe side ~ ! continue; } } return true; }//end: _network_accept() void network_disconnect(int32 fd){ SESSION *s = &g_Session[fd]; netbuf b, bn; // Prevent recursive calls // by wrong implemented on disconnect handlers.. and such.. if(s->disconnect_in_progress == true) return; s->disconnect_in_progress = true; // Disconnect Todo: // - Call onDisconnect Handler // - Release all Assigned buffers. // - remove from event system (notifications) // - cleanup session structure // - close connection. // if(s->onDisconnect != NULL && s->type != NST_LISTENER){ s->onDisconnect( fd ); } // Read Buffer if(s->read.buf != NULL){ netbuffer_put(s->read.buf); s->read.buf = NULL; } // Write Buffer(s) b = s->write.buf; while(1){ if(b == NULL) break; bn = b->next; netbuffer_put(b); b = bn; } s->write.buf = NULL; s->write.buf_last = NULL; s->write.n_outstanding = 0; s->write.max_outstanding = 0; // Remove from event system. evdp_remove(fd, &s->evdp_data); // Cleanup Session Structure. s->type = NST_FREE; s->data = NULL; // no application level data assigned s->disconnect_in_progress = false; // Close connection close(fd); }//end: network_disconnect() int32 network_addlistener(bool v6, const char *addr, uint16 port){ SESSION *s; int optval, fd; #if !defined(ENABLE_IPV6) if(v6 == true){ ShowError("network_addlistener(%c, '%s', %u): this release has no IPV6 support.\n", (v6==true?'t':'f'), addr, port); return -1; } #endif #ifdef ENABLE_IPV6 if(v6 == true) fd = socket(AF_INET6, SOCK_STREAM, 0); else #endif fd = socket(AF_INET, SOCK_STREAM, 0); // Error? if(fd == -1){ ShowError("network_addlistener(%c, '%s', %u): socket() failed (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); return -1; } // Too many connections? if(fd >= MAXCONN){ ShowError("network_addlistener(%c, '%s', %u): cannot create listener, exceeds more than supported connections (%u).\n", (v6==true?'t':'f'), addr, port, MAXCONN); close(fd); return -1; } s = &g_Session[fd]; if(s->type != NST_FREE){ // additional checks.. :) ShowError("network_addlistener(%c, '%s', %u): failed, got fd #%u which is already in use in local session table?!\n", (v6==true?'t':'f'), addr, port, fd); close(fd); return -1; } // Fill ip addr structs #ifdef ENABLE_IPV6 if(v6 == true){ memset(&s->addr.v6, 0x00, sizeof(s->addr.v6)); s->addr.v6.sin6_family = AF_INET6; s->addr.v6.sin6_port = htons(port); if(inet_pton(AF_INET6, addr, &s->addr.v6.sin6_addr) != 1){ ShowError("network_addlistener(%c, '%s', %u): failed to parse the given IPV6 address.\n", (v6==true?'t':'f'), addr, port); close(fd); return -1; } }else{ #endif memset(&s->addr.v4, 0x00, sizeof(s->addr.v4)); s->addr.v4.sin_family = AF_INET; s->addr.v4.sin_port = htons(port); s->addr.v4.sin_addr.s_addr = inet_addr(addr); #ifdef ENABLE_IPV6 } #endif // if OS has support for SO_REUSEADDR, apply the flag // so the address could be used when there're still time_wait sockets outstanding from previous application run. #ifdef SO_REUSEADDR optval=1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); #endif // Bind #ifdef ENABLE_IPV6 if(v6 == true){ if( bind(fd, (struct sockaddr*)&s->addr.v6, sizeof(s->addr.v6)) == -1) { ShowError("network_addlistener(%c, '%s', %u): bind failed (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); close(fd); return -1; } }else{ #endif if( bind(fd, (struct sockaddr*)&s->addr.v4, sizeof(s->addr.v4)) == -1) { ShowError("network_addlistener(%c, '%s', %u): bind failed (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); close(fd); return -1; } #ifdef ENABLE_IPV6 } #endif if( listen(fd, l_ListenBacklog) == -1){ ShowError("network_addlistener(%c, '%s', %u): listen failed (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); close(fd); return -1; } // Set to nonblock! if(_setnonblock(fd) == false){ ShowError("network_addlistener(%c, '%s', %u): cannot set to nonblock (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); close(fd); return -1; } // Rgister @ evdp. if( evdp_addlistener(fd, &s->evdp_data) != true){ ShowError("network_addlistener(%c, '%s', %u): eventdispatcher subsystem returned an error.\n", (v6==true?'t':'f'), addr, port); close(fd); return -1; } // Apply flags on Session array for this conneciton. if(v6 == true) s->v6 = true; else s->v6 = false; s->type = NST_LISTENER; s->onRecv = _network_accept; ShowStatus("Added Listener on '%s':%u\n", addr, port, (v6==true ? "(ipv6)":"(ipv4)") ); return fd; }//end: network_addlistener() static bool _network_connect_establishedHandler(int32 fd){ register SESSION *s = &g_Session[fd]; int val; socklen_t val_len; if(s->type == NST_FREE) return true; // due to multiple non coalesced event notifications // this can happen .. when a previous handled event has already disconnected the connection // within the same cycle.. val = -1; val_len = sizeof(val); getsockopt(fd, SOL_SOCKET, SO_ERROR, &val, &val_len); if(val != 0){ // :( .. cleanup session.. s->type = NST_FREE; s->onSend = NULL; s->onConnect = NULL; s->onDisconnect = NULL; evdp_remove(fd, &s->evdp_data); close(fd); return true; // we CANT return false, // becuase the normal disconnect procedure would execute the ondisconnect handler, which we dont want .. in this case. }else{ // ok if(s->onConnect(fd) == false) { // onConnect handler has refused the connection .. // cleanup .. and ok s->type = NST_FREE; s->onSend = NULL; s->onConnect = NULL; s->onDisconnect = NULL; evdp_remove(fd, &s->evdp_data); close(fd); return true; // we dnot want the ondisconnect handler to be executed, so its okay to handle this by ourself. } // connection established ! // if( evdp_outgoingconnection_established(fd, &s->evdp_data) == false ){ return false; // we want the normal disconnect procedure.. with call to ondisconnect handler. } s->onSend = NULL; ShowStatus("#%u connection successfull!\n", fd); } return true; }//end: _network_connect_establishedHandler() int32 network_connect(bool v6, const char *addr, uint16 port, const char *from_addr, uint16 from_port, bool (*onConnectionEstablishedHandler)(int32 fd), void (*onConnectionLooseHandler)(int32 fd) ){ register SESSION *s; int32 fd, optval, ret; struct sockaddr_in ip4; #ifdef ENABLE_IPV6 struct sockaddr_in6 ip6; #endif #ifdef ENABLE_IPV6 if(v6 == true) fd = socket(AF_INET6, SOCK_STREAM, 0); else #endif fd = socket(AF_INET, SOCK_STREAM, 0); #ifndef ENABLE_IPV6 // check.. if(v6 == true){ ShowError("network_connect(%c, '%s', %u...): tried to create an ipv6 connection, IPV6 is not supported in this release.\n", (v6==true?'t':'f'), addr, port); return -1; } #endif // check connection limits. if(fd >= MAXCONN){ ShowError("network_connect(%c, '%s', %u...): cannot create new connection, exceeeds more than supported connections (%u)\n", (v6==true?'t':'f'), addr, port ); close(fd); return -1; } // Originating IP/Port pair given ? if(from_addr != NULL && *from_addr != 0){ //.. #ifdef SO_REUSEADDR optval=1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)); #endif #ifdef ENABLE_IPV6 if(v6 == true){ memset(&ip6, 0x00, sizeof(ip6)); ip6.sin6_family = AF_INET6; ip6.sin6_port = htons(from_port); if(inet_pton(AF_INET6, from_addr, &ip6.sin6_addr) != 1){ ShowError("network_connect(%c, '%s', %u...): cannot parse originating (from) IPV6 address (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); close(fd); return -1; } ret = bind(fd, (struct sockaddr*)&ip6, sizeof(ip6)); }else{ #endif memset(&ip4, 0x00, sizeof(ip4)); ip4.sin_family = AF_INET; ip4.sin_port = htons(from_port); ip4.sin_addr.s_addr = inet_addr(from_addr); ret = bind(fd, (struct sockaddr*)&ip4, sizeof(ip4)); #ifdef ENABLE_IPV6 } #endif } // Set non block if(_setnonblock(fd) == false){ ShowError("network_connect(%c, '%s', %u...): cannot set socket to nonblocking (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); close(fd); return -1; } // Create ip addr block to connect to .. #ifdef ENABLE_IPV6 if(v6 == true){ memset(&ip6, 0x00, sizeof(ip6)); ip6.sin6_family = AF_INET6; ip6.sin6_port = htons(port); if(inet_pton(AF_INET6, addr, &ip6.sin6_addr) != 1){ ShowError("network_connect(%c, '%s', %u...): cannot parse destination IPV6 address (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); close(fd); return -1; } }else{ #endif memset(&ip4, 0x00, sizeof(ip4)); ip4.sin_family = AF_INET; ip4.sin_port = htons(port); ip4.sin_addr.s_addr = inet_addr(addr); #ifdef ENABLE_IPV6 } #endif // Assign Session.. s = &g_Session[fd]; s->type = NST_OUTGOING; s->v6 = v6; s->onConnect = onConnectionEstablishedHandler; s->onDisconnect = onConnectionLooseHandler; s->onRecv = NULL; s->onSend = _network_connect_establishedHandler; #ifdef ENABLE_IPV6 if(v6 == true) memcpy(&s->addr.v6, &ip6, sizeof(ip6)); else #endif memcpy(&s->addr.v4, &ip4, sizeof(ip4)); // Register @ EVDP. as outgoing (see doc of the function) if(evdp_addconnecting(fd, &s->evdp_data) == false){ ShowError("network_connect(%c, '%s', %u...): eventdispatcher subsystem returned an error.\n", (v6==true?'t':'f'), addr, port); // cleanup session x.x.. s->type = NST_FREE; s->onConnect = NULL; s->onDisconnect = NULL; s->onSend = NULL; // close, return error code. close(fd); return -1; } #ifdef ENABLE_IPV6 if(v6 == true) ret = connect(fd, (struct sockaddr*)&ip6, sizeof(ip6)); else #endif ret = connect(fd, (struct sockaddr*)&ip4, sizeof(ip4)); // if(ret != 0 && errno != EINPROGRESS){ ShowWarning("network_connect(%c, '%s', %u...): connection failed (errno: %u / %s)\n", (v6==true?'t':'f'), addr, port, errno, strerror(errno)); // Cleanup session .. s->type = NST_FREE; s->onConnect = NULL; s->onDisconnect = NULL; s->onSend = NULL; // .. remove from evdp and close fd. evdp_remove(fd, &s->evdp_data); close(fd); return -1; } // ! The Info Message :~D ShowStatus("network_connect fd#%u (%s:%u) in progress.. \n", fd, addr, port); return fd; }//end: network_connect() static bool _onSend(int32 fd){ register SESSION *s = &g_Session[fd]; register netbuf buf, buf_next; register uint32 szNeeded; register int wLen; if(s->type == NST_FREE) return true; // Possible due to multipl non coalsced event notifications // so onSend gets called after disconnect caused by an previous vent. // we can ignore the call to onSend, then. buf = s->write.buf; while(1){ if(buf == NULL) break; buf_next = buf->next; szNeeded = (buf->dataLen - s->write.dataPos); // using th session-local .dataPos member, due to shared write buffer support. // try to write. wLen = write(fd, &buf->buf[s->write.dataPos], szNeeded); if(wLen == 0){ return false; // eof. }else if(wLen == -1){ if(errno == EAGAIN || errno == EWOULDBLOCK) return true; // dont disconnect / try again later. // all other errors. . return false; } // Wrote data.. => szNeeded -= wLen; if(szNeeded > 0){ // still data left .. // s->write.dataPos += wLen; // fix offset. return true; }else{ // this buffer has been written successfully // could be returned to pool. netbuffer_put(buf); s->write.n_outstanding--; // When threadsafe -> Interlocked here. s->write.dataPos = 0; } buf = buf_next; } // okay, // reaching this part means: // while interrupted by break - // which means all buffers are written, nothing left // s->write.buf_last = NULL; s->write.buf = NULL; s->write.n_outstanding = 0; s->write.dataPos = 0; // Remove from event dispatcher (write notification) // evdp_writable_remove(fd, &s->evdp_data); return true; }//end: _onSend() static bool _onRORecv(int32 fd){ register SESSION *s = &g_Session[fd]; register uint32 szNeeded; register char *p; register int rLen; if(s->type == NST_FREE) return true; // Possible due to multiple non coalesced events by evdp. // simply ignore this call returning positive result. // Initialize p and szNeeded depending on change // switch(s->read.state){ case NRS_WAITOP: szNeeded = s->read.head_left; p = ((char*)&s->read.head[0]) + (2-szNeeded); break; case NRS_WAITLEN: szNeeded = s->read.head_left; p = ((char*)&s->read.head[1]) + (2-szNeeded); break; case NRS_WAITDATA:{ register netbuf buf = s->read.buf; szNeeded = (buf->dataLen - buf->dataPos); p = (char*)&buf->buf[ buf->dataPos ]; } break; default: // .. the impossible gets possible .. ShowError("_onRORecv: fd #%u has unknown read.state (%d) - disconnecting\n", fd, s->read.state); return false; break; } // rLen = read(fd, p, szNeeded); if(rLen == 0){ // eof.. return false; }else if(rLen == -1){ if(errno == EAGAIN || errno == EWOULDBLOCK){ // try again later .. (this case shouldnt happen, because we're event trigered.. but .. sometimes it happens :) return true; } // an additional interesting case would be // EINTR, this 'could' be handled .. but: // posix says that its possible that data gets currupted during irq // or data gor read and not reported.., so we'd have a data loss.. // (which shouldnt happen with stream based protocols such as tcp) // its better to disonnect the client in that case. return false; } // // Got Data: // next action also depends on current state .. // szNeeded -= rLen; switch(s->read.state){ case NRS_WAITOP: if(szNeeded > 0){ // still data missing .. s->read.head_left = szNeeded; return true; // wait for completion. }else{ // complete .. // next state depends on packet type. s->read.head[1] = ((uint16*)s->netparser_data)[ s->read.head[0] ]; // store lenght of packet by opcode head[0] to head[1] if(s->read.head[1] == ROPACKET_UNKNOWN){ // unknown packet - disconnect ShowWarning("_onRORecv: fd #%u got unlnown packet 0x%04x - disconnecting.\n", fd, s->read.head[0]); return false; } else if(s->read.head[1] == ROPACKET_DYNLEN){ // dynamic length // next state: requrie len. s->read.state = NRS_WAITLEN; s->read.head_left = 2; return true; // } else if(s->read.head[1] == 2){ // packet has no data (only opcode) register netbuf buf = netbuffer_get(2); // :D whoohoo its giant! NBUFW(buf, 0) = s->read.head[0]; // store opcode @ packet begin. buf->dataPos = 2; buf->dataLen = 2; buf->next = NULL; // Back to initial state -> Need opcode. s->read.state = NRS_WAITOP; s->read.head_left = 2; s->read.buf = NULL; // Call completion routine here. s->onPacketComplete(fd, s->read.head[0], 2, buf); return true; // done :) } else{ // paket needs .. data .. register netbuf buf = netbuffer_get( s->read.head[1] ); NBUFW(buf, 0) = s->read.head[0]; // store opcode @ packet begin. buf->dataPos = 2; buf->dataLen = s->read.head[1]; buf->next = NULL; // attach buffer. s->read.buf = buf; // set state: s->read.state = NRS_WAITDATA; return true; } }//endif: szNeeded > 0 (opcode read completed?) break; case NRS_WAITLEN: if(szNeeded > 0){ // incomplete .. s->read.head_left = szNeeded; return true; }else{ if(s->read.head[1] == 4){ // packet has no data (only opcode + length) register netbuf buf = netbuffer_get( 4 ); NBUFL(buf, 0) = *((uint32*)&s->read.head[0]); // copy Opcode + length to netbuffer using MOVL buf->dataPos = 4; buf->dataLen = 4; buf->next = NULL; // set initial state (need opcode) s->read.state = NRS_WAITOP; s->read.head_left = 2; s->read.buf = NULL; // call completion routine. s->onPacketComplete(fd, s->read.head[0], 4, buf); return true; } else if(s->read.head[1] < 4){ // invalid header. ShowWarning("_onRORecv: fd #%u invalid header - got packet 0x%04x, reported length < 4 - INVALID - disconnecting\n", fd, s->read.head[0]); return false; } else{ // Data needed // next state -> waitdata! register netbuf buf = netbuffer_get( s->read.head[1] ); NBUFL(buf, 0) = *((uint32*)&s->read.head[0]); // copy Opcode + length to netbuffer using MOVL buf->dataPos = 4; buf->dataLen = s->read.head[1]; buf->next = NULL; // attach to session: s->read.buf = buf; s->read.state = NRS_WAITDATA; return true; } }//endif: szNeeded > 0 (length read complete?) break; case NRS_WAITDATA: if(szNeeded == 0){ // Packet finished! // compltion. register netbuf buf = s->read.buf; // set initial state. s->read.state = NRS_WAITOP; s->read.head_left = 2; s->read.buf = NULL; // Call completion routine. s->onPacketComplete(fd, NBUFW(buf, 0), buf->dataLen, buf); return true; }else{ // still data needed s->read.buf->dataPos += rLen; return true; } break; // default: ShowError("_onRORecv: fd #%u has unknown read.state (%d) [2] - disconnecting\n", fd, s->read.state); return false; break; } return false; }//end: _onRORecv() void network_send(int32 fd, netbuf buf){ register SESSION *s = &g_Session[fd]; #ifdef PARANOID_CHECKS if(fd >= MAXCONN){ ShowError("network_send: tried to attach buffer to connection idientifer #%u which is out of bounds.\n", fd); _network_free_netbuf_async(buf); return; } #endif if(s->type == NST_FREE) return; // Check Max Outstanding buffers limit. if( (s->write.max_outstanding > 0) && (s->write.n_outstanding >= s->write.max_outstanding) ){ ShowWarning("network_send: fd #%u max Outstanding buffers exceeded. - disconnecting.\n", fd); network_disconnect(fd); // _network_free_netbuf_async(buf); return; } // Attach to the end: buf->next = NULL; if(s->write.buf_last != NULL){ s->write.buf_last->next = buf; s->write.buf_last = buf; }else{ // currently no buffer attached. s->write.buf = s->write.buf_last = buf; // register @ evdp for writable notification. evdp_writable_add(fd, &s->evdp_data); // } // s->write.n_outstanding++; }//end: network_send() void network_parser_set_ro(int32 fd, int16 *packetlentable, void (*onPacketCompleteProc)(int32 fd, uint16 op, uint16 len, netbuf buf) ){ register SESSION *s = &g_Session[fd]; register netbuf b, nb; // used for potential free attached buffers. if(s->type == NST_FREE) return; s->onPacketComplete = onPacketCompleteProc; s->onRecv = _onRORecv; // .. s->onSend = _onSend; // Using the normal generic netbuf based send function. s->netparser_data = packetlentable; // Initial State -> Need Packet OPCode. s->read.state = NRS_WAITOP; s->read.head_left = 2; // Detach (if..) all buffers. if(s->read.buf != NULL){ _network_free_netbuf_async(s->read.buf); // s->read.buf = NULL; } if(s->write.buf != NULL){ b = s->write.buf; while(1){ nb = b->next; _network_free_netbuf_async(b); b = nb; } s->write.buf = NULL; s->write.buf_last = NULL; s->write.n_outstanding = 0; } // not changing any limits on outstanding .. // }//end: network_parser_set_ro()