]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Use pthreads mutexes and conditions to synchronize threads in bc_test1.
authordan <dan@noemail.net>
Sat, 21 May 2016 18:50:56 +0000 (18:50 +0000)
committerdan <dan@noemail.net>
Sat, 21 May 2016 18:50:56 +0000 (18:50 +0000)
FossilOrigin-Name: f33aa76f074d8686a5a5c0edecabb71cb259c48d

manifest
manifest.uuid
test/bc_test1.c

index 9b43e47d700fc4968a4f1cf45105fc43ef310cbb..8db6dfd33188233f54cb12eef1424e920448c5c3 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Add\soptions\sto\sbc_test1.c\sto\smake\sit\smore\sflexible.
-D 2016-05-07T18:02:53.432
+C Use\spthreads\smutexes\sand\sconditions\sto\ssynchronize\sthreads\sin\sbc_test1.
+D 2016-05-21T18:50:56.299
 F Makefile.in 9e816d0323e418fbc0f8b2c05fc14e0b3763d9e8
 F Makefile.linux-gcc 7bc79876b875010e8c8f9502eb935ca92aa3c434
 F Makefile.msc 71b8b16cf9393f68e2e2035486ca104872558836
@@ -521,7 +521,7 @@ F test/backup_malloc.test 7162d604ec2b4683c4b3799a48657fb8b5e2d450
 F test/badutf.test d5360fc31f643d37a973ab0d8b4fb85799c3169f
 F test/badutf2.test f5bc7f2d280670ecd79b9cf4f0f1760c607fe51f
 F test/bc_common.tcl b5e42d80305be95697e6370e015af571e5333a1c
-F test/bc_test1.c 3f10831d71c7e0bc53640c1989d31ded030a0fcc
+F test/bc_test1.c 874d03ffabbd565a6cf9a7eec23d3deafd3c8f1c
 F test/bestindex1.test 0cf1bd2d7b97d3a3a8c10736125274f64765c4ee
 F test/bestindex2.test 4a06b8922ab2fd09434870da8d1cdf525aaf7060
 F test/between.test 34d375fb5ce1ae283ffe82b6b233e9f38e84fc6c
@@ -1491,7 +1491,7 @@ F vsixtest/vsixtest.tcl 6a9a6ab600c25a91a7acc6293828957a386a8a93
 F vsixtest/vsixtest.vcxproj.data 2ed517e100c66dc455b492e1a33350c1b20fbcdc
 F vsixtest/vsixtest.vcxproj.filters 37e51ffedcdb064aad6ff33b6148725226cd608e
 F vsixtest/vsixtest_TemporaryKey.pfx e5b1b036facdb453873e7084e1cae9102ccc67a0
-P 128c7eaed5479b615d75ebce1d781ea661e0fb2d
-R 86fffa805f6f27d07092161192388b60
+P ec6ef5f2c2dc18e1a19c205f365f4071f0870b68
+R 9d74e8a08d57f3a5262646881b7fcf49
 U dan
-Z aab288310f3155f4f7b4c6e3e47a2d89
+Z 51f0c57ee889307d403d3ab523c9aec8
index c99acbf3cfb1e0db4ea659dd39737f85ac5c4877..2e3f1b7111ad2cd08f9dcd214c4586bce3c9fd6f 100644 (file)
@@ -1 +1 @@
-ec6ef5f2c2dc18e1a19c205f365f4071f0870b68
\ No newline at end of file
+f33aa76f074d8686a5a5c0edecabb71cb259c48d
\ No newline at end of file
index bb41e9d4ed1775e98f1990c4d00e6935a6af722b..290e04067b614626b5b413a2514a9d72b653c3cf 100644 (file)
@@ -26,11 +26,326 @@ struct Config {
   int bMutex;                     /* --mutex */
   int nAutoCkpt;                  /* --autockpt */
   int bRm;                        /* --rm */
-  sqlite3_mutex *pMutex;
+
+  pthread_cond_t cond;
+  pthread_mutex_t mutex;
+  int nCondWait;                  /* Number of threads waiting on hCond */
+  sqlite3_vfs *pVfs;
+};
+
+typedef struct WalHookCtx WalHookCtx;
+struct WalHookCtx {
+  Config *pConfig;
+  Sqlite *pDb;
+  Error *pErr;
+};
+
+typedef struct VfsWrapperFd VfsWrapperFd;
+struct VfsWrapperFd {
+  sqlite3_file base;              /* Base class */
+  int bWriter;                    /* True if holding shm WRITER lock */
+  Config *pConfig;
+  sqlite3_file *pFd;              /* Underlying file descriptor */
 };
 
+/* Methods of the wrapper VFS */
+static int vfsWrapOpen(sqlite3_vfs*, const char*, sqlite3_file*, int, int*);
+static int vfsWrapDelete(sqlite3_vfs*, const char*, int);
+static int vfsWrapAccess(sqlite3_vfs*, const char*, int, int*);
+static int vfsWrapFullPathname(sqlite3_vfs*, const char *, int, char*);
+static void *vfsWrapDlOpen(sqlite3_vfs*, const char*);
+static void vfsWrapDlError(sqlite3_vfs*, int, char*);
+static void (*vfsWrapDlSym(sqlite3_vfs*,void*, const char*))(void);
+static void vfsWrapDlClose(sqlite3_vfs*, void*);
+static int vfsWrapRandomness(sqlite3_vfs*, int, char*);
+static int vfsWrapSleep(sqlite3_vfs*, int);
+static int vfsWrapCurrentTime(sqlite3_vfs*, double*);
+static int vfsWrapGetLastError(sqlite3_vfs*, int, char*);
+static int vfsWrapCurrentTimeInt64(sqlite3_vfs*, sqlite3_int64*);
+static int vfsWrapSetSystemCall(sqlite3_vfs*, const char*, sqlite3_syscall_ptr);
+static sqlite3_syscall_ptr vfsWrapGetSystemCall(sqlite3_vfs*, const char*);
+static const char *vfsWrapNextSystemCall(sqlite3_vfs*, const char*);
+
+/* Methods of wrapper sqlite3_io_methods object (see vfsWrapOpen()) */
+static int vfsWrapClose(sqlite3_file*);
+static int vfsWrapRead(sqlite3_file*, void*, int iAmt, sqlite3_int64 iOfst);
+static int vfsWrapWrite(sqlite3_file*, const void*, int iAmt, sqlite3_int64);
+static int vfsWrapTruncate(sqlite3_file*, sqlite3_int64 size);
+static int vfsWrapSync(sqlite3_file*, int flags);
+static int vfsWrapFileSize(sqlite3_file*, sqlite3_int64 *pSize);
+static int vfsWrapLock(sqlite3_file*, int);
+static int vfsWrapUnlock(sqlite3_file*, int);
+static int vfsWrapCheckReservedLock(sqlite3_file*, int *pResOut);
+static int vfsWrapFileControl(sqlite3_file*, int op, void *pArg);
+static int vfsWrapSectorSize(sqlite3_file*);
+static int vfsWrapDeviceCharacteristics(sqlite3_file*);
+static int vfsWrapShmMap(sqlite3_file*, int iPg, int, int, void volatile**);
+static int vfsWrapShmLock(sqlite3_file*, int offset, int n, int flags);
+static void vfsWrapShmBarrier(sqlite3_file*);
+static int vfsWrapShmUnmap(sqlite3_file*, int deleteFlag);
+static int vfsWrapFetch(sqlite3_file*, sqlite3_int64 iOfst, int iAmt, void **);
+static int vfsWrapUnfetch(sqlite3_file*, sqlite3_int64 iOfst, void *p);
+
+static int vfsWrapOpen(
+  sqlite3_vfs *pVfs, 
+  const char *zName, 
+  sqlite3_file *pFd, 
+  int flags, 
+  int *fout
+){
+  static sqlite3_io_methods methods = {
+    3,
+    vfsWrapClose, vfsWrapRead, vfsWrapWrite,
+    vfsWrapTruncate, vfsWrapSync, vfsWrapFileSize,
+    vfsWrapLock, vfsWrapUnlock, vfsWrapCheckReservedLock,
+    vfsWrapFileControl, vfsWrapSectorSize, vfsWrapDeviceCharacteristics,
+    vfsWrapShmMap, vfsWrapShmLock, vfsWrapShmBarrier,
+    vfsWrapShmUnmap, vfsWrapFetch, vfsWrapUnfetch
+  };
+
+  Config *pConfig = (Config*)pVfs->pAppData;
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  int rc;
+
+  pWrapper->pFd = (sqlite3_file*)&pWrapper[1];
+  pWrapper->pConfig = pConfig;
+  rc = pConfig->pVfs->xOpen(pConfig->pVfs, zName, pWrapper->pFd, flags, fout);
+  if( rc==SQLITE_OK ){
+    pWrapper->base.pMethods = &methods;
+  }
+  return rc;
+}
+
+static int vfsWrapDelete(sqlite3_vfs *pVfs, const char *a, int b){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xDelete(pConfig->pVfs, a, b);
+}
+static int vfsWrapAccess(sqlite3_vfs *pVfs, const char *a, int b, int *c){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xAccess(pConfig->pVfs, a, b, c);
+}
+static int vfsWrapFullPathname(sqlite3_vfs *pVfs, const char *a, int b, char*c){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xFullPathname(pConfig->pVfs, a, b, c);
+}
+static void *vfsWrapDlOpen(sqlite3_vfs *pVfs, const char *a){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xDlOpen(pConfig->pVfs, a);
+}
+static void vfsWrapDlError(sqlite3_vfs *pVfs, int a, char *b){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xDlError(pConfig->pVfs, a, b);
+}
+static void (*vfsWrapDlSym(sqlite3_vfs *pVfs, void *a, const char *b))(void){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xDlSym(pConfig->pVfs, a, b);
+}
+static void vfsWrapDlClose(sqlite3_vfs *pVfs, void *a){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xDlClose(pConfig->pVfs, a);
+}
+static int vfsWrapRandomness(sqlite3_vfs *pVfs, int a, char *b){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xRandomness(pConfig->pVfs, a, b);
+}
+static int vfsWrapSleep(sqlite3_vfs *pVfs, int a){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xSleep(pConfig->pVfs, a);
+}
+static int vfsWrapCurrentTime(sqlite3_vfs *pVfs, double *a){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xCurrentTime(pConfig->pVfs, a);
+}
+static int vfsWrapGetLastError(sqlite3_vfs *pVfs, int a, char *b){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xGetLastError(pConfig->pVfs, a, b);
+}
+static int vfsWrapCurrentTimeInt64(sqlite3_vfs *pVfs, sqlite3_int64 *a){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xCurrentTimeInt64(pConfig->pVfs, a);
+}
+static int vfsWrapSetSystemCall(
+  sqlite3_vfs *pVfs, 
+  const char *a, 
+  sqlite3_syscall_ptr b
+){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xSetSystemCall(pConfig->pVfs, a, b);
+}
+static sqlite3_syscall_ptr vfsWrapGetSystemCall(
+  sqlite3_vfs *pVfs, 
+  const char *a
+){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xGetSystemCall(pConfig->pVfs, a);
+}
+static const char *vfsWrapNextSystemCall(sqlite3_vfs *pVfs, const char *a){
+  Config *pConfig = (Config*)pVfs->pAppData;
+  return pConfig->pVfs->xNextSystemCall(pConfig->pVfs, a);
+}
+
+static int vfsWrapClose(sqlite3_file *pFd){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  pWrapper->pFd->pMethods->xClose(pWrapper->pFd);
+  pWrapper->pFd = 0;
+  return SQLITE_OK;
+}
+static int vfsWrapRead(sqlite3_file *pFd, void *a, int b, sqlite3_int64 c){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xRead(pWrapper->pFd, a, b, c);
+}
+static int vfsWrapWrite(
+  sqlite3_file *pFd, 
+  const void *a, int b, 
+  sqlite3_int64 c
+){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xWrite(pWrapper->pFd, a, b, c);
+}
+static int vfsWrapTruncate(sqlite3_file *pFd, sqlite3_int64 a){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xTruncate(pWrapper->pFd, a);
+}
+static int vfsWrapSync(sqlite3_file *pFd, int a){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xSync(pWrapper->pFd, a);
+}
+static int vfsWrapFileSize(sqlite3_file *pFd, sqlite3_int64 *a){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xFileSize(pWrapper->pFd, a);
+}
+static int vfsWrapLock(sqlite3_file *pFd, int a){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xLock(pWrapper->pFd, a);
+}
+static int vfsWrapUnlock(sqlite3_file *pFd, int a){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xUnlock(pWrapper->pFd, a);
+}
+static int vfsWrapCheckReservedLock(sqlite3_file *pFd, int *a){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xCheckReservedLock(pWrapper->pFd, a);
+}
+static int vfsWrapFileControl(sqlite3_file *pFd, int a, void *b){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xFileControl(pWrapper->pFd, a, b);
+}
+static int vfsWrapSectorSize(sqlite3_file *pFd){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xSectorSize(pWrapper->pFd);
+}
+static int vfsWrapDeviceCharacteristics(sqlite3_file *pFd){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xDeviceCharacteristics(pWrapper->pFd);
+}
+static int vfsWrapShmMap(
+  sqlite3_file *pFd, 
+  int a, int b, int c, 
+  void volatile **d
+){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xShmMap(pWrapper->pFd, a, b, c, d);
+}
+static int vfsWrapShmLock(sqlite3_file *pFd, int offset, int n, int flags){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  Config *pConfig = pWrapper->pConfig;
+  int bMutex = 0;
+  int rc;
+
+  if(  (offset==0 && n==1)
+    && (flags & SQLITE_SHM_LOCK) && (flags & SQLITE_SHM_EXCLUSIVE)
+  ){
+    pthread_mutex_lock(&pConfig->mutex);
+    pWrapper->bWriter = 1;
+    bMutex = 1;
+  }
+
+  if( offset==0 && (flags & SQLITE_SHM_UNLOCK) && pWrapper->bWriter ){
+    pthread_mutex_unlock(&pConfig->mutex);
+    pWrapper->bWriter = 0;
+  }
+
+  rc = pWrapper->pFd->pMethods->xShmLock(pWrapper->pFd, offset, n, flags);
+
+  if( rc!=SQLITE_OK && bMutex ){
+    pthread_mutex_unlock(&pConfig->mutex);
+    pWrapper->bWriter = 0;
+  }
+
+  return rc;
+}
+static void vfsWrapShmBarrier(sqlite3_file *pFd){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xShmBarrier(pWrapper->pFd);
+}
+static int vfsWrapShmUnmap(sqlite3_file *pFd, int a){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xShmUnmap(pWrapper->pFd, a);
+}
+static int vfsWrapFetch(sqlite3_file *pFd, sqlite3_int64 a, int b, void **c){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xFetch(pWrapper->pFd, a, b, c);
+}
+static int vfsWrapUnfetch(sqlite3_file *pFd, sqlite3_int64 a, void *b){
+  VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+  return pWrapper->pFd->pMethods->xUnfetch(pWrapper->pFd, a, b);
+}
+
+static void create_vfs(Config *pConfig){
+  static sqlite3_vfs vfs = {
+    3, 0, 0, 0, "wrapper", 0,
+    vfsWrapOpen, vfsWrapDelete, vfsWrapAccess,
+    vfsWrapFullPathname, vfsWrapDlOpen, vfsWrapDlError,
+    vfsWrapDlSym, vfsWrapDlClose, vfsWrapRandomness,
+    vfsWrapSleep, vfsWrapCurrentTime, vfsWrapGetLastError,
+    vfsWrapCurrentTimeInt64, vfsWrapSetSystemCall, vfsWrapGetSystemCall,
+    vfsWrapNextSystemCall
+  };
+  sqlite3_vfs *pVfs;
+
+  pVfs = sqlite3_vfs_find(0);
+  vfs.mxPathname = pVfs->mxPathname;
+  vfs.szOsFile = pVfs->szOsFile + sizeof(VfsWrapperFd);
+  vfs.pAppData = (void*)pConfig;
+  pConfig->pVfs = pVfs;
+
+  sqlite3_vfs_register(&vfs, 1);
+}
+
+
+/*
+** Wal hook used by connections in thread_main().
+*/
+static int thread_wal_hook(
+  void *pArg,                     /* Pointer to Config object */
+  sqlite3 *db,
+  const char *zDb, 
+  int nFrame
+){
+  WalHookCtx *pCtx = (WalHookCtx*)pArg;
+  Config *pConfig = pCtx->pConfig;
+
+  if( nFrame>=pConfig->nAutoCkpt ){
+    pthread_mutex_lock(&pConfig->mutex);
+    if( pConfig->nCondWait>=0 ){
+      pConfig->nCondWait++;
+      if( pConfig->nCondWait==pConfig->nThread ){
+        execsql(pCtx->pErr, pCtx->pDb, "PRAGMA wal_checkpoint");
+        pthread_cond_broadcast(&pConfig->cond);
+      }else{
+        pthread_cond_wait(&pConfig->cond, &pConfig->mutex);
+      }
+      pConfig->nCondWait--;
+    }
+    pthread_mutex_unlock(&pConfig->mutex);
+  }
+
+  return SQLITE_OK;
+}
+
 
 static char *thread_main(int iTid, void *pArg){
+  WalHookCtx ctx;
   Config *pConfig = (Config*)pArg;
   Error err = {0};                /* Error code and message */
   Sqlite db = {0};                /* SQLite database connection */
@@ -45,6 +360,11 @@ static char *thread_main(int iTid, void *pArg){
       "PRAGMA synchronous = 0;", pConfig->nAutoCkpt
   );
 
+  ctx.pConfig = pConfig;
+  ctx.pErr = &err;
+  ctx.pDb = &db;
+  sqlite3_wal_hook(db.db, thread_wal_hook, (void*)&ctx);
+
   while( !timetostop(&err) ){
     execsql(&err, &db, "BEGIN CONCURRENT");
     for(j=0; j<pConfig->nIPT; j++){
@@ -53,9 +373,7 @@ static char *thread_main(int iTid, void *pArg){
           "(randomblob(10), randomblob(20), randomblob(30), randomblob(200))"
       );
     }
-    sqlite3_mutex_enter(pConfig->pMutex);
     execsql(&err, &db, "COMMIT");
-    sqlite3_mutex_leave(pConfig->pMutex);
     nAttempt++;
     if( err.rc==SQLITE_OK ){
       nCommit++;
@@ -66,6 +384,12 @@ static char *thread_main(int iTid, void *pArg){
   }
 
   closedb(&err, &db);
+
+  pthread_mutex_lock(&pConfig->mutex);
+  pConfig->nCondWait = -1;
+  pthread_cond_broadcast(&pConfig->cond);
+  pthread_mutex_unlock(&pConfig->mutex);
+
   return sqlite3_mprintf("%d/%d successful commits", nCommit, nAttempt);
 }
 
@@ -93,12 +417,14 @@ int main(int argc, const char **argv){
     sqlite3_free(z);
   }
 
+  /* Create the special VFS - "wrapper". And the mutex and condition 
+  ** variable. */
+  create_vfs(&conf);
+  pthread_mutex_init(&conf.mutex, 0);
+  pthread_cond_init(&conf.cond, 0);
+
   /* Ensure the schema has been created */
-  if( conf.bMutex ){
-    conf.pMutex = sqlite3_mutex_alloc(SQLITE_MUTEX_RECURSIVE);
-  }
   opendb(&err, &db, "xyz.db", conf.bRm);
-
   sql_script(&err, &db,
       "PRAGMA journal_mode = wal;"
       "CREATE TABLE IF NOT EXISTS t1(a PRIMARY KEY, b, c, d) WITHOUT ROWID;"
@@ -107,10 +433,16 @@ int main(int argc, const char **argv){
   );
 
   setstoptime(&err, conf.nSecond*1000);
-  for(i=0; i<conf.nThread; i++){
-    launch_thread(&err, &threads, thread_main, (void*)&conf);
+  if( conf.nThread==1 ){
+    char *z = thread_main(0, (void*)&conf);
+    printf("Thread 0 says: %s\n", (z==0 ? "..." : z));
+    fflush(stdout);
+  }else{
+    for(i=0; i<conf.nThread; i++){
+      launch_thread(&err, &threads, thread_main, (void*)&conf);
+    }
+    join_all_threads(&err, &threads);
   }
-  join_all_threads(&err, &threads);
 
   if( err.rc==SQLITE_OK ){
     printf("Database is %dK\n", (int)(filesize(&err, "xyz.db") / 1024));
@@ -120,7 +452,6 @@ int main(int argc, const char **argv){
   }
 
   closedb(&err, &db);
-  sqlite3_mutex_free(conf.pMutex);
   print_and_free_err(&err);
   return 0;
 }