int bMutex; /* --mutex */
int nAutoCkpt; /* --autockpt */
int bRm; /* --rm */
- sqlite3_mutex *pMutex;
+
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+ int nCondWait; /* Number of threads waiting on hCond */
+ sqlite3_vfs *pVfs;
+};
+
+typedef struct WalHookCtx WalHookCtx;
+struct WalHookCtx {
+ Config *pConfig;
+ Sqlite *pDb;
+ Error *pErr;
+};
+
+typedef struct VfsWrapperFd VfsWrapperFd;
+struct VfsWrapperFd {
+ sqlite3_file base; /* Base class */
+ int bWriter; /* True if holding shm WRITER lock */
+ Config *pConfig;
+ sqlite3_file *pFd; /* Underlying file descriptor */
};
+/* Methods of the wrapper VFS */
+static int vfsWrapOpen(sqlite3_vfs*, const char*, sqlite3_file*, int, int*);
+static int vfsWrapDelete(sqlite3_vfs*, const char*, int);
+static int vfsWrapAccess(sqlite3_vfs*, const char*, int, int*);
+static int vfsWrapFullPathname(sqlite3_vfs*, const char *, int, char*);
+static void *vfsWrapDlOpen(sqlite3_vfs*, const char*);
+static void vfsWrapDlError(sqlite3_vfs*, int, char*);
+static void (*vfsWrapDlSym(sqlite3_vfs*,void*, const char*))(void);
+static void vfsWrapDlClose(sqlite3_vfs*, void*);
+static int vfsWrapRandomness(sqlite3_vfs*, int, char*);
+static int vfsWrapSleep(sqlite3_vfs*, int);
+static int vfsWrapCurrentTime(sqlite3_vfs*, double*);
+static int vfsWrapGetLastError(sqlite3_vfs*, int, char*);
+static int vfsWrapCurrentTimeInt64(sqlite3_vfs*, sqlite3_int64*);
+static int vfsWrapSetSystemCall(sqlite3_vfs*, const char*, sqlite3_syscall_ptr);
+static sqlite3_syscall_ptr vfsWrapGetSystemCall(sqlite3_vfs*, const char*);
+static const char *vfsWrapNextSystemCall(sqlite3_vfs*, const char*);
+
+/* Methods of wrapper sqlite3_io_methods object (see vfsWrapOpen()) */
+static int vfsWrapClose(sqlite3_file*);
+static int vfsWrapRead(sqlite3_file*, void*, int iAmt, sqlite3_int64 iOfst);
+static int vfsWrapWrite(sqlite3_file*, const void*, int iAmt, sqlite3_int64);
+static int vfsWrapTruncate(sqlite3_file*, sqlite3_int64 size);
+static int vfsWrapSync(sqlite3_file*, int flags);
+static int vfsWrapFileSize(sqlite3_file*, sqlite3_int64 *pSize);
+static int vfsWrapLock(sqlite3_file*, int);
+static int vfsWrapUnlock(sqlite3_file*, int);
+static int vfsWrapCheckReservedLock(sqlite3_file*, int *pResOut);
+static int vfsWrapFileControl(sqlite3_file*, int op, void *pArg);
+static int vfsWrapSectorSize(sqlite3_file*);
+static int vfsWrapDeviceCharacteristics(sqlite3_file*);
+static int vfsWrapShmMap(sqlite3_file*, int iPg, int, int, void volatile**);
+static int vfsWrapShmLock(sqlite3_file*, int offset, int n, int flags);
+static void vfsWrapShmBarrier(sqlite3_file*);
+static int vfsWrapShmUnmap(sqlite3_file*, int deleteFlag);
+static int vfsWrapFetch(sqlite3_file*, sqlite3_int64 iOfst, int iAmt, void **);
+static int vfsWrapUnfetch(sqlite3_file*, sqlite3_int64 iOfst, void *p);
+
+static int vfsWrapOpen(
+ sqlite3_vfs *pVfs,
+ const char *zName,
+ sqlite3_file *pFd,
+ int flags,
+ int *fout
+){
+ static sqlite3_io_methods methods = {
+ 3,
+ vfsWrapClose, vfsWrapRead, vfsWrapWrite,
+ vfsWrapTruncate, vfsWrapSync, vfsWrapFileSize,
+ vfsWrapLock, vfsWrapUnlock, vfsWrapCheckReservedLock,
+ vfsWrapFileControl, vfsWrapSectorSize, vfsWrapDeviceCharacteristics,
+ vfsWrapShmMap, vfsWrapShmLock, vfsWrapShmBarrier,
+ vfsWrapShmUnmap, vfsWrapFetch, vfsWrapUnfetch
+ };
+
+ Config *pConfig = (Config*)pVfs->pAppData;
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ int rc;
+
+ pWrapper->pFd = (sqlite3_file*)&pWrapper[1];
+ pWrapper->pConfig = pConfig;
+ rc = pConfig->pVfs->xOpen(pConfig->pVfs, zName, pWrapper->pFd, flags, fout);
+ if( rc==SQLITE_OK ){
+ pWrapper->base.pMethods = &methods;
+ }
+ return rc;
+}
+
+static int vfsWrapDelete(sqlite3_vfs *pVfs, const char *a, int b){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xDelete(pConfig->pVfs, a, b);
+}
+static int vfsWrapAccess(sqlite3_vfs *pVfs, const char *a, int b, int *c){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xAccess(pConfig->pVfs, a, b, c);
+}
+static int vfsWrapFullPathname(sqlite3_vfs *pVfs, const char *a, int b, char*c){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xFullPathname(pConfig->pVfs, a, b, c);
+}
+static void *vfsWrapDlOpen(sqlite3_vfs *pVfs, const char *a){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xDlOpen(pConfig->pVfs, a);
+}
+static void vfsWrapDlError(sqlite3_vfs *pVfs, int a, char *b){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xDlError(pConfig->pVfs, a, b);
+}
+static void (*vfsWrapDlSym(sqlite3_vfs *pVfs, void *a, const char *b))(void){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xDlSym(pConfig->pVfs, a, b);
+}
+static void vfsWrapDlClose(sqlite3_vfs *pVfs, void *a){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xDlClose(pConfig->pVfs, a);
+}
+static int vfsWrapRandomness(sqlite3_vfs *pVfs, int a, char *b){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xRandomness(pConfig->pVfs, a, b);
+}
+static int vfsWrapSleep(sqlite3_vfs *pVfs, int a){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xSleep(pConfig->pVfs, a);
+}
+static int vfsWrapCurrentTime(sqlite3_vfs *pVfs, double *a){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xCurrentTime(pConfig->pVfs, a);
+}
+static int vfsWrapGetLastError(sqlite3_vfs *pVfs, int a, char *b){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xGetLastError(pConfig->pVfs, a, b);
+}
+static int vfsWrapCurrentTimeInt64(sqlite3_vfs *pVfs, sqlite3_int64 *a){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xCurrentTimeInt64(pConfig->pVfs, a);
+}
+static int vfsWrapSetSystemCall(
+ sqlite3_vfs *pVfs,
+ const char *a,
+ sqlite3_syscall_ptr b
+){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xSetSystemCall(pConfig->pVfs, a, b);
+}
+static sqlite3_syscall_ptr vfsWrapGetSystemCall(
+ sqlite3_vfs *pVfs,
+ const char *a
+){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xGetSystemCall(pConfig->pVfs, a);
+}
+static const char *vfsWrapNextSystemCall(sqlite3_vfs *pVfs, const char *a){
+ Config *pConfig = (Config*)pVfs->pAppData;
+ return pConfig->pVfs->xNextSystemCall(pConfig->pVfs, a);
+}
+
+static int vfsWrapClose(sqlite3_file *pFd){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ pWrapper->pFd->pMethods->xClose(pWrapper->pFd);
+ pWrapper->pFd = 0;
+ return SQLITE_OK;
+}
+static int vfsWrapRead(sqlite3_file *pFd, void *a, int b, sqlite3_int64 c){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xRead(pWrapper->pFd, a, b, c);
+}
+static int vfsWrapWrite(
+ sqlite3_file *pFd,
+ const void *a, int b,
+ sqlite3_int64 c
+){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xWrite(pWrapper->pFd, a, b, c);
+}
+static int vfsWrapTruncate(sqlite3_file *pFd, sqlite3_int64 a){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xTruncate(pWrapper->pFd, a);
+}
+static int vfsWrapSync(sqlite3_file *pFd, int a){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xSync(pWrapper->pFd, a);
+}
+static int vfsWrapFileSize(sqlite3_file *pFd, sqlite3_int64 *a){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xFileSize(pWrapper->pFd, a);
+}
+static int vfsWrapLock(sqlite3_file *pFd, int a){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xLock(pWrapper->pFd, a);
+}
+static int vfsWrapUnlock(sqlite3_file *pFd, int a){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xUnlock(pWrapper->pFd, a);
+}
+static int vfsWrapCheckReservedLock(sqlite3_file *pFd, int *a){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xCheckReservedLock(pWrapper->pFd, a);
+}
+static int vfsWrapFileControl(sqlite3_file *pFd, int a, void *b){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xFileControl(pWrapper->pFd, a, b);
+}
+static int vfsWrapSectorSize(sqlite3_file *pFd){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xSectorSize(pWrapper->pFd);
+}
+static int vfsWrapDeviceCharacteristics(sqlite3_file *pFd){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xDeviceCharacteristics(pWrapper->pFd);
+}
+static int vfsWrapShmMap(
+ sqlite3_file *pFd,
+ int a, int b, int c,
+ void volatile **d
+){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xShmMap(pWrapper->pFd, a, b, c, d);
+}
+static int vfsWrapShmLock(sqlite3_file *pFd, int offset, int n, int flags){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ Config *pConfig = pWrapper->pConfig;
+ int bMutex = 0;
+ int rc;
+
+ if( (offset==0 && n==1)
+ && (flags & SQLITE_SHM_LOCK) && (flags & SQLITE_SHM_EXCLUSIVE)
+ ){
+ pthread_mutex_lock(&pConfig->mutex);
+ pWrapper->bWriter = 1;
+ bMutex = 1;
+ }
+
+ if( offset==0 && (flags & SQLITE_SHM_UNLOCK) && pWrapper->bWriter ){
+ pthread_mutex_unlock(&pConfig->mutex);
+ pWrapper->bWriter = 0;
+ }
+
+ rc = pWrapper->pFd->pMethods->xShmLock(pWrapper->pFd, offset, n, flags);
+
+ if( rc!=SQLITE_OK && bMutex ){
+ pthread_mutex_unlock(&pConfig->mutex);
+ pWrapper->bWriter = 0;
+ }
+
+ return rc;
+}
+static void vfsWrapShmBarrier(sqlite3_file *pFd){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xShmBarrier(pWrapper->pFd);
+}
+static int vfsWrapShmUnmap(sqlite3_file *pFd, int a){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xShmUnmap(pWrapper->pFd, a);
+}
+static int vfsWrapFetch(sqlite3_file *pFd, sqlite3_int64 a, int b, void **c){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xFetch(pWrapper->pFd, a, b, c);
+}
+static int vfsWrapUnfetch(sqlite3_file *pFd, sqlite3_int64 a, void *b){
+ VfsWrapperFd *pWrapper = (VfsWrapperFd*)pFd;
+ return pWrapper->pFd->pMethods->xUnfetch(pWrapper->pFd, a, b);
+}
+
+static void create_vfs(Config *pConfig){
+ static sqlite3_vfs vfs = {
+ 3, 0, 0, 0, "wrapper", 0,
+ vfsWrapOpen, vfsWrapDelete, vfsWrapAccess,
+ vfsWrapFullPathname, vfsWrapDlOpen, vfsWrapDlError,
+ vfsWrapDlSym, vfsWrapDlClose, vfsWrapRandomness,
+ vfsWrapSleep, vfsWrapCurrentTime, vfsWrapGetLastError,
+ vfsWrapCurrentTimeInt64, vfsWrapSetSystemCall, vfsWrapGetSystemCall,
+ vfsWrapNextSystemCall
+ };
+ sqlite3_vfs *pVfs;
+
+ pVfs = sqlite3_vfs_find(0);
+ vfs.mxPathname = pVfs->mxPathname;
+ vfs.szOsFile = pVfs->szOsFile + sizeof(VfsWrapperFd);
+ vfs.pAppData = (void*)pConfig;
+ pConfig->pVfs = pVfs;
+
+ sqlite3_vfs_register(&vfs, 1);
+}
+
+
+/*
+** Wal hook used by connections in thread_main().
+*/
+static int thread_wal_hook(
+ void *pArg, /* Pointer to Config object */
+ sqlite3 *db,
+ const char *zDb,
+ int nFrame
+){
+ WalHookCtx *pCtx = (WalHookCtx*)pArg;
+ Config *pConfig = pCtx->pConfig;
+
+ if( nFrame>=pConfig->nAutoCkpt ){
+ pthread_mutex_lock(&pConfig->mutex);
+ if( pConfig->nCondWait>=0 ){
+ pConfig->nCondWait++;
+ if( pConfig->nCondWait==pConfig->nThread ){
+ execsql(pCtx->pErr, pCtx->pDb, "PRAGMA wal_checkpoint");
+ pthread_cond_broadcast(&pConfig->cond);
+ }else{
+ pthread_cond_wait(&pConfig->cond, &pConfig->mutex);
+ }
+ pConfig->nCondWait--;
+ }
+ pthread_mutex_unlock(&pConfig->mutex);
+ }
+
+ return SQLITE_OK;
+}
+
static char *thread_main(int iTid, void *pArg){
+ WalHookCtx ctx;
Config *pConfig = (Config*)pArg;
Error err = {0}; /* Error code and message */
Sqlite db = {0}; /* SQLite database connection */
"PRAGMA synchronous = 0;", pConfig->nAutoCkpt
);
+ ctx.pConfig = pConfig;
+ ctx.pErr = &err;
+ ctx.pDb = &db;
+ sqlite3_wal_hook(db.db, thread_wal_hook, (void*)&ctx);
+
while( !timetostop(&err) ){
execsql(&err, &db, "BEGIN CONCURRENT");
for(j=0; j<pConfig->nIPT; j++){
"(randomblob(10), randomblob(20), randomblob(30), randomblob(200))"
);
}
- sqlite3_mutex_enter(pConfig->pMutex);
execsql(&err, &db, "COMMIT");
- sqlite3_mutex_leave(pConfig->pMutex);
nAttempt++;
if( err.rc==SQLITE_OK ){
nCommit++;
}
closedb(&err, &db);
+
+ pthread_mutex_lock(&pConfig->mutex);
+ pConfig->nCondWait = -1;
+ pthread_cond_broadcast(&pConfig->cond);
+ pthread_mutex_unlock(&pConfig->mutex);
+
return sqlite3_mprintf("%d/%d successful commits", nCommit, nAttempt);
}
sqlite3_free(z);
}
+ /* Create the special VFS - "wrapper". And the mutex and condition
+ ** variable. */
+ create_vfs(&conf);
+ pthread_mutex_init(&conf.mutex, 0);
+ pthread_cond_init(&conf.cond, 0);
+
/* Ensure the schema has been created */
- if( conf.bMutex ){
- conf.pMutex = sqlite3_mutex_alloc(SQLITE_MUTEX_RECURSIVE);
- }
opendb(&err, &db, "xyz.db", conf.bRm);
-
sql_script(&err, &db,
"PRAGMA journal_mode = wal;"
"CREATE TABLE IF NOT EXISTS t1(a PRIMARY KEY, b, c, d) WITHOUT ROWID;"
);
setstoptime(&err, conf.nSecond*1000);
- for(i=0; i<conf.nThread; i++){
- launch_thread(&err, &threads, thread_main, (void*)&conf);
+ if( conf.nThread==1 ){
+ char *z = thread_main(0, (void*)&conf);
+ printf("Thread 0 says: %s\n", (z==0 ? "..." : z));
+ fflush(stdout);
+ }else{
+ for(i=0; i<conf.nThread; i++){
+ launch_thread(&err, &threads, thread_main, (void*)&conf);
+ }
+ join_all_threads(&err, &threads);
}
- join_all_threads(&err, &threads);
if( err.rc==SQLITE_OK ){
printf("Database is %dK\n", (int)(filesize(&err, "xyz.db") / 1024));
}
closedb(&err, &db);
- sqlite3_mutex_free(conf.pMutex);
print_and_free_err(&err);
return 0;
}