From: dan Date: Mon, 17 Mar 2014 15:43:05 +0000 (+0000) Subject: Add an experimental multi-threaded capability to vdbesorter.c. X-Git-Tag: version-3.8.7~132^2~99 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=f87684189db4ed779b4d22c16cadc7bea89b814e;p=thirdparty%2Fsqlite.git Add an experimental multi-threaded capability to vdbesorter.c. FossilOrigin-Name: ff0b5c851ba7d04d1836d7c6a3222713e7d8d357 --- diff --git a/manifest b/manifest index 4f0c8a0be4..2de90048c0 100644 --- a/manifest +++ b/manifest @@ -1,11 +1,11 @@ -C Merge\slatest\strunk\schanges\sinto\sthis\sbranch. -D 2014-03-13T15:41:09.146 +C Add\san\sexperimental\smulti-threaded\scapability\sto\svdbesorter.c. +D 2014-03-17T15:43:05.543 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23 F Makefile.msc 153eb9b9725bc7b8e4dbe963219298e0c4a644b0 F Makefile.vxworks db21ed42a01d5740e656b16f92cb5d8d5e5dd315 -F README.md 64f270c43c38c46de749e419c22f0ae2f4499fe8 w README +F README.md 64f270c43c38c46de749e419c22f0ae2f4499fe8 F VERSION c3b0d47c3c5cf25c5bd4ff9e6f3af2f9d7934ea6 F aclocal.m4 a5c22d164aff7ed549d53a90fa56d56955281f50 F addopcodes.awk 9eb448a552d5c0185cf62c463f9c173cedae3811 @@ -108,17 +108,17 @@ F ext/icu/icu.c d415ccf984defeb9df2c0e1afcfaa2f6dc05eacb F ext/icu/sqliteicu.h 728867a802baa5a96de7495e9689a8e01715ef37 F ext/misc/amatch.c 678056a4bfcd83c4e82dea81d37543cd1d6dbee1 F ext/misc/closure.c 636024302cde41b2bf0c542f81c40c624cfb7012 -F ext/misc/fuzzer.c 136533c53cfce0957f0b48fa11dba27e21c5c01d w src/test_fuzzer.c +F ext/misc/fuzzer.c 136533c53cfce0957f0b48fa11dba27e21c5c01d F ext/misc/ieee754.c b0362167289170627659e84173f5d2e8fee8566e F ext/misc/nextchar.c 35c8b8baacb96d92abbb34a83a997b797075b342 F ext/misc/percentile.c bcbee3c061b884eccb80e21651daaae8e1e43c63 F ext/misc/regexp.c af92cdaa5058fcec1451e49becc7ba44dba023dc F ext/misc/rot13.c 1ac6f95f99b575907b9b09c81a349114cf9be45a -F ext/misc/spellfix.c 93f3961074cebe63c31fcefe62ca2a032ee8dfed w src/test_spellfix.c +F ext/misc/spellfix.c 93f3961074cebe63c31fcefe62ca2a032ee8dfed F ext/misc/totype.c 4a167594e791abeed95e0a8db028822b5e8fe512 F ext/misc/vfslog.c fe40fab5c077a40477f7e5eba994309ecac6cc95 F ext/misc/vtshim.c babb0dc2bf116029e3e7c9a618b8a1377045303e -F ext/misc/wholenumber.c 784b12543d60702ebdd47da936e278aa03076212 w src/test_wholenumber.c +F ext/misc/wholenumber.c 784b12543d60702ebdd47da936e278aa03076212 F ext/rtree/README 6315c0d73ebf0ec40dedb5aa0e942bc8b54e3761 F ext/rtree/rtree.c 2d9f95da404d850474e628c720c5ce15d29b47de F ext/rtree/rtree.h 834dbcb82dc85b2481cde6a07cdadfddc99e9b9e @@ -272,7 +272,7 @@ F src/test_thread.c 1e133a40b50e9c035b00174035b846e7eef481cb F src/test_vfs.c e72f555ef7a59080f898fcf1a233deb9eb704ea9 F src/test_vfstrace.c 3a0ab304682fecbceb689e7d9b904211fde11d78 F src/test_wsd.c 41cadfd9d97fe8e3e4e44f61a4a8ccd6f7ca8fe9 -F src/threads.c 2b918d1f4f0b0831e8f41c49bcaa097f01490120 +F src/threads.c cde9d885fd562b5427f89a42a8829085f88b17df F src/tokenize.c 6da2de6e12218ccb0aea5184b56727d011f4bee7 F src/trigger.c 66f3470b03b52b395e839155786966e3e037fddb F src/update.c 5b3e74a03b3811e586b4f2b4cbd7c49f01c93115 @@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4 F src/vdbeaux.c e45e3f9daf38c5be3fd39e9aacc1c9066af57a06 F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447 -F src/vdbesort.c 46801acb342e5e4c07ba1777fe58880c143abb59 +F src/vdbesort.c 1d973fcd0e00c77836e9d505e7c45df02601ffc2 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8 @@ -1157,7 +1157,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff -P c92b0fe1371e7c20a5fbdf5fa96e30da14c40880 6504aa47a8ebb13827be017c4cb4b2dc3c4c55f4 -R c6806fc7f1eff740ed4af1293ffc55d0 +P d17231b63d48c1f9c4dee109c90cec112e2f0fd4 +R 3c79acd4eecb6926efd889b756a27898 U dan -Z b3b09b0f0acb184b570781ded35d947c +Z 226b72b7faaae8e6d7b6586f32d5c5a0 diff --git a/manifest.uuid b/manifest.uuid index c9d158cd10..bf397f3d56 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -d17231b63d48c1f9c4dee109c90cec112e2f0fd4 \ No newline at end of file +ff0b5c851ba7d04d1836d7c6a3222713e7d8d357 \ No newline at end of file diff --git a/src/threads.c b/src/threads.c index 33781a7aac..7cc964250d 100644 --- a/src/threads.c +++ b/src/threads.c @@ -47,7 +47,6 @@ int sqlite3ThreadCreate( void *pIn /* Argument passed into xTask() */ ){ SQLiteThread *p; - int rc; assert( ppThread!=0 ); assert( xTask!=0 ); diff --git a/src/vdbesort.c b/src/vdbesort.c index b9ed97e8b3..e84a33fccc 100644 --- a/src/vdbesort.c +++ b/src/vdbesort.c @@ -20,9 +20,83 @@ typedef struct VdbeSorterIter VdbeSorterIter; +typedef struct SorterThread SorterThread; typedef struct SorterRecord SorterRecord; +typedef struct SorterMerger SorterMerger; typedef struct FileWriter FileWriter; + +/* +** Maximum number of threads to use. Setting this value to 1 forces all +** operations to be single-threaded. +*/ +#ifndef SQLITE_MAX_SORTER_THREAD +# define SQLITE_MAX_SORTER_THREAD 1 +#endif + +/* +** Candidate values for SorterThread.eWork +*/ +#define SORTER_THREAD_SORT 1 +#define SORTER_THREAD_TO_PMA 2 +#define SORTER_THREAD_CONS 3 + +/* +** Much of the work performed in this module to sort the list of records is +** broken down into smaller units that may be peformed in parallel. In order +** to perform such a unit of work, an instance of the following structure +** is configured and passed to vdbeSorterThreadMain() - either directly by +** the main thread or via a background thread. +** +** Exactly SQLITE_MAX_SORTER_THREAD instances of this structure are allocated +** as part of each VdbeSorter object. Instances are never allocated any other +** way. +** +** When a background thread is launched to perform work, SorterThread.bDone +** is set to 0 and the SorterThread.pThread variable set to point to the +** thread handle. SorterThread.bDone is set to 1 (to indicate to the main +** thread that joining SorterThread.pThread will not block) before the thread +** exits. SorterThread.pThread and bDone are always cleared after the +** background thread has been joined. +** +** One object (specifically, VdbeSorter.aThread[SQLITE_MAX_SORTER_THREAD-1]) +** is reserved for the foreground thread. +** +** The nature of the work performed is determined by SorterThread.eWork, +** as follows: +** +** SORTER_THREAD_SORT: +** Sort the linked list of records at SorterThread.pList. +** +** SORTER_THREAD_TO_PMA: +** Sort the linked list of records at SorterThread.pList, and write +** the results to a new PMA in temp file SorterThread.pTemp1. Open +** the temp file if it is not already open. +** +** SORTER_THREAD_CONS: +** Merge existing PMAs until SorterThread.nConsolidate or fewer +** remain in temp file SorterThread.pTemp1. +*/ +struct SorterThread { + SQLiteThread *pThread; /* Thread handle, or NULL */ + int bDone; /* Set to true by pThread when finished */ + + sqlite3_vfs *pVfs; /* VFS used to open temporary files */ + KeyInfo *pKeyInfo; /* How to compare records */ + UnpackedRecord *pUnpacked; /* Space to unpack a record */ + int pgsz; /* Main database page size */ + + u8 eWork; /* One of the SORTER_THREAD_* constants */ + int nConsolidate; /* For THREAD_CONS, max final PMAs */ + SorterRecord *pList; /* List of records for pThread to sort */ + int nInMemory; /* Expected size of PMA based on pList */ + + int nPMA; /* Number of PMAs currently in pTemp1 */ + i64 iTemp1Off; /* Offset to write to in pTemp1 */ + sqlite3_file *pTemp1; /* File to write PMAs to, or NULL */ +}; + + /* ** NOTES ON DATA STRUCTURE USED FOR N-WAY MERGES: ** @@ -92,19 +166,24 @@ typedef struct FileWriter FileWriter; ** key comparison operations are required, where N is the number of segments ** being merged (rounded up to the next power of 2). */ +struct SorterMerger { + int nTree; /* Used size of aTree/aIter (power of 2) */ + int *aTree; /* Current state of incremental merge */ + VdbeSorterIter *aIter; /* Array of iterators to merge data from */ +}; + +/* +** Main sorter structure. A single instance of this is allocated for each +** sorter cursor created by the VDBE. +*/ struct VdbeSorter { - i64 iWriteOff; /* Current write offset within file pTemp1 */ - i64 iReadOff; /* Current read offset within file pTemp1 */ int nInMemory; /* Current size of pRecord list as PMA */ - int nTree; /* Used size of aTree/aIter (power of 2) */ - int nPMA; /* Number of PMAs stored in pTemp1 */ int mnPmaSize; /* Minimum PMA size, in bytes */ int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ - VdbeSorterIter *aIter; /* Array of iterators to merge */ - int *aTree; /* Current state of incremental merge */ - sqlite3_file *pTemp1; /* PMA file 1 */ + int bUsePMA; /* True if one or more PMAs created */ SorterRecord *pRecord; /* Head of in-memory record list */ - UnpackedRecord *pUnpacked; /* Used to unpack keys */ + SorterMerger *pMerger; /* For final merge of PMAs (by caller) */ + SorterThread aThread[SQLITE_MAX_SORTER_THREAD]; }; /* @@ -150,7 +229,8 @@ struct SorterRecord { SorterRecord *pNext; }; -/* Minimum allowable value for the VdbeSorter.nWorking variable */ +/* The minimum PMA size is set to this value multiplied by the database +** page size in bytes. */ #define SORTER_MIN_WORKING 10 /* Maximum number of segments to merge in a single pass. */ @@ -160,9 +240,9 @@ struct SorterRecord { ** Free all memory belonging to the VdbeSorterIter object passed as the second ** argument. All structure fields are set to zero before returning. */ -static void vdbeSorterIterZero(sqlite3 *db, VdbeSorterIter *pIter){ - sqlite3DbFree(db, pIter->aAlloc); - sqlite3DbFree(db, pIter->aBuffer); +static void vdbeSorterIterZero(VdbeSorterIter *pIter){ + sqlite3_free(pIter->aAlloc); + sqlite3_free(pIter->aBuffer); memset(pIter, 0, sizeof(VdbeSorterIter)); } @@ -176,7 +256,6 @@ static void vdbeSorterIterZero(sqlite3 *db, VdbeSorterIter *pIter){ ** next call to this function. */ static int vdbeSorterIterRead( - sqlite3 *db, /* Database handle (for malloc) */ VdbeSorterIter *p, /* Iterator */ int nByte, /* Bytes of data to read */ u8 **ppOut /* OUT: Pointer to buffer containing data */ @@ -222,11 +301,13 @@ static int vdbeSorterIterRead( /* Extend the p->aAlloc[] allocation if required. */ if( p->nAllocnAlloc*2; while( nByte>nNew ) nNew = nNew*2; - p->aAlloc = sqlite3DbReallocOrFree(db, p->aAlloc, nNew); - if( !p->aAlloc ) return SQLITE_NOMEM; + aNew = sqlite3Realloc(p->aAlloc, nNew); + if( !aNew ) return SQLITE_NOMEM; p->nAlloc = nNew; + p->aAlloc = aNew; } /* Copy as much data as is available in the buffer into the start of @@ -244,7 +325,7 @@ static int vdbeSorterIterRead( nCopy = nRem; if( nRem>p->nBuffer ) nCopy = p->nBuffer; - rc = vdbeSorterIterRead(db, p, nCopy, &aNext); + rc = vdbeSorterIterRead(p, nCopy, &aNext); if( rc!=SQLITE_OK ) return rc; assert( aNext!=p->aAlloc ); memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy); @@ -261,7 +342,7 @@ static int vdbeSorterIterRead( ** Read a varint from the stream of data accessed by p. Set *pnOut to ** the value read. */ -static int vdbeSorterIterVarint(sqlite3 *db, VdbeSorterIter *p, u64 *pnOut){ +static int vdbeSorterIterVarint(VdbeSorterIter *p, u64 *pnOut){ int iBuf; iBuf = p->iReadOff % p->nBuffer; @@ -271,7 +352,7 @@ static int vdbeSorterIterVarint(sqlite3 *db, VdbeSorterIter *p, u64 *pnOut){ u8 aVarint[16], *a; int i = 0, rc; do{ - rc = vdbeSorterIterRead(db, p, 1, &a); + rc = vdbeSorterIterRead(p, 1, &a); if( rc ) return rc; aVarint[(i++)&0xf] = a[0]; }while( (a[0]&0x80)!=0 ); @@ -286,23 +367,20 @@ static int vdbeSorterIterVarint(sqlite3 *db, VdbeSorterIter *p, u64 *pnOut){ ** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if ** no error occurs, or an SQLite error code if one does. */ -static int vdbeSorterIterNext( - sqlite3 *db, /* Database handle (for sqlite3DbMalloc() ) */ - VdbeSorterIter *pIter /* Iterator to advance */ -){ +static int vdbeSorterIterNext(VdbeSorterIter *pIter){ int rc; /* Return Code */ u64 nRec = 0; /* Size of record in bytes */ if( pIter->iReadOff>=pIter->iEof ){ /* This is an EOF condition */ - vdbeSorterIterZero(db, pIter); + vdbeSorterIterZero(pIter); return SQLITE_OK; } - rc = vdbeSorterIterVarint(db, pIter, &nRec); + rc = vdbeSorterIterVarint(pIter, &nRec); if( rc==SQLITE_OK ){ pIter->nKey = (int)nRec; - rc = vdbeSorterIterRead(db, pIter, (int)nRec, &pIter->aKey); + rc = vdbeSorterIterRead(pIter, (int)nRec, &pIter->aKey); } return rc; @@ -315,26 +393,23 @@ static int vdbeSorterIterNext( ** PMA is empty). */ static int vdbeSorterIterInit( - sqlite3 *db, /* Database handle */ - const VdbeSorter *pSorter, /* Sorter object */ - i64 iStart, /* Start offset in pFile */ + SorterThread *pThread, /* Thread context */ + i64 iStart, /* Start offset in pThread->pTemp1 */ VdbeSorterIter *pIter, /* Iterator to populate */ i64 *pnByte /* IN/OUT: Increment this value by PMA size */ ){ int rc = SQLITE_OK; - int nBuf; + int nBuf = pThread->pgsz; - nBuf = sqlite3BtreeGetPageSize(db->aDb[0].pBt); - - assert( pSorter->iWriteOff>iStart ); + assert( pThread->iTemp1Off>iStart ); assert( pIter->aAlloc==0 ); assert( pIter->aBuffer==0 ); - pIter->pFile = pSorter->pTemp1; + pIter->pFile = pThread->pTemp1; pIter->iReadOff = iStart; pIter->nAlloc = 128; - pIter->aAlloc = (u8 *)sqlite3DbMallocRaw(db, pIter->nAlloc); + pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc); pIter->nBuffer = nBuf; - pIter->aBuffer = (u8 *)sqlite3DbMallocRaw(db, nBuf); + pIter->aBuffer = (u8*)sqlite3Malloc(nBuf); if( !pIter->aBuffer ){ rc = SQLITE_NOMEM; @@ -344,26 +419,26 @@ static int vdbeSorterIterInit( iBuf = iStart % nBuf; if( iBuf ){ int nRead = nBuf - iBuf; - if( (iStart + nRead) > pSorter->iWriteOff ){ - nRead = (int)(pSorter->iWriteOff - iStart); + if( (iStart + nRead) > pThread->iTemp1Off ){ + nRead = (int)(pThread->iTemp1Off - iStart); } rc = sqlite3OsRead( - pSorter->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart + pThread->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart ); assert( rc!=SQLITE_IOERR_SHORT_READ ); } if( rc==SQLITE_OK ){ - u64 nByte; /* Size of PMA in bytes */ - pIter->iEof = pSorter->iWriteOff; - rc = vdbeSorterIterVarint(db, pIter, &nByte); + u64 nByte; + pIter->iEof = pThread->iTemp1Off; + rc = vdbeSorterIterVarint(pIter, &nByte); pIter->iEof = pIter->iReadOff + nByte; *pnByte += nByte; } } if( rc==SQLITE_OK ){ - rc = vdbeSorterIterNext(db, pIter); + rc = vdbeSorterIterNext(pIter); } return rc; } @@ -385,15 +460,14 @@ static int vdbeSorterIterInit( ** has been allocated and contains an unpacked record that is used as key2. */ static void vdbeSorterCompare( - const VdbeCursor *pCsr, /* Cursor object (for pKeyInfo) */ + SorterThread *pThread, /* Thread context (for pKeyInfo) */ int nIgnore, /* Ignore the last nIgnore fields */ const void *pKey1, int nKey1, /* Left side of comparison */ const void *pKey2, int nKey2, /* Right side of comparison */ int *pRes /* OUT: Result of comparison */ ){ - KeyInfo *pKeyInfo = pCsr->pKeyInfo; - VdbeSorter *pSorter = pCsr->pSorter; - UnpackedRecord *r2 = pSorter->pUnpacked; + KeyInfo *pKeyInfo = pThread->pKeyInfo; + UnpackedRecord *r2 = pThread->pUnpacked; int i; if( pKey2 ){ @@ -420,26 +494,29 @@ static void vdbeSorterCompare( ** multiple b-tree segments. Parameter iOut is the index of the aTree[] ** value to recalculate. */ -static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){ - VdbeSorter *pSorter = pCsr->pSorter; +static int vdbeSorterDoCompare( + SorterThread *pThread, + SorterMerger *pMerger, + int iOut +){ int i1; int i2; int iRes; VdbeSorterIter *p1; VdbeSorterIter *p2; - assert( iOutnTree && iOut>0 ); + assert( iOutnTree && iOut>0 ); - if( iOut>=(pSorter->nTree/2) ){ - i1 = (iOut - pSorter->nTree/2) * 2; + if( iOut>=(pMerger->nTree/2) ){ + i1 = (iOut - pMerger->nTree/2) * 2; i2 = i1 + 1; }else{ - i1 = pSorter->aTree[iOut*2]; - i2 = pSorter->aTree[iOut*2+1]; + i1 = pMerger->aTree[iOut*2]; + i2 = pMerger->aTree[iOut*2+1]; } - p1 = &pSorter->aIter[i1]; - p2 = &pSorter->aIter[i2]; + p1 = &pMerger->aIter[i1]; + p2 = &pMerger->aIter[i2]; if( p1->pFile==0 ){ iRes = i2; @@ -447,9 +524,9 @@ static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){ iRes = i1; }else{ int res; - assert( pCsr->pSorter->pUnpacked!=0 ); /* allocated in vdbeSorterMerge() */ + assert( pThread->pUnpacked!=0 ); /* allocated in vdbeSorterMerge() */ vdbeSorterCompare( - pCsr, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res + pThread, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res ); if( res<=0 ){ iRes = i1; @@ -458,7 +535,7 @@ static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){ } } - pSorter->aTree[iOut] = iRes; + pMerger->aTree[iOut] = iRes; return SQLITE_OK; } @@ -467,22 +544,32 @@ static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){ */ int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){ int pgsz; /* Page size of main database */ + int i; /* Used to iterate through aThread[] */ int mxCache; /* Cache size */ VdbeSorter *pSorter; /* The new sorter */ - char *d; /* Dummy */ + KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */ + int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */ assert( pCsr->pKeyInfo && pCsr->pBt==0 ); - pCsr->pSorter = pSorter = sqlite3DbMallocZero(db, sizeof(VdbeSorter)); + szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*); + pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sizeof(VdbeSorter)+szKeyInfo); + pCsr->pSorter = pSorter; if( pSorter==0 ){ return SQLITE_NOMEM; } - - pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pCsr->pKeyInfo, 0, 0, &d); - if( pSorter->pUnpacked==0 ) return SQLITE_NOMEM; - assert( pSorter->pUnpacked==(UnpackedRecord *)d ); + pKeyInfo = (KeyInfo*)&pSorter[1]; + memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); + pKeyInfo->db = 0; + pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); + + for(i=0; iaThread[i]; + pThread->pKeyInfo = pKeyInfo; + pThread->pVfs = db->pVfs; + pThread->pgsz = pgsz; + } if( !sqlite3TempInMemory(db) ){ - pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz; mxCache = db->aDb[0].pSchema->cache_size; if( mxCachepUnpacked); + vdbeSorterRecordFree(0, pThread->pList); + if( pThread->pTemp1 ){ + sqlite3OsCloseFree(pThread->pTemp1); + } + memset(pThread, 0, sizeof(SorterThread)); +} + +/* +** Join all threads. +*/ +static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ + int rc = rcin; + int i; + for(i=0; iaThread[i]; + if( pThread->pThread ){ + void *pRet; + int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet); + pThread->pThread = 0; + pThread->bDone = 0; + if( rc==SQLITE_OK ) rc = rc2; + if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + } + } + return rc; +} + +/* +** Allocate a new SorterMerger object with space for nIter iterators. +*/ +static SorterMerger *vdbeSorterMergerNew(int nIter){ + int N = 2; /* Smallest power of two >= nIter */ + int nByte; /* Total bytes of space to allocate */ + SorterMerger *pNew; /* Pointer to allocated object to return */ + + assert( nIter<=SORTER_MAX_MERGE_COUNT ); + while( NnTree = N; + pNew->aIter = (VdbeSorterIter*)&pNew[1]; + pNew->aTree = (int*)&pNew->aIter[N]; + } + + return pNew; +} + +/* +** Free the SorterMerger object passed as the only argument. +*/ +static void vdbeSorterMergerFree(SorterMerger *pMerger){ + if( pMerger ){ + int i; + for(i=0; inTree; i++){ + vdbeSorterIterZero(&pMerger->aIter[i]); + } + sqlite3_free(pMerger); + } +} + /* ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines. */ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ VdbeSorter *pSorter = pCsr->pSorter; if( pSorter ){ - if( pSorter->aIter ){ - int i; - for(i=0; inTree; i++){ - vdbeSorterIterZero(db, &pSorter->aIter[i]); - } - sqlite3DbFree(db, pSorter->aIter); - } - if( pSorter->pTemp1 ){ - sqlite3OsCloseFree(pSorter->pTemp1); + int i; + vdbeSorterJoinAll(pSorter, SQLITE_OK); + for(i=0; iaThread[i]; + vdbeSorterThreadCleanup(db, pThread); } - vdbeSorterRecordFree(db, pSorter->pRecord); - sqlite3DbFree(db, pSorter->pUnpacked); + + vdbeSorterRecordFree(0, pSorter->pRecord); + vdbeSorterMergerFree(pSorter->pMerger); sqlite3DbFree(db, pSorter); pCsr->pSorter = 0; } @@ -532,9 +684,9 @@ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ ** set *ppFile to point to the malloc'd file-handle and return SQLITE_OK. ** Otherwise, set *ppFile to 0 and return an SQLite error code. */ -static int vdbeSorterOpenTempFile(sqlite3 *db, sqlite3_file **ppFile){ +static int vdbeSorterOpenTempFile(sqlite3_vfs *pVfs, sqlite3_file **ppFile){ int dummy; - return sqlite3OsOpenMalloc(db->pVfs, 0, ppFile, + return sqlite3OsOpenMalloc(pVfs, 0, ppFile, SQLITE_OPEN_TEMP_JOURNAL | SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &dummy @@ -546,7 +698,7 @@ static int vdbeSorterOpenTempFile(sqlite3 *db, sqlite3_file **ppFile){ ** Set *ppOut to the head of the new list. */ static void vdbeSorterMerge( - const VdbeCursor *pCsr, /* For pKeyInfo */ + SorterThread *pThread, /* Calling thread context */ SorterRecord *p1, /* First list to merge */ SorterRecord *p2, /* Second list to merge */ SorterRecord **ppOut /* OUT: Head of merged list */ @@ -557,7 +709,7 @@ static void vdbeSorterMerge( while( p1 && p2 ){ int res; - vdbeSorterCompare(pCsr, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res); + vdbeSorterCompare(pThread, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res); if( res<=0 ){ *pp = p1; pp = &p1->pNext; @@ -576,27 +728,26 @@ static void vdbeSorterMerge( } /* -** Sort the linked list of records headed at pCsr->pRecord. Return SQLITE_OK -** if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if an error -** occurs. +** Sort the linked list of records headed at pThread->pList. Return +** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if +** an error occurs. */ -static int vdbeSorterSort(const VdbeCursor *pCsr){ +static int vdbeSorterSort(SorterThread *pThread){ int i; SorterRecord **aSlot; SorterRecord *p; - VdbeSorter *pSorter = pCsr->pSorter; aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *)); if( !aSlot ){ return SQLITE_NOMEM; } - p = pSorter->pRecord; + p = pThread->pList; while( p ){ SorterRecord *pNext = p->pNext; p->pNext = 0; for(i=0; aSlot[i]; i++){ - vdbeSorterMerge(pCsr, p, aSlot[i], &p); + vdbeSorterMerge(pThread, p, aSlot[i], &p); aSlot[i] = 0; } aSlot[i] = p; @@ -605,9 +756,9 @@ static int vdbeSorterSort(const VdbeCursor *pCsr){ p = 0; for(i=0; i<64; i++){ - vdbeSorterMerge(pCsr, p, aSlot[i], &p); + vdbeSorterMerge(pThread, p, aSlot[i], &p); } - pSorter->pRecord = p; + pThread->pList = p; sqlite3_free(aSlot); return SQLITE_OK; @@ -617,15 +768,13 @@ static int vdbeSorterSort(const VdbeCursor *pCsr){ ** Initialize a file-writer object. */ static void fileWriterInit( - sqlite3 *db, /* Database (for malloc) */ sqlite3_file *pFile, /* File to write to */ FileWriter *p, /* Object to populate */ + int nBuf, /* Buffer size */ i64 iStart /* Offset of pFile to begin writing at */ ){ - int nBuf = sqlite3BtreeGetPageSize(db->aDb[0].pBt); - memset(p, 0, sizeof(FileWriter)); - p->aBuffer = (u8 *)sqlite3DbMallocRaw(db, nBuf); + p->aBuffer = (u8*)sqlite3Malloc(nBuf); if( !p->aBuffer ){ p->eFWErr = SQLITE_NOMEM; }else{ @@ -673,7 +822,7 @@ static void fileWriterWrite(FileWriter *p, u8 *pData, int nData){ ** Before returning, set *piEof to the offset immediately following the ** last byte written to the file. */ -static int fileWriterFinish(sqlite3 *db, FileWriter *p, i64 *piEof){ +static int fileWriterFinish(FileWriter *p, i64 *piEof){ int rc; if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){ p->eFWErr = sqlite3OsWrite(p->pFile, @@ -682,7 +831,7 @@ static int fileWriterFinish(sqlite3 *db, FileWriter *p, i64 *piEof){ ); } *piEof = (p->iWriteOff + p->iBufEnd); - sqlite3DbFree(db, p->aBuffer); + sqlite3_free(p->aBuffer); rc = p->eFWErr; memset(p, 0, sizeof(FileWriter)); return rc; @@ -712,43 +861,234 @@ static void fileWriterWriteVarint(FileWriter *p, u64 iVal){ ** Each record consists of a varint followed by a blob of data (the ** key). The varint is the number of bytes in the blob of data. */ -static int vdbeSorterListToPMA(sqlite3 *db, const VdbeCursor *pCsr){ +static int vdbeSorterListToPMA(SorterThread *pThread){ int rc = SQLITE_OK; /* Return code */ - VdbeSorter *pSorter = pCsr->pSorter; - FileWriter writer; + FileWriter writer; /* Object used to write to the file */ memset(&writer, 0, sizeof(FileWriter)); - - if( pSorter->nInMemory==0 ){ - assert( pSorter->pRecord==0 ); - return rc; - } - - rc = vdbeSorterSort(pCsr); + assert( pThread->nInMemory>0 ); /* If the first temporary PMA file has not been opened, open it now. */ - if( rc==SQLITE_OK && pSorter->pTemp1==0 ){ - rc = vdbeSorterOpenTempFile(db, &pSorter->pTemp1); - assert( rc!=SQLITE_OK || pSorter->pTemp1 ); - assert( pSorter->iWriteOff==0 ); - assert( pSorter->nPMA==0 ); + if( pThread->pTemp1==0 ){ + rc = vdbeSorterOpenTempFile(pThread->pVfs, &pThread->pTemp1); + assert( rc!=SQLITE_OK || pThread->pTemp1 ); + assert( pThread->iTemp1Off==0 ); + assert( pThread->nPMA==0 ); } if( rc==SQLITE_OK ){ SorterRecord *p; SorterRecord *pNext = 0; - fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff); - pSorter->nPMA++; - fileWriterWriteVarint(&writer, pSorter->nInMemory); - for(p=pSorter->pRecord; p; p=pNext){ + fileWriterInit(pThread->pTemp1, &writer, pThread->pgsz, pThread->iTemp1Off); + pThread->nPMA++; + fileWriterWriteVarint(&writer, pThread->nInMemory); + for(p=pThread->pList; p; p=pNext){ pNext = p->pNext; fileWriterWriteVarint(&writer, p->nVal); fileWriterWrite(&writer, p->pVal, p->nVal); - sqlite3DbFree(db, p); + sqlite3_free(p); + } + pThread->pList = p; + rc = fileWriterFinish(&writer, &pThread->iTemp1Off); + } + + return rc; +} + +/* +** Advance the SorterMerger iterator passed as the second argument to +** the next entry. Set *pbEof to true if this means the iterator has +** reached EOF. +** +** Return SQLITE_OK if successful or an error code if an error occurs. +*/ +static int vdbeSorterNext( + SorterThread *pThread, + SorterMerger *pMerger, + int *pbEof +){ + int rc; + int iPrev = pMerger->aTree[1];/* Index of iterator to advance */ + int i; /* Index of aTree[] to recalculate */ + + /* Advance the current iterator */ + rc = vdbeSorterIterNext(&pMerger->aIter[iPrev]); + + /* Update contents of aTree[] */ + for(i=(pMerger->nTree+iPrev)/2; rc==SQLITE_OK && i>0; i=i/2){ + rc = vdbeSorterDoCompare(pThread, pMerger, i); + } + + *pbEof = (pMerger->aIter[pMerger->aTree[1]].pFile==0); + return rc; +} + +/* +** The main routine for sorter-thread operations. +*/ +static void *vdbeSorterThreadMain(void *pCtx){ + int rc = SQLITE_OK; + SorterThread *pThread = (SorterThread*)pCtx; + + assert( pThread->eWork==SORTER_THREAD_SORT + || pThread->eWork==SORTER_THREAD_TO_PMA + || pThread->eWork==SORTER_THREAD_CONS + ); + assert( pThread->bDone==0 ); + + if( pThread->pUnpacked==0 ){ + char *pFree; + pThread->pUnpacked = sqlite3VdbeAllocUnpackedRecord( + pThread->pKeyInfo, 0, 0, &pFree + ); + assert( pThread->pUnpacked==(UnpackedRecord*)pFree ); + if( pFree==0 ){ + rc = SQLITE_NOMEM; + goto thread_out; + } + } + + if( pThread->eWork==SORTER_THREAD_CONS ){ + assert( pThread->pList==0 ); + while( pThread->nPMA>pThread->nConsolidate && rc==SQLITE_OK ){ + int nIter = MIN(pThread->nPMA, SORTER_MAX_MERGE_COUNT); + sqlite3_file *pTemp2 = 0; /* Second temp file to use */ + SorterMerger *pMerger; /* Object for reading/merging PMA data */ + i64 iReadOff = 0; /* Offset in pTemp1 to read from */ + i64 iWriteOff = 0; /* Offset in pTemp2 to write to */ + int i; + + /* Allocate a merger object to merge PMAs together. */ + pMerger = vdbeSorterMergerNew(nIter); + if( pMerger==0 ){ + rc = SQLITE_NOMEM; + break; + } + + /* Open a second temp file to write merged data to */ + rc = vdbeSorterOpenTempFile(pThread->pVfs, &pTemp2); + if( rc!=SQLITE_OK ){ + vdbeSorterMergerFree(pMerger); + break; + } + + /* This loop runs once for each output PMA. Each output PMA is made + ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */ + for(i=0; inPMA; i+=SORTER_MAX_MERGE_COUNT){ + FileWriter writer; /* Object for writing data to pTemp2 */ + i64 nOut = 0; /* Bytes of data in output PMA */ + int bEof = 0; + int rc2; + + /* Configure the merger object to read and merge data from the next + ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs, + ** if that is fewer). */ + int iIter; + for(iIter=0; iIteraIter[iIter]; + rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nOut); + iReadOff = pIter->iEof; + if( iReadOff>=pThread->iTemp1Off || rc!=SQLITE_OK ) break; + } + for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){ + rc = vdbeSorterDoCompare(pThread, pMerger, iIter); + } + + fileWriterInit(pTemp2, &writer, pThread->pgsz, iWriteOff); + fileWriterWriteVarint(&writer, nOut); + while( rc==SQLITE_OK && bEof==0 ){ + VdbeSorterIter *pIter = &pMerger->aIter[ pMerger->aTree[1] ]; + assert( pIter->pFile!=0 ); /* pIter is not at EOF */ + fileWriterWriteVarint(&writer, pIter->nKey); + fileWriterWrite(&writer, pIter->aKey, pIter->nKey); + rc = vdbeSorterNext(pThread, pMerger, &bEof); + } + rc2 = fileWriterFinish(&writer, &iWriteOff); + if( rc==SQLITE_OK ) rc = rc2; + } + + vdbeSorterMergerFree(pMerger); + sqlite3OsCloseFree(pThread->pTemp1); + pThread->pTemp1 = pTemp2; + pThread->nPMA = (i / SORTER_MAX_MERGE_COUNT); + pThread->iTemp1Off = iWriteOff; + } + }else{ + /* Sort the pThread->pList list */ + rc = vdbeSorterSort(pThread); + + /* If required, write the list out to a PMA. */ + if( rc==SQLITE_OK && pThread->eWork==SORTER_THREAD_TO_PMA ){ +#ifdef SQLITE_DEBUG + i64 nExpect = pThread->nInMemory + + sqlite3VarintLen(pThread->nInMemory) + + pThread->iTemp1Off; +#endif + rc = vdbeSorterListToPMA(pThread); + assert( rc!=SQLITE_OK || (nExpect==pThread->iTemp1Off) ); + } + } + + thread_out: + pThread->bDone = 1; + return SQLITE_INT_TO_PTR(rc); +} + +/* +** Run the activity scheduled by the object passed as the only argument +** in the current thread. +*/ +static int vdbeSorterRunThread(SorterThread *pThread){ + int rc = SQLITE_PTR_TO_INT( vdbeSorterThreadMain((void*)pThread) ); + assert( pThread->bDone ); + pThread->bDone = 0; + return rc; +} + +/* +** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly +** using a background thread. +** +** If argument bFg is non-zero, the operation always uses the calling thread. +*/ +static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ + VdbeSorter *pSorter = pCsr->pSorter; + int rc = SQLITE_OK; + int i; + SorterThread *pThread; /* Thread context used to create new PMA */ + + pSorter->bUsePMA = 1; + for(i=0; ALWAYS( iaThread[i]; + if( pThread->bDone ){ + void *pRet; + assert( pThread->pThread ); + rc = sqlite3ThreadJoin(pThread->pThread, &pRet); + pThread->pThread = 0; + pThread->bDone = 0; + if( rc==SQLITE_OK ){ + rc = SQLITE_PTR_TO_INT(pRet); + } + } + if( pThread->pThread==0 ) break; + } + + if( rc==SQLITE_OK ){ + assert( pThread->pThread==0 && pThread->bDone==0 ); + pThread->eWork = SORTER_THREAD_TO_PMA; + pThread->pList = pSorter->pRecord; + pThread->nInMemory = pSorter->nInMemory; + pSorter->nInMemory = 0; + pSorter->pRecord = 0; + + if( bFg || i<(SQLITE_MAX_SORTER_THREAD-1) ){ + void *pCtx = (void*)pThread; + rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSorterThreadMain, pCtx); + }else{ + /* Use the foreground thread for this operation */ + rc = vdbeSorterRunThread(pThread); } - pSorter->pRecord = p; - rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff); } return rc; @@ -769,7 +1109,7 @@ int sqlite3VdbeSorterWrite( assert( pSorter ); pSorter->nInMemory += sqlite3VarintLen(pVal->n) + pVal->n; - pNew = (SorterRecord *)sqlite3DbMallocRaw(db, pVal->n + sizeof(SorterRecord)); + pNew = (SorterRecord *)sqlite3Malloc(pVal->n + sizeof(SorterRecord)); if( pNew==0 ){ rc = SQLITE_NOMEM; }else{ @@ -793,48 +1133,22 @@ int sqlite3VdbeSorterWrite( (pSorter->nInMemory>pSorter->mxPmaSize) || (pSorter->nInMemory>pSorter->mnPmaSize && sqlite3HeapNearlyFull()) )){ -#ifdef SQLITE_DEBUG - i64 nExpect = pSorter->iWriteOff - + sqlite3VarintLen(pSorter->nInMemory) - + pSorter->nInMemory; -#endif - rc = vdbeSorterListToPMA(db, pCsr); - pSorter->nInMemory = 0; - assert( rc!=SQLITE_OK || (nExpect==pSorter->iWriteOff) ); + rc = vdbeSorterFlushPMA(db, pCsr, 0); } return rc; } /* -** Helper function for sqlite3VdbeSorterRewind(). +** Return the total number of PMAs in all temporary files. */ -static int vdbeSorterInitMerge( - sqlite3 *db, /* Database handle */ - const VdbeCursor *pCsr, /* Cursor handle for this sorter */ - i64 *pnByte /* Sum of bytes in all opened PMAs */ -){ - VdbeSorter *pSorter = pCsr->pSorter; - int rc = SQLITE_OK; /* Return code */ - int i; /* Used to iterator through aIter[] */ - i64 nByte = 0; /* Total bytes in all opened PMAs */ - - /* Initialize the iterators. */ - for(i=0; iaIter[i]; - rc = vdbeSorterIterInit(db, pSorter, pSorter->iReadOff, pIter, &nByte); - pSorter->iReadOff = pIter->iEof; - assert( rc!=SQLITE_OK || pSorter->iReadOff<=pSorter->iWriteOff ); - if( rc!=SQLITE_OK || pSorter->iReadOff>=pSorter->iWriteOff ) break; - } - - /* Initialize the aTree[] array. */ - for(i=pSorter->nTree-1; rc==SQLITE_OK && i>0; i--){ - rc = vdbeSorterDoCompare(pCsr, i); +static int vdbeSorterCountPMA(VdbeSorter *pSorter){ + int nPMA = 0; + int i; + for(i=0; iaThread[i].nPMA; } - - *pnByte = nByte; - return rc; + return nPMA; } /* @@ -843,107 +1157,98 @@ static int vdbeSorterInitMerge( */ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ VdbeSorter *pSorter = pCsr->pSorter; - int rc; /* Return code */ - sqlite3_file *pTemp2 = 0; /* Second temp file to use */ - i64 iWrite2 = 0; /* Write offset for pTemp2 */ - int nIter; /* Number of iterators used */ - int nByte; /* Bytes of space required for aIter/aTree */ - int N = 2; /* Power of 2 >= nIter */ + int rc = SQLITE_OK; /* Return code */ assert( pSorter ); /* If no data has been written to disk, then do not do so now. Instead, ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly ** from the in-memory list. */ - if( pSorter->nPMA==0 ){ - *pbEof = !pSorter->pRecord; - assert( pSorter->aTree==0 ); - return vdbeSorterSort(pCsr); + if( pSorter->bUsePMA==0 ){ + if( pSorter->pRecord ){ + SorterThread *pThread = &pSorter->aThread[0]; + *pbEof = 0; + pThread->pList = pSorter->pRecord; + pThread->eWork = SORTER_THREAD_SORT; + rc = vdbeSorterRunThread(pThread); + pSorter->pRecord = pThread->pList; + pThread->pList = 0; + }else{ + *pbEof = 1; + } + return rc; } /* Write the current in-memory list to a PMA. */ - rc = vdbeSorterListToPMA(db, pCsr); - if( rc!=SQLITE_OK ) return rc; + if( pSorter->pRecord ){ + rc = vdbeSorterFlushPMA(db, pCsr, 1); + } - /* Allocate space for aIter[] and aTree[]. */ - nIter = pSorter->nPMA; - if( nIter>SORTER_MAX_MERGE_COUNT ) nIter = SORTER_MAX_MERGE_COUNT; - assert( nIter>0 ); - while( NaIter = (VdbeSorterIter *)sqlite3DbMallocZero(db, nByte); - if( !pSorter->aIter ) return SQLITE_NOMEM; - pSorter->aTree = (int *)&pSorter->aIter[N]; - pSorter->nTree = N; - - do { - int iNew; /* Index of new, merged, PMA */ - - for(iNew=0; - rc==SQLITE_OK && iNew*SORTER_MAX_MERGE_COUNTnPMA; - iNew++ - ){ - int rc2; /* Return code from fileWriterFinish() */ - FileWriter writer; /* Object used to write to disk */ - i64 nWrite; /* Number of bytes in new PMA */ - - memset(&writer, 0, sizeof(FileWriter)); - - /* If there are SORTER_MAX_MERGE_COUNT or less PMAs in file pTemp1, - ** initialize an iterator for each of them and break out of the loop. - ** These iterators will be incrementally merged as the VDBE layer calls - ** sqlite3VdbeSorterNext(). - ** - ** Otherwise, if pTemp1 contains more than SORTER_MAX_MERGE_COUNT PMAs, - ** initialize interators for SORTER_MAX_MERGE_COUNT of them. These PMAs - ** are merged into a single PMA that is written to file pTemp2. - */ - rc = vdbeSorterInitMerge(db, pCsr, &nWrite); - assert( rc!=SQLITE_OK || pSorter->aIter[ pSorter->aTree[1] ].pFile ); - if( rc!=SQLITE_OK || pSorter->nPMA<=SORTER_MAX_MERGE_COUNT ){ - break; + /* Join all threads */ + rc = vdbeSorterJoinAll(pSorter, rc); + + /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge + ** some of them together so that this is no longer the case. */ + assert( SORTER_MAX_MERGE_COUNT>=SQLITE_MAX_SORTER_THREAD ); + if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ + int i; + for(i=0; rc==SQLITE_OK && iaThread[i]; + if( pThread->pTemp1 ){ + pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/SQLITE_MAX_SORTER_THREAD; + pThread->eWork = SORTER_THREAD_CONS; + + if( i<(SQLITE_MAX_SORTER_THREAD-1) ){ + void *pCtx = (void*)pThread; + rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx); + }else{ + rc = vdbeSorterRunThread(pThread); + } } + } + } - /* Open the second temp file, if it is not already open. */ - if( pTemp2==0 ){ - assert( iWrite2==0 ); - rc = vdbeSorterOpenTempFile(db, &pTemp2); - } + /* Join all threads */ + rc = vdbeSorterJoinAll(pSorter, rc); - if( rc==SQLITE_OK ){ - int bEof = 0; - fileWriterInit(db, pTemp2, &writer, iWrite2); - fileWriterWriteVarint(&writer, nWrite); - while( rc==SQLITE_OK && bEof==0 ){ - VdbeSorterIter *pIter = &pSorter->aIter[ pSorter->aTree[1] ]; - assert( pIter->pFile ); + /* Assuming no errors have occurred, set up a merger structure to read + ** and merge all remaining PMAs. */ + assert( pSorter->pMerger==0 ); + if( rc==SQLITE_OK ){ + int nIter = 0; /* Number of iterators used */ + int i; + SorterMerger *pMerger; + for(i=0; iaThread[i].nPMA; + } - fileWriterWriteVarint(&writer, pIter->nKey); - fileWriterWrite(&writer, pIter->aKey, pIter->nKey); - rc = sqlite3VdbeSorterNext(db, pCsr, &bEof); + pSorter->pMerger = pMerger = vdbeSorterMergerNew(nIter); + if( pMerger==0 ){ + rc = SQLITE_NOMEM; + }else{ + int iIter = 0; + int iThread = 0; + for(iThread=0; iThreadaThread[iThread]; + for(iPMA=0; iPMAnPMA && rc==SQLITE_OK; iPMA++){ + i64 nDummy = 0; + VdbeSorterIter *pIter = &pMerger->aIter[iIter++]; + rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nDummy); + iReadOff = pIter->iEof; } - rc2 = fileWriterFinish(db, &writer, &iWrite2); - if( rc==SQLITE_OK ) rc = rc2; } - } - if( pSorter->nPMA<=SORTER_MAX_MERGE_COUNT ){ - break; - }else{ - sqlite3_file *pTmp = pSorter->pTemp1; - pSorter->nPMA = iNew; - pSorter->pTemp1 = pTemp2; - pTemp2 = pTmp; - pSorter->iWriteOff = iWrite2; - pSorter->iReadOff = 0; - iWrite2 = 0; + for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ + rc = vdbeSorterDoCompare(&pSorter->aThread[0], pMerger, i); + } } - }while( rc==SQLITE_OK ); + } - if( pTemp2 ){ - sqlite3OsCloseFree(pTemp2); + if( rc==SQLITE_OK ){ + *pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0); } - *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0); return rc; } @@ -954,16 +1259,8 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ VdbeSorter *pSorter = pCsr->pSorter; int rc; /* Return code */ - if( pSorter->aTree ){ - int iPrev = pSorter->aTree[1];/* Index of iterator to advance */ - int i; /* Index of aTree[] to recalculate */ - - rc = vdbeSorterIterNext(db, &pSorter->aIter[iPrev]); - for(i=(pSorter->nTree+iPrev)/2; rc==SQLITE_OK && i>0; i=i/2){ - rc = vdbeSorterDoCompare(pCsr, i); - } - - *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0); + if( pSorter->pMerger ){ + rc = vdbeSorterNext(&pSorter->aThread[0], pSorter->pMerger, pbEof); }else{ SorterRecord *pFree = pSorter->pRecord; pSorter->pRecord = pFree->pNext; @@ -984,9 +1281,9 @@ static void *vdbeSorterRowkey( int *pnKey /* OUT: Size of current key in bytes */ ){ void *pKey; - if( pSorter->aTree ){ + if( pSorter->pMerger ){ VdbeSorterIter *pIter; - pIter = &pSorter->aIter[ pSorter->aTree[1] ]; + pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ]; *pnKey = pIter->nKey; pKey = pIter->aKey; }else{ @@ -1031,9 +1328,10 @@ int sqlite3VdbeSorterCompare( int *pRes /* OUT: Result of comparison */ ){ VdbeSorter *pSorter = pCsr->pSorter; + SorterThread *pMain = &pSorter->aThread[0]; void *pKey; int nKey; /* Sorter key to compare pVal with */ pKey = vdbeSorterRowkey(pSorter, &nKey); - vdbeSorterCompare(pCsr, nIgnore, pVal->z, pVal->n, pKey, nKey, pRes); + vdbeSorterCompare(pMain, nIgnore, pVal->z, pVal->n, pKey, nKey, pRes); return SQLITE_OK; }