From: dan Date: Fri, 11 Apr 2014 19:43:07 +0000 (+0000) Subject: Avoid having the sorter merge too many PMAs at a time when incrementally merging... X-Git-Tag: version-3.8.7~132^2~75^2~4 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=4be4c406faf605d78a9c737795c3abc6bb62621e;p=thirdparty%2Fsqlite.git Avoid having the sorter merge too many PMAs at a time when incrementally merging data following a SorterRewind(). FossilOrigin-Name: 98bf0307b121b0776a7170108cc8d3f948a7ebfe --- diff --git a/manifest b/manifest index 8831053880..f48df9c72d 100644 --- a/manifest +++ b/manifest @@ -1,5 +1,5 @@ -C Experimental\smulti-threaded\ssorting\schanges\sto\sallow\sthe\ssorter\sto\sbegin\sreturning\sitems\sto\sthe\sVDBE\sbefore\sall\sdata\sis\ssorted. -D 2014-04-09T20:04:17.324 +C Avoid\shaving\sthe\ssorter\smerge\stoo\smany\sPMAs\sat\sa\stime\swhen\sincrementally\smerging\sdata\sfollowing\sa\sSorterRewind(). +D 2014-04-11T19:43:07.755 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23 @@ -218,7 +218,7 @@ F src/random.c d10c1f85b6709ca97278428fd5db5bbb9c74eece F src/resolve.c 273d5f47c4e2c05b2d3d2bffeda939551ab59e66 F src/rowset.c 64655f1a627c9c212d9ab497899e7424a34222e0 F src/select.c 20055cf917222e660c4222fea306bd13a0623caa -F src/shell.c afc0b1a5a646d287142ef0c9a2a6e3139d57cba2 +F src/shell.c b44c3f17f0bf41b3431e9cc171706251156ae85f F src/sqlite.h.in 81221c50addbf698c3247154d92efd1095bfd885 F src/sqlite3.rc 11094cc6a157a028b301a9f06b3d03089ea37c3e F src/sqlite3ext.h 886f5a34de171002ad46fae8c36a7d8051c190fc @@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4 F src/vdbeaux.c d8dc38965507a34b0e150c0d7fc82b02f8cf25ea F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447 -F src/vdbesort.c 26823b626c3231a52e45f5e78a18cb8681bb1b88 +F src/vdbesort.c 2984e3624383adf9c762558b8f85a17a626c11a7 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8 @@ -818,7 +818,7 @@ F test/skipscan2.test 5a4db0799c338ddbacb154aaa5589c0254b36a8d F test/soak.test 0b5b6375c9f4110c828070b826b3b4b0bb65cd5f F test/softheap1.test 40562fe6cac6d9827b7b42b86d45aedf12c15e24 F test/sort.test 79dc647c4e9b123a64e57b7080b7f9a2df43f87a -F test/sort2.test bbc2eb244fb862141a900a851056d48705b5997b +F test/sort2.test 04e99d0d028b469c6cfab2c647c6c28755504063 F test/sort3.test c3f88d233452a129de519de311d109a0ad0da0af F test/speed1.test f2974a91d79f58507ada01864c0e323093065452 F test/speed1p.explain d841e650a04728b39e6740296b852dccdca9b2cb @@ -1163,10 +1163,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff -P e54dded2012f0ab486ee138e9bd57c528af33980 -R 803b4ddf4cddf7e21aeddc04109caaf0 -T *branch * threads-experimental -T *sym-threads-experimental * -T -sym-threads * +P f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f +R f6c598c1c558c5930404cda096730209 U dan -Z 3b5c615396ccbaaa23add5a8103bd906 +Z 3e9d4ee1a6e7b343cf831d1b18651067 diff --git a/manifest.uuid b/manifest.uuid index 6cf45357c4..b1943056c1 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f \ No newline at end of file +98bf0307b121b0776a7170108cc8d3f948a7ebfe \ No newline at end of file diff --git a/src/shell.c b/src/shell.c index e032bd36d2..40ac24093a 100644 --- a/src/shell.c +++ b/src/shell.c @@ -3535,7 +3535,7 @@ static void main_init(struct callback_data *data) { sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> "); sqlite3_snprintf(sizeof(continuePrompt), continuePrompt," ...> "); sqlite3_config(SQLITE_CONFIG_MULTITHREAD); - sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 3); + sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 4); } /* diff --git a/src/vdbesort.c b/src/vdbesort.c index e558c42f11..16f6c618c6 100644 --- a/src/vdbesort.c +++ b/src/vdbesort.c @@ -164,7 +164,8 @@ struct SortSubtask { u8 *aListMemory; /* Records memory (or NULL) */ int nPMA; /* Number of PMAs currently in file */ - SorterFile file; + SorterFile file; /* Temp file for level-0 PMAs */ + SorterFile file2; /* Space for other PMAs */ }; @@ -240,6 +241,11 @@ struct MergeEngine { /* ** Main sorter structure. A single instance of this is allocated for each ** sorter cursor created by the VDBE. +** +** mxKeysize: +** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(), +** this variable is updated so as to be set to the size on disk of the +** largest record in the sorter. */ struct VdbeSorter { int nInMemory; /* Current size of pRecord list as PMA */ @@ -249,6 +255,7 @@ struct VdbeSorter { int bUseThreads; /* True if one or more PMAs created */ SorterRecord *pRecord; /* Head of in-memory record list */ PmaReader *pReader; /* Read data from here after Rewind() */ + int mxKeysize; /* Largest serialized key seen so far */ UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */ u8 *aMemory; /* Block of memory to alloc records from */ int iMemory; /* Offset of first free byte in aMemory */ @@ -277,13 +284,21 @@ struct PmaReader { IncrMerger *pIncr; /* Incremental merger */ }; +/* +** Normally, a PmaReader object iterates through an existing PMA stored +** within a temp file. However, if the PmaReader.pIncr variable points to +** an object of the following type, it may be used to iterate/merge through +** multiple PMAs simultaneously. +*/ struct IncrMerger { - int mxSz; /* Maximum size of files */ SortSubtask *pTask; /* Task that owns this merger */ + SQLiteThread *pThread; /* Thread currently populating aFile[1] */ + MergeEngine *pMerger; /* Merge engine thread reads data from */ + i64 iStartOff; /* Offset to start writing file at */ + int mxSz; /* Maximum bytes of data to store */ int bEof; /* Set to true when merge is finished */ + int bUseThread; /* True to use a bg thread for this object */ SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */ - MergeEngine *pMerger; /* Merge engine thread reads data from */ - SQLiteThread *pThread; /* Thread currently populating aFile[1] */ }; /* @@ -506,16 +521,30 @@ static int vdbePmaReaderReinit(PmaReader *pIter){ sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap); pIter->aMap = 0; } - pIter->iReadOff = 0; + pIter->iReadOff = pIncr->iStartOff; pIter->iEof = pIncr->aFile[0].iEof; pIter->pFile = pIncr->aFile[0].pFd; rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap); if( rc==SQLITE_OK ){ - if( pIter->aMap==0 && pIter->aBuffer==0 ){ - pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz); - if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM; - pIter->nBuffer = pTask->pgsz; + if( pIter->aMap==0 ){ + /* TODO: Combine this code with similar code in vdbePmaReaderInit() */ + int iBuf = pIter->iReadOff % pTask->pgsz; + if( pIter->aBuffer==0 ){ + pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz); + if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM; + pIter->nBuffer = pTask->pgsz; + } + if( iBuf ){ + int nRead = pTask->pgsz - iBuf; + if( (pIter->iReadOff + nRead) > pIter->iEof ){ + nRead = (int)(pIter->iEof - pIter->iReadOff); + } + rc = sqlite3OsRead( + pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff + ); + assert( rc!=SQLITE_IOERR_SHORT_READ ); + } } } @@ -577,7 +606,6 @@ static int vdbePmaReaderInit( ){ int rc = SQLITE_OK; int nBuf = pTask->pgsz; - void *pMap = 0; /* Mapping of temp file */ assert( pFile->iEof>iStart ); assert( pIter->aAlloc==0 ); @@ -589,33 +617,27 @@ static int vdbePmaReaderInit( if( pIter->aAlloc ){ /* Try to xFetch() a mapping of the entire temp file. If this is possible, ** the PMA will be read via the mapping. Otherwise, use xRead(). */ - if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){ - rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iEof, &pMap); - } + rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap); }else{ rc = SQLITE_NOMEM; } - if( rc==SQLITE_OK ){ - if( pMap ){ - pIter->aMap = (u8*)pMap; + if( rc==SQLITE_OK && pIter->aMap==0 ){ + pIter->nBuffer = nBuf; + pIter->aBuffer = (u8*)sqlite3Malloc(nBuf); + if( !pIter->aBuffer ){ + rc = SQLITE_NOMEM; }else{ - pIter->nBuffer = nBuf; - pIter->aBuffer = (u8*)sqlite3Malloc(nBuf); - if( !pIter->aBuffer ){ - rc = SQLITE_NOMEM; - }else{ - int iBuf = iStart % nBuf; - if( iBuf ){ - int nRead = nBuf - iBuf; - if( (iStart + nRead) > pFile->iEof ){ - nRead = (int)(pFile->iEof - iStart); - } - rc = sqlite3OsRead( - pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart - ); - assert( rc!=SQLITE_IOERR_SHORT_READ ); + int iBuf = iStart % nBuf; + if( iBuf ){ + int nRead = nBuf - iBuf; + if( (iStart + nRead) > pFile->iEof ){ + nRead = (int)(pFile->iEof - iStart); } + rc = sqlite3OsRead( + pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart + ); + assert( rc!=SQLITE_IOERR_SHORT_READ ); } } } @@ -805,6 +827,11 @@ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ pTask->file.pFd = 0; pTask->file.iEof = 0; } + if( pTask->file2.pFd ){ + sqlite3OsCloseFree(pTask->file2.pFd); + pTask->file2.pFd = 0; + pTask->file2.iEof = 0; + } } /* @@ -839,7 +866,7 @@ static MergeEngine *vdbeMergeEngineNew(int nIter){ int nByte; /* Total bytes of space to allocate */ MergeEngine *pNew; /* Pointer to allocated object to return */ - /* assert( nIter<=SORTER_MAX_MERGE_COUNT ); */ + assert( nIter<=SORTER_MAX_MERGE_COUNT ); while( NnInMemory = 0; pSorter->bUsePMA = 0; pSorter->iMemory = 0; + pSorter->mxKeysize = 0; sqlite3DbFree(db, pSorter->pUnpacked); pSorter->pUnpacked = 0; } @@ -1259,11 +1287,35 @@ static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){ sqlite3OsCurrentTimeInt64(db->pVfs, &t); fprintf(stderr, "%lld:X %s\n", t, zEvent); } +static void vdbeSorterPopulateDebug( + SortSubtask *pTask, + const char *zEvent +){ + i64 t; + int iTask = (pTask - pTask->pSorter->aTask); + sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); + fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent); +} #else # define vdbeSorterWorkDebug(x,y) # define vdbeSorterRewindDebug(x,y) +# define vdbeSorterPopulateDebug(x,y) #endif +static int vdbeSortAllocUnpacked(SortSubtask *pTask){ + if( pTask->pUnpacked==0 ){ + char *pFree; + pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( + pTask->pKeyInfo, 0, 0, &pFree + ); + assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); + if( pFree==0 ) return SQLITE_NOMEM; + pTask->pUnpacked->nField = pTask->pKeyInfo->nField; + pTask->pUnpacked->errCode = 0; + } + return SQLITE_OK; +} + /* ** The main routine for sorter-thread operations. */ @@ -1279,19 +1331,8 @@ static void *vdbeSortSubtaskMain(void *pCtx){ vdbeSorterWorkDebug(pTask, "enter"); - if( pTask->pUnpacked==0 ){ - char *pFree; - pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( - pTask->pKeyInfo, 0, 0, &pFree - ); - assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); - if( pFree==0 ){ - rc = SQLITE_NOMEM; - goto thread_out; - } - pTask->pUnpacked->nField = pTask->pKeyInfo->nField; - pTask->pUnpacked->errCode = 0; - } + rc = vdbeSortAllocUnpacked(pTask); + if( rc!=SQLITE_OK ) goto thread_out; if( pTask->eWork==SORT_SUBTASK_CONS ){ assert( pTask->pList==0 ); @@ -1533,6 +1574,9 @@ int sqlite3VdbeSorterWrite( } pSorter->nInMemory += nPMA; + if( nPMA>pSorter->mxKeysize ){ + pSorter->mxKeysize = nPMA; + } if( pSorter->aMemory ){ int nMin = pSorter->iMemory + nReq; @@ -1591,12 +1635,15 @@ static int vdbeSorterCountPMA(VdbeSorter *pSorter){ static int vdbeIncrPopulate(IncrMerger *pIncr){ int rc = SQLITE_OK; int rc2; + i64 iStart = pIncr->iStartOff; SorterFile *pOut = &pIncr->aFile[1]; MergeEngine *pMerger = pIncr->pMerger; PmaWriter writer; assert( pIncr->bEof==0 ); - vdbePmaWriterInit(pIncr->aFile[1].pFd, &writer, pIncr->pTask->pgsz, 0); + vdbeSorterPopulateDebug(pIncr->pTask, "enter"); + + vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart); while( rc==SQLITE_OK ){ int dummy; PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ]; @@ -1606,7 +1653,7 @@ static int vdbeIncrPopulate(IncrMerger *pIncr){ /* Check if the output file is full or if the input has been exhausted. ** In either case exit the loop. */ if( pReader->pFile==0 ) break; - if( iEof && (iEof + nKey)>pIncr->mxSz ) break; + if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break; /* Write the next key to the output. */ vdbePmaWriteVarint(&writer, nKey); @@ -1616,6 +1663,7 @@ static int vdbeIncrPopulate(IncrMerger *pIncr){ rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof); if( rc==SQLITE_OK ) rc = rc2; + vdbeSorterPopulateDebug(pIncr->pTask, "exit"); return rc; } @@ -1627,34 +1675,50 @@ static void *vdbeIncrPopulateThreadMain(void *pCtx){ static int vdbeIncrBgPopulate(IncrMerger *pIncr){ int rc; assert( pIncr->pThread==0 ); - if( pIncr->pTask->pSorter->bUseThreads==0 ){ + if( pIncr->bUseThread==0 ){ rc = vdbeIncrPopulate(pIncr); - }else{ + } +#if SQLITE_MAX_WORKER_THREADS>0 + else{ void *pCtx = (void*)pIncr; rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx); } +#endif return rc; } static int vdbeIncrSwap(IncrMerger *pIncr){ int rc = SQLITE_OK; - - if( pIncr->pThread ){ - void *pRet; - rc = sqlite3ThreadJoin(pIncr->pThread, &pRet); - if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); - pIncr->pThread = 0; - } - if( rc==SQLITE_OK ){ - SorterFile f0 = pIncr->aFile[0]; - pIncr->aFile[0] = pIncr->aFile[1]; - pIncr->aFile[1] = f0; + if( pIncr->bUseThread ){ +#if SQLITE_MAX_WORKER_THREADS>0 + if( pIncr->pThread ){ + void *pRet; + assert( pIncr->bUseThread ); + rc = sqlite3ThreadJoin(pIncr->pThread, &pRet); + if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + pIncr->pThread = 0; + } +#endif + + if( rc==SQLITE_OK ){ + SorterFile f0 = pIncr->aFile[0]; + pIncr->aFile[0] = pIncr->aFile[1]; + pIncr->aFile[1] = f0; + } - if( pIncr->aFile[0].iEof==0 ){ + if( rc==SQLITE_OK ){ + if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ + pIncr->bEof = 1; + }else{ + rc = vdbeIncrBgPopulate(pIncr); + } + } + }else{ + rc = vdbeIncrPopulate(pIncr); + pIncr->aFile[0] = pIncr->aFile[1]; + if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ pIncr->bEof = 1; - }else{ - rc = vdbeIncrBgPopulate(pIncr); } } @@ -1662,81 +1726,283 @@ static int vdbeIncrSwap(IncrMerger *pIncr){ } static void vdbeIncrFree(IncrMerger *pIncr){ - if( pIncr->pThread ){ - void *pRet; - sqlite3ThreadJoin(pIncr->pThread, &pRet); - } - if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); - if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); - vdbeMergeEngineFree(pIncr->pMerger); - sqlite3_free(pIncr); + if( pIncr ){ +#if SQLITE_MAX_WORKER_THREADS>0 + if( pIncr->pThread ){ + void *pRet; + sqlite3ThreadJoin(pIncr->pThread, &pRet); + } + if( pIncr->bUseThread ){ + if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); + if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); + } +#endif + vdbeMergeEngineFree(pIncr->pMerger); + sqlite3_free(pIncr); + } +} + +static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){ + IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger)); + if( pIncr ){ + memset(pIncr, 0, sizeof(IncrMerger)); + pIncr->pMerger = pMerger; + pIncr->pTask = pTask; + pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2); + pTask->file2.iEof += pIncr->mxSz; + +#if 0 + /* Open the two temp files. */ + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd); + if( rc==SQLITE_OK ){ + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd); + } + if( rc!=SQLITE_OK ){ + vdbeIncrFree(pIncr); + pIncr = 0; + } +#endif + } + return pIncr; +} + +static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){ + if( bUseThread ){ + pIncr->bUseThread = 1; + pIncr->pTask->file2.iEof -= pIncr->mxSz; + } +} + +static int vdbeIncrInit2(PmaReader *pIter){ + int rc = SQLITE_OK; + IncrMerger *pIncr = pIter->pIncr; + if( pIncr ){ + SortSubtask *pTask = pIncr->pTask; + int i; + MergeEngine *pMerger = pIncr->pMerger; + + for(i=0; rc==SQLITE_OK && inTree; i++){ + rc = vdbeIncrInit2(&pMerger->aIter[i]); + } + for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ + rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i); + } + + /* Set up the required files for pIncr */ + if( rc==SQLITE_OK ){ + if( pIncr->bUseThread==0 ){ + if( pTask->file2.pFd==0 ){ + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd); + assert( pTask->file2.iEof>0 ); + if( rc==SQLITE_OK ){ + vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof); + pTask->file2.iEof = 0; + } + } + if( rc==SQLITE_OK ){ + pIncr->aFile[1].pFd = pTask->file2.pFd; + pIncr->iStartOff = pTask->file2.iEof; + pTask->file2.iEof += pIncr->mxSz; + } + }else{ + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd); + if( rc==SQLITE_OK ){ + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd); + } + } + } + + if( rc==SQLITE_OK && pIncr->bUseThread ){ + rc = vdbeIncrBgPopulate(pIncr); + } + + if( rc==SQLITE_OK ){ + rc = vdbePmaReaderNext(pIter); + } + } + return rc; +} + +/* +** Allocate a new MergeEngine object to merge the contents of nPMA level-0 +** PMAs from pTask->file. If no error occurs, set *ppOut to point to +** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut +** to NULL and return an SQLite error code. +** +** When this function is called, *piOffset is set to the offset of the +** first PMA to read from pTask->file. Assuming no error occurs, it is +** set to the offset immediately following the last byte of the last +** PMA before returning. If an error does occur, then the final value of +** *piOffset is undefined. +*/ +static int vdbeMergeEngineLevel0( + SortSubtask *pTask, /* Sorter task to read from */ + int nPMA, /* Number of PMAs to read */ + i64 *piOffset, /* IN/OUT: Read offset in pTask->file */ + MergeEngine **ppOut /* OUT: New merge-engine */ +){ + MergeEngine *pNew; /* Merge engine to return */ + i64 iOff = *piOffset; + int i; + int rc = SQLITE_OK; + + *ppOut = pNew = vdbeMergeEngineNew(nPMA); + if( pNew==0 ) rc = SQLITE_NOMEM; + + for(i=0; iaIter[i]; + rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pIter, &nDummy); + iOff = pIter->iEof; + } + + if( rc!=SQLITE_OK ){ + vdbeMergeEngineFree(pNew); + *ppOut = 0; + } + *piOffset = iOff; + return rc; +} + +typedef struct IncrBuilder IncrBuilder; +struct IncrBuilder { + int nPMA; /* Number of iterators used so far */ + MergeEngine *pMerger; /* Merge engine to populate. */ +}; + +static int vdbeAddToBuilder( + SortSubtask *pTask, + IncrBuilder *pBuilder, + MergeEngine *pMerger +){ + int rc = SQLITE_OK; + IncrMerger *pIncr; + + assert( pMerger ); + if( pBuilder->nPMA==SORTER_MAX_MERGE_COUNT ){ + rc = vdbeAddToBuilder(pTask, &pBuilder[1], pBuilder->pMerger); + pBuilder->pMerger = 0; + pBuilder->nPMA = 0; + } + + if( rc==SQLITE_OK && pBuilder->pMerger==0 ){ + pBuilder->pMerger = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT); + if( pBuilder->pMerger==0 ) rc = SQLITE_NOMEM; + } + + if( rc==SQLITE_OK ){ + pIncr = vdbeIncrNew(pTask, pMerger); + if( pIncr==0 ) rc = SQLITE_NOMEM; + pBuilder->pMerger->aIter[pBuilder->nPMA++].pIncr = pIncr; + } + + if( rc!=SQLITE_OK ){ + vdbeMergeEngineFree(pMerger); + } + + return rc; } /* ** Populate iterator *pIter so that it may be used to iterate through all -** keys stored in subtask pTask using the incremental merge method. +** keys stored in all PMAs created by this sorter. */ static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){ SortSubtask *pTask0 = &pSorter->aTask[0]; + MergeEngine *pMain = 0; + sqlite3 *db = pTask0->db; int rc = SQLITE_OK; - MergeEngine *pMerger = 0; - IncrMerger *pIncr = 0; - int i; - int nPMA = 0; + int iTask; - for(i=0; inTask; i++){ - nPMA += pSorter->aTask[i].nPMA; + IncrBuilder *aMerge; + const int nMerge = 32; + aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge); + if( aMerge==0 ) return SQLITE_NOMEM; + + if( pSorter->nTask>1 ){ + pMain = vdbeMergeEngineNew(pSorter->nTask); + if( pMain==0 ) rc = SQLITE_NOMEM; } - pMerger = vdbeMergeEngineNew(nPMA); - if( pMerger==0 ){ - rc = SQLITE_NOMEM; - }else{ - int iIter = 0; + + for(iTask=0; iTasknTask && rc==SQLITE_OK; iTask++){ + MergeEngine *pRoot = 0; int iPMA; - for(i=0; inTask; i++){ - i64 iReadOff = 0; - SortSubtask *pTask = &pSorter->aTask[i]; - for(iPMA=0; iPMAnPMA; iPMA++){ - i64 nDummy = 0; - PmaReader *pIter = &pMerger->aIter[iIter++]; - rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nDummy); - iReadOff = pIter->iEof; + i64 iReadOff = 0; + SortSubtask *pTask = &pSorter->aTask[iTask]; + if( pTask->nPMA==0 ) continue; + for(iPMA=0; iPMAnPMA; iPMA += SORTER_MAX_MERGE_COUNT){ + MergeEngine *pMerger = 0; + int nReader = MIN(pTask->nPMA - iPMA, SORTER_MAX_MERGE_COUNT); + + rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger); + if( rc!=SQLITE_OK ) break; + + if( iPMA==0 ){ + pRoot = pMerger; + }else{ + if( pRoot ){ + rc = vdbeAddToBuilder(pTask, &aMerge[0], pRoot); + pRoot = 0; + if( rc!=SQLITE_OK ){ + vdbeMergeEngineFree(pMerger); + break; + } + } + rc = vdbeAddToBuilder(pTask, &aMerge[0], pMerger); } } - for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ - rc = vdbeSorterDoCompare(pTask0, pMerger, i); + + if( pRoot==0 ){ + int i; + for(i=0; rc==SQLITE_OK && imxSz = (pSorter->mxPmaSize / 2); - pIncr->pMerger = pMerger; - pIncr->pTask = pTask0; + if( rc==SQLITE_OK ){ + if( pMain==0 ){ + pMain = pRoot; + }else{ + IncrMerger *pNew = vdbeIncrNew(pTask, pRoot); + pMain->aIter[iTask].pIncr = pNew; + if( pNew==0 ) rc = SQLITE_NOMEM; + } + memset(aMerge, 0, nMerge*sizeof(aMerge[0])); } } - /* Open the two temp files. */ if( rc==SQLITE_OK ){ - rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[0].pFd); - } - if( rc==SQLITE_OK ){ - rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[1].pFd); - } + SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1]; - /* Launch a background thread to populate aFile[1]. */ - if( rc==SQLITE_OK ){ - rc = vdbeIncrBgPopulate(pIncr); + rc = vdbeSortAllocUnpacked(pLast); + if( rc==SQLITE_OK ){ + pIter->pIncr = vdbeIncrNew(pLast, pMain); + if( pIter->pIncr==0 ){ + rc = SQLITE_NOMEM; + }else{ + vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads); + for(iTask=0; iTask<(pSorter->nTask-1); iTask++){ + IncrMerger *pIncr; + if( (pIncr = pMain->aIter[iTask].pIncr) ){ + vdbeIncrSetThreads(pIncr, pSorter->bUseThreads); + assert( pIncr->pTask!=pLast ); + } + } + } + } } - - pIter->pIncr = pIncr; if( rc==SQLITE_OK ){ - rc = vdbePmaReaderNext(pIter); + rc = vdbeIncrInit2(pIter); } + + sqlite3_free(aMerge); return rc; } diff --git a/test/sort2.test b/test/sort2.test index 626630050c..4fb6a9462b 100644 --- a/test/sort2.test +++ b/test/sort2.test @@ -15,47 +15,71 @@ set testdir [file dirname $argv0] source $testdir/tester.tcl set testprefix sort2 -db close -sqlite3_shutdown -sqlite3_config_worker_threads 7 -reset_db - -do_execsql_test 1 { - PRAGMA cache_size = 5; - WITH r(x,y) AS ( - SELECT 1, randomblob(100) - UNION ALL - SELECT x+1, randomblob(100) FROM r - LIMIT 100000 - ) - SELECT count(x), length(y) FROM r GROUP BY (x%5) +foreach {tn script} { + 1 { } + 2 { + catch { db close } + sqlite3_shutdown + sqlite3_config_worker_threads 7 + reset_db + } } { - 20000 100 20000 100 20000 100 20000 100 20000 100 -} -do_execsql_test 2.1 { - CREATE TABLE t1(a, b); - WITH r(x,y) AS ( - SELECT 1, randomblob(100) - UNION ALL - SELECT x+1, randomblob(100) FROM r - LIMIT 10000 - ) INSERT INTO t1 SELECT * FROM r; -} + eval $script -do_execsql_test 2.2 { - CREATE UNIQUE INDEX i1 ON t1(b, a); -} + do_execsql_test $tn.1 { + PRAGMA cache_size = 5; + WITH r(x,y) AS ( + SELECT 1, randomblob(100) + UNION ALL + SELECT x+1, randomblob(100) FROM r + LIMIT 100000 + ) + SELECT count(x), length(y) FROM r GROUP BY (x%5) + } { + 20000 100 20000 100 20000 100 20000 100 20000 100 + } -do_execsql_test 2.3 { - CREATE UNIQUE INDEX i2 ON t1(a); -} + do_execsql_test $tn.2.1 { + CREATE TABLE t1(a, b); + WITH r(x,y) AS ( + SELECT 1, randomblob(100) + UNION ALL + SELECT x+1, randomblob(100) FROM r + LIMIT 10000 + ) INSERT INTO t1 SELECT * FROM r; + } + + do_execsql_test $tn.2.2 { + CREATE UNIQUE INDEX i1 ON t1(b, a); + } + + do_execsql_test $tn.2.3 { + CREATE UNIQUE INDEX i2 ON t1(a); + } + + do_execsql_test $tn.2.4 { PRAGMA integrity_check } {ok} + + breakpoint + do_execsql_test $tn.3 { + PRAGMA cache_size = 5; + WITH r(x,y) AS ( + SELECT 1, randomblob(100) + UNION ALL + SELECT x+1, randomblob(100) FROM r + LIMIT 1000000 + ) + SELECT count(x), length(y) FROM r GROUP BY (x%5) + } { + 200000 100 200000 100 200000 100 200000 100 200000 100 + } + + db close + sqlite3_shutdown + sqlite3_config_worker_threads 0 + sqlite3_initialize -do_execsql_test 2.4 { PRAGMA integrity_check } {ok} +} -db close -sqlite3_shutdown -sqlite3_config_worker_threads 0 -sqlite3_initialize finish_test