diff options
-rw-r--r-- | src/common/thread.c | 2 | ||||
-rw-r--r-- | src/config/core.h | 5 | ||||
-rw-r--r-- | src/map/log.c | 58 | ||||
-rw-r--r-- | src/map/log.h | 7 | ||||
-rw-r--r-- | src/map/map.c | 7 | ||||
-rw-r--r-- | src/map/map.h | 17 | ||||
-rw-r--r-- | src/map/script.c | 304 | ||||
-rw-r--r-- | src/map/script.h | 4 |
8 files changed, 385 insertions, 19 deletions
diff --git a/src/common/thread.c b/src/common/thread.c index baf4171da..315b310b2 100644 --- a/src/common/thread.c +++ b/src/common/thread.c @@ -271,7 +271,7 @@ int rathread_get_tid(){ #ifdef WIN32 return (int)GetCurrentThreadId(); #else - return (int)pthread_self(); + return (intptr_t)pthread_self(); #endif #endif diff --git a/src/config/core.h b/src/config/core.h index b0e139f27..fba593dc9 100644 --- a/src/config/core.h +++ b/src/config/core.h @@ -31,6 +31,11 @@ /// your map-server using more resources while this is active, comment the line #define SCRIPT_CALLFUNC_CHECK +/// uncomment to enable query_sql script command and mysql logs to function on it's own thread +/// be aware this feature is under tests and you should use at your own risk, we however +/// welcome any feedback you may have regarding this feature, please send us all bug reports. +//#define BETA_THREAD_TEST + //Uncomment to enable the Cell Stack Limit mod. //It's only config is the battle_config cell_stack_limit. //Only chars affected are those defined in BL_CHAR (mobs and players currently) diff --git a/src/map/log.c b/src/map/log.c index de3c850f3..749bd5c28 100644 --- a/src/map/log.c +++ b/src/map/log.c @@ -136,8 +136,13 @@ void log_branch(struct map_session_data* sd) if( !log_config.branch ) return; - if( log_config.sql_logs ) - { + if( log_config.sql_logs ) { +#ifdef BETA_THREAD_TEST + char entry[512]; + int e_length = 0; + e_length = sprintf(entry, LOG_QUERY " INTO `%s` (`branch_date`, `account_id`, `char_id`, `char_name`, `map`) VALUES (NOW(), '%d', '%d', '%s', '%s')", log_config.log_branch, sd->status.account_id, sd->status.char_id, sd->status.name, mapindex_id2name(sd->mapindex)); + queryThread_log(entry,e_length); +#else SqlStmt* stmt; stmt = SqlStmt_Malloc(logmysql_handle); if( SQL_SUCCESS != SqlStmt_Prepare(stmt, LOG_QUERY " INTO `%s` (`branch_date`, `account_id`, `char_id`, `char_name`, `map`) VALUES (NOW(), '%d', '%d', ?, '%s')", log_config.log_branch, sd->status.account_id, sd->status.char_id, mapindex_id2name(sd->mapindex) ) @@ -149,6 +154,7 @@ void log_branch(struct map_session_data* sd) return; } SqlStmt_Free(stmt); +#endif } else { @@ -179,12 +185,20 @@ void log_pick(int id, int m, e_log_pick_type type, int amount, struct item* itm) if( log_config.sql_logs ) { +#ifdef BETA_THREAD_TEST + char entry[512]; + int e_length = 0; + e_length = sprintf(entry, LOG_QUERY " INTO `%s` (`time`, `char_id`, `type`, `nameid`, `amount`, `refine`, `card0`, `card1`, `card2`, `card3`, `map`) VALUES (NOW(), '%d', '%c', '%d', '%d', '%d', '%d', '%d', '%d', '%d', '%s')", + log_config.log_pick, id, log_picktype2char(type), itm->nameid, amount, itm->refine, itm->card[0], itm->card[1], itm->card[2], itm->card[3], map[m].name?map[m].name:"" ); + queryThread_log(entry,e_length); +#else if( SQL_ERROR == Sql_Query(logmysql_handle, LOG_QUERY " INTO `%s` (`time`, `char_id`, `type`, `nameid`, `amount`, `refine`, `card0`, `card1`, `card2`, `card3`, `map`) VALUES (NOW(), '%d', '%c', '%d', '%d', '%d', '%d', '%d', '%d', '%d', '%s')", log_config.log_pick, id, log_picktype2char(type), itm->nameid, amount, itm->refine, itm->card[0], itm->card[1], itm->card[2], itm->card[3], map[m].name?map[m].name:"") ) { Sql_ShowDebug(logmysql_handle); return; } +#endif } else { @@ -226,12 +240,20 @@ void log_zeny(struct map_session_data* sd, e_log_pick_type type, struct map_sess if( log_config.sql_logs ) { +#ifdef BETA_THREAD_TEST + char entry[512]; + int e_length = 0; + e_length = sprintf(entry, LOG_QUERY " INTO `%s` (`time`, `char_id`, `src_id`, `type`, `amount`, `map`) VALUES (NOW(), '%d', '%d', '%c', '%d', '%s')", + log_config.log_zeny, sd->status.char_id, src_sd->status.char_id, log_picktype2char(type), amount, mapindex_id2name(sd->mapindex)); + queryThread_log(entry,e_length); +#else if( SQL_ERROR == Sql_Query(logmysql_handle, LOG_QUERY " INTO `%s` (`time`, `char_id`, `src_id`, `type`, `amount`, `map`) VALUES (NOW(), '%d', '%d', '%c', '%d', '%s')", log_config.log_zeny, sd->status.char_id, src_sd->status.char_id, log_picktype2char(type), amount, mapindex_id2name(sd->mapindex)) ) { Sql_ShowDebug(logmysql_handle); return; } +#endif } else { @@ -259,12 +281,20 @@ void log_mvpdrop(struct map_session_data* sd, int monster_id, int* log_mvp) if( log_config.sql_logs ) { +#ifdef BETA_THREAD_TEST + char entry[512]; + int e_length = 0; + e_length = sprintf(entry, LOG_QUERY " INTO `%s` (`mvp_date`, `kill_char_id`, `monster_id`, `prize`, `mvpexp`, `map`) VALUES (NOW(), '%d', '%d', '%d', '%d', '%s') ", + log_config.log_mvpdrop, sd->status.char_id, monster_id, log_mvp[0], log_mvp[1], mapindex_id2name(sd->mapindex)); + queryThread_log(entry,e_length); +#else if( SQL_ERROR == Sql_Query(logmysql_handle, LOG_QUERY " INTO `%s` (`mvp_date`, `kill_char_id`, `monster_id`, `prize`, `mvpexp`, `map`) VALUES (NOW(), '%d', '%d', '%d', '%d', '%s') ", log_config.log_mvpdrop, sd->status.char_id, monster_id, log_mvp[0], log_mvp[1], mapindex_id2name(sd->mapindex)) ) { Sql_ShowDebug(logmysql_handle); return; } +#endif } else { @@ -293,6 +323,12 @@ void log_atcommand(struct map_session_data* sd, const char* message) if( log_config.sql_logs ) { +#ifdef BETA_THREAD_TEST + char entry[512]; + int e_length = 0; + e_length = sprintf(entry, LOG_QUERY " INTO `%s` (`atcommand_date`, `account_id`, `char_id`, `char_name`, `map`, `command`) VALUES (NOW(), '%d', '%d', '%s', '%s', '%s')", log_config.log_gm, sd->status.account_id, sd->status.char_id, sd->status.name ,mapindex_id2name(sd->mapindex), message); + queryThread_log(entry,e_length); +#else SqlStmt* stmt; stmt = SqlStmt_Malloc(logmysql_handle); @@ -306,6 +342,7 @@ void log_atcommand(struct map_session_data* sd, const char* message) return; } SqlStmt_Free(stmt); +#endif } else { @@ -333,6 +370,12 @@ void log_npc(struct map_session_data* sd, const char* message) if( log_config.sql_logs ) { +#ifdef BETA_THREAD_TEST + char entry[512]; + int e_length = 0; + e_length = sprintf(entry, LOG_QUERY " INTO `%s` (`npc_date`, `account_id`, `char_id`, `char_name`, `map`, `mes`) VALUES (NOW(), '%d', '%d', '%s', '%s', '%s')", log_config.log_npc, sd->status.account_id, sd->status.char_id, sd->status.name, mapindex_id2name(sd->mapindex), message ); + queryThread_log(entry,e_length); +#else SqlStmt* stmt; stmt = SqlStmt_Malloc(logmysql_handle); if( SQL_SUCCESS != SqlStmt_Prepare(stmt, LOG_QUERY " INTO `%s` (`npc_date`, `account_id`, `char_id`, `char_name`, `map`, `mes`) VALUES (NOW(), '%d', '%d', ?, '%s', ?)", log_config.log_npc, sd->status.account_id, sd->status.char_id, mapindex_id2name(sd->mapindex) ) @@ -345,6 +388,7 @@ void log_npc(struct map_session_data* sd, const char* message) return; } SqlStmt_Free(stmt); +#endif } else { @@ -375,8 +419,13 @@ void log_chat(e_log_chat_type type, int type_id, int src_charid, int src_accid, return; } - if( log_config.sql_logs ) - { + if( log_config.sql_logs ) { +#ifdef BETA_THREAD_TEST + char entry[512]; + int e_length = 0; + e_length = sprintf(entry, LOG_QUERY " INTO `%s` (`time`, `type`, `type_id`, `src_charid`, `src_accountid`, `src_map`, `src_map_x`, `src_map_y`, `dst_charname`, `message`) VALUES (NOW(), '%c', '%d', '%d', '%d', '%s', '%d', '%d', '%s', '%s')", log_config.log_chat, log_chattype2char(type), type_id, src_charid, src_accid, map, x, y, dst_charname, message ); + queryThread_log(entry,e_length); +#else SqlStmt* stmt; stmt = SqlStmt_Malloc(logmysql_handle); @@ -390,6 +439,7 @@ void log_chat(e_log_chat_type type, int type_id, int src_charid, int src_accid, return; } SqlStmt_Free(stmt); +#endif } else { diff --git a/src/map/log.h b/src/map/log.h index ac85b7ccb..a40a3fcf4 100644 --- a/src/map/log.h +++ b/src/map/log.h @@ -79,4 +79,11 @@ extern struct Log_Config } log_config; +#ifdef BETA_THREAD_TEST + struct { + char** entry; + int count; + } logThreadData; +#endif + #endif /* _LOG_H_ */ diff --git a/src/map/map.c b/src/map/map.c index 2239a2f23..30198a32c 100644 --- a/src/map/map.c +++ b/src/map/map.c @@ -3502,19 +3502,20 @@ int map_sql_close(void) ShowStatus("Close Map DB Connection....\n"); Sql_Free(mmysql_handle); mmysql_handle = NULL; - +#ifndef BETA_THREAD_TEST if (log_config.sql_logs) { ShowStatus("Close Log DB Connection....\n"); Sql_Free(logmysql_handle); logmysql_handle = NULL; } - +#endif return 0; } int log_sql_init(void) { +#ifndef BETA_THREAD_TEST // log db connection logmysql_handle = Sql_Malloc(); @@ -3526,7 +3527,7 @@ int log_sql_init(void) if( strlen(default_codepage) > 0 ) if ( SQL_ERROR == Sql_SetEncoding(logmysql_handle, default_codepage) ) Sql_ShowDebug(logmysql_handle); - +#endif return 0; } diff --git a/src/map/map.h b/src/map/map.h index 6b8ffbe5b..118808713 100644 --- a/src/map/map.h +++ b/src/map/map.h @@ -753,6 +753,23 @@ typedef struct elemental_data TBL_ELEM; extern char main_chat_nick[16]; +#ifdef BETA_THREAD_TEST + +extern char default_codepage[32]; +extern int map_server_port; +extern char map_server_ip[32]; +extern char map_server_id[32]; +extern char map_server_pw[32]; +extern char map_server_db[32]; + +extern char log_db_ip[32]; +extern int log_db_port; +extern char log_db_id[32]; +extern char log_db_pw[32]; +extern char log_db_db[32]; + +#endif + #include "../common/sql.h" extern int db_use_sqldbs; diff --git a/src/map/script.c b/src/map/script.c index 4b8afdb24..7b13787ce 100644 --- a/src/map/script.c +++ b/src/map/script.c @@ -61,6 +61,13 @@ #include <setjmp.h> #include <errno.h> +#ifdef BETA_THREAD_TEST + #include "../common/atomic.h" + #include "../common/spinlock.h" + #include "../common/thread.h" + #include "../common/mutex.h" +#endif + /////////////////////////////////////////////////////////////////////////////// //## TODO possible enhancements: [FlavioJS] @@ -306,6 +313,30 @@ extern script_function buildin_func[]; static struct linkdb_node* sleep_db;// int oid -> struct script_state* +#ifdef BETA_THREAD_TEST +/** + * MySQL Query Slave + **/ +static SPIN_LOCK queryThreadLock; +static rAthread queryThread = NULL; +static ramutex queryThreadMutex = NULL; +static racond queryThreadCond = NULL; +static volatile int32 queryThreadTerminate = 0; + +struct queryThreadEntry { + bool ok; + bool type; /* main db or log db? */ + struct script_state *st; +}; + +/* Ladies and Gentleman the Manager! */ +struct { + struct queryThreadEntry **entry;/* array of structs */ + int count; + int timer;/* used to receive processed entries */ +} queryThreadData; +#endif + /*========================================== * ローカルプロトタイプ宣言 (必要な物のみ) *------------------------------------------*/ @@ -3006,6 +3037,7 @@ void script_free_state(struct script_state* st) pop_stack(st, 0, st->stack->sp); aFree(st->stack->stack_data); aFree(st->stack); + st->stack = NULL; st->pos = -1; aFree(st); } @@ -3634,8 +3666,8 @@ static void script_attach_state(struct script_state* st) *------------------------------------------*/ void run_script_main(struct script_state *st) { - int cmdcount=script_config.check_cmdcount; - int gotocount=script_config.check_gotocount; + int cmdcount = script_config.check_cmdcount; + int gotocount = script_config.check_gotocount; TBL_PC *sd; struct script_stack *stack=st->stack; struct npc_data *nd; @@ -3933,8 +3965,175 @@ void script_setarray_pc(struct map_session_data* sd, const char* varname, uint8 refcache[0] = key; } } +#ifdef BETA_THREAD_TEST +int buildin_query_sql_sub(struct script_state* st, Sql* handle); + +/* used to receive items the queryThread has already processed */ +int queryThread_timer(int tid, unsigned int tick, int id, intptr_t data) { + int i, cursor = 0; + bool allOk = true; + + EnterSpinLock(&queryThreadLock); + + for( i = 0; i < queryThreadData.count; i++ ) { + struct queryThreadEntry *entry = queryThreadData.entry[i]; + + if( !entry->ok ) { + allOk = false; + continue; + } + + run_script_main(entry->st); + + entry->st = NULL;/* empty entries */ + aFree(entry); + queryThreadData.entry[i] = NULL; + } + + + if( allOk ) { + /* cancel the repeating timer -- it'll re-create itself when necessary, dont need to remain looping */ + delete_timer(queryThreadData.timer, queryThread_timer); + queryThreadData.timer = INVALID_TIMER; + } + + /* now lets clear the mess. */ + for( i = 0; i < queryThreadData.count; i++ ) { + struct queryThreadEntry *entry = queryThreadData.entry[i]; + if( entry == NULL ) + continue;/* entry on hold */ + + /* move */ + memmove(&queryThreadData.entry[cursor], &queryThreadData.entry[i], sizeof(struct queryThreadEntry*)); + + cursor++; + } + + queryThreadData.count = cursor; + + LeaveSpinLock(&queryThreadLock); + + return 0; +} + +void queryThread_add(struct script_state *st, bool type) { + int idx = 0; + struct queryThreadEntry* entry = NULL; + + EnterSpinLock(&queryThreadLock); + + if( queryThreadData.count++ != 0 ) + RECREATE(queryThreadData.entry, struct queryThreadEntry* , queryThreadData.count); + + idx = queryThreadData.count-1; + + CREATE(queryThreadData.entry[idx],struct queryThreadEntry,1); + + entry = queryThreadData.entry[idx]; + + entry->st = st; + entry->ok = false; + entry->type = type; + if( queryThreadData.timer == INVALID_TIMER ) { /* start the receiver timer */ + queryThreadData.timer = add_timer_interval(gettick() + 100, queryThread_timer, 0, 0, 100); + } + + LeaveSpinLock(&queryThreadLock); + + /* unlock the queryThread */ + racond_signal(queryThreadCond); +} +/* adds a new log to the queue */ +void queryThread_log(char * entry, int length) { + int idx = logThreadData.count; + + EnterSpinLock(&queryThreadLock); + + if( logThreadData.count++ != 0 ) + RECREATE(logThreadData.entry, char* , logThreadData.count); + + CREATE(logThreadData.entry[idx], char, length + 1 ); + safestrncpy(logThreadData.entry[idx], entry, length + 1 ); + + LeaveSpinLock(&queryThreadLock); + + /* unlock the queryThread */ + racond_signal(queryThreadCond); +} + +/* queryThread_main */ +static void *queryThread_main(void *x) { + Sql *queryThread_handle = Sql_Malloc(); + int i; + + if ( SQL_ERROR == Sql_Connect(queryThread_handle, map_server_id, map_server_pw, map_server_ip, map_server_port, map_server_db) ) + exit(EXIT_FAILURE); + + if( strlen(default_codepage) > 0 ) + if ( SQL_ERROR == Sql_SetEncoding(queryThread_handle, default_codepage) ) + Sql_ShowDebug(queryThread_handle); + if( log_config.sql_logs ) { + logmysql_handle = Sql_Malloc(); + + if ( SQL_ERROR == Sql_Connect(logmysql_handle, log_db_id, log_db_pw, log_db_ip, log_db_port, log_db_db) ) + exit(EXIT_FAILURE); + + if( strlen(default_codepage) > 0 ) + if ( SQL_ERROR == Sql_SetEncoding(logmysql_handle, default_codepage) ) + Sql_ShowDebug(logmysql_handle); + } + + while( 1 ) { + + if(queryThreadTerminate > 0) + break; + + EnterSpinLock(&queryThreadLock); + + /* mess with queryThreadData within the lock */ + for( i = 0; i < queryThreadData.count; i++ ) { + struct queryThreadEntry *entry = queryThreadData.entry[i]; + + if( entry->ok ) + continue; + else if ( !entry->st || !entry->st->stack ) { + entry->ok = true;/* dispose */ + continue; + } + + buildin_query_sql_sub(entry->st, entry->type ? logmysql_handle : queryThread_handle); + + entry->ok = true;/* we're done with this */ + } + + /* also check for any logs in need to be sent */ + if( log_config.sql_logs ) { + for( i = 0; i < logThreadData.count; i++ ) { + if( SQL_ERROR == Sql_Query(logmysql_handle, logThreadData.entry[i]) ) + Sql_ShowDebug(logmysql_handle); + aFree(logThreadData.entry[i]); + } + logThreadData.count = 0; + } + + LeaveSpinLock(&queryThreadLock); + + ramutex_lock( queryThreadMutex ); + racond_wait( queryThreadCond, queryThreadMutex, -1 ); + ramutex_unlock( queryThreadMutex ); + } + + Sql_Free(queryThread_handle); + + if( log_config.sql_logs ) { + Sql_Free(logmysql_handle); + } + + return NULL; +} +#endif /*========================================== * 終了 *------------------------------------------*/ @@ -4021,25 +4220,88 @@ int do_final_script() { if( atcmd_binding_count != 0 ) aFree(atcmd_binding); +#ifdef BETA_THREAD_TEST + /* QueryThread */ + InterlockedIncrement(&queryThreadTerminate); + racond_signal(queryThreadCond); + rathread_wait(queryThread, NULL); + + // Destroy cond var and mutex. + racond_destroy( queryThreadCond ); + ramutex_destroy( queryThreadMutex ); + + /* Clear missing vars */ + for( i = 0; i < queryThreadData.count; i++ ) { + aFree(queryThreadData.entry[i]); + } + + aFree(queryThreadData.entry); + + for( i = 0; i < logThreadData.count; i++ ) { + aFree(logThreadData.entry[i]); + } + + aFree(logThreadData.entry); +#endif return 0; } /*========================================== * 初期化 *------------------------------------------*/ -int do_init_script() -{ +int do_init_script() { userfunc_db=strdb_alloc(DB_OPT_DUP_KEY,0); scriptlabel_db=strdb_alloc(DB_OPT_DUP_KEY,50); autobonus_db = strdb_alloc(DB_OPT_DUP_KEY,0); mapreg_init(); +#ifdef BETA_THREAD_TEST + CREATE(queryThreadData.entry, struct queryThreadEntry*, 1); + queryThreadData.count = 0; + CREATE(logThreadData.entry, char *, 1); + logThreadData.count = 0; + /* QueryThread Start */ + + InitializeSpinLock(&queryThreadLock); + queryThreadData.timer = INVALID_TIMER; + queryThreadTerminate = 0; + queryThreadMutex = ramutex_create(); + queryThreadCond = racond_create(); + + queryThread = rathread_create(queryThread_main, NULL); + + if(queryThread == NULL){ + ShowFatalError("do_init_script: cannot spawn Query Thread.\n"); + exit(EXIT_FAILURE); + } + + add_timer_func_list(queryThread_timer, "queryThread_timer"); +#endif return 0; } int script_reload() { int i; + +#ifdef BETA_THREAD_TEST + /* we're reloading so any queries undergoing should be...exterminated. */ + EnterSpinLock(&queryThreadLock); + + for( i = 0; i < queryThreadData.count; i++ ) { + aFree(queryThreadData.entry[i]); + } + queryThreadData.count = 0; + + if( queryThreadData.timer != INVALID_TIMER ) { + delete_timer(queryThreadData.timer, queryThread_timer); + queryThreadData.timer = INVALID_TIMER; + } + + LeaveSpinLock(&queryThreadLock); +#endif + + userfunc_db->clear(userfunc_db, db_script_free_code_sub); db_clear(scriptlabel_db); @@ -13960,6 +14222,7 @@ int buildin_query_sql_sub(struct script_state* st, Sql* handle) // Execute the query query = script_getstr(st,2); + if( SQL_ERROR == Sql_QueryStr(handle, query) ) { Sql_ShowDebug(handle); @@ -14014,24 +14277,43 @@ int buildin_query_sql_sub(struct script_state* st, Sql* handle) // Free data Sql_FreeResult(handle); script_pushint(st, i); + return 0; } -BUILDIN_FUNC(query_sql) -{ +BUILDIN_FUNC(query_sql) { +#ifdef BETA_THREAD_TEST + if( st->state != RERUNLINE ) { + queryThread_add(st,false); + + st->state = RERUNLINE;/* will continue when the query is finished running. */ + } else + st->state = RUN; + + return 0; +#else return buildin_query_sql_sub(st, mmysql_handle); +#endif } -BUILDIN_FUNC(query_logsql) -{ - if( !log_config.sql_logs ) - {// logmysql_handle == NULL +BUILDIN_FUNC(query_logsql) { + if( !log_config.sql_logs ) {// logmysql_handle == NULL ShowWarning("buildin_query_logsql: SQL logs are disabled, query '%s' will not be executed.\n", script_getstr(st,2)); script_pushint(st,-1); return 1; } - +#ifdef BETA_THREAD_TEST + if( st->state != RERUNLINE ) { + queryThread_add(st,true); + + st->state = RERUNLINE;/* will continue when the query is finished running. */ + } else + st->state = RUN; + + return 0; +#else return buildin_query_sql_sub(st, logmysql_handle); +#endif } //Allows escaping of a given string. diff --git a/src/map/script.h b/src/map/script.h index 41c686660..ed56b8ebe 100644 --- a/src/map/script.h +++ b/src/map/script.h @@ -190,4 +190,8 @@ int script_reload(void); // @commands (script based) void setd_sub(struct script_state *st, TBL_PC *sd, const char *varname, int elem, void *value, struct DBMap **ref); +#ifdef BETA_THREAD_TEST +void queryThread_log(char * entry, int length); +#endif + #endif /* _SCRIPT_H_ */ |