From: drh Date: Mon, 1 Sep 2014 17:36:46 +0000 (+0000) Subject: Add support for using separate worker threads to speed large sorts. X-Git-Tag: version-3.8.7~132 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=0f8f26716cc3d1fa848f798756f6488f4c3760d3;p=thirdparty%2Fsqlite.git Add support for using separate worker threads to speed large sorts. The SQLITE_MAX_WORKER_THREADS and SQLITE_DEFAULT_WORKER_THREADS compile-time options and the SQLITE_LIMIT_WORKER_THREADS argument to sqlite3_limit() and the "PRAGMA threads=N" pragma are added. FossilOrigin-Name: b1c0f0bc1bd8a3477cd7a7ab510f0442ac88b517 --- 0f8f26716cc3d1fa848f798756f6488f4c3760d3 diff --cc manifest index 5851cdd221,6c7fe672c1..cbe9919f0e --- a/manifest +++ b/manifest @@@ -1,9 -1,9 +1,9 @@@ - C Attempt\sto\smake\sthe\sxDelete\smethod\sof\sthe\sunix\sVFS\smore\srobust\son\sVxWorks. - D 2014-09-01T13:37:55.088 -C Disable\sworker\sthreads\swhen\sSQLITE_THREADSAFE=0.\s\sSet\sthe\sdefault\scompile-time\nmaximum\snumber\sof\sworker\sthreads\sto\s8\sand\shonor\sthe\nSQLITE_DEFAULT_WORKER_THREADS\scompile-time\sconstant\s(which\sdefaults\sto\s0). -D 2014-08-29T19:06:07.922 ++C Add\ssupport\sfor\susing\sseparate\sworker\sthreads\sto\sspeed\slarge\ssorts.\nThe\sSQLITE_MAX_WORKER_THREADS\sand\sSQLITE_DEFAULT_WORKER_THREADS\scompile-time\noptions\sand\sthe\sSQLITE_LIMIT_WORKER_THREADS\sargument\sto\s\nsqlite3_limit()\sand\sthe\s"PRAGMA\sthreads=N"\spragma\sare\sadded. ++D 2014-09-01T17:36:46.754 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f - F Makefile.in 5eb79e334a5de69c87740edd56af6527dd219308 + F Makefile.in cf57f673d77606ab0f2d9627ca52a9ba1464146a F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23 - F Makefile.msc 5b04e657cf08a9ac7fc47d876c5c8be962c47d6b + F Makefile.msc e31dee24038965fb6269d6d61073fd6b7e331dec F Makefile.vxworks 034289efa9d591b04b1a73598623119c306cbba0 F README.md 64f270c43c38c46de749e419c22f0ae2f4499fe8 F VERSION 53a0b870e7f16d3b06623c31d233a304c163a6af @@@ -168,10 -168,10 +168,10 @@@ F src/auth.c 523da7fb4979469955d822ff92 F src/backup.c a31809c65623cc41849b94d368917f8bb66e6a7e F src/bitvec.c 19a4ba637bd85f8f63fc8c9bae5ade9fb05ec1cb F src/btmutex.c ec9d3f1295dafeb278c3830211cc5584132468f4 - F src/btree.c 4d0427bab54229030fc5b8577d2e5ffdc8129030 -F src/btree.c a10ceaccf04fbc1670907e1d79013e1a7a89edee ++F src/btree.c 2a483a8045118faa99867a8679da42754b532318 F src/btree.h a79aa6a71e7f1055f01052b7f821bd1c2dce95c8 F src/btreeInt.h cf180d86b2e9e418f638d65baa425c4c69c0e0e3 - F src/build.c 058e3aadb1376521ff291735237edf4c10f438fb + F src/build.c c26b233dcdb1e2c8f468d49236c266f9f3de96d8 F src/callback.c b97d0695ffcf6a8710ee445ffe56ee387d4d8a6f F src/complete.c dc1d136c0feee03c2f7550bafc0d29075e36deac F src/ctime.c 0231df905e2c4abba4483ee18ffc05adc321df2a @@@ -208,8 -208,8 +208,8 @@@ F src/os.c 1b147e4cf7cc39e618115c14a086 F src/os.h 60d419395e32a8029fa380a80a3da2e9030f635e F src/os_common.h 92815ed65f805560b66166e3583470ff94478f04 F src/os_setup.h c9d4553b5aaa6f73391448b265b89bed0b890faa -F src/os_unix.c bd7df3094a60915c148517504c76df4fca24e542 +F src/os_unix.c 8525ca79457c5b4673a5fda2774ee39fe155f40f - F src/os_win.c d067fce558a5032e6e6afe62899e5397bf63cf3e + F src/os_win.c 08ce5616a5755da9400931fb39146e4a97801a2a F src/os_win.h 09e751b20bbc107ffbd46e13555dc73576d88e21 F src/pager.c 3e732d2bbdd8d8d95fed0c5ae7e718d73153c4c5 F src/pager.h ffd5607f7b3e4590b415b007a4382f693334d428 @@@ -223,12 -223,12 +223,12 @@@ F src/printf.c 00986c86ddfffefc2fd3c736 F src/random.c d10c1f85b6709ca97278428fd5db5bbb9c74eece F src/resolve.c 0ea356d32a5e884add23d1b9b4e8736681dd5697 F src/rowset.c a9c9aae3234b44a6d7c6f5a3cadf90dce1e627be - F src/select.c 1c4667571f2c9e339b5a5c5b152a9ea7b0bc4163 + F src/select.c 89e569b263535662f54b537eb9118b2c554ae7aa -F src/shell.c 88378cef39aba4b4a1df82793dcb1daf9276bb81 +F src/shell.c 713cef4d73c05fc8e12f4960072329d767a05d50 - F src/sqlite.h.in ed9d35990c61f0388ca6405706455c4095310553 + F src/sqlite.h.in 74b42237f0d2b010779cc1b1a00190452b31a2ec F src/sqlite3.rc 992c9f5fb8285ae285d6be28240a7e8d3a7f2bad F src/sqlite3ext.h 886f5a34de171002ad46fae8c36a7d8051c190fc - F src/sqliteInt.h d8a9be2aa123a78c90ad4aba09b23e7dd3f8cc9f + F src/sqliteInt.h 6244ee9052752e26d1275ab20c9b774385aa57d2 F src/sqliteLimit.h 164b0e6749d31e0daa1a4589a169d31c0dec7b3d F src/status.c 7ac05a5c7017d0b9f0b4bcd701228b784f987158 F src/table.c 2cd62736f845d82200acfa1287e33feb3c15d62e @@@ -284,14 -285,14 +285,14 @@@ F src/update.c ea336ce7b8b3fc5e316ba8f0 F src/utf.c 77abb5e6d27f3d236e50f7c8fff1d00e15262359 F src/util.c 068dcd26354a3898ccc64ad5c4bdb95a7a15d33a F src/vacuum.c 3728d74919d4fb1356f9e9a13e27773db60b7179 - F src/vdbe.c fd193824d1cc4a71c631eb792ce837aab530210a + F src/vdbe.c 90db7ad740b6d3f7ab446e6244dbc17ce495cca6 F src/vdbe.h c63fad052c9e7388d551e556e119c0bcf6bebdf8 - F src/vdbeInt.h 4653bb420abb7acdc215659cdcedd3a59f336191 + F src/vdbeInt.h cdc8e421f85beb1ac9b4669ec5beadab6faa15e0 F src/vdbeapi.c 09677a53dd8c71bcd670b0bd073bb9aefa02b441 - F src/vdbeaux.c dba006f67c9fd1b1d07ee7fb0fb38aa1905161d1 + F src/vdbeaux.c cef5d34a64ae3a65b56d96d3fd663246ec8e1c36 F src/vdbeblob.c 848238dc73e93e48432991bb5651bf87d865eca4 F src/vdbemem.c 921d5468a68ac06f369810992e84ca22cc730a62 - F src/vdbesort.c f7f5563bf7d4695ca8f3203f3bf9de96d04ed0b3 -F src/vdbesort.c f92628f3d5d4432f751b15a5f39bacc3c6a64a03 ++F src/vdbesort.c 02646a9f86421776ae5d7594f620f9ed669d3698 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767 F src/vtab.c 019dbfd0406a7447c990e1f7bd1dfcdb8895697f F src/wal.c 264df50a1b33124130b23180ded2e2c5663c652a @@@ -1188,7 -1193,7 +1193,8 @@@ F tool/vdbe_profile.tcl 67746953071a9f8 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4 F tool/warnings.sh 0abfd78ceb09b7f7c27c688c8e3fe93268a13b32 F tool/win/sqlite.vsix deb315d026cc8400325c5863eef847784a219a2f - P 839c7996eecd5480152c514555b9aa1121a69ce0 - R aa6b1a81952b002633810b3bb14c199e -P 2ab4b5adc60b52bf2d2b79968d226b8dd7d2ab3b -R 97462c8661f494fef9aea0705c0547b9 ++P b0f6b91f36b503d8ba8d5257bb194f8c1afb4833 33fa0410499900dd8beb44b9a8ffbd9f4b68c8d8 ++R 91d41afdb912bf253c979082c1abeeae ++T +closed 33fa0410499900dd8beb44b9a8ffbd9f4b68c8d8 U drh - Z 8d5a13ba98607c42fffb4d1d92841eac -Z 369e082ce9db2aa4e0cc673a60a91a43 ++Z 8ac04e78c2df72246f6a97d0d813c221 diff --cc manifest.uuid index 9f24642c58,680e153df5..754fc696d7 --- a/manifest.uuid +++ b/manifest.uuid @@@ -1,1 -1,1 +1,1 @@@ - b0f6b91f36b503d8ba8d5257bb194f8c1afb4833 -33fa0410499900dd8beb44b9a8ffbd9f4b68c8d8 ++b1c0f0bc1bd8a3477cd7a7ab510f0442ac88b517 diff --cc src/vdbesort.c index 6a5855f2ef,158fa440fb..7318ea409e --- a/src/vdbesort.c +++ b/src/vdbesort.c @@@ -753,17 -1404,177 +1404,177 @@@ static int vdbeSorterListToPMA(SortSubt SorterRecord *p; SorterRecord *pNext = 0; - fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff); - pSorter->nPMA++; - fileWriterWriteVarint(&writer, pSorter->nInMemory); - for(p=pSorter->pRecord; p; p=pNext){ - pNext = p->pNext; - fileWriterWriteVarint(&writer, p->nVal); - fileWriterWrite(&writer, p->pVal, p->nVal); - sqlite3DbFree(db, p); + vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz, + pTask->file.iEof); + pTask->nPMA++; + vdbePmaWriteVarint(&writer, pList->szPMA); + for(p=pList->pList; p; p=pNext){ + pNext = p->u.pNext; + vdbePmaWriteVarint(&writer, p->nVal); + vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal); + if( pList->aMemory==0 ) sqlite3_free(p); + } + pList->pList = p; + rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof); + } + + vdbeSorterWorkDebug(pTask, "exit"); + assert( rc!=SQLITE_OK || pList->pList==0 ); + assert( rc!=SQLITE_OK || pTask->file.iEof==iSz ); + return rc; + } + + /* + ** Advance the MergeEngine to its next entry. + ** Set *pbEof to true there is no next entry because + ** the MergeEngine has reached the end of all its inputs. + ** + ** Return SQLITE_OK if successful or an error code if an error occurs. + */ + static int vdbeMergeEngineStep( + MergeEngine *pMerger, /* The merge engine to advance to the next row */ + int *pbEof /* Set TRUE at EOF. Set false for more content */ + ){ + int rc; + int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */ + SortSubtask *pTask = pMerger->pTask; + + /* Advance the current PmaReader */ + rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]); + + /* Update contents of aTree[] */ + if( rc==SQLITE_OK ){ + int i; /* Index of aTree[] to recalculate */ + PmaReader *pReadr1; /* First PmaReader to compare */ + PmaReader *pReadr2; /* Second PmaReader to compare */ + u8 *pKey2; /* To pReadr2->aKey, or 0 if record cached */ + + /* Find the first two PmaReaders to compare. The one that was just + ** advanced (iPrev) and the one next to it in the array. */ + pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)]; + pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)]; + pKey2 = pReadr2->aKey; + + for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){ + /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */ + int iRes; + if( pReadr1->pFd==0 ){ + iRes = +1; + }else if( pReadr2->pFd==0 ){ + iRes = -1; + }else{ + iRes = vdbeSorterCompare(pTask, + pReadr1->aKey, pReadr1->nKey, pKey2, pReadr2->nKey + ); + } + + /* If pReadr1 contained the smaller value, set aTree[i] to its index. + ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this + ** case there is no cache of pReadr2 in pTask->pUnpacked, so set + ** pKey2 to point to the record belonging to pReadr2. + ** + ** Alternatively, if pReadr2 contains the smaller of the two values, + ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare() + ** was actually called above, then pTask->pUnpacked now contains + ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent + ** vdbeSorterCompare() from decoding pReadr2 again. + ** + ** If the two values were equal, then the value from the oldest + ** PMA should be considered smaller. The VdbeSorter.aReadr[] array + ** is sorted from oldest to newest, so pReadr1 contains older values + ** than pReadr2 iff (pReadr1aTree[i] = (int)(pReadr1 - pMerger->aReadr); + pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; + pKey2 = pReadr2->aKey; + }else{ + if( pReadr1->pFd ) pKey2 = 0; + pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr); + pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ]; + } + } + *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0); + } + + return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc); + } + + #if SQLITE_MAX_WORKER_THREADS>0 + /* + ** The main routine for background threads that write level-0 PMAs. + */ + static void *vdbeSorterFlushThread(void *pCtx){ + SortSubtask *pTask = (SortSubtask*)pCtx; + int rc; /* Return code */ + assert( pTask->bDone==0 ); + rc = vdbeSorterListToPMA(pTask, &pTask->list); + pTask->bDone = 1; + return SQLITE_INT_TO_PTR(rc); + } + #endif /* SQLITE_MAX_WORKER_THREADS>0 */ + + /* + ** Flush the current contents of VdbeSorter.list to a new PMA, possibly + ** using a background thread. + */ + static int vdbeSorterFlushPMA(VdbeSorter *pSorter){ + #if SQLITE_MAX_WORKER_THREADS==0 + pSorter->bUsePMA = 1; + return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list); + #else + int rc = SQLITE_OK; + int i; + SortSubtask *pTask = 0; /* Thread context used to create new PMA */ + int nWorker = (pSorter->nTask-1); + + /* Set the flag to indicate that at least one PMA has been written. + ** Or will be, anyhow. */ + pSorter->bUsePMA = 1; + + /* Select a sub-task to sort and flush the current list of in-memory + ** records to disk. If the sorter is running in multi-threaded mode, + ** round-robin between the first (pSorter->nTask-1) tasks. Except, if + ** the background thread from a sub-tasks previous turn is still running, + ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy, + ** fall back to using the final sub-task. The first (pSorter->nTask-1) + ** sub-tasks are prefered as they use background threads - the final + ** sub-task uses the main thread. */ + for(i=0; iiPrev + i + 1) % nWorker; + pTask = &pSorter->aTask[iTest]; + if( pTask->bDone ){ + rc = vdbeSorterJoinThread(pTask); + } + if( rc!=SQLITE_OK || pTask->pThread==0 ) break; + } + + if( rc==SQLITE_OK ){ + if( i==nWorker ){ + /* Use the foreground thread for this operation */ + rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list); + }else{ + /* Launch a background thread for this operation */ + u8 *aMem = pTask->list.aMemory; + void *pCtx = (void*)pTask; + + assert( pTask->pThread==0 && pTask->bDone==0 ); + assert( pTask->list.pList==0 ); + assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 ); + - pSorter->iPrev = (pTask - pSorter->aTask); ++ pSorter->iPrev = (u8)(pTask - pSorter->aTask); + pTask->list = pSorter->list; + pSorter->list.pList = 0; + pSorter->list.szPMA = 0; + if( aMem ){ + pSorter->list.aMemory = aMem; + pSorter->nMemory = sqlite3MallocSize(aMem); + }else if( pSorter->list.aMemory ){ + pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory); + if( !pSorter->list.aMemory ) return SQLITE_NOMEM; + } + + rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx); } - pSorter->pRecord = p; - rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff); } return rc; @@@ -822,33 -1790,325 +1790,325 @@@ static int vdbeIncrSwap(IncrMerger *pIn } /* - ** Helper function for sqlite3VdbeSorterRewind(). + ** Allocate and return a new IncrMerger object to read data from pMerger. + ** + ** If an OOM condition is encountered, return NULL. In this case free the + ** pMerger argument before returning. */ - static int vdbeSorterInitMerge( - sqlite3 *db, /* Database handle */ - const VdbeCursor *pCsr, /* Cursor handle for this sorter */ - i64 *pnByte /* Sum of bytes in all opened PMAs */ + static int vdbeIncrMergerNew( + SortSubtask *pTask, /* The thread that will be using the new IncrMerger */ + MergeEngine *pMerger, /* The MergeEngine that the IncrMerger will control */ + IncrMerger **ppOut /* Write the new IncrMerger here */ + ){ + int rc = SQLITE_OK; + IncrMerger *pIncr = *ppOut = (IncrMerger*) + (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr))); + if( pIncr ){ + pIncr->pMerger = pMerger; + pIncr->pTask = pTask; + pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2); + pTask->file2.iEof += pIncr->mxSz; + }else{ + vdbeMergeEngineFree(pMerger); + rc = SQLITE_NOMEM; + } + return rc; + } + + #if SQLITE_MAX_WORKER_THREADS>0 + /* + ** Set the "use-threads" flag on object pIncr. + */ + static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){ + pIncr->bUseThread = 1; + pIncr->pTask->file2.iEof -= pIncr->mxSz; + } + #endif /* SQLITE_MAX_WORKER_THREADS>0 */ + + + + /* + ** Recompute pMerger->aTree[iOut] by comparing the next keys on the + ** two PmaReaders that feed that entry. Neither of the PmaReaders + ** are advanced. This routine merely does the comparison. + */ + static void vdbeMergeEngineCompare( + MergeEngine *pMerger, /* Merge engine containing PmaReaders to compare */ + int iOut /* Store the result in pMerger->aTree[iOut] */ + ){ + int i1; + int i2; + int iRes; + PmaReader *p1; + PmaReader *p2; + + assert( iOutnTree && iOut>0 ); + + if( iOut>=(pMerger->nTree/2) ){ + i1 = (iOut - pMerger->nTree/2) * 2; + i2 = i1 + 1; + }else{ + i1 = pMerger->aTree[iOut*2]; + i2 = pMerger->aTree[iOut*2+1]; + } + + p1 = &pMerger->aReadr[i1]; + p2 = &pMerger->aReadr[i2]; + + if( p1->pFd==0 ){ + iRes = i2; + }else if( p2->pFd==0 ){ + iRes = i1; + }else{ + int res; + assert( pMerger->pTask->pUnpacked!=0 ); /* from vdbeSortSubtaskMain() */ + res = vdbeSorterCompare( + pMerger->pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey + ); + if( res<=0 ){ + iRes = i1; + }else{ + iRes = i2; + } + } + + pMerger->aTree[iOut] = iRes; + } + + /* + ** Allowed values for the eMode parameter to vdbeMergeEngineInit() + ** and vdbePmaReaderIncrMergeInit(). + ** + ** Only INCRINIT_NORMAL is valid in single-threaded builds (when + ** SQLITE_MAX_WORKER_THREADS==0). The other values are only used + ** when there exists one or more separate worker threads. + */ + #define INCRINIT_NORMAL 0 + #define INCRINIT_TASK 1 + #define INCRINIT_ROOT 2 + + /* Forward reference. + ** The vdbeIncrMergeInit() and vdbePmaReaderIncrMergeInit() routines call each + ** other (when building a merge tree). + */ + static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode); + + /* + ** Initialize the MergeEngine object passed as the second argument. Once this + ** function returns, the first key of merged data may be read from the + ** MergeEngine object in the usual fashion. + ** + ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge + ** objects attached to the PmaReader objects that the merger reads from have + ** already been populated, but that they have not yet populated aFile[0] and + ** set the PmaReader objects up to read from it. In this case all that is + ** required is to call vdbePmaReaderNext() on each PmaReader to point it at + ** its first key. + ** + ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use + ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data + ** to pMerger. + ** + ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. + */ + static int vdbeMergeEngineInit( + SortSubtask *pTask, /* Thread that will run pMerger */ + MergeEngine *pMerger, /* MergeEngine to initialize */ + int eMode /* One of the INCRINIT_XXX constants */ ){ - VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; /* Return code */ - int i; /* Used to iterator through aIter[] */ - i64 nByte = 0; /* Total bytes in all opened PMAs */ + int i; /* For looping over PmaReader objects */ + int nTree = pMerger->nTree; + + /* eMode is always INCRINIT_NORMAL in single-threaded mode */ + assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); + + /* Verify that the MergeEngine is assigned to a single thread */ - assert( pMerger->pTask==0 ); // || pMerger->pTask==pTask ); ++ assert( pMerger->pTask==0 ); + pMerger->pTask = pTask; + + for(i=0; i0 && eMode==INCRINIT_ROOT ){ + /* PmaReaders should be normally initialized in order, as if they are + ** reading from the same temp file this makes for more linear file IO. + ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is + ** in use it will block the vdbePmaReaderNext() call while it uses + ** the main thread to fill its buffer. So calling PmaReaderNext() + ** on this PmaReader before any of the multi-threaded PmaReaders takes + ** better advantage of multi-processor hardware. */ + rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]); + }else{ + rc = vdbePmaReaderIncrMergeInit(&pMerger->aReadr[i], INCRINIT_NORMAL); + } + if( rc!=SQLITE_OK ) return rc; + } - /* Initialize the iterators. */ - for(i=0; iaIter[i]; - rc = vdbeSorterIterInit(db, pSorter, pSorter->iReadOff, pIter, &nByte); - pSorter->iReadOff = pIter->iEof; - assert( rc!=SQLITE_OK || pSorter->iReadOff<=pSorter->iWriteOff ); - if( rc!=SQLITE_OK || pSorter->iReadOff>=pSorter->iWriteOff ) break; + for(i=pMerger->nTree-1; i>0; i--){ + vdbeMergeEngineCompare(pMerger, i); } + return pTask->pUnpacked->errCode; + } + + /* + ** Initialize the IncrMerge field of a PmaReader. + ** + ** If the PmaReader passed as the first argument is not an incremental-reader + ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it serves + ** to open and/or initialize the temp file related fields of the IncrMerge + ** object at (pReadr->pIncr). + ** + ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders + ** in the sub-tree headed by pReadr are also initialized. Data is then loaded + ** into the buffers belonging to pReadr and it is set to + ** point to the first key in its range. + ** + ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed + ** to be a multi-threaded PmaReader and this function is being called in a + ** background thread. In this case all PmaReaders in the sub-tree are + ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to + ** pReadr is populated. However, pReadr itself is not set up to point + ** to its first key. A call to vdbePmaReaderNext() is still required to do + ** that. + ** + ** The reason this function does not call vdbePmaReaderNext() immediately + ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has + ** to block on thread (pTask->thread) before accessing aFile[1]. But, since + ** this entire function is being run by thread (pTask->thread), that will + ** lead to the current background thread attempting to join itself. + ** + ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed + ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all + ** child-trees have already been initialized using IncrInit(INCRINIT_TASK). + ** In this case vdbePmaReaderNext() is called on all child PmaReaders and + ** the current PmaReader set to point to the first key in its range. + ** + ** SQLITE_OK is returned if successful, or an SQLite error code otherwise. + */ + static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){ + int rc = SQLITE_OK; + IncrMerger *pIncr = pReadr->pIncr; - /* Initialize the aTree[] array. */ - for(i=pSorter->nTree-1; rc==SQLITE_OK && i>0; i--){ - rc = vdbeSorterDoCompare(pCsr, i); + /* eMode is always INCRINIT_NORMAL in single-threaded mode */ + assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL ); + + if( pIncr ){ + SortSubtask *pTask = pIncr->pTask; + sqlite3 *db = pTask->pSorter->db; + + rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode); + + /* Set up the required files for pIncr. A multi-theaded IncrMerge object + ** requires two temp files to itself, whereas a single-threaded object + ** only requires a region of pTask->file2. */ + if( rc==SQLITE_OK ){ + int mxSz = pIncr->mxSz; + #if SQLITE_MAX_WORKER_THREADS>0 + if( pIncr->bUseThread ){ + rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd); + if( rc==SQLITE_OK ){ + rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd); + } + }else + #endif + /*if( !pIncr->bUseThread )*/{ + if( pTask->file2.pFd==0 ){ + assert( pTask->file2.iEof>0 ); + rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd); + pTask->file2.iEof = 0; + } + if( rc==SQLITE_OK ){ + pIncr->aFile[1].pFd = pTask->file2.pFd; + pIncr->iStartOff = pTask->file2.iEof; + pTask->file2.iEof += mxSz; + } + } + } + + #if SQLITE_MAX_WORKER_THREADS>0 + if( rc==SQLITE_OK && pIncr->bUseThread ){ + /* Use the current thread to populate aFile[1], even though this + ** PmaReader is multi-threaded. The reason being that this function + ** is already running in background thread pIncr->pTask->thread. */ + assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK ); + rc = vdbeIncrPopulate(pIncr); + } + #endif + + if( rc==SQLITE_OK + && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) + ){ + rc = vdbePmaReaderNext(pReadr); + } } + return rc; + } - *pnByte = nByte; + #if SQLITE_MAX_WORKER_THREADS>0 + /* + ** The main routine for vdbePmaReaderIncrMergeInit() operations run in + ** background threads. + */ + static void *vdbePmaReaderBgInit(void *pCtx){ + PmaReader *pReader = (PmaReader*)pCtx; + void *pRet = SQLITE_INT_TO_PTR( + vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK) + ); + pReader->pIncr->pTask->bDone = 1; + return pRet; + } + + /* + ** Use a background thread to invoke vdbePmaReaderIncrMergeInit(INCRINIT_TASK) + ** on the the PmaReader object passed as the first argument. + ** + ** This call will initialize the various fields of the pReadr->pIncr + ** structure and, if it is a multi-threaded IncrMerger, launch a + ** background thread to populate aFile[1]. + */ + static int vdbePmaReaderBgIncrInit(PmaReader *pReadr){ + void *pCtx = (void*)pReadr; + return vdbeSorterCreateThread(pReadr->pIncr->pTask, vdbePmaReaderBgInit, pCtx); + } + #endif + + /* + ** Allocate a new MergeEngine object to merge the contents of nPMA level-0 + ** PMAs from pTask->file. If no error occurs, set *ppOut to point to + ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut + ** to NULL and return an SQLite error code. + ** + ** When this function is called, *piOffset is set to the offset of the + ** first PMA to read from pTask->file. Assuming no error occurs, it is + ** set to the offset immediately following the last byte of the last + ** PMA before returning. If an error does occur, then the final value of + ** *piOffset is undefined. + */ + static int vdbeMergeEngineLevel0( + SortSubtask *pTask, /* Sorter task to read from */ + int nPMA, /* Number of PMAs to read */ + i64 *piOffset, /* IN/OUT: Readr offset in pTask->file */ + MergeEngine **ppOut /* OUT: New merge-engine */ + ){ + MergeEngine *pNew; /* Merge engine to return */ + i64 iOff = *piOffset; + int i; + int rc = SQLITE_OK; + + *ppOut = pNew = vdbeMergeEngineNew(nPMA); + if( pNew==0 ) rc = SQLITE_NOMEM; + + for(i=0; iaReadr[i]; + rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy); + iOff = pReadr->iEof; + } + + if( rc!=SQLITE_OK ){ + vdbeMergeEngineFree(pNew); + *ppOut = 0; + } + *piOffset = iOff; return rc; }