From: dan Date: Sat, 29 Jul 2017 17:01:06 +0000 (+0000) Subject: Update test program "tserver" to use a native pthreads mutex/condition X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=af12f8e2c94edca25d6ee8b2e2f16c0b592542ab;p=thirdparty%2Fsqlite.git Update test program "tserver" to use a native pthreads mutex/condition variable to efficiently manage wal file checkpoints without the wal file growing indefinitely. FossilOrigin-Name: 8299bdb7cbede30c665dda131bdcbd1d260b3ae9bd16d9b414d8c3776b08f1b3 --- diff --git a/manifest b/manifest index 24090665cf..0d9e30de88 100644 --- a/manifest +++ b/manifest @@ -1,5 +1,5 @@ -C Reduce\sthe\snumber\sof\smallocs\srequired\sof\swriters\sin\sserver\smode. -D 2017-07-28T21:02:13.200 +C Update\stest\sprogram\s"tserver"\sto\suse\sa\snative\spthreads\smutex/condition\nvariable\sto\sefficiently\smanage\swal\sfile\scheckpoints\swithout\sthe\swal\sfile\ngrowing\sindefinitely. +D 2017-07-29T17:01:06.529 F Makefile.in d9873c9925917cca9990ee24be17eb9613a668012c85a343aef7e5536ae266e8 F Makefile.linux-gcc 7bc79876b875010e8c8f9502eb935ca92aa3c434 F Makefile.msc 02b469e9dcd5b7ee63fc1fb05babc174260ee4cfa4e0ef2e48c3c6801567a016 @@ -439,8 +439,8 @@ F src/os_setup.h 0dbaea40a7d36bf311613d31342e0b99e2536586 F src/os_unix.c 98df292b72d8c15c6292be663ee917ac0316a0d9ec652e6e55e06bc4d83f84b7 F src/os_win.c 2a6c73eef01c51a048cc4ddccd57f981afbec18a F src/os_win.h 7b073010f1451abe501be30d12f6bc599824944a -F src/pager.c d3a15bf7861cc9be235ff424cdddde7fae3cd06fbcacbd967c6dad2d313e1d90 -F src/pager.h 857d21f70acefc76e5bae4584f0133e754a2448b197d1e79b7179f26f9a11bc1 +F src/pager.c 9c2006fb3c4d4a985e79de21cb5e5be22c8ae663159cbd0bf5e708803bf53678 +F src/pager.h 316dac0671fd7555af9e73d4357febd5f2d3ce6a185ffd8d77b7fc0423ac8b1a F src/parse.y 3a1babd6645a8103898a5e7e239dcf56cdafbdc25fd8133bb4a9160f9471d42d F src/pcache.c 62835bed959e2914edd26afadfecce29ece0e870 F src/pcache.h 521bb9610d38ef17a3cc9b5ddafd4546c2ea67fa3d0e464823d73c2a28d50e11 @@ -453,7 +453,7 @@ F src/random.c 80f5d666f23feb3e6665a6ce04c7197212a88384 F src/resolve.c 4324a94573b1e29286f8121e4881db59eaedc014afeb274c8d3e07ed282e0e20 F src/rowset.c 7b7e7e479212e65b723bf40128c7b36dc5afdfac F src/select.c c6bf96a7f9d7d68f929de84738c599a30d0a725ab0b54420e70545743cd5ee7b -F src/server.c cc07e5ee19436c08ae2331e8476db0c968ade42528df68cfa40eb58314cd21e1 +F src/server.c ef4a69ea4124c89fe73ee0889929c089290e240a3a5f27fa28beea51cb855e51 F src/server.h adcc122084f9370c91479bd9f7bbac1ccd7f63784249de40f63dae8a9fae1bfe F src/shell.c bd6a37cbe8bf64ef6a6a74fdc50f067d3148149b4ce2b4d03154663e66ded55f F src/shell.c.in b5725acacba95ccefa57b6d068f710e29ba8239c3aa704628a1902a1f729c175 @@ -1618,7 +1618,7 @@ F tool/stack_usage.tcl f8e71b92cdb099a147dad572375595eae55eca43 F tool/symbols-mingw.sh 4dbcea7e74768305384c9fd2ed2b41bbf9f0414d F tool/symbols.sh c5a617b8c61a0926747a56c65f5671ef8ac0e148 F tool/tostr.tcl 96022f35ada2194f6f8ccf6fd95809e90ed277c4 -F tool/tserver.c b3f368766b6bcef6d58b057de2fc3d3e33510b2b92fd0a53a6bfbe243b58d027 +F tool/tserver.c d6b4e074dc9d11edf9aac79cfd925cf1ea71f823b00344bc443e1aaea1f16d5b F tool/varint.c 5d94cb5003db9dbbcbcc5df08d66f16071aee003 F tool/vdbe-compress.tcl 5926c71f9c12d2ab73ef35c29376e756eb68361c F tool/vdbe_profile.tcl 246d0da094856d72d2c12efec03250d71639d19f @@ -1646,7 +1646,7 @@ F vsixtest/vsixtest.tcl 6a9a6ab600c25a91a7acc6293828957a386a8a93 F vsixtest/vsixtest.vcxproj.data 2ed517e100c66dc455b492e1a33350c1b20fbcdc F vsixtest/vsixtest.vcxproj.filters 37e51ffedcdb064aad6ff33b6148725226cd608e F vsixtest/vsixtest_TemporaryKey.pfx e5b1b036facdb453873e7084e1cae9102ccc67a0 -P d0719ad757bdf7cf2d7d7a4f7b0e713c262ffb434b91ddbb12e190e479abd19e -R 45df4a0aef3166ba81de04b37f949988 +P 60953997f62208f82b1efb53b8a1b0c6a26370411041457f747917e10d9a0e68 +R f90a2e0fad39683a4120989a28a36559 U dan -Z b12068c542ae27866a718f970ab635de +Z 371e271b5452f3a4804ed5b5db0b3183 diff --git a/manifest.uuid b/manifest.uuid index 4dccf405fb..03504fbc9e 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -60953997f62208f82b1efb53b8a1b0c6a26370411041457f747917e10d9a0e68 \ No newline at end of file +8299bdb7cbede30c665dda131bdcbd1d260b3ae9bd16d9b414d8c3776b08f1b3 \ No newline at end of file diff --git a/src/pager.c b/src/pager.c index 14ec8321b5..ab232f8c60 100644 --- a/src/pager.c +++ b/src/pager.c @@ -7747,9 +7747,6 @@ int sqlite3PagerWalFramesize(Pager *pPager){ #endif #ifdef SQLITE_SERVER_EDITION -int sqlite3PagerIsServer(Pager *pPager){ - return pagerIsServer(pPager); -} int sqlite3PagerPagelock(Pager *pPager, Pgno pgno, int bWrite){ if( pagerIsServer(pPager)==0 ) return SQLITE_OK; return sqlite3ServerLock(pPager->pServer, pgno, bWrite, 0); diff --git a/src/pager.h b/src/pager.h index bc9405837b..b214eadb3f 100644 --- a/src/pager.h +++ b/src/pager.h @@ -238,7 +238,6 @@ void *sqlite3PagerCodec(DbPage *); #ifdef SQLITE_SERVER_EDITION int sqlite3PagerRollbackJournal(Pager*, sqlite3_file*); - int sqlite3PagerIsServer(Pager *pPager); int sqlite3PagerPagelock(Pager *pPager, Pgno, int); void sqlite3PagerServerJournal(Pager*, sqlite3_file*, const char*); #endif diff --git a/src/server.c b/src/server.c index 5632f771bf..2e0ac47cfc 100644 --- a/src/server.c +++ b/src/server.c @@ -289,7 +289,7 @@ void sqlite3ServerDisconnect(Server *p, sqlite3_file *dbfd){ for(pp=&g_server.pDb; *pp!=pDb; pp=&((*pp)->pNext)); *pp = pDb->pNext; sqlite3_mutex_free(pDb->mutex); - while( pFree=pDb->pFree ){ + while( (pFree = pDb->pFree) ){ pDb->pFree = pFree->pNext; sqlite3_free(pFree); } diff --git a/tool/tserver.c b/tool/tserver.c index ba44f0df3c..71f5b93e73 100644 --- a/tool/tserver.c +++ b/tool/tserver.c @@ -32,6 +32,8 @@ ** .run Run all SQL statements in the list. ** .repeats N Configure the number of repeats per ".run". ** .seconds N Configure the number of seconds to ".run" for. +** .mutex_commit Add a "COMMIT" protected by a g.commit_mutex +** to the current SQL. ** ** Example input: ** @@ -60,10 +62,31 @@ #include "sqlite3.h" -/* Database used by this server */ -static char *zDatabaseName = 0; +#define TSERVER_DEFAULT_CHECKPOINT_THRESHOLD 3900 + +/* Global variables */ +struct TserverGlobal { + char *zDatabaseName; /* Database used by this server */ + char *zVfs; + sqlite3_mutex *commit_mutex; + + /* The following use native pthreads instead of a portable interface. This + ** is because a condition variable, as well as a mutex, is required. */ + pthread_mutex_t ckpt_mutex; + pthread_cond_t ckpt_cond; + int nThreshold; /* Checkpoint when wal is this large */ + int bCkptRequired; /* True if wal checkpoint is required */ + int nRun; /* Number of clients in ".run" */ + int nWait; /* Number of clients waiting on ckpt_cond */ +}; + +static struct TserverGlobal g = {0}; -static char *zGlobalVfs = 0; +typedef struct ClientSql ClientSql; +struct ClientSql { + sqlite3_stmt *pStmt; + int bMutex; +}; typedef struct ClientCtx ClientCtx; struct ClientCtx { @@ -71,7 +94,7 @@ struct ClientCtx { int fd; /* Client fd */ int nRepeat; /* Number of times to repeat SQL */ int nSecond; /* Number of seconds to run for */ - sqlite3_stmt **apPrepare; /* Array of prepared statements */ + ClientSql *aPrepare; /* Array of prepared statements */ int nPrepare; /* Valid size of apPrepare[] */ int nAlloc; /* Allocated size of apPrepare[] */ }; @@ -137,10 +160,11 @@ static int handle_some_sql(ClientCtx *p, const char *zSql, int nSql){ while( rc==SQLITE_OK ){ if( p->nPrepare>=p->nAlloc ){ - int nByte = (p->nPrepare+32) * sizeof(sqlite3_stmt*); - sqlite3_stmt **apNew = sqlite3_realloc(p->apPrepare, nByte); - if( apNew ){ - p->apPrepare = apNew; + int nByte = (p->nPrepare+32) * sizeof(ClientSql); + ClientSql *aNew = sqlite3_realloc(p->aPrepare, nByte); + if( aNew ){ + memset(&aNew[p->nPrepare], 0, sizeof(ClientSql)*32); + p->aPrepare = aNew; p->nAlloc = p->nPrepare+32; }else{ rc = SQLITE_NOMEM; @@ -148,14 +172,14 @@ static int handle_some_sql(ClientCtx *p, const char *zSql, int nSql){ } } rc = sqlite3_prepare_v2( - p->db, zTail, nTail, &p->apPrepare[p->nPrepare], &zTail + p->db, zTail, nTail, &p->aPrepare[p->nPrepare].pStmt, &zTail ); if( rc!=SQLITE_OK ){ send_message(p, "error - %s\n", sqlite3_errmsg(p->db)); rc = 1; break; } - if( p->apPrepare[p->nPrepare]==0 ){ + if( p->aPrepare[p->nPrepare].pStmt==0 ){ break; } p->nPrepare++; @@ -175,19 +199,123 @@ static sqlite3_int64 get_timer(void){ static void clear_sql(ClientCtx *p){ int j; for(j=0; jnPrepare; j++){ - sqlite3_finalize(p->apPrepare[j]); + sqlite3_finalize(p->aPrepare[j].pStmt); } p->nPrepare = 0; } +/* +** The sqlite3_wal_hook() callback used by all client database connections. +*/ +static int clientWalHook(void *pArg, sqlite3 *db, const char *zDb, int nFrame){ + if( nFrame>=g.nThreshold ){ + g.bCkptRequired = 1; + } + return SQLITE_OK; +} + +static int handle_run_command(ClientCtx *p){ + int i, j; + int nBusy = 0; + sqlite3_int64 t0 = get_timer(); + sqlite3_int64 t1 = t0; + int nT1 = 0; + int nTBusy1 = 0; + int rc = SQLITE_OK; + + pthread_mutex_lock(&g.ckpt_mutex); + g.nRun++; + pthread_mutex_unlock(&g.ckpt_mutex); + + + for(j=0; (p->nRepeat<=0 || jnRepeat) && rc==SQLITE_OK; j++){ + sqlite3_int64 t2; + + for(i=0; inPrepare && rc==SQLITE_OK; i++){ + sqlite3_stmt *pStmt = p->aPrepare[i].pStmt; + + /* If the bMutex flag is set, grab g.commit_mutex before executing + ** the SQL statement (which is always "COMMIT" in this case). */ + if( p->aPrepare[i].bMutex ){ + sqlite3_mutex_enter(g.commit_mutex); + } + + /* Execute the statement */ + while( sqlite3_step(pStmt)==SQLITE_ROW ); + rc = sqlite3_reset(pStmt); + + /* Relinquish the g.commit_mutex mutex if required. */ + if( p->aPrepare[i].bMutex ){ + sqlite3_mutex_leave(g.commit_mutex); + } + + if( (rc & 0xFF)==SQLITE_BUSY ){ + if( sqlite3_get_autocommit(p->db)==0 ){ + sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0); + } + nBusy++; + rc = SQLITE_OK; + break; + } + else if( rc!=SQLITE_OK ){ + send_message(p, "error - %s\n", sqlite3_errmsg(p->db)); + } + } + + t2 = get_timer(); + if( t2>=(t1+1000) ){ + int nMs = (t2 - t1); + int nDone = (j+1 - nBusy - nT1); + + rc = send_message( + p, "(%d done @ %d per second, %d busy)\n", + nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1 + ); + t1 = t2; + nT1 = j+1 - nBusy; + nTBusy1 = nBusy; + if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break; + } + + /* Checkpoint handling. */ + pthread_mutex_lock(&g.ckpt_mutex); + if( rc==SQLITE_OK && g.bCkptRequired ){ + if( g.nWait==g.nRun-1 ){ + /* All other clients are already waiting on the condition variable. + ** Run the checkpoint, signal the condition and move on. */ + rc = sqlite3_wal_checkpoint(p->db, "main"); + g.bCkptRequired = 0; + pthread_cond_broadcast(&g.ckpt_cond); + }else{ + assert( g.nWait=1 && n<=4 && 0==strncmp(z, "list", n) ){ int i; for(i=0; rc==0 && inPrepare; i++){ - const char *zSql = sqlite3_sql(p->apPrepare[i]); + const char *zSql = sqlite3_sql(p->aPrepare[i].pStmt); int nSql = strlen(zSql); trim_string(&zSql, &nSql); rc = send_message(p, "%d: %.*s\n", i, nSql, zSql); @@ -219,56 +347,7 @@ static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){ } else if( n>=2 && n<=3 && 0==strncmp(z, "run", n) ){ - int i, j; - int nBusy = 0; - sqlite3_int64 t0 = get_timer(); - sqlite3_int64 t1 = t0; - int nT1 = 0; - int nTBusy1 = 0; - - for(j=0; (p->nRepeat<=0 || jnRepeat) && rc==SQLITE_OK; j++){ - sqlite3_int64 t2; - - for(i=0; inPrepare && rc==SQLITE_OK; i++){ - sqlite3_stmt *pStmt = p->apPrepare[i]; - - /* Execute the statement */ - while( sqlite3_step(pStmt)==SQLITE_ROW ); - rc = sqlite3_reset(pStmt); - - if( (rc & 0xFF)==SQLITE_BUSY ){ - if( sqlite3_get_autocommit(p->db)==0 ){ - sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0); - } - nBusy++; - rc = SQLITE_OK; - break; - } - else if( rc!=SQLITE_OK ){ - send_message(p, "error - %s\n", sqlite3_errmsg(p->db)); - } - } - - t2 = get_timer(); - if( t2>=(t1+1000) ){ - int nMs = (t2 - t1); - int nDone = (j+1 - nBusy - nT1); - - rc = send_message( - p, "(%d done @ %d per second, %d busy)\n", - nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1 - ); - t1 = t2; - nT1 = j+1 - nBusy; - nTBusy1 = nBusy; - if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break; - } - } - - if( rc==SQLITE_OK ){ - send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j); - } - clear_sql(p); + rc = handle_run_command(p); } else if( n>=1 && n<=7 && 0==strncmp(z, "seconds", n) ){ @@ -279,10 +358,18 @@ static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){ rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat); } + else if( n>=1 && n<=12 && 0==strncmp(z, "mutex_commit", n) ){ + rc = handle_some_sql(p, "COMMIT;", 7); + if( rc==SQLITE_OK ){ + p->aPrepare[p->nPrepare-1].bMutex = 1; + } + } + else{ send_message(p, "unrecognized dot command: %.*s\n" - "should be \"list\", \"run\", \"repeats\", or \"seconds\"\n", n, z + "should be \"list\", \"run\", \"repeats\", \"mutex_commit\" " + "or \"seconds\"\n", n, z ); rc = 1; } @@ -301,8 +388,8 @@ static void *handle_client(void *pArg){ ctx.fd = (int)(intptr_t)pArg; ctx.nRepeat = 1; - rc = sqlite3_open_v2(zDatabaseName, &ctx.db, - SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, zGlobalVfs + rc = sqlite3_open_v2(g.zDatabaseName, &ctx.db, + SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs ); if( rc!=SQLITE_OK ){ fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(ctx.db)); @@ -313,6 +400,9 @@ static void *handle_client(void *pArg){ usleepFunc, 0, 0 ); + /* Register the wal-hook with the new client connection */ + sqlite3_wal_hook(ctx.db, clientWalHook, (void*)&ctx); + while( rc==SQLITE_OK ){ int i; int iStart; @@ -379,7 +469,7 @@ static void *handle_client(void *pArg){ fprintf(stdout, "Client %d disconnects\n", ctx.fd); close(ctx.fd); clear_sql(&ctx); - sqlite3_free(ctx.apPrepare); + sqlite3_free(ctx.aPrepare); sqlite3_close(ctx.db); return 0; } @@ -406,12 +496,17 @@ int main(int argc, char *argv[]) { if( argc==4 ){ int n = strlen(argv[1]); if( n<2 || n>4 || memcmp("-vfs", argv[1], 4) ) usage(argv[0]); - zGlobalVfs = argv[2]; + g.zVfs = argv[2]; } - zDatabaseName = argv[argc-1]; + g.zDatabaseName = argv[argc-1]; + g.commit_mutex = sqlite3_mutex_alloc(SQLITE_MUTEX_FAST); + + g.nThreshold = TSERVER_DEFAULT_CHECKPOINT_THRESHOLD; + pthread_mutex_init(&g.ckpt_mutex, 0); + pthread_cond_init(&g.ckpt_cond, 0); - rc = sqlite3_open_v2(zDatabaseName, &db, - SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, zGlobalVfs + rc = sqlite3_open_v2(g.zDatabaseName, &db, + SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs ); if( rc!=SQLITE_OK ){ fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(db));