summaryrefslogblamecommitdiff
path: root/src/common/network.c
blob: a40cbd60296a85165cbe93c67ca57ff265f56a3d (plain) (tree)
1
  

















































                                                                           
                                                                                                             

                                                                                              
 
                 


                                         


















                                                                                                

                      
                     
 


                                                


                       















































                                                                                                                  

                    


                                                       
 
                    

                      



                                           
                  
                                       
      






                                                                                                                                
                  


                                                           
      
                                                                        
                  
                 

                   
                                                                   
     
                                                   
      





                                                                                                                                            
 

                                                                           
                                                                                



                                                                                                                                                                       
      





































                                                                                                                                                  

                         




























































                                                                   

                            

                                                                    
                         


                                                                                                                                        


                  

                                                      
      























                                                                                                                                                                                   
                  









                                                                                                                                                    
      


                                                              
                  
         
      
                 
 
                                                                                                                        
                   
                                                                          
      
               
                  





                                                                                                                                                                 
      



                                                                                                                                                     
                  
         
      































                                                                                                                                                                    

                             




















































                                                                                                                                                               


                                             








                                                                         
                  
                                

                  

                                                      
      
                                                     
                   



                                                                                                                                                                             
      



















































                                                                                                                                                                                                        
                  










                                                                                                                                                                                          
      



                                                
                  
         

      






                                                        
                  

                                                       
      














                                                                                                                                               

                  

                                                                       
      






















                                                                                                                                                               

                         


































































                                                                                                                                              

                 




























































































































































































































                                                                                                                                                                                  

                   

                                             
                      



                                                                                                                             

      































                                                                                                             


                                    











































                                                                                                                                     
                               
//
// Network Subsystem (previously known as socket system)
//
// Author: Florian Wilkemeyer <fw@f-ws.de>
//
// Copyright (c) rAthena Project (www.rathena.org) - Licensed under GNU GPL
// For more information, see LICENCE in the main folder
//
//
//#ifdef HAVE_ACCETP4
#define _GNU_SOURCE
//#endif

#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>

#include <sys/types.h>
#include <sys/fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>


#include "../common/cbasetypes.h"
#include "../common/showmsg.h"
#include "../common/timer.h"
#include "../common/evdp.h"
#include "../common/netbuffer.h"

#include "../common/network.h"

#define ENABLE_IPV6
#define HAVE_ACCEPT4
#define EVENTS_PER_CYCLE 10
#define PARANOID_CHECKS

// Local Vars (settings..)
static int l_ListenBacklog = 64;

//
// Global Session Array (previously exported as session[]
//
SESSION g_Session[MAXCONN];


//
static bool onSend(int32 fd);


#define _network_free_netbuf_async( buf ) add_timer( 0, _network_async_free_netbuf_proc, 0,  (intptr_t)(buf))
static int _network_async_free_netbuf_proc(int tid, unsigned int tick, int id, intptr_t data){
	// netbuf is in data
	netbuffer_put( (netbuf)data );

	return 0;
}//end: _network_async_free_netbuf_proc()



void network_init(){
	SESSION *s;
	int32 i;
	
	memset(g_Session, 0x00, (sizeof(SESSION) * MAXCONN) );
	
	for(i = 0; i < MAXCONN; i++){
		s = &g_Session[i];
		
		s->type = NST_FREE;
		s->disconnect_in_progress = false;
				
	}
	
	// Initialize the correspondig event dispatcher
	evdp_init();

	//
	add_timer_func_list(_network_async_free_netbuf_proc, "_network_async_free_netbuf_proc");
		
}//end: network_init()


void network_final(){

	// @TODO:
	// .. disconnect and cleanup everything!
	
	evdp_final();

}//end: network_final()


void network_do(){
	struct EVDP_EVENT l_events[EVENTS_PER_CYCLE];
	register struct EVDP_EVENT *ev;
	register int n, nfds;
	register SESSION *s;
	
	nfds = evdp_wait( l_events,	EVENTS_PER_CYCLE, 1000); // @TODO: timer_getnext()
	
	for(n = 0; n < nfds; n++){
		ev = &l_events[n];
		s = &g_Session[ ev->fd ];
		
		if(ev->events & EVDP_EVENT_HUP){
			network_disconnect( ev->fd );	
			continue; // no further event processing.
		}// endif vent is HUP (disconnect)
		
		
		if(ev->events & EVDP_EVENT_IN){
			
			if(s->onRecv != NULL){
				if( false == s->onRecv(ev->fd) ){
					network_disconnect(ev->fd);
					continue; // ..
				}
			}else{
				ShowError("network_do: fd #%u has no onRecv proc set. - disconnecting\n", ev->fd);
				network_disconnect(ev->fd);
				continue;
			}	
						
		}// endif event is IN (recv)
		
		
		if(ev->events & EVDP_EVENT_OUT){
			if(s->onSend != NULL){
				if( false == s->onSend(ev->fd) ){
					network_disconnect(ev->fd);
					continue;
				}
			}else{
				ShowError("network_do: fd #%u has no onSend proc set. - disconnecting\n", ev->fd);
				network_disconnect(ev->fd);
				continue;
			}
		}// endif event is OUT (send)
		
	}//endfor
			
}//end: network_do()


static bool _setnonblock(int32 fd){
	int flags = fcntl(fd, F_GETFL, 0);
	if(fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0)
		return false;

	return true;
}//end: _setnonblock()


static bool _network_accept(int32 fd){
	SESSION *listener = &g_Session[fd];
	SESSION *s; 
	union{
		struct sockaddr_in v4;
#ifdef ENABLE_IPV6
		struct sockaddr_in6 v6;
#endif
	} _addr;
	int newfd;
	socklen_t addrlen;
	struct sockaddr *addr;

	// Accept until OS returns - nothing to accept anymore
	// - this is required due to our EVDP abstraction. (which handles on listening sockets similar to epoll's EPOLLET flag.)
	while(1){
#ifdef ENABLE_IPV6
		if(listener->v6 == true){
			addrlen = sizeof(_addr.v6);
			addr = (struct sockaddr*)&_addr.v6;
		}else{
#endif
			addrlen = sizeof(_addr.v4);
			addr = (struct sockaddr*)&_addr.v4;		
#ifdef ENABLE_IPV6
		}
#endif

#ifdef HAVE_ACCEPT4
		newfd = accept4(fd, addr, &addrlen, SOCK_NONBLOCK);
#else
		newfd = accept(fd, addr, &addrlen);
#endif

		if(newfd == -1){
			if(errno == EAGAIN || errno == EWOULDBLOCK)
				break; // this is fully valid & whished., se explaination on top of while(1)
			
			// Otherwis .. we have serious problems :( seems tahat our listner has gone away..
			// @TODO handle this .. 
			ShowError("_network_accept: accept() returned error. closing listener. (errno: %u / %s)\n", errno, strerror(errno));

			return false; // will call disconnect after return.
			//break;
		}

#ifndef HAVE_ACCEPT4 // no accept4 means, we have to set nonblock by ourself. ..
		if(_setnonblock(newfd) == false){
			ShowError("_network_accept: failed to set newly accepted connection nonblocking (errno: %u / %s). - disconnecting.\n", errno, strerror(errno));
			close(newfd);
			continue; 
		}
#endif

		// Check connection limits.
		if(newfd >= MAXCONN){
			ShowError("_network_accept: failed to accept connection - MAXCONN (%u) exceeded.\n", MAXCONN);
			close(newfd);
			continue; // we have to loop over the events (and disconnect them too ..) but otherwise we would leak event notifications.
		}


		// Create new Session.
		s = &g_Session[newfd];
		s->type = NST_CLIENT;
		
		// The new connection inherits listenr's handlers.
		s->onDisconnect = listener->onDisconnect;
		s->onConnect = listener->onConnect; // maybe useless but .. fear the future .. :~ 
	
		// Register the new connection @ EVDP
		if( evdp_addclient(newfd, &s->evdp_data) == false){
			ShowError("_network_accept: failed to accept connection - event subsystem returned an error.\n");
			close(newfd);
			s->type = NST_FREE;
		}
		
		// Call the onConnect handler on the listener.
		if( listener->onConnect(newfd) == false ){
			// Resfused by onConnect handler..
			evdp_remove(newfd, &s->evdp_data);
			
			close(newfd);
			s->type = NST_FREE;
			
			s->data = NULL; // be on the safe side ~ !
			continue;
		}
		
		
	}

	return true;
}//end: _network_accept()


void network_disconnect(int32 fd){
	SESSION *s = &g_Session[fd];
	netbuf b, bn;
	
	// Prevent recursive calls 
	// by wrong implemented on disconnect handlers.. and such..
	if(s->disconnect_in_progress == true)
		return; 	
		
	s->disconnect_in_progress = true;
	
	
	// Disconnect Todo:
	//	- Call onDisconnect Handler
	//	- Release all Assigned buffers.
	//	- remove from event system (notifications)
	//	- cleanup session structure
	//	- close connection. 
	//
	
	if(s->onDisconnect != NULL && 
		s->type != NST_LISTENER){
		
		s->onDisconnect( fd );
	}

	// Read Buffer 
	if(s->read.buf != NULL){
		netbuffer_put(s->read.buf);
		s->read.buf = NULL;
	}

	// Write Buffer(s)
	b = s->write.buf;
	while(1){
		if(b == NULL) break;

		bn = b->next;
		
		netbuffer_put(b);
		
		b = bn;
	}
	s->write.buf = NULL;
	s->write.buf_last = NULL;
	
	s->write.n_outstanding = 0;
	s->write.max_outstanding = 0;
	
		
	// Remove from event system.
	evdp_remove(fd, &s->evdp_data);
	
	// Cleanup Session Structure.
	s->type = NST_FREE;
	s->data = NULL; // no application level data assigned
	s->disconnect_in_progress = false;


	// Close connection	
	close(fd);	
	
}//end: network_disconnect()


int32 network_addlistener(bool v6,  const char *addr,  uint16 port){
	SESSION *s;
	int optval, fd;

#if !defined(ENABLE_IPV6)
	if(v6 == true){
		 ShowError("network_addlistener(%c, '%s', %u):  this release has no IPV6 support.\n",  (v6==true?'t':'f'),  addr, port);
		 return -1;
	}
#endif


#ifdef ENABLE_IPV6
	if(v6 == true)
		fd = socket(AF_INET6, SOCK_STREAM, 0);
	else
#endif
		fd = socket(AF_INET, SOCK_STREAM, 0);

	// Error?
	if(fd == -1){
		ShowError("network_addlistener(%c, '%s', %u):  socket() failed (errno: %u / %s)\n",  (v6==true?'t':'f'),  addr, port,    errno, strerror(errno));
		return -1;
	}
	
	// Too many connections?
	if(fd >= MAXCONN){
		ShowError("network_addlistener(%c, '%s', %u):  cannot create listener, exceeds more than supported connections (%u).\n", (v6==true?'t':'f'),  addr, port, MAXCONN);
		close(fd);
		return -1;
	}
	
	
	s = &g_Session[fd];
	if(s->type != NST_FREE){ // additional checks.. :)
			ShowError("network_addlistener(%c, '%s', %u): failed, got fd #%u which is already in use in local session table?!\n", (v6==true?'t':'f'),  addr, port, fd);
			close(fd);
			return -1;
	}
	
	
	// Fill ip addr structs
#ifdef ENABLE_IPV6
	if(v6 == true){
		memset(&s->addr.v6, 0x00, sizeof(s->addr.v6));
		s->addr.v6.sin6_family = AF_INET6;
		s->addr.v6.sin6_port = htons(port);
		if(inet_pton(AF_INET6, addr, &s->addr.v6.sin6_addr) != 1){
			ShowError("network_addlistener(%c, '%s', %u): failed to parse the given IPV6 address.\n",  (v6==true?'t':'f'),  addr, port);
			close(fd);
			return -1;
		}
		
	}else{
#endif
		memset(&s->addr.v4, 0x00, sizeof(s->addr.v4));
		s->addr.v4.sin_family = AF_INET;
		s->addr.v4.sin_port = htons(port);
		s->addr.v4.sin_addr.s_addr = inet_addr(addr);
#ifdef ENABLE_IPV6
	}
#endif
		 

	// if OS has support for SO_REUSEADDR, apply the flag
	// so the address could be used when there're still time_wait sockets outstanding from previous application run.
#ifdef SO_REUSEADDR
	optval=1;
	setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
#endif

	// Bind
#ifdef ENABLE_IPV6
	if(v6 == true){
		if( bind(fd, (struct sockaddr*)&s->addr.v6,  sizeof(s->addr.v6)) == -1) {
			ShowError("network_addlistener(%c, '%s', %u): bind failed (errno: %u / %s)\n",  (v6==true?'t':'f'),  addr, port, errno, strerror(errno));
			close(fd);
			return -1;
		}
	}else{
#endif
		if( bind(fd, (struct sockaddr*)&s->addr.v4,  sizeof(s->addr.v4)) == -1) {
            ShowError("network_addlistener(%c, '%s', %u): bind failed (errno: %u / %s)\n",  (v6==true?'t':'f'),  addr, port, errno, strerror(errno));
			close(fd);
			return -1;
		}		
#ifdef ENABLE_IPV6
	}
#endif

	if( listen(fd, l_ListenBacklog) == -1){
		ShowError("network_addlistener(%c, '%s', %u): listen failed (errno: %u / %s)\n",  (v6==true?'t':'f'),  addr, port, errno, strerror(errno));
		close(fd);
		return -1;
	}
		

	// Set to nonblock!
	if(_setnonblock(fd) == false){
		ShowError("network_addlistener(%c, '%s', %u): cannot set to nonblock (errno: %u / %s)\n",  (v6==true?'t':'f'),  addr, port, errno, strerror(errno));
		close(fd);
		return -1;
	}	


	// Rgister @ evdp.
	if( evdp_addlistener(fd, &s->evdp_data) != true){
		ShowError("network_addlistener(%c, '%s', %u): eventdispatcher subsystem returned an error.\n",  (v6==true?'t':'f'),  addr, port);
		close(fd);
		return -1;
	}
	
	
	// Apply flags on Session array for this conneciton.
	if(v6 == true)	s->v6 = true;
	else			s->v6 = false;
	
	s->type = NST_LISTENER;
	s->onRecv = _network_accept;

	ShowStatus("Added Listener on '%s':%u\n", addr, port, (v6==true ? "(ipv6)":"(ipv4)") );

	return fd;
}//end: network_addlistener()


static bool _network_connect_establishedHandler(int32 fd){
	register SESSION *s = &g_Session[fd];
	int val;
	socklen_t val_len;
	
	if(s->type == NST_FREE)
		return true;	// due to multiple non coalesced event notifications
						// this can happen .. when a previous handled event has already disconnected the connection
						// within the same cycle..
	
	val = -1;
	val_len = sizeof(val);
	getsockopt(fd, SOL_SOCKET, SO_ERROR, &val, &val_len);
	
	if(val != 0){
		// :( .. cleanup session..
		s->type = NST_FREE;
		s->onSend = NULL;
		s->onConnect = NULL;
		s->onDisconnect = NULL;

		evdp_remove(fd, &s->evdp_data);
		close(fd);
		
		return true; // we CANT return false,
					 // becuase the normal disconnect procedure would execute the ondisconnect handler, which we dont want .. in this case.
	}else{
		// ok 
		if(s->onConnect(fd) == false) {
			// onConnect handler has refused the connection .. 
			// cleanup .. and ok
			s->type = NST_FREE;
			s->onSend = NULL;
			s->onConnect = NULL;
			s->onDisconnect = NULL;
			
			evdp_remove(fd, &s->evdp_data);
			close(fd);
			
			return true; // we dnot want the ondisconnect handler to be executed, so its okay to handle this by ourself.
		}
		
		// connection established ! 
		// 
		if( evdp_outgoingconnection_established(fd, &s->evdp_data) == false ){
			return false; // we want the normal disconnect procedure.. with call to ondisconnect handler.
		}
		
		s->onSend = NULL;
		
		ShowStatus("#%u connection successfull!\n", fd);	
	}

	return true;	
}//end: _network_connect_establishedHandler()


int32 network_connect(bool v6,
                        const char *addr,
                        uint16 port,
                        const char *from_addr,
                        uint16 from_port,
                        bool (*onConnectionEstablishedHandler)(int32 fd),
                        void (*onConnectionLooseHandler)(int32 fd)
){
	register SESSION *s;
	int32 fd, optval, ret;
	struct sockaddr_in ip4;
#ifdef ENABLE_IPV6
	struct sockaddr_in6 ip6;
#endif

#ifdef ENABLE_IPV6
	if(v6 == true)
		fd = socket(AF_INET6, SOCK_STREAM, 0);
	else
#endif
		fd = socket(AF_INET, SOCK_STREAM, 0);

#ifndef ENABLE_IPV6
	// check..
	if(v6 == true){
		ShowError("network_connect(%c, '%s', %u...): tried to create an ipv6 connection, IPV6 is not supported in this release.\n", (v6==true?'t':'f'),  addr, port);
		return -1;
	}
#endif

	// check connection limits.
	if(fd >= MAXCONN){
		ShowError("network_connect(%c, '%s', %u...): cannot create new connection, exceeeds more than supported connections (%u)\n", (v6==true?'t':'f'),  addr, port );
		close(fd);
		return -1;
	}

	
	// Originating IP/Port pair given ?
	if(from_addr != NULL && *from_addr != 0){
		//.. 
		#ifdef SO_REUSEADDR
	    optval=1;
	    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
		#endif
		
		#ifdef ENABLE_IPV6
		if(v6 == true){
			memset(&ip6, 0x00, sizeof(ip6));
			ip6.sin6_family = AF_INET6;
			ip6.sin6_port = htons(from_port);
			
			if(inet_pton(AF_INET6, from_addr, &ip6.sin6_addr) != 1){
				ShowError("network_connect(%c, '%s', %u...): cannot parse originating (from) IPV6 address (errno: %u / %s)\n", (v6==true?'t':'f'),  addr, port, errno, strerror(errno));
				close(fd);
				return -1;
 			}
		
 			ret = bind(fd, (struct sockaddr*)&ip6, sizeof(ip6));
		}else{
		#endif
			memset(&ip4, 0x00, sizeof(ip4));
			
			ip4.sin_family = AF_INET;
			ip4.sin_port = htons(from_port);
			ip4.sin_addr.s_addr = inet_addr(from_addr);
			ret = bind(fd, (struct sockaddr*)&ip4, sizeof(ip4));
		#ifdef ENABLE_IPV6
		}
		#endif
		
	}
	

	// Set non block
	if(_setnonblock(fd) == false){
		ShowError("network_connect(%c, '%s', %u...): cannot set socket to nonblocking (errno: %u / %s)\n", (v6==true?'t':'f'),  addr, port, errno, strerror(errno));
		close(fd);
		return -1;
	}


	// Create ip addr block to connect to .. 
#ifdef ENABLE_IPV6
	if(v6 == true){
		memset(&ip6, 0x00, sizeof(ip6));
		ip6.sin6_family = AF_INET6;
		ip6.sin6_port = htons(port);
		
		if(inet_pton(AF_INET6, addr, &ip6.sin6_addr) != 1){
			 ShowError("network_connect(%c, '%s', %u...): cannot parse destination IPV6 address (errno: %u / %s)\n", (v6==true?'t':'f'),  addr, port, errno, strerror(errno));
			 close(fd);
			 return -1;
		}
	
	}else{
#endif
	memset(&ip4, 0x00, sizeof(ip4));
	
	ip4.sin_family = AF_INET;
	ip4.sin_port = htons(port);
	ip4.sin_addr.s_addr = inet_addr(addr);	
#ifdef ENABLE_IPV6
	}
#endif


	// Assign Session..
	s = &g_Session[fd];
	s->type = NST_OUTGOING;
	s->v6 = v6;
	s->onConnect = onConnectionEstablishedHandler;
	s->onDisconnect = onConnectionLooseHandler;
	s->onRecv = NULL;
	s->onSend = _network_connect_establishedHandler;
#ifdef ENABLE_IPV6
	if(v6 == true)
		memcpy(&s->addr.v6, &ip6, sizeof(ip6));
	else
#endif
		memcpy(&s->addr.v4, &ip4, sizeof(ip4));
	
	// Register @ EVDP. as outgoing (see doc of the function)
	if(evdp_addconnecting(fd, &s->evdp_data) == false){
		ShowError("network_connect(%c, '%s', %u...): eventdispatcher subsystem returned an error.\n", (v6==true?'t':'f'),  addr, port);
		
		// cleanup session x.x.. 
		s->type = NST_FREE;
		s->onConnect = NULL;
		s->onDisconnect = NULL;
		s->onSend = NULL;
		
		// close, return error code.
		close(fd);
		return -1;
	}


#ifdef ENABLE_IPV6
	if(v6 == true)
		ret = connect(fd, (struct sockaddr*)&ip6, sizeof(ip6));
	else
#endif
		ret = connect(fd, (struct sockaddr*)&ip4, sizeof(ip4));


	// 
	if(ret != 0 && errno != EINPROGRESS){
		ShowWarning("network_connect(%c, '%s', %u...): connection failed (errno: %u / %s)\n", (v6==true?'t':'f'),  addr, port, errno, strerror(errno));
		
		// Cleanup session ..
		s->type = NST_FREE;
		s->onConnect = NULL;
		s->onDisconnect = NULL;
		s->onSend = NULL;
		
		// .. remove from evdp and close fd.	
		evdp_remove(fd, &s->evdp_data);
		close(fd);
		return -1;
	}


	// ! The Info Message :~D
	ShowStatus("network_connect fd#%u (%s:%u) in progress.. \n", fd, addr, port);

return fd;	
}//end: network_connect()


static bool _onSend(int32 fd){
	register SESSION *s = &g_Session[fd];
	register netbuf buf, buf_next;
	register uint32 szNeeded;
	register int wLen;

	if(s->type == NST_FREE)
		return true; 	// Possible due to multipl non coalsced event notifications 
						// so onSend gets called after disconnect caused by an previous vent. 
						// we can ignore the call to onSend, then. 
	
	buf = s->write.buf;
	while(1){
		if(buf == NULL)
			break;
		
		buf_next = buf->next;
		
		
		szNeeded = (buf->dataLen - s->write.dataPos);	// using th session-local .dataPos member, due to shared write buffer support.
		
		// try to write.
		wLen = write(fd, &buf->buf[s->write.dataPos],  szNeeded);
		if(wLen == 0){
			return false; // eof.
		}else if(wLen == -1){
			if(errno == EAGAIN || errno == EWOULDBLOCK)
				return true; // dont disconnect / try again later.
			
			// all other errors. . 
			return false;
		}
		
		// Wrote data.. =>
		szNeeded -= wLen;
		if(szNeeded > 0){
			// still data left .. 
			// 
			s->write.dataPos += wLen; // fix offset.
			return true;
		}else{
			// this buffer has been written successfully
			// could be returned to pool.
			netbuffer_put(buf);
			s->write.n_outstanding--; // When threadsafe -> Interlocked here.
			s->write.dataPos = 0; 
		}
			
			
		buf = buf_next;
	}

	// okay,
	// reaching this part means:
	// while interrupted by break - 
	// which means all buffers are written, nothing left
	//
	
	s->write.buf_last = NULL;
	s->write.buf = NULL;
	s->write.n_outstanding = 0;
	s->write.dataPos = 0;
	
	// Remove from event dispatcher (write notification)
	//
	evdp_writable_remove(fd, &s->evdp_data);
			
	return true;	
}//end: _onSend()


static bool _onRORecv(int32 fd){
	register SESSION *s = &g_Session[fd];
	register uint32	szNeeded;
	register char *p;
	register int rLen;
	
	if(s->type == NST_FREE)
		return true;	// Possible due to multiple non coalesced events by evdp.
						//  simply ignore this call returning positive result. 
	
	// Initialize p and szNeeded depending on change
	// 
	switch(s->read.state){
		case NRS_WAITOP:
			szNeeded = s->read.head_left;
			p = ((char*)&s->read.head[0]) + (2-szNeeded);
		break;

		case NRS_WAITLEN:
			szNeeded = s->read.head_left;
			p = ((char*)&s->read.head[1]) + (2-szNeeded);
		break;
		
		case NRS_WAITDATA:{
			register netbuf buf = s->read.buf;
			
			szNeeded = (buf->dataLen - buf->dataPos);
			p = (char*)&buf->buf[ buf->dataPos ];
		}
		break;	
		
		default: 
			// .. the impossible gets possible .. 
			ShowError("_onRORecv: fd #%u has unknown read.state (%d) - disconnecting\n", fd, s->read.state);
			return false;
		break;
	}
	
	
	// 
	
	rLen = read(fd, p, szNeeded);
	if(rLen == 0){
		// eof..
		return false;
	}else if(rLen == -1){
		
		if(errno == EAGAIN || errno == EWOULDBLOCK){
			// try again later .. (this case shouldnt happen, because we're event trigered.. but .. sometimes it happens :)
			return true;
		}
		
		// an additional interesting case would be 
		// EINTR, this 'could' be handled .. but:
		//	posix says that its possible that data gets currupted during irq
		//	or data gor read and not reported.., so we'd have a data loss..
		//	(which shouldnt happen with stream based protocols such as tcp)
		// its better to disonnect the client in that case.
		
		return false;
	}
	
	//
	// Got Data:
	//  next action also depends on current state .. 
	// 
	szNeeded -= rLen;
	switch(s->read.state){
		case NRS_WAITOP:

			if(szNeeded > 0){
				// still data missing .. 
				s->read.head_left = szNeeded;
				return true; // wait for completion.
			}else{
				// complete .. 
				//  next state depends on packet type.
				
				s->read.head[1] = ((uint16*)s->netparser_data)[ s->read.head[0] ];  // store lenght of packet by opcode head[0] to head[1]
				
				if(s->read.head[1] == ROPACKET_UNKNOWN){
					// unknown packet - disconnect
					ShowWarning("_onRORecv: fd #%u got unlnown packet 0x%04x - disconnecting.\n", fd, s->read.head[0]);
					return false;
				}
				else if(s->read.head[1] == ROPACKET_DYNLEN){
					// dynamic length
					// next state: requrie len.
					s->read.state = NRS_WAITLEN;
					s->read.head_left = 2;
					return true; //
				}
				else if(s->read.head[1] == 2){ 
					// packet has no data (only opcode)
					register netbuf buf = netbuffer_get(2); // :D whoohoo its giant! 
					
					NBUFW(buf, 0) = s->read.head[0]; // store opcode @ packet begin.
					buf->dataPos = 2;
					buf->dataLen = 2;
					buf->next = NULL;
					
					// Back to initial state -> Need opcode.
					s->read.state = NRS_WAITOP;
					s->read.head_left = 2;
					s->read.buf = NULL;
					
					// Call completion routine here.					
					s->onPacketComplete(fd,  s->read.head[0],  2,  buf); 
					
					return true; // done :)
				}
				else{
					// paket needs .. data ..
					register netbuf buf = netbuffer_get( s->read.head[1] );
					
					NBUFW(buf, 0) = s->read.head[0]; // store opcode @ packet begin.
					buf->dataPos = 2;
					buf->dataLen = s->read.head[1];
					buf->next = NULL;
					
					// attach buffer.
					s->read.buf = buf;
					
					// set state:
					s->read.state = NRS_WAITDATA;	
					
					return true;
				}
				
			}//endif: szNeeded > 0 (opcode read completed?)
			
		break;
		
		
		case NRS_WAITLEN:
			
			if(szNeeded > 0){
				// incomplete .. 
				s->read.head_left = szNeeded;
				return true;
			}else{
				
				if(s->read.head[1] == 4){
					// packet has no data (only opcode + length)
					register netbuf buf = netbuffer_get( 4 );
					
					NBUFL(buf, 0) = *((uint32*)&s->read.head[0]); // copy  Opcode + length to netbuffer using MOVL	
					buf->dataPos = 4;
					buf->dataLen = 4;
					buf->next = NULL;
					
					// set initial state (need opcode)
					s->read.state = NRS_WAITOP;
					s->read.head_left = 2; 
					s->read.buf = NULL;
					
					// call completion routine.
					s->onPacketComplete(fd,  s->read.head[0],  4,  buf);

					return true;					
				}
				else if(s->read.head[1] < 4){
					// invalid header.
					ShowWarning("_onRORecv: fd #%u invalid header - got packet 0x%04x, reported length < 4 - INVALID - disconnecting\n", fd, s->read.head[0]);
					return false;
				}
				else{
					// Data needed
					// next state -> waitdata!
					register netbuf buf = netbuffer_get( s->read.head[1] );
					
					NBUFL(buf, 0) = *((uint32*)&s->read.head[0]); // copy  Opcode + length to netbuffer using MOVL
					buf->dataPos = 4;
					buf->dataLen = s->read.head[1];
					buf->next = NULL;
					
					// attach to session:
					s->read.buf = buf;
					s->read.state = NRS_WAITDATA;
					
					return true;
				}
				
			}//endif: szNeeded > 0 (length read complete?)
			
		break;
		
		
		case NRS_WAITDATA:

			if(szNeeded == 0){
				// Packet finished!
				// compltion.
				register netbuf buf = s->read.buf;
				
				// set initial state.
				s->read.state = NRS_WAITOP;
				s->read.head_left = 2;
				s->read.buf = NULL;
				
				// Call completion routine.
				s->onPacketComplete(fd,  NBUFW(buf, 0),  buf->dataLen,  buf);
				
				return true;
			}else{
				// still data needed 
				s->read.buf->dataPos += rLen; 
				
				return true;				
			}
		break;
		
		
		//
		default:
			ShowError("_onRORecv: fd #%u has unknown read.state (%d) [2] - disconnecting\n", fd, s->read.state);
			return false;
		break;
	}
	
		
	return false;
}//end: _onRORecv()


void network_send(int32 fd,  netbuf buf){
	register SESSION *s = &g_Session[fd];
	
#ifdef PARANOID_CHECKS
	if(fd >= MAXCONN){
		ShowError("network_send: tried to attach buffer to connection idientifer #%u which is out of bounds.\n", fd);
		_network_free_netbuf_async(buf);
		return;
	}
#endif


	if(s->type == NST_FREE)
		return;
	
	// Check Max Outstanding buffers limit.
	if( (s->write.max_outstanding > 0)	&&
		(s->write.n_outstanding >= s->write.max_outstanding) ){
		
		ShowWarning("network_send: fd #%u max Outstanding buffers exceeded. - disconnecting.\n", fd);
		network_disconnect(fd);
		//
		_network_free_netbuf_async(buf);
		return;
	}
	

	// Attach to the end:
	buf->next = NULL;
	if(s->write.buf_last != NULL){
		s->write.buf_last->next = buf; 
		s->write.buf_last = buf;

	}else{
		// currently no buffer attached.
		s->write.buf = s->write.buf_last = buf;
		
		// register @ evdp for writable notification.
		evdp_writable_add(fd, &s->evdp_data); // 
	}
	
	
	//
	s->write.n_outstanding++;
	
}//end: network_send()


void network_parser_set_ro(int32 fd,
							int16 *packetlentable,
							void (*onPacketCompleteProc)(int32 fd,  uint16 op,  uint16 len,  netbuf buf) 
							){
	register SESSION *s = &g_Session[fd];
	register netbuf b, nb; // used for potential free attached buffers.
	
	if(s->type == NST_FREE)
		return;
	
	s->onPacketComplete = onPacketCompleteProc;
	
	s->onRecv = _onRORecv; // ..
	s->onSend = _onSend; // Using the normal generic netbuf based send function.
		
	s->netparser_data = packetlentable;
	
	// Initial State -> Need Packet OPCode.
	s->read.state = NRS_WAITOP;
	s->read.head_left = 2; 

	
	// Detach (if..) all buffers.
	if(s->read.buf != NULL){
		_network_free_netbuf_async(s->read.buf); //
		s->read.buf = NULL;	
	}
	
	if(s->write.buf != NULL){
		b = s->write.buf;
		while(1){
			nb = b->next;
			
			_network_free_netbuf_async(b);
			
			b = nb;
		}
	
		s->write.buf = NULL;
		s->write.buf_last = NULL;
		s->write.n_outstanding = 0;
	}
	
	// not changing any limits on outstanding ..
	//
	
}//end: network_parser_set_ro()