- C Attempt\sto\smake\sthe\sxDelete\smethod\sof\sthe\sunix\sVFS\smore\srobust\son\sVxWorks.
- D 2014-09-01T13:37:55.088
-C Disable\sworker\sthreads\swhen\sSQLITE_THREADSAFE=0.\s\sSet\sthe\sdefault\scompile-time\nmaximum\snumber\sof\sworker\sthreads\sto\s8\sand\shonor\sthe\nSQLITE_DEFAULT_WORKER_THREADS\scompile-time\sconstant\s(which\sdefaults\sto\s0).
-D 2014-08-29T19:06:07.922
++C Add\ssupport\sfor\susing\sseparate\sworker\sthreads\sto\sspeed\slarge\ssorts.\nThe\sSQLITE_MAX_WORKER_THREADS\sand\sSQLITE_DEFAULT_WORKER_THREADS\scompile-time\noptions\sand\sthe\sSQLITE_LIMIT_WORKER_THREADS\sargument\sto\s\nsqlite3_limit()\sand\sthe\s"PRAGMA\sthreads=N"\spragma\sare\sadded.
++D 2014-09-01T17:36:46.754
F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
- F Makefile.in 5eb79e334a5de69c87740edd56af6527dd219308
+ F Makefile.in cf57f673d77606ab0f2d9627ca52a9ba1464146a
F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
- F Makefile.msc 5b04e657cf08a9ac7fc47d876c5c8be962c47d6b
+ F Makefile.msc e31dee24038965fb6269d6d61073fd6b7e331dec
F Makefile.vxworks 034289efa9d591b04b1a73598623119c306cbba0
F README.md 64f270c43c38c46de749e419c22f0ae2f4499fe8
F VERSION 53a0b870e7f16d3b06623c31d233a304c163a6af
F src/backup.c a31809c65623cc41849b94d368917f8bb66e6a7e
F src/bitvec.c 19a4ba637bd85f8f63fc8c9bae5ade9fb05ec1cb
F src/btmutex.c ec9d3f1295dafeb278c3830211cc5584132468f4
- F src/btree.c 4d0427bab54229030fc5b8577d2e5ffdc8129030
-F src/btree.c a10ceaccf04fbc1670907e1d79013e1a7a89edee
++F src/btree.c 2a483a8045118faa99867a8679da42754b532318
F src/btree.h a79aa6a71e7f1055f01052b7f821bd1c2dce95c8
F src/btreeInt.h cf180d86b2e9e418f638d65baa425c4c69c0e0e3
- F src/build.c 058e3aadb1376521ff291735237edf4c10f438fb
+ F src/build.c c26b233dcdb1e2c8f468d49236c266f9f3de96d8
F src/callback.c b97d0695ffcf6a8710ee445ffe56ee387d4d8a6f
F src/complete.c dc1d136c0feee03c2f7550bafc0d29075e36deac
F src/ctime.c 0231df905e2c4abba4483ee18ffc05adc321df2a
F src/os.h 60d419395e32a8029fa380a80a3da2e9030f635e
F src/os_common.h 92815ed65f805560b66166e3583470ff94478f04
F src/os_setup.h c9d4553b5aaa6f73391448b265b89bed0b890faa
-F src/os_unix.c bd7df3094a60915c148517504c76df4fca24e542
+F src/os_unix.c 8525ca79457c5b4673a5fda2774ee39fe155f40f
- F src/os_win.c d067fce558a5032e6e6afe62899e5397bf63cf3e
+ F src/os_win.c 08ce5616a5755da9400931fb39146e4a97801a2a
F src/os_win.h 09e751b20bbc107ffbd46e13555dc73576d88e21
F src/pager.c 3e732d2bbdd8d8d95fed0c5ae7e718d73153c4c5
F src/pager.h ffd5607f7b3e4590b415b007a4382f693334d428
F src/random.c d10c1f85b6709ca97278428fd5db5bbb9c74eece
F src/resolve.c 0ea356d32a5e884add23d1b9b4e8736681dd5697
F src/rowset.c a9c9aae3234b44a6d7c6f5a3cadf90dce1e627be
- F src/select.c 1c4667571f2c9e339b5a5c5b152a9ea7b0bc4163
+ F src/select.c 89e569b263535662f54b537eb9118b2c554ae7aa
-F src/shell.c 88378cef39aba4b4a1df82793dcb1daf9276bb81
+F src/shell.c 713cef4d73c05fc8e12f4960072329d767a05d50
- F src/sqlite.h.in ed9d35990c61f0388ca6405706455c4095310553
+ F src/sqlite.h.in 74b42237f0d2b010779cc1b1a00190452b31a2ec
F src/sqlite3.rc 992c9f5fb8285ae285d6be28240a7e8d3a7f2bad
F src/sqlite3ext.h 886f5a34de171002ad46fae8c36a7d8051c190fc
- F src/sqliteInt.h d8a9be2aa123a78c90ad4aba09b23e7dd3f8cc9f
+ F src/sqliteInt.h 6244ee9052752e26d1275ab20c9b774385aa57d2
F src/sqliteLimit.h 164b0e6749d31e0daa1a4589a169d31c0dec7b3d
F src/status.c 7ac05a5c7017d0b9f0b4bcd701228b784f987158
F src/table.c 2cd62736f845d82200acfa1287e33feb3c15d62e
F src/utf.c 77abb5e6d27f3d236e50f7c8fff1d00e15262359
F src/util.c 068dcd26354a3898ccc64ad5c4bdb95a7a15d33a
F src/vacuum.c 3728d74919d4fb1356f9e9a13e27773db60b7179
- F src/vdbe.c fd193824d1cc4a71c631eb792ce837aab530210a
+ F src/vdbe.c 90db7ad740b6d3f7ab446e6244dbc17ce495cca6
F src/vdbe.h c63fad052c9e7388d551e556e119c0bcf6bebdf8
- F src/vdbeInt.h 4653bb420abb7acdc215659cdcedd3a59f336191
+ F src/vdbeInt.h cdc8e421f85beb1ac9b4669ec5beadab6faa15e0
F src/vdbeapi.c 09677a53dd8c71bcd670b0bd073bb9aefa02b441
- F src/vdbeaux.c dba006f67c9fd1b1d07ee7fb0fb38aa1905161d1
+ F src/vdbeaux.c cef5d34a64ae3a65b56d96d3fd663246ec8e1c36
F src/vdbeblob.c 848238dc73e93e48432991bb5651bf87d865eca4
F src/vdbemem.c 921d5468a68ac06f369810992e84ca22cc730a62
- F src/vdbesort.c f7f5563bf7d4695ca8f3203f3bf9de96d04ed0b3
-F src/vdbesort.c f92628f3d5d4432f751b15a5f39bacc3c6a64a03
++F src/vdbesort.c 02646a9f86421776ae5d7594f620f9ed669d3698
F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767
F src/vtab.c 019dbfd0406a7447c990e1f7bd1dfcdb8895697f
F src/wal.c 264df50a1b33124130b23180ded2e2c5663c652a
F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
F tool/warnings.sh 0abfd78ceb09b7f7c27c688c8e3fe93268a13b32
F tool/win/sqlite.vsix deb315d026cc8400325c5863eef847784a219a2f
- P 839c7996eecd5480152c514555b9aa1121a69ce0
- R aa6b1a81952b002633810b3bb14c199e
-P 2ab4b5adc60b52bf2d2b79968d226b8dd7d2ab3b
-R 97462c8661f494fef9aea0705c0547b9
++P b0f6b91f36b503d8ba8d5257bb194f8c1afb4833 33fa0410499900dd8beb44b9a8ffbd9f4b68c8d8
++R 91d41afdb912bf253c979082c1abeeae
++T +closed 33fa0410499900dd8beb44b9a8ffbd9f4b68c8d8
U drh
- Z 8d5a13ba98607c42fffb4d1d92841eac
-Z 369e082ce9db2aa4e0cc673a60a91a43
++Z 8ac04e78c2df72246f6a97d0d813c221
SorterRecord *p;
SorterRecord *pNext = 0;
- fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff);
- pSorter->nPMA++;
- fileWriterWriteVarint(&writer, pSorter->nInMemory);
- for(p=pSorter->pRecord; p; p=pNext){
- pNext = p->pNext;
- fileWriterWriteVarint(&writer, p->nVal);
- fileWriterWrite(&writer, p->pVal, p->nVal);
- sqlite3DbFree(db, p);
+ vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
+ pTask->file.iEof);
+ pTask->nPMA++;
+ vdbePmaWriteVarint(&writer, pList->szPMA);
+ for(p=pList->pList; p; p=pNext){
+ pNext = p->u.pNext;
+ vdbePmaWriteVarint(&writer, p->nVal);
+ vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
+ if( pList->aMemory==0 ) sqlite3_free(p);
+ }
+ pList->pList = p;
+ rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
+ }
+
+ vdbeSorterWorkDebug(pTask, "exit");
+ assert( rc!=SQLITE_OK || pList->pList==0 );
+ assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
+ return rc;
+ }
+
+ /*
+ ** Advance the MergeEngine to its next entry.
+ ** Set *pbEof to true there is no next entry because
+ ** the MergeEngine has reached the end of all its inputs.
+ **
+ ** Return SQLITE_OK if successful or an error code if an error occurs.
+ */
+ static int vdbeMergeEngineStep(
+ MergeEngine *pMerger, /* The merge engine to advance to the next row */
+ int *pbEof /* Set TRUE at EOF. Set false for more content */
+ ){
+ int rc;
+ int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */
+ SortSubtask *pTask = pMerger->pTask;
+
+ /* Advance the current PmaReader */
+ rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
+
+ /* Update contents of aTree[] */
+ if( rc==SQLITE_OK ){
+ int i; /* Index of aTree[] to recalculate */
+ PmaReader *pReadr1; /* First PmaReader to compare */
+ PmaReader *pReadr2; /* Second PmaReader to compare */
+ u8 *pKey2; /* To pReadr2->aKey, or 0 if record cached */
+
+ /* Find the first two PmaReaders to compare. The one that was just
+ ** advanced (iPrev) and the one next to it in the array. */
+ pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)];
+ pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)];
+ pKey2 = pReadr2->aKey;
+
+ for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
+ /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
+ int iRes;
+ if( pReadr1->pFd==0 ){
+ iRes = +1;
+ }else if( pReadr2->pFd==0 ){
+ iRes = -1;
+ }else{
+ iRes = vdbeSorterCompare(pTask,
+ pReadr1->aKey, pReadr1->nKey, pKey2, pReadr2->nKey
+ );
+ }
+
+ /* If pReadr1 contained the smaller value, set aTree[i] to its index.
+ ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
+ ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
+ ** pKey2 to point to the record belonging to pReadr2.
+ **
+ ** Alternatively, if pReadr2 contains the smaller of the two values,
+ ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
+ ** was actually called above, then pTask->pUnpacked now contains
+ ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
+ ** vdbeSorterCompare() from decoding pReadr2 again.
+ **
+ ** If the two values were equal, then the value from the oldest
+ ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
+ ** is sorted from oldest to newest, so pReadr1 contains older values
+ ** than pReadr2 iff (pReadr1<pReadr2). */
+ if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){
+ pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr);
+ pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
+ pKey2 = pReadr2->aKey;
+ }else{
+ if( pReadr1->pFd ) pKey2 = 0;
+ pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
+ pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
+ }
+ }
+ *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0);
+ }
+
+ return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
+ }
+
+ #if SQLITE_MAX_WORKER_THREADS>0
+ /*
+ ** The main routine for background threads that write level-0 PMAs.
+ */
+ static void *vdbeSorterFlushThread(void *pCtx){
+ SortSubtask *pTask = (SortSubtask*)pCtx;
+ int rc; /* Return code */
+ assert( pTask->bDone==0 );
+ rc = vdbeSorterListToPMA(pTask, &pTask->list);
+ pTask->bDone = 1;
+ return SQLITE_INT_TO_PTR(rc);
+ }
+ #endif /* SQLITE_MAX_WORKER_THREADS>0 */
+
+ /*
+ ** Flush the current contents of VdbeSorter.list to a new PMA, possibly
+ ** using a background thread.
+ */
+ static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
+ #if SQLITE_MAX_WORKER_THREADS==0
+ pSorter->bUsePMA = 1;
+ return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
+ #else
+ int rc = SQLITE_OK;
+ int i;
+ SortSubtask *pTask = 0; /* Thread context used to create new PMA */
+ int nWorker = (pSorter->nTask-1);
+
+ /* Set the flag to indicate that at least one PMA has been written.
+ ** Or will be, anyhow. */
+ pSorter->bUsePMA = 1;
+
+ /* Select a sub-task to sort and flush the current list of in-memory
+ ** records to disk. If the sorter is running in multi-threaded mode,
+ ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
+ ** the background thread from a sub-tasks previous turn is still running,
+ ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
+ ** fall back to using the final sub-task. The first (pSorter->nTask-1)
+ ** sub-tasks are prefered as they use background threads - the final
+ ** sub-task uses the main thread. */
+ for(i=0; i<nWorker; i++){
+ int iTest = (pSorter->iPrev + i + 1) % nWorker;
+ pTask = &pSorter->aTask[iTest];
+ if( pTask->bDone ){
+ rc = vdbeSorterJoinThread(pTask);
+ }
+ if( rc!=SQLITE_OK || pTask->pThread==0 ) break;
+ }
+
+ if( rc==SQLITE_OK ){
+ if( i==nWorker ){
+ /* Use the foreground thread for this operation */
+ rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
+ }else{
+ /* Launch a background thread for this operation */
+ u8 *aMem = pTask->list.aMemory;
+ void *pCtx = (void*)pTask;
+
+ assert( pTask->pThread==0 && pTask->bDone==0 );
+ assert( pTask->list.pList==0 );
+ assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
+
- pSorter->iPrev = (pTask - pSorter->aTask);
++ pSorter->iPrev = (u8)(pTask - pSorter->aTask);
+ pTask->list = pSorter->list;
+ pSorter->list.pList = 0;
+ pSorter->list.szPMA = 0;
+ if( aMem ){
+ pSorter->list.aMemory = aMem;
+ pSorter->nMemory = sqlite3MallocSize(aMem);
+ }else if( pSorter->list.aMemory ){
+ pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
+ if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
+ }
+
+ rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
}
- pSorter->pRecord = p;
- rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff);
}
return rc;
}
/*
- ** Helper function for sqlite3VdbeSorterRewind().
+ ** Allocate and return a new IncrMerger object to read data from pMerger.
+ **
+ ** If an OOM condition is encountered, return NULL. In this case free the
+ ** pMerger argument before returning.
*/
- static int vdbeSorterInitMerge(
- sqlite3 *db, /* Database handle */
- const VdbeCursor *pCsr, /* Cursor handle for this sorter */
- i64 *pnByte /* Sum of bytes in all opened PMAs */
+ static int vdbeIncrMergerNew(
+ SortSubtask *pTask, /* The thread that will be using the new IncrMerger */
+ MergeEngine *pMerger, /* The MergeEngine that the IncrMerger will control */
+ IncrMerger **ppOut /* Write the new IncrMerger here */
+ ){
+ int rc = SQLITE_OK;
+ IncrMerger *pIncr = *ppOut = (IncrMerger*)
+ (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr)));
+ if( pIncr ){
+ pIncr->pMerger = pMerger;
+ pIncr->pTask = pTask;
+ pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
+ pTask->file2.iEof += pIncr->mxSz;
+ }else{
+ vdbeMergeEngineFree(pMerger);
+ rc = SQLITE_NOMEM;
+ }
+ return rc;
+ }
+
+ #if SQLITE_MAX_WORKER_THREADS>0
+ /*
+ ** Set the "use-threads" flag on object pIncr.
+ */
+ static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){
+ pIncr->bUseThread = 1;
+ pIncr->pTask->file2.iEof -= pIncr->mxSz;
+ }
+ #endif /* SQLITE_MAX_WORKER_THREADS>0 */
+
+
+
+ /*
+ ** Recompute pMerger->aTree[iOut] by comparing the next keys on the
+ ** two PmaReaders that feed that entry. Neither of the PmaReaders
+ ** are advanced. This routine merely does the comparison.
+ */
+ static void vdbeMergeEngineCompare(
+ MergeEngine *pMerger, /* Merge engine containing PmaReaders to compare */
+ int iOut /* Store the result in pMerger->aTree[iOut] */
+ ){
+ int i1;
+ int i2;
+ int iRes;
+ PmaReader *p1;
+ PmaReader *p2;
+
+ assert( iOut<pMerger->nTree && iOut>0 );
+
+ if( iOut>=(pMerger->nTree/2) ){
+ i1 = (iOut - pMerger->nTree/2) * 2;
+ i2 = i1 + 1;
+ }else{
+ i1 = pMerger->aTree[iOut*2];
+ i2 = pMerger->aTree[iOut*2+1];
+ }
+
+ p1 = &pMerger->aReadr[i1];
+ p2 = &pMerger->aReadr[i2];
+
+ if( p1->pFd==0 ){
+ iRes = i2;
+ }else if( p2->pFd==0 ){
+ iRes = i1;
+ }else{
+ int res;
+ assert( pMerger->pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */
+ res = vdbeSorterCompare(
+ pMerger->pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey
+ );
+ if( res<=0 ){
+ iRes = i1;
+ }else{
+ iRes = i2;
+ }
+ }
+
+ pMerger->aTree[iOut] = iRes;
+ }
+
+ /*
+ ** Allowed values for the eMode parameter to vdbeMergeEngineInit()
+ ** and vdbePmaReaderIncrMergeInit().
+ **
+ ** Only INCRINIT_NORMAL is valid in single-threaded builds (when
+ ** SQLITE_MAX_WORKER_THREADS==0). The other values are only used
+ ** when there exists one or more separate worker threads.
+ */
+ #define INCRINIT_NORMAL 0
+ #define INCRINIT_TASK 1
+ #define INCRINIT_ROOT 2
+
+ /* Forward reference.
+ ** The vdbeIncrMergeInit() and vdbePmaReaderIncrMergeInit() routines call each
+ ** other (when building a merge tree).
+ */
+ static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode);
+
+ /*
+ ** Initialize the MergeEngine object passed as the second argument. Once this
+ ** function returns, the first key of merged data may be read from the
+ ** MergeEngine object in the usual fashion.
+ **
+ ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
+ ** objects attached to the PmaReader objects that the merger reads from have
+ ** already been populated, but that they have not yet populated aFile[0] and
+ ** set the PmaReader objects up to read from it. In this case all that is
+ ** required is to call vdbePmaReaderNext() on each PmaReader to point it at
+ ** its first key.
+ **
+ ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use
+ ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data
+ ** to pMerger.
+ **
+ ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
+ */
+ static int vdbeMergeEngineInit(
+ SortSubtask *pTask, /* Thread that will run pMerger */
+ MergeEngine *pMerger, /* MergeEngine to initialize */
+ int eMode /* One of the INCRINIT_XXX constants */
){
- VdbeSorter *pSorter = pCsr->pSorter;
int rc = SQLITE_OK; /* Return code */
- int i; /* Used to iterator through aIter[] */
- i64 nByte = 0; /* Total bytes in all opened PMAs */
+ int i; /* For looping over PmaReader objects */
+ int nTree = pMerger->nTree;
+
+ /* eMode is always INCRINIT_NORMAL in single-threaded mode */
+ assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
+
+ /* Verify that the MergeEngine is assigned to a single thread */
- assert( pMerger->pTask==0 ); // || pMerger->pTask==pTask );
++ assert( pMerger->pTask==0 );
+ pMerger->pTask = pTask;
+
+ for(i=0; i<nTree; i++){
+ if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){
+ /* PmaReaders should be normally initialized in order, as if they are
+ ** reading from the same temp file this makes for more linear file IO.
+ ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
+ ** in use it will block the vdbePmaReaderNext() call while it uses
+ ** the main thread to fill its buffer. So calling PmaReaderNext()
+ ** on this PmaReader before any of the multi-threaded PmaReaders takes
+ ** better advantage of multi-processor hardware. */
+ rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
+ }else{
+ rc = vdbePmaReaderIncrMergeInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
+ }
+ if( rc!=SQLITE_OK ) return rc;
+ }
- /* Initialize the iterators. */
- for(i=0; i<SORTER_MAX_MERGE_COUNT; i++){
- VdbeSorterIter *pIter = &pSorter->aIter[i];
- rc = vdbeSorterIterInit(db, pSorter, pSorter->iReadOff, pIter, &nByte);
- pSorter->iReadOff = pIter->iEof;
- assert( rc!=SQLITE_OK || pSorter->iReadOff<=pSorter->iWriteOff );
- if( rc!=SQLITE_OK || pSorter->iReadOff>=pSorter->iWriteOff ) break;
+ for(i=pMerger->nTree-1; i>0; i--){
+ vdbeMergeEngineCompare(pMerger, i);
}
+ return pTask->pUnpacked->errCode;
+ }
+
+ /*
+ ** Initialize the IncrMerge field of a PmaReader.
+ **
+ ** If the PmaReader passed as the first argument is not an incremental-reader
+ ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it serves
+ ** to open and/or initialize the temp file related fields of the IncrMerge
+ ** object at (pReadr->pIncr).
+ **
+ ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
+ ** in the sub-tree headed by pReadr are also initialized. Data is then loaded
+ ** into the buffers belonging to pReadr and it is set to
+ ** point to the first key in its range.
+ **
+ ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
+ ** to be a multi-threaded PmaReader and this function is being called in a
+ ** background thread. In this case all PmaReaders in the sub-tree are
+ ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
+ ** pReadr is populated. However, pReadr itself is not set up to point
+ ** to its first key. A call to vdbePmaReaderNext() is still required to do
+ ** that.
+ **
+ ** The reason this function does not call vdbePmaReaderNext() immediately
+ ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
+ ** to block on thread (pTask->thread) before accessing aFile[1]. But, since
+ ** this entire function is being run by thread (pTask->thread), that will
+ ** lead to the current background thread attempting to join itself.
+ **
+ ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
+ ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
+ ** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
+ ** In this case vdbePmaReaderNext() is called on all child PmaReaders and
+ ** the current PmaReader set to point to the first key in its range.
+ **
+ ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
+ */
+ static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
+ int rc = SQLITE_OK;
+ IncrMerger *pIncr = pReadr->pIncr;
- /* Initialize the aTree[] array. */
- for(i=pSorter->nTree-1; rc==SQLITE_OK && i>0; i--){
- rc = vdbeSorterDoCompare(pCsr, i);
+ /* eMode is always INCRINIT_NORMAL in single-threaded mode */
+ assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
+
+ if( pIncr ){
+ SortSubtask *pTask = pIncr->pTask;
+ sqlite3 *db = pTask->pSorter->db;
+
+ rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
+
+ /* Set up the required files for pIncr. A multi-theaded IncrMerge object
+ ** requires two temp files to itself, whereas a single-threaded object
+ ** only requires a region of pTask->file2. */
+ if( rc==SQLITE_OK ){
+ int mxSz = pIncr->mxSz;
+ #if SQLITE_MAX_WORKER_THREADS>0
+ if( pIncr->bUseThread ){
+ rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
+ }
+ }else
+ #endif
+ /*if( !pIncr->bUseThread )*/{
+ if( pTask->file2.pFd==0 ){
+ assert( pTask->file2.iEof>0 );
+ rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
+ pTask->file2.iEof = 0;
+ }
+ if( rc==SQLITE_OK ){
+ pIncr->aFile[1].pFd = pTask->file2.pFd;
+ pIncr->iStartOff = pTask->file2.iEof;
+ pTask->file2.iEof += mxSz;
+ }
+ }
+ }
+
+ #if SQLITE_MAX_WORKER_THREADS>0
+ if( rc==SQLITE_OK && pIncr->bUseThread ){
+ /* Use the current thread to populate aFile[1], even though this
+ ** PmaReader is multi-threaded. The reason being that this function
+ ** is already running in background thread pIncr->pTask->thread. */
+ assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
+ rc = vdbeIncrPopulate(pIncr);
+ }
+ #endif
+
+ if( rc==SQLITE_OK
+ && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK)
+ ){
+ rc = vdbePmaReaderNext(pReadr);
+ }
}
+ return rc;
+ }
- *pnByte = nByte;
+ #if SQLITE_MAX_WORKER_THREADS>0
+ /*
+ ** The main routine for vdbePmaReaderIncrMergeInit() operations run in
+ ** background threads.
+ */
+ static void *vdbePmaReaderBgInit(void *pCtx){
+ PmaReader *pReader = (PmaReader*)pCtx;
+ void *pRet = SQLITE_INT_TO_PTR(
+ vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
+ );
+ pReader->pIncr->pTask->bDone = 1;
+ return pRet;
+ }
+
+ /*
+ ** Use a background thread to invoke vdbePmaReaderIncrMergeInit(INCRINIT_TASK)
+ ** on the the PmaReader object passed as the first argument.
+ **
+ ** This call will initialize the various fields of the pReadr->pIncr
+ ** structure and, if it is a multi-threaded IncrMerger, launch a
+ ** background thread to populate aFile[1].
+ */
+ static int vdbePmaReaderBgIncrInit(PmaReader *pReadr){
+ void *pCtx = (void*)pReadr;
+ return vdbeSorterCreateThread(pReadr->pIncr->pTask, vdbePmaReaderBgInit, pCtx);
+ }
+ #endif
+
+ /*
+ ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
+ ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
+ ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
+ ** to NULL and return an SQLite error code.
+ **
+ ** When this function is called, *piOffset is set to the offset of the
+ ** first PMA to read from pTask->file. Assuming no error occurs, it is
+ ** set to the offset immediately following the last byte of the last
+ ** PMA before returning. If an error does occur, then the final value of
+ ** *piOffset is undefined.
+ */
+ static int vdbeMergeEngineLevel0(
+ SortSubtask *pTask, /* Sorter task to read from */
+ int nPMA, /* Number of PMAs to read */
+ i64 *piOffset, /* IN/OUT: Readr offset in pTask->file */
+ MergeEngine **ppOut /* OUT: New merge-engine */
+ ){
+ MergeEngine *pNew; /* Merge engine to return */
+ i64 iOff = *piOffset;
+ int i;
+ int rc = SQLITE_OK;
+
+ *ppOut = pNew = vdbeMergeEngineNew(nPMA);
+ if( pNew==0 ) rc = SQLITE_NOMEM;
+
+ for(i=0; i<nPMA && rc==SQLITE_OK; i++){
+ i64 nDummy;
+ PmaReader *pReadr = &pNew->aReadr[i];
+ rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy);
+ iOff = pReadr->iEof;
+ }
+
+ if( rc!=SQLITE_OK ){
+ vdbeMergeEngineFree(pNew);
+ *ppOut = 0;
+ }
+ *piOffset = iOff;
return rc;
}