From 4feeab8c61334ec73172fa01cda951dafde2505f Mon Sep 17 00:00:00 2001 From: blacksirius Date: Sun, 3 Jun 2012 18:53:02 +0000 Subject: feature merge bs-coreoptimize->trunk: Atomic Operations, Threading, Spinlock implemnetation. [commit 1/2, windows will followup] - Added Abstractions for Atomic Operations (lock instructions.. windows guy's may now this as Interlocked* stuff ..) - Added Threading api abstraction for Pthread based OS's and Windows - Added Spinlock Implementation (uses CAS / if you need more informations - just read the source - its simple.) - Due to Interlocked(Compare)Exchange64 .. we now require at least i686 (Pentium Pro) for 32Bit Builds :) youll also may feel some performance improvements when using 32bit builsd due to "newer" minimal arch the compiler is now able to use CMOV's .... ================================================================ = Important Warning: ================================================================ Dont use threading at the moment athena is not threadsafe! you'll mess up everthing when accessing data from other threads .., no synchronization is provided. A way to process tasks asynchronously will come up after / with the new socket system. git-svn-id: https://rathena.svn.sourceforge.net/svnroot/rathena/trunk@16221 54d463be-8e91-2dee-dedb-b68131a5f0ec --- src/common/Makefile.in | 2 +- src/common/atomic.h | 157 ++++++++++++++++++++++++++++ src/common/core.c | 5 + src/common/spinlock.h | 104 +++++++++++++++++++ src/common/thread.c | 270 +++++++++++++++++++++++++++++++++++++++++++++++++ src/common/thread.h | 115 +++++++++++++++++++++ 6 files changed, 652 insertions(+), 1 deletion(-) create mode 100644 src/common/atomic.h create mode 100644 src/common/spinlock.h create mode 100644 src/common/thread.c create mode 100644 src/common/thread.h (limited to 'src/common') diff --git a/src/common/Makefile.in b/src/common/Makefile.in index c778ff17b..9b2ffdd39 100644 --- a/src/common/Makefile.in +++ b/src/common/Makefile.in @@ -3,7 +3,7 @@ COMMON_OBJ = obj_all/core.o obj_all/socket.o obj_all/timer.o obj_all/db.o obj_al obj_all/nullpo.o obj_all/malloc.o obj_all/showmsg.o obj_all/strlib.o obj_all/utils.o \ obj_all/grfio.o obj_all/mapindex.o obj_all/ers.o obj_all/md5calc.o \ obj_all/minicore.o obj_all/minisocket.o obj_all/minimalloc.o obj_all/random.o obj_all/des.o \ - obj_all/conf.o + obj_all/conf.o obj_all/thread.o COMMON_H = $(shell ls ../common/*.h) diff --git a/src/common/atomic.h b/src/common/atomic.h new file mode 100644 index 000000000..7a9e8c4cc --- /dev/null +++ b/src/common/atomic.h @@ -0,0 +1,157 @@ +#ifndef _rA_ATOMIC_H_ +#define _rA_ATOMIC_H_ + +// Atomic Operations +// (Interlocked CompareExchange, Add .. and so on ..) +// +// Implementation varies / depends on: +// - Architecture +// - Compiler +// - Operating System +// +// our Abstraction is fully API-Compatible to Microsofts implementation @ NT5.0+ +// +#include "../common/cbasetypes.h" + +#if defined(_MSC_VER) +#include "../common/winapi.h" + +#if !defined(_M_X64) +// When compiling for windows 32bit, the 8byte interlocked operations are not provided by microsoft +// (because they need at least i586 so its not generic enough.. ... ) +forceinline int64 InterlockedCompareExchange64(volatile int64 *dest, int64 exch, int64 _cmp){ + _asm{ + lea esi,_cmp; + lea edi,exch; + + mov eax,[esi]; + mov edx,4[esi]; + mov ebx,[edi]; + mov ecx,4[edi]; + mov esi,dest; + + lock CMPXCHG8B [esi]; + } +} + + +forceinline volatile int64 InterlockedIncrement64(volatile int64 *addend){ + __int64 old; + do{ + old = *addend; + }while(InterlockedCompareExchange64(addend, (old+1), old) != old); + + return (old + 1); +} + + + +forceinline volatile int64 InterlockedDecrement64(volatile int64 *addend){ + __int64 old; + + do{ + old = *addend; + }while(InterlockedCompareExchange64(addend, (old-1), old) != old); + + return (old - 1); +} + +forceinline volatile int64 InterlockedExchangeAdd64(volatile int64 *addend, int64 increment){ + __int64 old; + + do{ + old = *addend; + }while(InterlockedCompareExchange64(addend, (old + increment), old) != old); + + return old; +} + +forceinline volatile int64 InterlockedExchange64(volatile int64 *target, int64 val){ + __int64 old; + do{ + old = *target; + }while(InterlockedCompareExchange64(target, val, old) != old); + + return old; +} + +#endif //endif 32bit windows + +#elif defined(__GNUC__) + +#if !defined(__x86_64__) && !defined(__i386__) +#error Your Target Platfrom is not supported +#endif + +static forceinline volatile int64 InterlockedExchangeAdd64(volatile int64 *addend, int64 increment){ + return __sync_fetch_and_add(addend, increment); +}//end: InterlockedExchangeAdd64() + + +static forceinline volatile int32 InterlockedExchangeAdd(volatile int32 *addend, int32 increment){ + return __sync_fetch_and_add(addend, increment); +}//end: InterlockedExchangeAdd() + + +static forceinline volatile int64 InterlockedIncrement64(volatile int64 *addend){ + return __sync_add_and_fetch(addend, 1); +}//end: InterlockedIncrement64() + + +static forceinline volatile int32 InterlockedIncrement(volatile int32 *addend){ + return __sync_add_and_fetch(addend, 1); +}//end: InterlockedIncrement() + + +static forceinline volatile int64 InterlockedDecrement64(volatile int64 *addend){ + return __sync_sub_and_fetch(addend, 1); +}//end: InterlockedDecrement64() + + +static forceinline volatile int32 InterlockedDecrement(volatile int32 *addend){ + return __sync_sub_and_fetch(addend, 1); +}//end: InterlockedDecrement() + + +static forceinline volatile int64 InterlockedCompareExchange64(volatile int64 *dest, int64 exch, int64 cmp){ + return __sync_val_compare_and_swap(dest, cmp, exch); +}//end: InterlockedCompareExchange64() + + +static forceinline volatile int32 InterlockedCompareExchange(volatile int32 *dest, int32 exch, int32 cmp){ + return __sync_val_compare_and_swap(dest, cmp, exch); +}//end: InterlockedCompareExchnage() + + +static forceinline volatile int64 InterlockedExchange64(volatile int64 *target, int64 val){ + int ret; + + __asm__ __volatile__( + "lock xchg %2, (%1)" + :"=r" (ret) + :"r" (target), "0" (val) + :"memory" + ); + + return ret; +}//end: InterlockedExchange64() + + +static forceinline volatile int32 InterlockedExchange(volatile int32 *target, int32 val){ + int ret; + + __asm__ __volatile__( + "lock xchgl %2, (%1)" + :"=r" (ret) + :"r" (target), "0" (val) + :"memory" + ); + + return ret; +}//end: InterlockedExchange() + + +#endif //endif compiler decission + + +#endif diff --git a/src/common/core.c b/src/common/core.c index 22d36eaab..4e276fcdc 100644 --- a/src/common/core.c +++ b/src/common/core.c @@ -9,6 +9,7 @@ #include "../common/db.h" #include "../common/socket.h" #include "../common/timer.h" +#include "../common/thread.h" #endif #include @@ -278,6 +279,8 @@ int main (int argc, char **argv) display_title(); usercheck(); + rathread_init(); + db_init(); signals_init(); @@ -303,6 +306,8 @@ int main (int argc, char **argv) timer_final(); socket_final(); db_final(); + + rathread_final(); #endif malloc_final(); diff --git a/src/common/spinlock.h b/src/common/spinlock.h new file mode 100644 index 000000000..3419bfdd5 --- /dev/null +++ b/src/common/spinlock.h @@ -0,0 +1,104 @@ +#pragma once +#ifndef _rA_SPINLOCK_H_ +#define _rA_SPINLOCK_H_ + +// +// CAS based Spinlock Implementation +// +// CamelCase names are choosen to be consistent with microsofts winapi +// which implements CriticalSection by this naming... +// +// Author: Florian Wilkemeyer +// +// Copyright (c) rAthena Project (www.rathena.org) - Licensed under GNU GPL +// For more information, see LICENCE in the main folder +// +// + +#ifdef WIN32 +#include "../common/winapi.h" +#endif + +#include "../common/cbasetypes.h" +#include "../common/atomic.h" +#include "../common/thread.h" + +#ifdef WIN32 + +typedef struct __declspec( align(64) ) SPIN_LOCK{ + volatile LONG lock; + volatile LONG nest; + volatile LONG sync_lock; +} SPIN_LOCK, *PSPIN_LOCK; +#else +typedef struct SPIN_LOCK{ + volatile int32 lock; + volatile int32 nest; // nesting level. + + volatile int32 sync_lock; +} __attribute__((aligned(64))) SPIN_LOCK, *PSPIN_LOCK; +#endif + + + +static forceinline void InitializeSpinLock(PSPIN_LOCK lck){ + lck->lock = 0; + lck->nest = 0; + lck->sync_lock = 0; +} + +static forceinline void FinalizeSpinLock(PSPIN_LOCK lck){ + return; +} + + +#define getsynclock(l) { while(1){ if(InterlockedCompareExchange(l, 1, 0) == 0) break; rathread_yield(); } } +#define dropsynclock(l) { InterlockedExchange(l, 0); } + +static forceinline void EnterSpinLock(PSPIN_LOCK lck){ + int tid = rathread_get_tid(); + + // Get Sync Lock && Check if the requester thread already owns the lock. + // if it owns, increase nesting level + getsynclock(&lck->sync_lock); + if(InterlockedCompareExchange(&lck->lock, tid, tid) == tid){ + InterlockedIncrement(&lck->nest); + dropsynclock(&lck->sync_lock); + return; // Got Lock + } + // drop sync lock + dropsynclock(&lck->sync_lock); + + + // Spin until we've got it ! + while(1){ + + if(InterlockedCompareExchange(&lck->lock, tid, 0) == 0){ + + InterlockedIncrement(&lck->nest); + return; // Got Lock + } + + rathread_yield(); // Force ctxswitch to another thread. + } + +} + + +static forceinline void LeaveSpinLock(PSPIN_LOCK lck){ + int tid = rathread_get_tid(); + + getsynclock(&lck->sync_lock); + + if(InterlockedCompareExchange(&lck->lock, tid, tid) == tid){ // this thread owns the lock. + if(InterlockedDecrement(&lck->nest) == 0) + InterlockedExchange(&lck->lock, 0); // Unlock! + } + + dropsynclock(&lck->sync_lock); +} + + + + +#endif diff --git a/src/common/thread.c b/src/common/thread.c new file mode 100644 index 000000000..49accff4c --- /dev/null +++ b/src/common/thread.c @@ -0,0 +1,270 @@ + +#ifdef WIN32 +#include "../common/winapi.h" +#define getpagesize() 4096 // @TODO: implement this properly (GetSystemInfo .. dwPageSize..). (Atm as on all supported win platforms its 4k its static.) +#define __thread __declspec( thread ) +#else +#include +#include +#include +#include +#include +#include +#endif + +#include "cbasetypes.h" +#include "malloc.h" +#include "showmsg.h" +#include "thread.h" + + +#define RA_THREADS_MAX 64 + +struct rAthread { + unsigned int myID; + + RATHREAD_PRIO prio; + rAthreadProc proc; + void *param; + + #ifdef WIN32 + HANDLE hThread; + #else + pthread_t hThread; + #endif +}; + + +__thread int g_rathread_ID = -1; + + +/// +/// Subystem Code +/// +static struct rAthread l_threads[RA_THREADS_MAX]; + +void rathread_init(){ + register unsigned int i; + memset(&l_threads, 0x00, RA_THREADS_MAX * sizeof(struct rAthread) ); + + for(i = 0; i < RA_THREADS_MAX; i++){ + l_threads[i].myID = i; + } + + // now lets init thread id 0, which represnts the main thread + g_rathread_ID = 0; + l_threads[0].prio = RAT_PRIO_NORMAL; + l_threads[0].proc = (rAthreadProc)0xDEADCAFE; + +}//end: rathread_init() + + + +void rathread_final(){ + register unsigned int i; + + // Unterminated Threads Left? + // Should'nt happen .. + // Kill 'em all! + // + for(i = 1; i < RA_THREADS_MAX; i++){ + if(l_threads[i].proc != NULL){ + ShowWarning("rAthread_final: unterminated Thread (tid %u entryPoint %p) - forcing to terminate (kill)\n", i, l_threads[i].proc); + rathread_destroy(&l_threads[i]); + } + } + + +}//end: rathread_final() + + + +// gets called whenever a thread terminated .. +static void rat_thread_terminated( rAthread handle ){ + + int id_backup = handle->myID; + + // Simply set all members to 0 (except the id) + memset(handle, 0x00, sizeof(struct rAthread)); + + handle->myID = id_backup; // done ;) + +}//end: rat_thread_terminated() + +#ifdef WIN32 +DWORD WINAPI _raThreadMainRedirector(LPVOID p){ +#else +static void *_raThreadMainRedirector( void *p ){ + sigset_t set; // on Posix Thread platforms +#endif + void *ret; + + // Update myID @ TLS to right id. + g_rathread_ID = ((rAthread)p)->myID; + +#ifndef WIN32 + // When using posix threads + // the threads inherits the Signal mask from the thread which's spawned + // this thread + // so we've to block everything we dont care about. + sigemptyset(&set); + sigaddset(&set, SIGINT); + sigaddset(&set, SIGTERM); + sigaddset(&set, SIGPIPE); + + pthread_sigmask(SIG_BLOCK, &set, NULL); + +#endif + + + ret = ((rAthread)p)->proc( ((rAthread)p)->param ) ; + +#ifdef WIN32 + CloseHandle( ((rAthread)p)->hThread ); +#endif + + rat_thread_terminated( (rAthread)p ); +#ifdef WIN32 + return (DWORD)ret; +#else + return ret; +#endif +}//end: _raThreadMainRedirector() + + + + + +/// +/// API Level +/// +rAthread rathread_create( rAthreadProc entryPoint, void *param ){ + return rathread_createEx( entryPoint, param, (1<<23) /*8MB*/, RAT_PRIO_NORMAL ); +}//end: rathread_create() + + +rAthread rathread_createEx( rAthreadProc entryPoint, void *param, size_t szStack, RATHREAD_PRIO prio ){ +#ifndef WIN32 + pthread_attr_t attr; +#endif + size_t tmp; + unsigned int i; + rAthread handle = NULL; + + + // given stacksize aligned to systems pagesize? + tmp = szStack % getpagesize(); + if(tmp != 0) + szStack += tmp; + + + // Get a free Thread Slot. + for(i = 0; i < RA_THREADS_MAX; i++){ + if(l_threads[i].proc == NULL){ + handle = &l_threads[i]; + break; + } + } + + if(handle == NULL){ + ShowError("rAthread: cannot create new thread (entryPoint: %p) - no free thread slot found!", entryPoint); + return NULL; + } + + + + handle->proc = entryPoint; + handle->param = param; + +#ifdef WIN32 + handle->hThread = CreateThread(NULL, szStack, _raThreadMainRedirector, (void*)handle, 0, NULL); +#else + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, szStack); + + if(pthread_create(&handle->hThread, &attr, _raThreadMainRedirector, (void*)handle) != 0){ + handle->proc = NULL; + handle->param = NULL; + return NULL; + } + pthread_attr_destroy(&attr); +#endif + + rathread_prio_set( handle, prio ); + + return handle; +}//end: rathread_createEx + + +void rathread_destroy ( rAthread handle ){ +#ifdef WIN32 + if( TerminateThread(handle->hThread, 0) != FALSE){ + CloseHandle(handle->hThread); + rat_thread_terminated(handle); + } +#else + if( pthread_cancel( handle->hThread ) == 0){ + + // We have to join it, otherwise pthread wont re-cycle its internal ressources assoc. with this thread. + // + pthread_join( handle->hThread, NULL ); + + // Tell our manager to release ressources ;) + rat_thread_terminated(handle); + } +#endif +}//end: rathread_destroy() + +rAthread rathread_self( ){ + rAthread handle = &l_threads[g_rathread_ID]; + + if(handle->proc != NULL) // entry point set, so its used! + return handle; + + return NULL; +}//end: rathread_self() + + +int rathread_get_tid(){ + + return g_rathread_ID; + +}//end: rathread_get_tid() + + +bool rathread_wait( rAthread handle, void* *out_exitCode ){ + + // Hint: + // no thread data cleanup routine call here! + // its managed by the callProxy itself.. + // +#ifdef WIN32 + WaitForSingleObject(handle->hThread, INFINITE); + return true; +#else + if(pthread_join(handle->hThread, out_exitCode) == 0) + return true; + return false; +#endif + +}//end: rathread_wait() + + +void rathread_prio_set( rAthread handle, RATHREAD_PRIO prio ){ + handle->prio = RAT_PRIO_NORMAL; + //@TODO +}//end: rathread_prio_set() + + +RATHREAD_PRIO rathread_prio_get( rAthread handle){ + return handle->prio; +}//end: rathread_prio_get() + + +void rathread_yield(){ +#ifdef WIN32 + SwitchToThread(); +#else + sched_yield(); +#endif +}//end: rathread_yield() diff --git a/src/common/thread.h b/src/common/thread.h new file mode 100644 index 000000000..d4027811f --- /dev/null +++ b/src/common/thread.h @@ -0,0 +1,115 @@ +#pragma once +#ifndef _rA_THREAD_H_ +#define _rA_THREAD_H_ + +#include "../common/cbasetypes.h" + +typedef struct rAthread *rAthread; +typedef void* (*rAthreadProc)(void*); + +typedef enum RATHREAD_PRIO { + RAT_PRIO_LOW = 0, + RAT_PRIO_NORMAL, + RAT_PRIO_HIGH +} RATHREAD_PRIO; + + +/** + * Creates a new Thread + * + * @param entyPoint - entryProc, + * @param param - general purpose parameter, would be given as parameter to the thread's entrypoint. + * + * @return not NULL if success + */ +rAthread rathread_create( rAthreadProc entryPoint, void *param ); + + +/** + * Creates a new Thread (with more creation options) + * + * @param entyPoint - entryProc, + * @param param - general purpose parameter, would be given as parameter to the thread's entrypoint + * @param szStack - stack Size in bytes + * @param prio - Priority of the Thread @ OS Scheduler.. + * + * @return not NULL if success + */ +rAthread rathread_createEx( rAthreadProc entryPoint, void *param, size_t szStack, RATHREAD_PRIO prio ); + + +/** + * Destroys the given Thread immediatly + * + * @note The Handle gets invalid after call! dont use it afterwards. + * + * @param handle - thread to destroy. + */ +void rathread_destroy ( rAthread handle ); + + +/** + * Returns the thread handle of the thread calling this function + * + * @note this wont work @ programms main thread + * @note the underlying implementation might not perform very well, cache the value received! + * + * @return not NULL if success + */ +rAthread rathread_self( ); + + +/** + * Returns own thrad id (TID) + * + * @note this is not the operating system THREAD ID! + * + * @return -1 when fails, otherwise >= 0 + */ +int rathread_get_tid(); + + +/** + * Waits for the given thread to terminate + * + * @param handle - thread to wait (join) for + * @param out_Exitcode - [OPTIONAL] - if given => Exitcode (value) of the given thread - if it's terminated + * + * @return true - if the given thread has been terminated. + */ +bool rathread_wait( rAthread handle, void* *out_exitCode ); + + +/** + * Sets the given PRIORITY @ OS Task Scheduler + * + * @param handle - thread to set prio for + * @param rio - the priority (RAT_PRIO_LOW ... ) + */ +void rathread_prio_set( rAthread handle, RATHREAD_PRIO prio ); + + +/** + * Gets the current Prio of the given trhead + * + * @param handle - the thread to get the prio for. + */ +RATHREAD_PRIO rathread_prio_get( rAthread handle); + + +/** + * Tells the OS scheduler to yield the execution of the calling thread + * + * @note: this will not "pause" the thread, + * it just allows the OS to spent the remaining time + * of the slice to another thread. + */ +void rathread_yield(); + + + +void rathread_init(); +void rathread_final(); + + +#endif -- cgit v1.2.3-60-g2f50