-C Reduce\sthe\snumber\sof\smallocs\srequired\sof\swriters\sin\sserver\smode.
-D 2017-07-28T21:02:13.200
+C Update\stest\sprogram\s"tserver"\sto\suse\sa\snative\spthreads\smutex/condition\nvariable\sto\sefficiently\smanage\swal\sfile\scheckpoints\swithout\sthe\swal\sfile\ngrowing\sindefinitely.
+D 2017-07-29T17:01:06.529
F Makefile.in d9873c9925917cca9990ee24be17eb9613a668012c85a343aef7e5536ae266e8
F Makefile.linux-gcc 7bc79876b875010e8c8f9502eb935ca92aa3c434
F Makefile.msc 02b469e9dcd5b7ee63fc1fb05babc174260ee4cfa4e0ef2e48c3c6801567a016
F src/os_unix.c 98df292b72d8c15c6292be663ee917ac0316a0d9ec652e6e55e06bc4d83f84b7
F src/os_win.c 2a6c73eef01c51a048cc4ddccd57f981afbec18a
F src/os_win.h 7b073010f1451abe501be30d12f6bc599824944a
-F src/pager.c d3a15bf7861cc9be235ff424cdddde7fae3cd06fbcacbd967c6dad2d313e1d90
-F src/pager.h 857d21f70acefc76e5bae4584f0133e754a2448b197d1e79b7179f26f9a11bc1
+F src/pager.c 9c2006fb3c4d4a985e79de21cb5e5be22c8ae663159cbd0bf5e708803bf53678
+F src/pager.h 316dac0671fd7555af9e73d4357febd5f2d3ce6a185ffd8d77b7fc0423ac8b1a
F src/parse.y 3a1babd6645a8103898a5e7e239dcf56cdafbdc25fd8133bb4a9160f9471d42d
F src/pcache.c 62835bed959e2914edd26afadfecce29ece0e870
F src/pcache.h 521bb9610d38ef17a3cc9b5ddafd4546c2ea67fa3d0e464823d73c2a28d50e11
F src/resolve.c 4324a94573b1e29286f8121e4881db59eaedc014afeb274c8d3e07ed282e0e20
F src/rowset.c 7b7e7e479212e65b723bf40128c7b36dc5afdfac
F src/select.c c6bf96a7f9d7d68f929de84738c599a30d0a725ab0b54420e70545743cd5ee7b
-F src/server.c cc07e5ee19436c08ae2331e8476db0c968ade42528df68cfa40eb58314cd21e1
+F src/server.c ef4a69ea4124c89fe73ee0889929c089290e240a3a5f27fa28beea51cb855e51
F src/server.h adcc122084f9370c91479bd9f7bbac1ccd7f63784249de40f63dae8a9fae1bfe
F src/shell.c bd6a37cbe8bf64ef6a6a74fdc50f067d3148149b4ce2b4d03154663e66ded55f
F src/shell.c.in b5725acacba95ccefa57b6d068f710e29ba8239c3aa704628a1902a1f729c175
F tool/symbols-mingw.sh 4dbcea7e74768305384c9fd2ed2b41bbf9f0414d
F tool/symbols.sh c5a617b8c61a0926747a56c65f5671ef8ac0e148
F tool/tostr.tcl 96022f35ada2194f6f8ccf6fd95809e90ed277c4
-F tool/tserver.c b3f368766b6bcef6d58b057de2fc3d3e33510b2b92fd0a53a6bfbe243b58d027
+F tool/tserver.c d6b4e074dc9d11edf9aac79cfd925cf1ea71f823b00344bc443e1aaea1f16d5b
F tool/varint.c 5d94cb5003db9dbbcbcc5df08d66f16071aee003
F tool/vdbe-compress.tcl 5926c71f9c12d2ab73ef35c29376e756eb68361c
F tool/vdbe_profile.tcl 246d0da094856d72d2c12efec03250d71639d19f
F vsixtest/vsixtest.vcxproj.data 2ed517e100c66dc455b492e1a33350c1b20fbcdc
F vsixtest/vsixtest.vcxproj.filters 37e51ffedcdb064aad6ff33b6148725226cd608e
F vsixtest/vsixtest_TemporaryKey.pfx e5b1b036facdb453873e7084e1cae9102ccc67a0
-P d0719ad757bdf7cf2d7d7a4f7b0e713c262ffb434b91ddbb12e190e479abd19e
-R 45df4a0aef3166ba81de04b37f949988
+P 60953997f62208f82b1efb53b8a1b0c6a26370411041457f747917e10d9a0e68
+R f90a2e0fad39683a4120989a28a36559
U dan
-Z b12068c542ae27866a718f970ab635de
+Z 371e271b5452f3a4804ed5b5db0b3183
** .run Run all SQL statements in the list.
** .repeats N Configure the number of repeats per ".run".
** .seconds N Configure the number of seconds to ".run" for.
+** .mutex_commit Add a "COMMIT" protected by a g.commit_mutex
+** to the current SQL.
**
** Example input:
**
#include "sqlite3.h"
-/* Database used by this server */
-static char *zDatabaseName = 0;
+#define TSERVER_DEFAULT_CHECKPOINT_THRESHOLD 3900
+
+/* Global variables */
+struct TserverGlobal {
+ char *zDatabaseName; /* Database used by this server */
+ char *zVfs;
+ sqlite3_mutex *commit_mutex;
+
+ /* The following use native pthreads instead of a portable interface. This
+ ** is because a condition variable, as well as a mutex, is required. */
+ pthread_mutex_t ckpt_mutex;
+ pthread_cond_t ckpt_cond;
+ int nThreshold; /* Checkpoint when wal is this large */
+ int bCkptRequired; /* True if wal checkpoint is required */
+ int nRun; /* Number of clients in ".run" */
+ int nWait; /* Number of clients waiting on ckpt_cond */
+};
+
+static struct TserverGlobal g = {0};
-static char *zGlobalVfs = 0;
+typedef struct ClientSql ClientSql;
+struct ClientSql {
+ sqlite3_stmt *pStmt;
+ int bMutex;
+};
typedef struct ClientCtx ClientCtx;
struct ClientCtx {
int fd; /* Client fd */
int nRepeat; /* Number of times to repeat SQL */
int nSecond; /* Number of seconds to run for */
- sqlite3_stmt **apPrepare; /* Array of prepared statements */
+ ClientSql *aPrepare; /* Array of prepared statements */
int nPrepare; /* Valid size of apPrepare[] */
int nAlloc; /* Allocated size of apPrepare[] */
};
while( rc==SQLITE_OK ){
if( p->nPrepare>=p->nAlloc ){
- int nByte = (p->nPrepare+32) * sizeof(sqlite3_stmt*);
- sqlite3_stmt **apNew = sqlite3_realloc(p->apPrepare, nByte);
- if( apNew ){
- p->apPrepare = apNew;
+ int nByte = (p->nPrepare+32) * sizeof(ClientSql);
+ ClientSql *aNew = sqlite3_realloc(p->aPrepare, nByte);
+ if( aNew ){
+ memset(&aNew[p->nPrepare], 0, sizeof(ClientSql)*32);
+ p->aPrepare = aNew;
p->nAlloc = p->nPrepare+32;
}else{
rc = SQLITE_NOMEM;
}
}
rc = sqlite3_prepare_v2(
- p->db, zTail, nTail, &p->apPrepare[p->nPrepare], &zTail
+ p->db, zTail, nTail, &p->aPrepare[p->nPrepare].pStmt, &zTail
);
if( rc!=SQLITE_OK ){
send_message(p, "error - %s\n", sqlite3_errmsg(p->db));
rc = 1;
break;
}
- if( p->apPrepare[p->nPrepare]==0 ){
+ if( p->aPrepare[p->nPrepare].pStmt==0 ){
break;
}
p->nPrepare++;
static void clear_sql(ClientCtx *p){
int j;
for(j=0; j<p->nPrepare; j++){
- sqlite3_finalize(p->apPrepare[j]);
+ sqlite3_finalize(p->aPrepare[j].pStmt);
}
p->nPrepare = 0;
}
+/*
+** The sqlite3_wal_hook() callback used by all client database connections.
+*/
+static int clientWalHook(void *pArg, sqlite3 *db, const char *zDb, int nFrame){
+ if( nFrame>=g.nThreshold ){
+ g.bCkptRequired = 1;
+ }
+ return SQLITE_OK;
+}
+
+static int handle_run_command(ClientCtx *p){
+ int i, j;
+ int nBusy = 0;
+ sqlite3_int64 t0 = get_timer();
+ sqlite3_int64 t1 = t0;
+ int nT1 = 0;
+ int nTBusy1 = 0;
+ int rc = SQLITE_OK;
+
+ pthread_mutex_lock(&g.ckpt_mutex);
+ g.nRun++;
+ pthread_mutex_unlock(&g.ckpt_mutex);
+
+
+ for(j=0; (p->nRepeat<=0 || j<p->nRepeat) && rc==SQLITE_OK; j++){
+ sqlite3_int64 t2;
+
+ for(i=0; i<p->nPrepare && rc==SQLITE_OK; i++){
+ sqlite3_stmt *pStmt = p->aPrepare[i].pStmt;
+
+ /* If the bMutex flag is set, grab g.commit_mutex before executing
+ ** the SQL statement (which is always "COMMIT" in this case). */
+ if( p->aPrepare[i].bMutex ){
+ sqlite3_mutex_enter(g.commit_mutex);
+ }
+
+ /* Execute the statement */
+ while( sqlite3_step(pStmt)==SQLITE_ROW );
+ rc = sqlite3_reset(pStmt);
+
+ /* Relinquish the g.commit_mutex mutex if required. */
+ if( p->aPrepare[i].bMutex ){
+ sqlite3_mutex_leave(g.commit_mutex);
+ }
+
+ if( (rc & 0xFF)==SQLITE_BUSY ){
+ if( sqlite3_get_autocommit(p->db)==0 ){
+ sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0);
+ }
+ nBusy++;
+ rc = SQLITE_OK;
+ break;
+ }
+ else if( rc!=SQLITE_OK ){
+ send_message(p, "error - %s\n", sqlite3_errmsg(p->db));
+ }
+ }
+
+ t2 = get_timer();
+ if( t2>=(t1+1000) ){
+ int nMs = (t2 - t1);
+ int nDone = (j+1 - nBusy - nT1);
+
+ rc = send_message(
+ p, "(%d done @ %d per second, %d busy)\n",
+ nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1
+ );
+ t1 = t2;
+ nT1 = j+1 - nBusy;
+ nTBusy1 = nBusy;
+ if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break;
+ }
+
+ /* Checkpoint handling. */
+ pthread_mutex_lock(&g.ckpt_mutex);
+ if( rc==SQLITE_OK && g.bCkptRequired ){
+ if( g.nWait==g.nRun-1 ){
+ /* All other clients are already waiting on the condition variable.
+ ** Run the checkpoint, signal the condition and move on. */
+ rc = sqlite3_wal_checkpoint(p->db, "main");
+ g.bCkptRequired = 0;
+ pthread_cond_broadcast(&g.ckpt_cond);
+ }else{
+ assert( g.nWait<g.nRun-1 );
+ g.nWait++;
+ pthread_cond_wait(&g.ckpt_cond, &g.ckpt_mutex);
+ g.nWait--;
+ }
+ }
+ pthread_mutex_unlock(&g.ckpt_mutex);
+ }
+
+ if( rc==SQLITE_OK ){
+ send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j);
+ }
+ clear_sql(p);
+
+ pthread_mutex_lock(&g.ckpt_mutex);
+ g.nRun--;
+ pthread_mutex_unlock(&g.ckpt_mutex);
+
+ return rc;
+}
+
static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){
- assert( zCmd[0]=='.' );
int n;
int rc = 0;
const char *z = &zCmd[1];
const char *zArg;
int nArg;
+ assert( zCmd[0]=='.' );
for(n=0; n<(nCmd-1); n++){
if( is_whitespace(z[n]) ) break;
}
if( n>=1 && n<=4 && 0==strncmp(z, "list", n) ){
int i;
for(i=0; rc==0 && i<p->nPrepare; i++){
- const char *zSql = sqlite3_sql(p->apPrepare[i]);
+ const char *zSql = sqlite3_sql(p->aPrepare[i].pStmt);
int nSql = strlen(zSql);
trim_string(&zSql, &nSql);
rc = send_message(p, "%d: %.*s\n", i, nSql, zSql);
}
else if( n>=2 && n<=3 && 0==strncmp(z, "run", n) ){
- int i, j;
- int nBusy = 0;
- sqlite3_int64 t0 = get_timer();
- sqlite3_int64 t1 = t0;
- int nT1 = 0;
- int nTBusy1 = 0;
-
- for(j=0; (p->nRepeat<=0 || j<p->nRepeat) && rc==SQLITE_OK; j++){
- sqlite3_int64 t2;
-
- for(i=0; i<p->nPrepare && rc==SQLITE_OK; i++){
- sqlite3_stmt *pStmt = p->apPrepare[i];
-
- /* Execute the statement */
- while( sqlite3_step(pStmt)==SQLITE_ROW );
- rc = sqlite3_reset(pStmt);
-
- if( (rc & 0xFF)==SQLITE_BUSY ){
- if( sqlite3_get_autocommit(p->db)==0 ){
- sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0);
- }
- nBusy++;
- rc = SQLITE_OK;
- break;
- }
- else if( rc!=SQLITE_OK ){
- send_message(p, "error - %s\n", sqlite3_errmsg(p->db));
- }
- }
-
- t2 = get_timer();
- if( t2>=(t1+1000) ){
- int nMs = (t2 - t1);
- int nDone = (j+1 - nBusy - nT1);
-
- rc = send_message(
- p, "(%d done @ %d per second, %d busy)\n",
- nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1
- );
- t1 = t2;
- nT1 = j+1 - nBusy;
- nTBusy1 = nBusy;
- if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break;
- }
- }
-
- if( rc==SQLITE_OK ){
- send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j);
- }
- clear_sql(p);
+ rc = handle_run_command(p);
}
else if( n>=1 && n<=7 && 0==strncmp(z, "seconds", n) ){
rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat);
}
+ else if( n>=1 && n<=12 && 0==strncmp(z, "mutex_commit", n) ){
+ rc = handle_some_sql(p, "COMMIT;", 7);
+ if( rc==SQLITE_OK ){
+ p->aPrepare[p->nPrepare-1].bMutex = 1;
+ }
+ }
+
else{
send_message(p,
"unrecognized dot command: %.*s\n"
- "should be \"list\", \"run\", \"repeats\", or \"seconds\"\n", n, z
+ "should be \"list\", \"run\", \"repeats\", \"mutex_commit\" "
+ "or \"seconds\"\n", n, z
);
rc = 1;
}
ctx.fd = (int)(intptr_t)pArg;
ctx.nRepeat = 1;
- rc = sqlite3_open_v2(zDatabaseName, &ctx.db,
- SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, zGlobalVfs
+ rc = sqlite3_open_v2(g.zDatabaseName, &ctx.db,
+ SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs
);
if( rc!=SQLITE_OK ){
fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(ctx.db));
usleepFunc, 0, 0
);
+ /* Register the wal-hook with the new client connection */
+ sqlite3_wal_hook(ctx.db, clientWalHook, (void*)&ctx);
+
while( rc==SQLITE_OK ){
int i;
int iStart;
fprintf(stdout, "Client %d disconnects\n", ctx.fd);
close(ctx.fd);
clear_sql(&ctx);
- sqlite3_free(ctx.apPrepare);
+ sqlite3_free(ctx.aPrepare);
sqlite3_close(ctx.db);
return 0;
}
if( argc==4 ){
int n = strlen(argv[1]);
if( n<2 || n>4 || memcmp("-vfs", argv[1], 4) ) usage(argv[0]);
- zGlobalVfs = argv[2];
+ g.zVfs = argv[2];
}
- zDatabaseName = argv[argc-1];
+ g.zDatabaseName = argv[argc-1];
+ g.commit_mutex = sqlite3_mutex_alloc(SQLITE_MUTEX_FAST);
+
+ g.nThreshold = TSERVER_DEFAULT_CHECKPOINT_THRESHOLD;
+ pthread_mutex_init(&g.ckpt_mutex, 0);
+ pthread_cond_init(&g.ckpt_cond, 0);
- rc = sqlite3_open_v2(zDatabaseName, &db,
- SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, zGlobalVfs
+ rc = sqlite3_open_v2(g.zDatabaseName, &db,
+ SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs
);
if( rc!=SQLITE_OK ){
fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(db));