]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Update test program "tserver" to use a native pthreads mutex/condition
authordan <dan@noemail.net>
Sat, 29 Jul 2017 17:01:06 +0000 (17:01 +0000)
committerdan <dan@noemail.net>
Sat, 29 Jul 2017 17:01:06 +0000 (17:01 +0000)
variable to efficiently manage wal file checkpoints without the wal file
growing indefinitely.

FossilOrigin-Name: 8299bdb7cbede30c665dda131bdcbd1d260b3ae9bd16d9b414d8c3776b08f1b3

manifest
manifest.uuid
src/pager.c
src/pager.h
src/server.c
tool/tserver.c

index 24090665cf7955e4be5274dd76ee017e7e33cd6b..0d9e30de885cbce6f214cb0ebf3c01381362c592 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Reduce\sthe\snumber\sof\smallocs\srequired\sof\swriters\sin\sserver\smode.
-D 2017-07-28T21:02:13.200
+C Update\stest\sprogram\s"tserver"\sto\suse\sa\snative\spthreads\smutex/condition\nvariable\sto\sefficiently\smanage\swal\sfile\scheckpoints\swithout\sthe\swal\sfile\ngrowing\sindefinitely.
+D 2017-07-29T17:01:06.529
 F Makefile.in d9873c9925917cca9990ee24be17eb9613a668012c85a343aef7e5536ae266e8
 F Makefile.linux-gcc 7bc79876b875010e8c8f9502eb935ca92aa3c434
 F Makefile.msc 02b469e9dcd5b7ee63fc1fb05babc174260ee4cfa4e0ef2e48c3c6801567a016
@@ -439,8 +439,8 @@ F src/os_setup.h 0dbaea40a7d36bf311613d31342e0b99e2536586
 F src/os_unix.c 98df292b72d8c15c6292be663ee917ac0316a0d9ec652e6e55e06bc4d83f84b7
 F src/os_win.c 2a6c73eef01c51a048cc4ddccd57f981afbec18a
 F src/os_win.h 7b073010f1451abe501be30d12f6bc599824944a
-F src/pager.c d3a15bf7861cc9be235ff424cdddde7fae3cd06fbcacbd967c6dad2d313e1d90
-F src/pager.h 857d21f70acefc76e5bae4584f0133e754a2448b197d1e79b7179f26f9a11bc1
+F src/pager.c 9c2006fb3c4d4a985e79de21cb5e5be22c8ae663159cbd0bf5e708803bf53678
+F src/pager.h 316dac0671fd7555af9e73d4357febd5f2d3ce6a185ffd8d77b7fc0423ac8b1a
 F src/parse.y 3a1babd6645a8103898a5e7e239dcf56cdafbdc25fd8133bb4a9160f9471d42d
 F src/pcache.c 62835bed959e2914edd26afadfecce29ece0e870
 F src/pcache.h 521bb9610d38ef17a3cc9b5ddafd4546c2ea67fa3d0e464823d73c2a28d50e11
@@ -453,7 +453,7 @@ F src/random.c 80f5d666f23feb3e6665a6ce04c7197212a88384
 F src/resolve.c 4324a94573b1e29286f8121e4881db59eaedc014afeb274c8d3e07ed282e0e20
 F src/rowset.c 7b7e7e479212e65b723bf40128c7b36dc5afdfac
 F src/select.c c6bf96a7f9d7d68f929de84738c599a30d0a725ab0b54420e70545743cd5ee7b
-F src/server.c cc07e5ee19436c08ae2331e8476db0c968ade42528df68cfa40eb58314cd21e1
+F src/server.c ef4a69ea4124c89fe73ee0889929c089290e240a3a5f27fa28beea51cb855e51
 F src/server.h adcc122084f9370c91479bd9f7bbac1ccd7f63784249de40f63dae8a9fae1bfe
 F src/shell.c bd6a37cbe8bf64ef6a6a74fdc50f067d3148149b4ce2b4d03154663e66ded55f
 F src/shell.c.in b5725acacba95ccefa57b6d068f710e29ba8239c3aa704628a1902a1f729c175
@@ -1618,7 +1618,7 @@ F tool/stack_usage.tcl f8e71b92cdb099a147dad572375595eae55eca43
 F tool/symbols-mingw.sh 4dbcea7e74768305384c9fd2ed2b41bbf9f0414d
 F tool/symbols.sh c5a617b8c61a0926747a56c65f5671ef8ac0e148
 F tool/tostr.tcl 96022f35ada2194f6f8ccf6fd95809e90ed277c4
-F tool/tserver.c b3f368766b6bcef6d58b057de2fc3d3e33510b2b92fd0a53a6bfbe243b58d027
+F tool/tserver.c d6b4e074dc9d11edf9aac79cfd925cf1ea71f823b00344bc443e1aaea1f16d5b
 F tool/varint.c 5d94cb5003db9dbbcbcc5df08d66f16071aee003
 F tool/vdbe-compress.tcl 5926c71f9c12d2ab73ef35c29376e756eb68361c
 F tool/vdbe_profile.tcl 246d0da094856d72d2c12efec03250d71639d19f
@@ -1646,7 +1646,7 @@ F vsixtest/vsixtest.tcl 6a9a6ab600c25a91a7acc6293828957a386a8a93
 F vsixtest/vsixtest.vcxproj.data 2ed517e100c66dc455b492e1a33350c1b20fbcdc
 F vsixtest/vsixtest.vcxproj.filters 37e51ffedcdb064aad6ff33b6148725226cd608e
 F vsixtest/vsixtest_TemporaryKey.pfx e5b1b036facdb453873e7084e1cae9102ccc67a0
-P d0719ad757bdf7cf2d7d7a4f7b0e713c262ffb434b91ddbb12e190e479abd19e
-R 45df4a0aef3166ba81de04b37f949988
+P 60953997f62208f82b1efb53b8a1b0c6a26370411041457f747917e10d9a0e68
+R f90a2e0fad39683a4120989a28a36559
 U dan
-Z b12068c542ae27866a718f970ab635de
+Z 371e271b5452f3a4804ed5b5db0b3183
index 4dccf405fb2936a7fd30879cba300cb3a088ded5..03504fbc9ec8f30f3d3ce123b797e51f6388adf3 100644 (file)
@@ -1 +1 @@
-60953997f62208f82b1efb53b8a1b0c6a26370411041457f747917e10d9a0e68
\ No newline at end of file
+8299bdb7cbede30c665dda131bdcbd1d260b3ae9bd16d9b414d8c3776b08f1b3
\ No newline at end of file
index 14ec8321b572ca3ad1bf141d30dcfa5c28d11e82..ab232f8c602726a0517205ec2109931dd40ad542 100644 (file)
@@ -7747,9 +7747,6 @@ int sqlite3PagerWalFramesize(Pager *pPager){
 #endif
 
 #ifdef SQLITE_SERVER_EDITION
-int sqlite3PagerIsServer(Pager *pPager){
-  return pagerIsServer(pPager);
-}
 int sqlite3PagerPagelock(Pager *pPager, Pgno pgno, int bWrite){
   if( pagerIsServer(pPager)==0 ) return SQLITE_OK;
   return sqlite3ServerLock(pPager->pServer, pgno, bWrite, 0);
index bc9405837bbe0bfbf6154acdf8da4ada2a8928fd..b214eadb3f243dd370219f943d0e862efe83e677 100644 (file)
@@ -238,7 +238,6 @@ void *sqlite3PagerCodec(DbPage *);
 
 #ifdef SQLITE_SERVER_EDITION
   int sqlite3PagerRollbackJournal(Pager*, sqlite3_file*);
-  int sqlite3PagerIsServer(Pager *pPager);
   int sqlite3PagerPagelock(Pager *pPager, Pgno, int);
   void sqlite3PagerServerJournal(Pager*, sqlite3_file*, const char*);
 #endif
index 5632f771bffd8a16334a9b81ba8c25229129f602..2e0ac47cfca0e9f68d2700a711940c4f23189493 100644 (file)
@@ -289,7 +289,7 @@ void sqlite3ServerDisconnect(Server *p, sqlite3_file *dbfd){
     for(pp=&g_server.pDb; *pp!=pDb; pp=&((*pp)->pNext));
     *pp = pDb->pNext;
     sqlite3_mutex_free(pDb->mutex);
-    while( pFree=pDb->pFree ){
+    while( (pFree = pDb->pFree) ){
       pDb->pFree = pFree->pNext;
       sqlite3_free(pFree);
     }
index ba44f0df3c77f9f89b6d0e556d61d0fcc1214358..71f5b93e73a5b0e31ab741bc879924444ecaaed0 100644 (file)
@@ -32,6 +32,8 @@
 **   .run                     Run all SQL statements in the list.
 **   .repeats N               Configure the number of repeats per ".run".
 **   .seconds N               Configure the number of seconds to ".run" for.
+**   .mutex_commit            Add a "COMMIT" protected by a g.commit_mutex
+**                            to the current SQL.
 **
 ** Example input:
 **
 
 #include "sqlite3.h"
 
-/* Database used by this server */
-static char *zDatabaseName = 0;
+#define TSERVER_DEFAULT_CHECKPOINT_THRESHOLD 3900
+
+/* Global variables */
+struct TserverGlobal {
+  char *zDatabaseName;             /* Database used by this server */
+  char *zVfs;
+  sqlite3_mutex *commit_mutex;
+
+  /* The following use native pthreads instead of a portable interface. This
+  ** is because a condition variable, as well as a mutex, is required.  */
+  pthread_mutex_t ckpt_mutex;
+  pthread_cond_t ckpt_cond;
+  int nThreshold;                  /* Checkpoint when wal is this large */
+  int bCkptRequired;               /* True if wal checkpoint is required */
+  int nRun;                        /* Number of clients in ".run" */
+  int nWait;                       /* Number of clients waiting on ckpt_cond */
+};
+
+static struct TserverGlobal g = {0};
 
-static char *zGlobalVfs = 0;
+typedef struct ClientSql ClientSql;
+struct ClientSql {
+  sqlite3_stmt *pStmt;
+  int bMutex;
+};
 
 typedef struct ClientCtx ClientCtx;
 struct ClientCtx {
@@ -71,7 +94,7 @@ struct ClientCtx {
   int fd;                         /* Client fd */
   int nRepeat;                    /* Number of times to repeat SQL */
   int nSecond;                    /* Number of seconds to run for */
-  sqlite3_stmt **apPrepare;       /* Array of prepared statements */
+  ClientSql *aPrepare;            /* Array of prepared statements */
   int nPrepare;                   /* Valid size of apPrepare[] */
   int nAlloc;                     /* Allocated size of apPrepare[] */
 };
@@ -137,10 +160,11 @@ static int handle_some_sql(ClientCtx *p, const char *zSql, int nSql){
 
   while( rc==SQLITE_OK ){
     if( p->nPrepare>=p->nAlloc ){
-      int nByte = (p->nPrepare+32) * sizeof(sqlite3_stmt*);
-      sqlite3_stmt **apNew = sqlite3_realloc(p->apPrepare, nByte);
-      if( apNew ){
-        p->apPrepare = apNew;
+      int nByte = (p->nPrepare+32) * sizeof(ClientSql);
+      ClientSql *aNew = sqlite3_realloc(p->aPrepare, nByte);
+      if( aNew ){
+        memset(&aNew[p->nPrepare], 0, sizeof(ClientSql)*32);
+        p->aPrepare = aNew;
         p->nAlloc = p->nPrepare+32;
       }else{
         rc = SQLITE_NOMEM;
@@ -148,14 +172,14 @@ static int handle_some_sql(ClientCtx *p, const char *zSql, int nSql){
       }
     }
     rc = sqlite3_prepare_v2(
-        p->db, zTail, nTail, &p->apPrepare[p->nPrepare], &zTail
+        p->db, zTail, nTail, &p->aPrepare[p->nPrepare].pStmt, &zTail
     );
     if( rc!=SQLITE_OK ){
       send_message(p, "error - %s\n", sqlite3_errmsg(p->db));
       rc = 1;
       break;
     }
-    if( p->apPrepare[p->nPrepare]==0 ){
+    if( p->aPrepare[p->nPrepare].pStmt==0 ){
       break;
     }
     p->nPrepare++;
@@ -175,19 +199,123 @@ static sqlite3_int64 get_timer(void){
 static void clear_sql(ClientCtx *p){
   int j;
   for(j=0; j<p->nPrepare; j++){
-    sqlite3_finalize(p->apPrepare[j]);
+    sqlite3_finalize(p->aPrepare[j].pStmt);
   }
   p->nPrepare = 0;
 }
 
+/*
+** The sqlite3_wal_hook() callback used by all client database connections.
+*/
+static int clientWalHook(void *pArg, sqlite3 *db, const char *zDb, int nFrame){
+  if( nFrame>=g.nThreshold ){
+    g.bCkptRequired = 1;
+  }
+  return SQLITE_OK;
+}
+
+static int handle_run_command(ClientCtx *p){
+  int i, j;
+  int nBusy = 0;
+  sqlite3_int64 t0 = get_timer();
+  sqlite3_int64 t1 = t0;
+  int nT1 = 0;
+  int nTBusy1 = 0;
+  int rc = SQLITE_OK;
+
+  pthread_mutex_lock(&g.ckpt_mutex);
+  g.nRun++;
+  pthread_mutex_unlock(&g.ckpt_mutex);
+
+
+  for(j=0; (p->nRepeat<=0 || j<p->nRepeat) && rc==SQLITE_OK; j++){
+    sqlite3_int64 t2;
+
+    for(i=0; i<p->nPrepare && rc==SQLITE_OK; i++){
+      sqlite3_stmt *pStmt = p->aPrepare[i].pStmt;
+
+      /* If the bMutex flag is set, grab g.commit_mutex before executing
+      ** the SQL statement (which is always "COMMIT" in this case). */
+      if( p->aPrepare[i].bMutex ){
+        sqlite3_mutex_enter(g.commit_mutex);
+      }
+
+      /* Execute the statement */
+      while( sqlite3_step(pStmt)==SQLITE_ROW );
+      rc = sqlite3_reset(pStmt);
+
+      /* Relinquish the g.commit_mutex mutex if required. */
+      if( p->aPrepare[i].bMutex ){
+        sqlite3_mutex_leave(g.commit_mutex);
+      }
+
+      if( (rc & 0xFF)==SQLITE_BUSY ){
+        if( sqlite3_get_autocommit(p->db)==0 ){
+          sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0);
+        }
+        nBusy++;
+        rc = SQLITE_OK;
+        break;
+      }
+      else if( rc!=SQLITE_OK ){
+        send_message(p, "error - %s\n", sqlite3_errmsg(p->db));
+      }
+    }
+
+    t2 = get_timer();
+    if( t2>=(t1+1000) ){
+      int nMs = (t2 - t1);
+      int nDone = (j+1 - nBusy - nT1);
+
+      rc = send_message(
+          p, "(%d done @ %d per second, %d busy)\n", 
+          nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1
+          );
+      t1 = t2;
+      nT1 = j+1 - nBusy;
+      nTBusy1 = nBusy;
+      if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break;
+    }
+
+    /* Checkpoint handling. */
+    pthread_mutex_lock(&g.ckpt_mutex);
+    if( rc==SQLITE_OK && g.bCkptRequired ){
+      if( g.nWait==g.nRun-1 ){
+        /* All other clients are already waiting on the condition variable.
+        ** Run the checkpoint, signal the condition and move on.  */
+        rc = sqlite3_wal_checkpoint(p->db, "main");
+        g.bCkptRequired = 0;
+        pthread_cond_broadcast(&g.ckpt_cond);
+      }else{
+        assert( g.nWait<g.nRun-1 );
+        g.nWait++;
+        pthread_cond_wait(&g.ckpt_cond, &g.ckpt_mutex);
+        g.nWait--;
+      }
+    }
+    pthread_mutex_unlock(&g.ckpt_mutex);
+  }
+
+  if( rc==SQLITE_OK ){
+    send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j);
+  }
+  clear_sql(p);
+
+  pthread_mutex_lock(&g.ckpt_mutex);
+  g.nRun--;
+  pthread_mutex_unlock(&g.ckpt_mutex);
+
+  return rc;
+}
+
 static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){
-  assert( zCmd[0]=='.' );
   int n;
   int rc = 0;
   const char *z = &zCmd[1];
   const char *zArg;
   int nArg;
 
+  assert( zCmd[0]=='.' );
   for(n=0; n<(nCmd-1); n++){
     if( is_whitespace(z[n]) ) break;
   }
@@ -199,7 +327,7 @@ static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){
   if( n>=1 && n<=4 && 0==strncmp(z, "list", n) ){
     int i;
     for(i=0; rc==0 && i<p->nPrepare; i++){
-      const char *zSql = sqlite3_sql(p->apPrepare[i]);
+      const char *zSql = sqlite3_sql(p->aPrepare[i].pStmt);
       int nSql = strlen(zSql);
       trim_string(&zSql, &nSql);
       rc = send_message(p, "%d: %.*s\n", i, nSql, zSql);
@@ -219,56 +347,7 @@ static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){
   }
 
   else if( n>=2 && n<=3 && 0==strncmp(z, "run", n) ){
-    int i, j;
-    int nBusy = 0;
-    sqlite3_int64 t0 = get_timer();
-    sqlite3_int64 t1 = t0;
-    int nT1 = 0;
-    int nTBusy1 = 0;
-
-    for(j=0; (p->nRepeat<=0 || j<p->nRepeat) && rc==SQLITE_OK; j++){
-      sqlite3_int64 t2;
-
-      for(i=0; i<p->nPrepare && rc==SQLITE_OK; i++){
-        sqlite3_stmt *pStmt = p->apPrepare[i];
-
-        /* Execute the statement */
-        while( sqlite3_step(pStmt)==SQLITE_ROW );
-        rc = sqlite3_reset(pStmt);
-
-        if( (rc & 0xFF)==SQLITE_BUSY ){
-          if( sqlite3_get_autocommit(p->db)==0 ){
-            sqlite3_exec(p->db, "ROLLBACK", 0, 0, 0);
-          }
-          nBusy++;
-          rc = SQLITE_OK;
-          break;
-        }
-        else if( rc!=SQLITE_OK ){
-          send_message(p, "error - %s\n", sqlite3_errmsg(p->db));
-        }
-      }
-
-      t2 = get_timer();
-      if( t2>=(t1+1000) ){
-        int nMs = (t2 - t1);
-        int nDone = (j+1 - nBusy - nT1);
-
-        rc = send_message(
-            p, "(%d done @ %d per second, %d busy)\n", 
-            nDone, (1000*nDone + nMs/2) / nMs, nBusy - nTBusy1
-        );
-        t1 = t2;
-        nT1 = j+1 - nBusy;
-        nTBusy1 = nBusy;
-        if( p->nSecond>0 && (p->nSecond*1000)<=t1-t0 ) break;
-      }
-    }
-
-    if( rc==SQLITE_OK ){
-      send_message(p, "ok (%d/%d SQLITE_BUSY)\n", nBusy, j);
-    }
-    clear_sql(p);
+    rc = handle_run_command(p);
   }
 
   else if( n>=1 && n<=7 && 0==strncmp(z, "seconds", n) ){
@@ -279,10 +358,18 @@ static int handle_dot_command(ClientCtx *p, const char *zCmd, int nCmd){
     rc = send_message(p, "ok (repeat=%d)\n", p->nRepeat);
   }
 
+  else if( n>=1 && n<=12 && 0==strncmp(z, "mutex_commit", n) ){
+    rc = handle_some_sql(p, "COMMIT;", 7);
+    if( rc==SQLITE_OK ){
+      p->aPrepare[p->nPrepare-1].bMutex = 1;
+    }
+  }
+
   else{
     send_message(p, 
         "unrecognized dot command: %.*s\n"
-        "should be \"list\", \"run\", \"repeats\", or \"seconds\"\n", n, z
+        "should be \"list\", \"run\", \"repeats\", \"mutex_commit\" "
+        "or \"seconds\"\n", n, z
     );
     rc = 1;
   }
@@ -301,8 +388,8 @@ static void *handle_client(void *pArg){
 
   ctx.fd = (int)(intptr_t)pArg;
   ctx.nRepeat = 1;
-  rc = sqlite3_open_v2(zDatabaseName, &ctx.db,
-      SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, zGlobalVfs
+  rc = sqlite3_open_v2(g.zDatabaseName, &ctx.db,
+      SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs
   );
   if( rc!=SQLITE_OK ){
     fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(ctx.db));
@@ -313,6 +400,9 @@ static void *handle_client(void *pArg){
       usleepFunc, 0, 0
   );
 
+  /* Register the wal-hook with the new client connection */
+  sqlite3_wal_hook(ctx.db, clientWalHook, (void*)&ctx);
+
   while( rc==SQLITE_OK ){
     int i;
     int iStart;
@@ -379,7 +469,7 @@ static void *handle_client(void *pArg){
   fprintf(stdout, "Client %d disconnects\n", ctx.fd);
   close(ctx.fd);
   clear_sql(&ctx);
-  sqlite3_free(ctx.apPrepare);
+  sqlite3_free(ctx.aPrepare);
   sqlite3_close(ctx.db);
   return 0;
 } 
@@ -406,12 +496,17 @@ int main(int argc, char *argv[]) {
   if( argc==4 ){
     int n = strlen(argv[1]);
     if( n<2 || n>4 || memcmp("-vfs", argv[1], 4) ) usage(argv[0]);
-    zGlobalVfs = argv[2];
+    g.zVfs = argv[2];
   }
-  zDatabaseName = argv[argc-1];
+  g.zDatabaseName = argv[argc-1];
+  g.commit_mutex = sqlite3_mutex_alloc(SQLITE_MUTEX_FAST);
+
+  g.nThreshold = TSERVER_DEFAULT_CHECKPOINT_THRESHOLD;
+  pthread_mutex_init(&g.ckpt_mutex, 0);
+  pthread_cond_init(&g.ckpt_cond, 0);
 
-  rc = sqlite3_open_v2(zDatabaseName, &db,
-      SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, zGlobalVfs
+  rc = sqlite3_open_v2(g.zDatabaseName, &db,
+      SQLITE_OPEN_READWRITE|SQLITE_OPEN_CREATE, g.zVfs
   );
   if( rc!=SQLITE_OK ){
     fprintf(stderr, "sqlite3_open(): %s\n", sqlite3_errmsg(db));