From d30ab3d9dd17d37f152dfad1d7b9ffd7fd96bddc Mon Sep 17 00:00:00 2001 From: dan Date: Wed, 9 Apr 2014 20:04:17 +0000 Subject: [PATCH] Experimental multi-threaded sorting changes to allow the sorter to begin returning items to the VDBE before all data is sorted. FossilOrigin-Name: f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f --- manifest | 19 ++- manifest.uuid | 2 +- src/vdbesort.c | 443 ++++++++++++++++++++++++++++++++++++------------ test/sort2.test | 6 + 4 files changed, 348 insertions(+), 122 deletions(-) diff --git a/manifest b/manifest index 2eb2b1da22..8831053880 100644 --- a/manifest +++ b/manifest @@ -1,5 +1,5 @@ -C Fix\sharmless\scompiler\swarnings. -D 2014-04-04T22:44:59.018 +C Experimental\smulti-threaded\ssorting\schanges\sto\sallow\sthe\ssorter\sto\sbegin\sreturning\sitems\sto\sthe\sVDBE\sbefore\sall\sdata\sis\ssorted. +D 2014-04-09T20:04:17.324 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23 @@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4 F src/vdbeaux.c d8dc38965507a34b0e150c0d7fc82b02f8cf25ea F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447 -F src/vdbesort.c 8da916fc74e78edd5bc95653206942e01710ac09 +F src/vdbesort.c 26823b626c3231a52e45f5e78a18cb8681bb1b88 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8 @@ -818,7 +818,7 @@ F test/skipscan2.test 5a4db0799c338ddbacb154aaa5589c0254b36a8d F test/soak.test 0b5b6375c9f4110c828070b826b3b4b0bb65cd5f F test/softheap1.test 40562fe6cac6d9827b7b42b86d45aedf12c15e24 F test/sort.test 79dc647c4e9b123a64e57b7080b7f9a2df43f87a -F test/sort2.test 21cd865e31adecdc8fc81c8d95431e629676a8d8 +F test/sort2.test bbc2eb244fb862141a900a851056d48705b5997b F test/sort3.test c3f88d233452a129de519de311d109a0ad0da0af F test/speed1.test f2974a91d79f58507ada01864c0e323093065452 F test/speed1p.explain d841e650a04728b39e6740296b852dccdca9b2cb @@ -1163,7 +1163,10 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff -P 5e3dfa27c71a666e122e3cf64897038ff8424800 -R bbcde0d30a2bb025a3c15f4a10b2b404 -U drh -Z 9376e61a8443d421f4f6f69d3d5500ac +P e54dded2012f0ab486ee138e9bd57c528af33980 +R 803b4ddf4cddf7e21aeddc04109caaf0 +T *branch * threads-experimental +T *sym-threads-experimental * +T -sym-threads * +U dan +Z 3b5c615396ccbaaa23add5a8103bd906 diff --git a/manifest.uuid b/manifest.uuid index 44109c7358..6cf45357c4 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -e54dded2012f0ab486ee138e9bd57c528af33980 \ No newline at end of file +f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f \ No newline at end of file diff --git a/src/vdbesort.c b/src/vdbesort.c index f59e8f51f5..e558c42f11 100644 --- a/src/vdbesort.c +++ b/src/vdbesort.c @@ -96,6 +96,8 @@ typedef struct PmaReader PmaReader; /* Incrementally read one PMA */ typedef struct PmaWriter PmaWriter; /* Incrementally write on PMA */ typedef struct SorterRecord SorterRecord; /* A record being sorted */ typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ +typedef struct SorterFile SorterFile; +typedef struct IncrMerger IncrMerger; /* @@ -105,6 +107,11 @@ typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ #define SORT_SUBTASK_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pTemp1 */ #define SORT_SUBTASK_CONS 3 /* Consolidate multiple PMAs */ +struct SorterFile { + sqlite3_file *pFd; + i64 iEof; +}; + /* ** Sorting is divided up into smaller subtasks. Each subtask is controlled ** by an instance of this object. A Subtask might run in either the main thread @@ -145,6 +152,7 @@ struct SortSubtask { int bDone; /* Set to true by pTask when finished */ sqlite3 *db; /* Database connection */ + VdbeSorter *pSorter; /* Sorter */ KeyInfo *pKeyInfo; /* How to compare records */ UnpackedRecord *pUnpacked; /* Space to unpack a record */ int pgsz; /* Main database page size */ @@ -155,9 +163,8 @@ struct SortSubtask { int nInMemory; /* Expected size of PMA based on pList */ u8 *aListMemory; /* Records memory (or NULL) */ - int nPMA; /* Number of PMAs currently in pTemp1 */ - i64 iTemp1Off; /* Offset to write to in pTemp1 */ - sqlite3_file *pTemp1; /* File to write PMAs to, or NULL */ + int nPMA; /* Number of PMAs currently in file */ + SorterFile file; }; @@ -239,8 +246,10 @@ struct VdbeSorter { int mnPmaSize; /* Minimum PMA size, in bytes */ int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ int bUsePMA; /* True if one or more PMAs created */ + int bUseThreads; /* True if one or more PMAs created */ SorterRecord *pRecord; /* Head of in-memory record list */ - MergeEngine *pMerger; /* For final merge of PMAs (by caller) */ + PmaReader *pReader; /* Read data from here after Rewind() */ + UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */ u8 *aMemory; /* Block of memory to alloc records from */ int iMemory; /* Offset of first free byte in aMemory */ int nMemory; /* Size of aMemory allocation in bytes */ @@ -265,6 +274,16 @@ struct PmaReader { u8 *aBuffer; /* Current read buffer */ int nBuffer; /* Size of read buffer in bytes */ u8 *aMap; /* Pointer to mapping of entire file */ + IncrMerger *pIncr; /* Incremental merger */ +}; + +struct IncrMerger { + int mxSz; /* Maximum size of files */ + SortSubtask *pTask; /* Task that owns this merger */ + int bEof; /* Set to true when merge is finished */ + SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */ + MergeEngine *pMerger; /* Merge engine thread reads data from */ + SQLiteThread *pThread; /* Thread currently populating aFile[1] */ }; /* @@ -326,6 +345,9 @@ struct SorterRecord { /* Maximum number of PMAs that a single MergeEngine can merge */ #define SORTER_MAX_MERGE_COUNT 16 +static int vdbeIncrSwap(IncrMerger*); +static void vdbeIncrFree(IncrMerger*); + /* ** Free all memory belonging to the PmaReader object passed as the second ** argument. All structure fields are set to zero before returning. @@ -334,6 +356,7 @@ static void vdbePmaReaderClear(PmaReader *pIter){ sqlite3_free(pIter->aAlloc); sqlite3_free(pIter->aBuffer); if( pIter->aMap ) sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap); + if( pIter->pIncr ) vdbeIncrFree(pIter->pIncr); memset(pIter, 0, sizeof(PmaReader)); } @@ -400,7 +423,7 @@ static int vdbePmaReadBlob( /* Extend the p->aAlloc[] allocation if required. */ if( p->nAllocnAlloc*2; + int nNew = MAX(128, p->nAlloc*2); while( nByte>nNew ) nNew = nNew*2; aNew = sqlite3Realloc(p->aAlloc, nNew); if( !aNew ) return SQLITE_NOMEM; @@ -464,22 +487,70 @@ static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){ return SQLITE_OK; } +static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){ + int rc = SQLITE_OK; + if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){ + rc = sqlite3OsFetch(pFile->pFd, 0, pFile->iEof, (void**)pp); + } + return rc; +} + +static int vdbePmaReaderReinit(PmaReader *pIter){ + IncrMerger *pIncr = pIter->pIncr; + SortSubtask *pTask = pIncr->pTask; + int rc = SQLITE_OK; + + assert( pIncr->bEof==0 ); + + if( pIter->aMap ){ + sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap); + pIter->aMap = 0; + } + pIter->iReadOff = 0; + pIter->iEof = pIncr->aFile[0].iEof; + pIter->pFile = pIncr->aFile[0].pFd; + + rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap); + if( rc==SQLITE_OK ){ + if( pIter->aMap==0 && pIter->aBuffer==0 ){ + pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz); + if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM; + pIter->nBuffer = pTask->pgsz; + } + } + + return rc; +} + /* ** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if ** no error occurs, or an SQLite error code if one does. */ static int vdbePmaReaderNext(PmaReader *pIter){ - int rc; /* Return Code */ + int rc = SQLITE_OK; /* Return Code */ u64 nRec = 0; /* Size of record in bytes */ if( pIter->iReadOff>=pIter->iEof ){ - /* This is an EOF condition */ - vdbePmaReaderClear(pIter); - return SQLITE_OK; + int bEof = 1; + if( pIter->pIncr ){ + rc = vdbeIncrSwap(pIter->pIncr); + if( rc==SQLITE_OK && pIter->pIncr->bEof==0 ){ + rc = vdbePmaReaderReinit(pIter); + bEof = 0; + } + } + + if( bEof ){ + /* This is an EOF condition */ + vdbePmaReaderClear(pIter); + return rc; + } } - rc = vdbePmaReadVarint(pIter, &nRec); + if( rc==SQLITE_OK ){ + rc = vdbePmaReadVarint(pIter, &nRec); + } if( rc==SQLITE_OK ){ pIter->nKey = (int)nRec; rc = vdbePmaReadBlob(pIter, (int)nRec, &pIter->aKey); @@ -493,10 +564,14 @@ static int vdbePmaReaderNext(PmaReader *pIter){ ** starting at offset iStart and ending at offset iEof-1. This function ** leaves the iterator pointing to the first key in the PMA (or EOF if the ** PMA is empty). +** +** If the pnByte parameter is NULL, then it is assumed that the file +** contains a single PMA, and that that PMA omits the initial length varint. */ static int vdbePmaReaderInit( - SortSubtask *pTask, /* Thread context */ - i64 iStart, /* Start offset in pTask->pTemp1 */ + SortSubtask *pTask, /* Task context */ + SorterFile *pFile, /* Sorter file to read from */ + i64 iStart, /* Start offset in pFile */ PmaReader *pIter, /* Iterator to populate */ i64 *pnByte /* IN/OUT: Increment this value by PMA size */ ){ @@ -504,18 +579,18 @@ static int vdbePmaReaderInit( int nBuf = pTask->pgsz; void *pMap = 0; /* Mapping of temp file */ - assert( pTask->iTemp1Off>iStart ); + assert( pFile->iEof>iStart ); assert( pIter->aAlloc==0 ); assert( pIter->aBuffer==0 ); - pIter->pFile = pTask->pTemp1; + pIter->pFile = pFile->pFd; pIter->iReadOff = iStart; pIter->nAlloc = 128; pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc); if( pIter->aAlloc ){ /* Try to xFetch() a mapping of the entire temp file. If this is possible, ** the PMA will be read via the mapping. Otherwise, use xRead(). */ - if( pTask->iTemp1Off<=(i64)(pTask->db->nMaxSorterMmap) ){ - rc = sqlite3OsFetch(pIter->pFile, 0, pTask->iTemp1Off, &pMap); + if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){ + rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iEof, &pMap); } }else{ rc = SQLITE_NOMEM; @@ -533,12 +608,12 @@ static int vdbePmaReaderInit( int iBuf = iStart % nBuf; if( iBuf ){ int nRead = nBuf - iBuf; - if( (iStart + nRead) > pTask->iTemp1Off ){ - nRead = (int)(pTask->iTemp1Off - iStart); + if( (iStart + nRead) > pFile->iEof ){ + nRead = (int)(pFile->iEof - iStart); } rc = sqlite3OsRead( - pTask->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart - ); + pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart + ); assert( rc!=SQLITE_IOERR_SHORT_READ ); } } @@ -547,7 +622,7 @@ static int vdbePmaReaderInit( if( rc==SQLITE_OK ){ u64 nByte; /* Size of PMA in bytes */ - pIter->iEof = pTask->iTemp1Off; + pIter->iEof = pFile->iEof; rc = vdbePmaReadVarint(pIter, &nByte); pIter->iEof = pIter->iReadOff + nByte; *pnByte += nByte; @@ -669,11 +744,13 @@ int sqlite3VdbeSorterInit( pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); pSorter->nTask = nWorker + 1; + pSorter->bUseThreads = (pSorter->nTask>1); for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; pTask->pKeyInfo = pKeyInfo; pTask->pgsz = pgsz; pTask->db = db; + pTask->pSorter = pSorter; } if( !sqlite3TempInMemory(db) ){ @@ -723,9 +800,10 @@ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ pTask->aListMemory = 0; } pTask->pList = 0; - if( pTask->pTemp1 ){ - sqlite3OsCloseFree(pTask->pTemp1); - pTask->pTemp1 = 0; + if( pTask->file.pFd ){ + sqlite3OsCloseFree(pTask->file.pFd); + pTask->file.pFd = 0; + pTask->file.iEof = 0; } } @@ -761,7 +839,8 @@ static MergeEngine *vdbeMergeEngineNew(int nIter){ int nByte; /* Total bytes of space to allocate */ MergeEngine *pNew; /* Pointer to allocated object to return */ - assert( nIter<=SORTER_MAX_MERGE_COUNT ); + /* assert( nIter<=SORTER_MAX_MERGE_COUNT ); */ + while( NpMerger); - pSorter->pMerger = 0; + if( pSorter->pReader ){ + vdbePmaReaderClear(pSorter->pReader); + sqlite3DbFree(db, pSorter->pReader); + pSorter->pReader = 0; + } for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; vdbeSortSubtaskCleanup(db, pTask); @@ -806,6 +888,8 @@ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ pSorter->nInMemory = 0; pSorter->bUsePMA = 0; pSorter->iMemory = 0; + sqlite3DbFree(db, pSorter->pUnpacked); + pSorter->pUnpacked = 0; } /* @@ -815,7 +899,6 @@ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ VdbeSorter *pSorter = pCsr->pSorter; if( pSorter ){ sqlite3VdbeSorterReset(db, pSorter); - vdbeMergeEngineFree(pSorter->pMerger); sqlite3_free(pSorter->aMemory); sqlite3DbFree(db, pSorter); pCsr->pSorter = 0; @@ -1053,17 +1136,17 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){ assert( pTask->nInMemory>0 ); /* If the first temporary PMA file has not been opened, open it now. */ - if( pTask->pTemp1==0 ){ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->pTemp1); - assert( rc!=SQLITE_OK || pTask->pTemp1 ); - assert( pTask->iTemp1Off==0 ); + if( pTask->file.pFd==0 ){ + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file.pFd); + assert( rc!=SQLITE_OK || pTask->file.pFd ); + assert( pTask->file.iEof==0 ); assert( pTask->nPMA==0 ); } /* Try to get the file to memory map */ if( rc==SQLITE_OK ){ vdbeSorterExtendFile(pTask->db, - pTask->pTemp1, pTask->iTemp1Off + pTask->nInMemory + 9 + pTask->file.pFd, pTask->file.iEof + pTask->nInMemory + 9 ); } @@ -1071,8 +1154,8 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){ SorterRecord *p; SorterRecord *pNext = 0; - vdbePmaWriterInit(pTask->pTemp1, &writer, pTask->pgsz, - pTask->iTemp1Off); + vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz, + pTask->file.iEof); pTask->nPMA++; vdbePmaWriteVarint(&writer, pTask->nInMemory); for(p=pTask->pList; p; p=pNext){ @@ -1082,7 +1165,7 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){ if( pTask->aListMemory==0 ) sqlite3_free(p); } pTask->pList = p; - rc = vdbePmaWriterFinish(&writer, &pTask->iTemp1Off); + rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof); } assert( pTask->pList==0 || rc!=SQLITE_OK ); @@ -1164,6 +1247,23 @@ static int vdbeSorterNext( return rc; } +#if 0 +static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){ + i64 t; + int iTask = (pTask - pTask->pSorter->aTask); + sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); + fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent); +} +static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){ + i64 t; + sqlite3OsCurrentTimeInt64(db->pVfs, &t); + fprintf(stderr, "%lld:X %s\n", t, zEvent); +} +#else +# define vdbeSorterWorkDebug(x,y) +# define vdbeSorterRewindDebug(x,y) +#endif + /* ** The main routine for sorter-thread operations. */ @@ -1177,6 +1277,8 @@ static void *vdbeSortSubtaskMain(void *pCtx){ ); assert( pTask->bDone==0 ); + vdbeSorterWorkDebug(pTask, "enter"); + if( pTask->pUnpacked==0 ){ char *pFree; pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( @@ -1211,7 +1313,7 @@ static void *vdbeSortSubtaskMain(void *pCtx){ /* Open a second temp file to write merged data to */ rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2); if( rc==SQLITE_OK ){ - vdbeSorterExtendFile(pTask->db, pTemp2, pTask->iTemp1Off); + vdbeSorterExtendFile(pTask->db, pTemp2, pTask->file.iEof); }else{ vdbeMergeEngineFree(pMerger); break; @@ -1231,9 +1333,9 @@ static void *vdbeSortSubtaskMain(void *pCtx){ int iIter; for(iIter=0; iIteraIter[iIter]; - rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nOut); + rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nOut); iReadOff = pIter->iEof; - if( iReadOff>=pTask->iTemp1Off || rc!=SQLITE_OK ) break; + if( iReadOff>=pTask->file.iEof || rc!=SQLITE_OK ) break; } for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){ rc = vdbeSorterDoCompare(pTask, pMerger, iIter); @@ -1253,10 +1355,10 @@ static void *vdbeSortSubtaskMain(void *pCtx){ } vdbeMergeEngineFree(pMerger); - sqlite3OsCloseFree(pTask->pTemp1); - pTask->pTemp1 = pTemp2; + sqlite3OsCloseFree(pTask->file.pFd); + pTask->file.pFd = pTemp2; pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT); - pTask->iTemp1Off = iWriteOff; + pTask->file.iEof = iWriteOff; } }else{ /* Sort the pTask->pList list */ @@ -1267,10 +1369,10 @@ static void *vdbeSortSubtaskMain(void *pCtx){ #ifdef SQLITE_DEBUG i64 nExpect = pTask->nInMemory + sqlite3VarintLen(pTask->nInMemory) - + pTask->iTemp1Off; + + pTask->file.iEof; #endif rc = vdbeSorterListToPMA(pTask); - assert( rc!=SQLITE_OK || (nExpect==pTask->iTemp1Off) ); + assert( rc!=SQLITE_OK || (nExpect==pTask->file.iEof) ); } } @@ -1280,6 +1382,7 @@ static void *vdbeSortSubtaskMain(void *pCtx){ assert( pTask->pUnpacked->errCode==SQLITE_NOMEM ); rc = SQLITE_NOMEM; } + vdbeSorterWorkDebug(pTask, "exit"); return SQLITE_INT_TO_PTR(rc); } @@ -1480,6 +1583,164 @@ static int vdbeSorterCountPMA(VdbeSorter *pSorter){ return nPMA; } +/* +** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format +** of the data stored in aFile[1] is the same as that used by regular PMAs, +** except that the number-of-bytes varint is omitted from the start. +*/ +static int vdbeIncrPopulate(IncrMerger *pIncr){ + int rc = SQLITE_OK; + int rc2; + SorterFile *pOut = &pIncr->aFile[1]; + MergeEngine *pMerger = pIncr->pMerger; + PmaWriter writer; + assert( pIncr->bEof==0 ); + + vdbePmaWriterInit(pIncr->aFile[1].pFd, &writer, pIncr->pTask->pgsz, 0); + while( rc==SQLITE_OK ){ + int dummy; + PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ]; + int nKey = pReader->nKey; + i64 iEof = writer.iWriteOff + writer.iBufEnd; + + /* Check if the output file is full or if the input has been exhausted. + ** In either case exit the loop. */ + if( pReader->pFile==0 ) break; + if( iEof && (iEof + nKey)>pIncr->mxSz ) break; + + /* Write the next key to the output. */ + vdbePmaWriteVarint(&writer, nKey); + vdbePmaWriteBlob(&writer, pReader->aKey, nKey); + rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy); + } + + rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof); + if( rc==SQLITE_OK ) rc = rc2; + return rc; +} + +static void *vdbeIncrPopulateThreadMain(void *pCtx){ + IncrMerger *pIncr = (IncrMerger*)pCtx; + return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) ); +} + +static int vdbeIncrBgPopulate(IncrMerger *pIncr){ + int rc; + assert( pIncr->pThread==0 ); + if( pIncr->pTask->pSorter->bUseThreads==0 ){ + rc = vdbeIncrPopulate(pIncr); + }else{ + void *pCtx = (void*)pIncr; + rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx); + } + return rc; +} + +static int vdbeIncrSwap(IncrMerger *pIncr){ + int rc = SQLITE_OK; + + if( pIncr->pThread ){ + void *pRet; + rc = sqlite3ThreadJoin(pIncr->pThread, &pRet); + if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + pIncr->pThread = 0; + } + + if( rc==SQLITE_OK ){ + SorterFile f0 = pIncr->aFile[0]; + pIncr->aFile[0] = pIncr->aFile[1]; + pIncr->aFile[1] = f0; + + if( pIncr->aFile[0].iEof==0 ){ + pIncr->bEof = 1; + }else{ + rc = vdbeIncrBgPopulate(pIncr); + } + } + + return rc; +} + +static void vdbeIncrFree(IncrMerger *pIncr){ + if( pIncr->pThread ){ + void *pRet; + sqlite3ThreadJoin(pIncr->pThread, &pRet); + } + if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); + if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); + vdbeMergeEngineFree(pIncr->pMerger); + sqlite3_free(pIncr); +} + +/* +** Populate iterator *pIter so that it may be used to iterate through all +** keys stored in subtask pTask using the incremental merge method. +*/ +static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){ + SortSubtask *pTask0 = &pSorter->aTask[0]; + int rc = SQLITE_OK; + MergeEngine *pMerger = 0; + IncrMerger *pIncr = 0; + int i; + int nPMA = 0; + + for(i=0; inTask; i++){ + nPMA += pSorter->aTask[i].nPMA; + } + pMerger = vdbeMergeEngineNew(nPMA); + if( pMerger==0 ){ + rc = SQLITE_NOMEM; + }else{ + int iIter = 0; + int iPMA; + for(i=0; inTask; i++){ + i64 iReadOff = 0; + SortSubtask *pTask = &pSorter->aTask[i]; + for(iPMA=0; iPMAnPMA; iPMA++){ + i64 nDummy = 0; + PmaReader *pIter = &pMerger->aIter[iIter++]; + rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nDummy); + iReadOff = pIter->iEof; + } + } + for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ + rc = vdbeSorterDoCompare(pTask0, pMerger, i); + } + } + + if( rc==SQLITE_OK ){ + pIncr = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger)); + if( pIncr==0 ){ + rc = SQLITE_NOMEM; + }else{ + memset(pIncr, 0, sizeof(IncrMerger)); + pIncr->mxSz = (pSorter->mxPmaSize / 2); + pIncr->pMerger = pMerger; + pIncr->pTask = pTask0; + } + } + + /* Open the two temp files. */ + if( rc==SQLITE_OK ){ + rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[0].pFd); + } + if( rc==SQLITE_OK ){ + rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[1].pFd); + } + + /* Launch a background thread to populate aFile[1]. */ + if( rc==SQLITE_OK ){ + rc = vdbeIncrBgPopulate(pIncr); + } + + pIter->pIncr = pIncr; + if( rc==SQLITE_OK ){ + rc = vdbePmaReaderNext(pIter); + } + return rc; +} + + /* ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite, ** this function is called to prepare for iterating through the records @@ -1520,70 +1781,21 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ /* Join all threads */ rc = vdbeSorterJoinAll(pSorter, rc); - /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge - ** some of them together so that this is no longer the case. */ - if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ - int i; - for(i=0; rc==SQLITE_OK && inTask; i++){ - SortSubtask *pTask = &pSorter->aTask[i]; - if( pTask->pTemp1 ){ - pTask->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nTask; - pTask->eWork = SORT_SUBTASK_CONS; + vdbeSorterRewindDebug(db, "rewind"); -#if SQLITE_MAX_WORKER_THREADS>0 - if( i<(pSorter->nTask-1) ){ - void *pCtx = (void*)pTask; - rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx); - }else -#endif - { - rc = vdbeSorterRunTask(pTask); - } - } - } - } - - /* Join all threads */ - rc = vdbeSorterJoinAll(pSorter, rc); - - /* Assuming no errors have occurred, set up a merger structure to read - ** and merge all remaining PMAs. */ - assert( pSorter->pMerger==0 ); + /* Assuming no errors have occurred, set up a merger structure to + ** incrementally read and merge all remaining PMAs. */ + assert( pSorter->pReader==0 ); if( rc==SQLITE_OK ){ - int nIter = 0; /* Number of iterators used */ - int i; - MergeEngine *pMerger; - for(i=0; inTask; i++){ - nIter += pSorter->aTask[i].nPMA; - } - - pSorter->pMerger = pMerger = vdbeMergeEngineNew(nIter); - if( pMerger==0 ){ - rc = SQLITE_NOMEM; - }else{ - int iIter = 0; - int iThread = 0; - for(iThread=0; iThreadnTask; iThread++){ - int iPMA; - i64 iReadOff = 0; - SortSubtask *pTask = &pSorter->aTask[iThread]; - for(iPMA=0; iPMAnPMA && rc==SQLITE_OK; iPMA++){ - i64 nDummy = 0; - PmaReader *pIter = &pMerger->aIter[iIter++]; - rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nDummy); - iReadOff = pIter->iEof; - } - } - - for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ - rc = vdbeSorterDoCompare(&pSorter->aTask[0], pMerger, i); - } - } + PmaReader *pReader; + pReader = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader)); + pSorter->pReader = pReader; + rc = vdbePmaReaderIncrInit(pSorter, pReader); + assert( rc!=SQLITE_OK || pReader->pFile ); + *pbEof = 0; } - if( rc==SQLITE_OK ){ - *pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0); - } + vdbeSorterRewindDebug(db, "rewinddone"); return rc; } @@ -1594,8 +1806,9 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ VdbeSorter *pSorter = pCsr->pSorter; int rc; /* Return code */ - if( pSorter->pMerger ){ - rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof); + if( pSorter->pReader ){ + rc = vdbePmaReaderNext(pSorter->pReader); + *pbEof = (pSorter->pReader->pFile==0); }else{ SorterRecord *pFree = pSorter->pRecord; pSorter->pRecord = pFree->u.pNext; @@ -1616,11 +1829,9 @@ static void *vdbeSorterRowkey( int *pnKey /* OUT: Size of current key in bytes */ ){ void *pKey; - if( pSorter->pMerger ){ - PmaReader *pIter; - pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ]; - *pnKey = pIter->nKey; - pKey = pIter->aKey; + if( pSorter->pReader ){ + *pnKey = pSorter->pReader->nKey; + pKey = pSorter->pReader->aKey; }else{ *pnKey = pSorter->pRecord->nVal; pKey = SRVAL(pSorter->pRecord); @@ -1669,13 +1880,19 @@ int sqlite3VdbeSorterCompare( int *pRes /* OUT: Result of comparison */ ){ VdbeSorter *pSorter = pCsr->pSorter; - UnpackedRecord *r2 = pSorter->aTask[0].pUnpacked; + UnpackedRecord *r2 = pSorter->pUnpacked; KeyInfo *pKeyInfo = pCsr->pKeyInfo; int i; void *pKey; int nKey; /* Sorter key to compare pVal with */ + if( r2==0 ){ + char *p; + r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo,0,0,&p); + assert( pSorter->pUnpacked==(UnpackedRecord*)p ); + if( r2==0 ) return SQLITE_NOMEM; + r2->nField = pKeyInfo->nField-nIgnore; + } assert( r2->nField>=pKeyInfo->nField-nIgnore ); - r2->nField = pKeyInfo->nField-nIgnore; pKey = vdbeSorterRowkey(pSorter, &nKey); sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2); diff --git a/test/sort2.test b/test/sort2.test index f8bfb0fe51..626630050c 100644 --- a/test/sort2.test +++ b/test/sort2.test @@ -47,6 +47,12 @@ do_execsql_test 2.2 { CREATE UNIQUE INDEX i1 ON t1(b, a); } +do_execsql_test 2.3 { + CREATE UNIQUE INDEX i2 ON t1(a); +} + +do_execsql_test 2.4 { PRAGMA integrity_check } {ok} + db close sqlite3_shutdown sqlite3_config_worker_threads 0 -- 2.39.5