*/
struct unixShmNode {
unixInodeInfo *pInode; /* unixInodeInfo that owns this SHM node */
+#ifdef SQLITE_ENABLE_SETLK_TIMEOUT
+ pthread_cond_t shmcond;
+ pthread_mutex_t shmmutex;
+#else
sqlite3_mutex *pShmMutex; /* Mutex to access this object */
+#endif
char *zFilename; /* Name of the mmapped file */
int hShm; /* Open file descriptor */
int szRegion; /* Size of shared-memory regions */
#endif
};
+#ifdef SQLITE_ENABLE_SETLK_TIMEOUT
+# define ENTER_SHMNODE_MUTEX(p) pthread_mutex_lock(&p->shmmutex)
+# define LEAVE_SHMNODE_MUTEX(p) pthread_mutex_unlock(&p->shmmutex)
+#else
+# define ENTER_SHMNODE_MUTEX(p) sqlite3_mutex_enter(p->pShmMutex)
+# define LEAVE_SHMNODE_MUTEX(p) sqlite3_mutex_leave(p->pShmMutex)
+#endif
+
+
/*
** Structure used internally by this VFS to record the state of an
** open shared memory connection.
f.l_start = UNIX_SHM_BASE + 3;
f.l_len = SQLITE_SHM_NLOCK - 3;
- sqlite3_mutex_enter(pShmNode->pShmMutex);
+ ENTER_SHMNODE_MUTEX(pShmNode);
if( osFcntl(pShmNode->hShm, F_GETLK, &f)<0 ){
rc = SQLITE_IOERR_LOCK;
}else{
*piOut = (f.l_type!=F_UNLCK);
}
- sqlite3_mutex_leave(pShmNode->pShmMutex);
+ LEAVE_SHMNODE_MUTEX(pShmNode);
}
return rc;
** otherwise.
*/
static int unixShmSystemLock(
- unixFile *pFile, /* Open connection to the WAL file */
+ unixFile *pFile, /* Open connection to the database file */
int lockType, /* F_UNLCK, F_RDLCK, or F_WRLCK */
int ofst, /* First byte of the locking range */
int n /* Number of bytes to lock */
/* Access to the unixShmNode object is serialized by the caller */
pShmNode = pFile->pInode->pShmNode;
+#ifndef SQLITE_ENABLE_SETLK_TIMEOUT
assert( pShmNode->nRef==0 || sqlite3_mutex_held(pShmNode->pShmMutex) );
+#endif
assert( pShmNode->nRef>0 || unixMutexHeld() );
/* Shared locks never span more than one byte */
int nShmPerMap = unixShmRegionPerMap();
int i;
assert( p->pInode==pFd->pInode );
+#ifdef SQLITE_ENABLE_SETLK_TIMEOUT
+ pthread_cond_destroy(&p->shmcond);
+ pthread_mutex_destroy(&p->shmmutex);
+#else
sqlite3_mutex_free(p->pShmMutex);
+#endif
for(i=0; i<p->nRegion; i+=nShmPerMap){
if( p->hShm>=0 ){
osMunmap(p->apRegion[i], p->szRegion);
pDbFd->pInode->pShmNode = pShmNode;
pShmNode->pInode = pDbFd->pInode;
if( sqlite3GlobalConfig.bCoreMutex ){
+#ifdef SQLITE_ENABLE_SETLK_TIMEOUT
+ pthread_cond_init(&pShmNode->shmcond, 0);
+ pthread_mutex_init(&pShmNode->shmmutex, 0);
+#else
pShmNode->pShmMutex = sqlite3_mutex_alloc(SQLITE_MUTEX_FAST);
if( pShmNode->pShmMutex==0 ){
rc = SQLITE_NOMEM_BKPT;
goto shm_open_err;
}
+#endif
}
if( pInode->bProcessLock==0 ){
** at pShmNode->pFirst. This must be done while holding the
** pShmNode->pShmMutex.
*/
- sqlite3_mutex_enter(pShmNode->pShmMutex);
+ ENTER_SHMNODE_MUTEX(pShmNode);
p->pNext = pShmNode->pFirst;
pShmNode->pFirst = p;
- sqlite3_mutex_leave(pShmNode->pShmMutex);
+ LEAVE_SHMNODE_MUTEX(pShmNode);
return rc;
/* Jump here on any error */
p = pDbFd->pShm;
pShmNode = p->pShmNode;
- sqlite3_mutex_enter(pShmNode->pShmMutex);
+ ENTER_SHMNODE_MUTEX(pShmNode);
if( pShmNode->isUnlocked ){
rc = unixLockSharedMemory(pDbFd, pShmNode);
if( rc!=SQLITE_OK ) goto shmpage_out;
*pp = 0;
}
if( pShmNode->isReadonly && rc==SQLITE_OK ) rc = SQLITE_READONLY;
- sqlite3_mutex_leave(pShmNode->pShmMutex);
+ LEAVE_SHMNODE_MUTEX(pShmNode);
return rc;
}
static int assertLockingArrayOk(unixShmNode *pShmNode){
unixShm *pX;
int aLock[SQLITE_SHM_NLOCK];
+#ifndef SQLITE_ENABLE_SETLK_TIMEOUT
assert( sqlite3_mutex_held(pShmNode->pShmMutex) );
+#endif
memset(aLock, 0, sizeof(aLock));
for(pX=pShmNode->pFirst; pX; pX=pX->pNext){
u16 mask; /* Mask of locks to take or release */
int *aLock = pShmNode->aLock;
+#ifdef SQLITE_ENABLE_SETLK_TIMEOUT
+ int bRetry;
+ struct timespec sTimespec;
+#endif
+
assert( pShmNode==pDbFd->pInode->pShmNode );
assert( pShmNode->pInode==pDbFd->pInode );
assert( ofst>=0 && ofst+n<=SQLITE_SHM_NLOCK );
&& (ofst!=0 || (p->exclMask|p->sharedMask)<3)
&& (ofst<3 || (p->exclMask|p->sharedMask)<(1<<ofst))
));
+
+ if( pDbFd->iBusyTimeout ){
+ struct timeval tm;
+ memset(&sTimespec, 0, sizeof(sTimespec));
+ gettimeofday(&tm, 0);
+ TIMEVAL_TO_TIMESPEC(&tm, &sTimespec);
+ sTimespec.tv_sec += pDbFd->iBusyTimeout / 1000;
+ sTimespec.tv_nsec += (pDbFd->iBusyTimeout % 1000) * 1000000;
+ if( sTimespec.tv_nsec>(1000*1000000) ){
+ sTimespec.tv_sec++;
+ sTimespec.tv_nsec -= (1000*1000000);
+ }
+ }
#endif
mask = (1<<(ofst+n)) - (1<<ofst);
assert( n>1 || mask==(1<<ofst) );
- sqlite3_mutex_enter(pShmNode->pShmMutex);
- assert( assertLockingArrayOk(pShmNode) );
- if( flags & SQLITE_SHM_UNLOCK ){
- if( (p->exclMask|p->sharedMask) & mask ){
- int ii;
- int bUnlock = 1;
+ ENTER_SHMNODE_MUTEX(pShmNode);
- for(ii=ofst; ii<ofst+n; ii++){
- if( aLock[ii]>((p->sharedMask & (1<<ii)) ? 1 : 0) ){
- bUnlock = 0;
+#ifdef SQLITE_ENABLE_SETLK_TIMEOUT
+ do{
+ bRetry = 0;
+#endif
+
+ assert( assertLockingArrayOk(pShmNode) );
+ if( flags & SQLITE_SHM_UNLOCK ){
+ if( (p->exclMask|p->sharedMask) & mask ){
+ int ii;
+ int bUnlock = 1;
+
+ for(ii=ofst; ii<ofst+n; ii++){
+ if( aLock[ii]>((p->sharedMask & (1<<ii)) ? 1 : 0) ){
+ bUnlock = 0;
+ }
+ }
+
+ if( bUnlock ){
+ rc = unixShmSystemLock(pDbFd, F_UNLCK, ofst+UNIX_SHM_BASE, n);
+ if( rc==SQLITE_OK ){
+ memset(&aLock[ofst], 0, sizeof(int)*n);
+ }
+ }else if( ALWAYS(p->sharedMask & (1<<ofst)) ){
+ assert( n==1 && aLock[ofst]>1 );
+ aLock[ofst]--;
}
+
+ /* Undo the local locks */
+ if( rc==SQLITE_OK ){
+ p->exclMask &= ~mask;
+ p->sharedMask &= ~mask;
+#ifdef SQLITE_ENABLE_SETLK_TIMEOUT
+ pthread_cond_broadcast(&pShmNode->shmcond);
+#endif
+ }
}
-
- if( bUnlock ){
- rc = unixShmSystemLock(pDbFd, F_UNLCK, ofst+UNIX_SHM_BASE, n);
+ }else if( flags & SQLITE_SHM_SHARED ){
+ assert( n==1 );
+ assert( (p->exclMask & (1<<ofst))==0 );
+ if( (p->sharedMask & mask)==0 ){
+ if( aLock[ofst]<0 ){
+ rc = SQLITE_BUSY;
+#ifdef SQLITE_ENABLE_SETLK_TIMEOUT
+ bRetry = (pDbFd->iBusyTimeout!=0);
+#endif
+ }else if( aLock[ofst]==0 ){
+ rc = unixShmSystemLock(pDbFd, F_RDLCK, ofst+UNIX_SHM_BASE, n);
+ }
+
+ /* Get the local shared locks */
if( rc==SQLITE_OK ){
- memset(&aLock[ofst], 0, sizeof(int)*n);
+ p->sharedMask |= mask;
+ aLock[ofst]++;
}
- }else if( ALWAYS(p->sharedMask & (1<<ofst)) ){
- assert( n==1 && aLock[ofst]>1 );
- aLock[ofst]--;
}
-
- /* Undo the local locks */
- if( rc==SQLITE_OK ){
- p->exclMask &= ~mask;
- p->sharedMask &= ~mask;
- }
- }
- }else if( flags & SQLITE_SHM_SHARED ){
- assert( n==1 );
- assert( (p->exclMask & (1<<ofst))==0 );
- if( (p->sharedMask & mask)==0 ){
- if( aLock[ofst]<0 ){
- rc = SQLITE_BUSY;
- }else if( aLock[ofst]==0 ){
- rc = unixShmSystemLock(pDbFd, F_RDLCK, ofst+UNIX_SHM_BASE, n);
+ }else{
+ /* Make sure no sibling connections hold locks that will block this
+ ** lock. If any do, return SQLITE_BUSY right away. */
+ int ii;
+ for(ii=ofst; ii<ofst+n; ii++){
+ assert( (p->sharedMask & mask)==0 );
+ if( ALWAYS((p->exclMask & (1<<ii))==0) && aLock[ii] ){
+ rc = SQLITE_BUSY;
+#ifdef SQLITE_ENABLE_SETLK_TIMEOUT
+ bRetry = (pDbFd->iBusyTimeout!=0);
+#endif
+ break;
+ }
}
-
- /* Get the local shared locks */
+
+ /* Get the exclusive locks at the system level. Then if successful
+ ** also update the in-memory values. */
if( rc==SQLITE_OK ){
- p->sharedMask |= mask;
- aLock[ofst]++;
- }
- }
- }else{
- /* Make sure no sibling connections hold locks that will block this
- ** lock. If any do, return SQLITE_BUSY right away. */
- int ii;
- for(ii=ofst; ii<ofst+n; ii++){
- assert( (p->sharedMask & mask)==0 );
- if( ALWAYS((p->exclMask & (1<<ii))==0) && aLock[ii] ){
- rc = SQLITE_BUSY;
- break;
+ rc = unixShmSystemLock(pDbFd, F_WRLCK, ofst+UNIX_SHM_BASE, n);
+ if( rc==SQLITE_OK ){
+ assert( (p->sharedMask & mask)==0 );
+ p->exclMask |= mask;
+ for(ii=ofst; ii<ofst+n; ii++){
+ aLock[ii] = -1;
+ }
+ }
}
}
- /* Get the exclusive locks at the system level. Then if successful
- ** also update the in-memory values. */
- if( rc==SQLITE_OK ){
- rc = unixShmSystemLock(pDbFd, F_WRLCK, ofst+UNIX_SHM_BASE, n);
- if( rc==SQLITE_OK ){
- assert( (p->sharedMask & mask)==0 );
- p->exclMask |= mask;
- for(ii=ofst; ii<ofst+n; ii++){
- aLock[ii] = -1;
- }
+#ifdef SQLITE_ENABLE_SETLK_TIMEOUT
+ if( bRetry ){
+ int prc;
+ assert( rc==SQLITE_BUSY );
+ prc = pthread_cond_timedwait(
+ &pShmNode->shmcond, &pShmNode->shmmutex, &sTimespec
+ );
+ if( prc==0 ){
+ rc = SQLITE_OK;
+ }else{
+ /* printf("prc=%d (%s)\n", prc, strerror(prc)); */
+ bRetry = 0;
}
}
- }
+ }while( bRetry );
+#endif
+
assert( assertLockingArrayOk(pShmNode) );
- sqlite3_mutex_leave(pShmNode->pShmMutex);
+ LEAVE_SHMNODE_MUTEX(pShmNode);
OSTRACE(("SHM-LOCK shmid-%d, pid-%d got %03x,%03x\n",
p->id, osGetpid(0), p->sharedMask, p->exclMask));
return rc;
/* Remove connection p from the set of connections associated
** with pShmNode */
- sqlite3_mutex_enter(pShmNode->pShmMutex);
+ ENTER_SHMNODE_MUTEX(pShmNode);
for(pp=&pShmNode->pFirst; (*pp)!=p; pp = &(*pp)->pNext){}
*pp = p->pNext;
/* Free the connection p */
sqlite3_free(p);
pDbFd->pShm = 0;
- sqlite3_mutex_leave(pShmNode->pShmMutex);
+ LEAVE_SHMNODE_MUTEX(pShmNode);
/* If pShmNode->nRef has reached 0, then close the underlying
** shared-memory file, too */
print_and_free_err(&err);
}
+typedef struct Walthread6 Walthread6;
+struct Walthread6 {
+ int nInsert;
+ int nBusy;
+ int nMaxFrame;
+};
+
+static int walthread6_walhook(
+ void *pArg, /* Pointer to Walthread6 structure */
+ sqlite3 *db, /* Database handle */
+ const char *zDb, /* "main" */
+ int nFrame /* Frames current in wal file */
+){
+ int rc = SQLITE_OK;
+ Walthread6 *p = (Walthread6*)pArg;
+
+ if( nFrame>p->nMaxFrame ) p->nMaxFrame = nFrame;
+ if( nFrame>1000 ){
+ sqlite3_wal_checkpoint_v2(db, zDb, SQLITE_CHECKPOINT_RESTART, 0, 0);
+ }
+
+ return rc;
+}
+
+static char *walthread6_thread(int iTid, void *pArg){
+ Error err = {0};
+ Sqlite db = {0};
+
+ Walthread6 res;
+ memset(&res, 0, sizeof(res));
+
+ opendb(&err, &db, "test.db", 0);
+ sqlite3_busy_timeout(db.db, 1000);
+ sqlite3_wal_hook(db.db, walthread6_walhook, (void*)&res);
+
+ while( !timetostop(&err) ){
+ int i;
+ execsql(&err, &db, "BEGIN IMMEDIATE");
+ if( err.rc==SQLITE_BUSY ){
+ res.nBusy++;
+ clear_error(&err, SQLITE_BUSY);
+ }else{
+ res.nInsert++;
+ for(i=0; i<20; i++){
+ execsql(&err, &db, "INSERT INTO t1(b) VALUES(random())");
+ }
+ usleep(10*1000);
+ execsql(&err, &db, "COMMIT");
+ }
+ }
+
+ closedb(&err, &db);
+ print_and_free_err(&err);
+ return sqlite3_mprintf(
+ "%d transactions (%d busy), max-wal-size=%d frames",
+ res.nInsert, res.nBusy, res.nMaxFrame
+ );
+}
+
+/*
+**
+*/
+static void walthread6(int nMs){
+ Error err = {0};
+ Sqlite db = {0};
+ Threadset threads = {0};
+
+ int i;
+ int nThread = 2;
+
+ opendb(&err, &db, "test.db", 1);
+ sql_script(&err, &db,
+ "PRAGMA page_size = 1024;"
+ "PRAGMA journal_mode = WAL;"
+ "CREATE TABLE t1(a INTEGER PRIMARY KEY, b TEXT);"
+ );
+ closedb(&err, &db);
+
+ setstoptime(&err, nMs);
+ for(i=0; i<nThread; i++){
+ launch_thread(&err, &threads, walthread6_thread, 0);
+ }
+ join_all_threads(&err, &threads);
+
+ print_and_free_err(&err);
+}
+
#include "tt3_checkpoint.c"
{ walthread3, "walthread3", 20000 },
{ walthread4, "walthread4", 20000 },
{ walthread5, "walthread5", 1000 },
+
+ { walthread6, "walthread6", 5000 },
{ cgt_pager_1, "cgt_pager_1", 0 },
{ dynamic_triggers, "dynamic_triggers", 20000 },