From: dan Date: Sat, 12 Apr 2014 19:34:44 +0000 (+0000) Subject: Fix many issues with new code. X-Git-Tag: version-3.8.7~132^2~75^2~3 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=82a8a9f1204ac4fe3ba6b658199884f1852a2b4a;p=thirdparty%2Fsqlite.git Fix many issues with new code. FossilOrigin-Name: 62c406a042d7246f6df6b943421182a88483b2e3 --- diff --git a/manifest b/manifest index f48df9c72d..bca9ab362e 100644 --- a/manifest +++ b/manifest @@ -1,5 +1,5 @@ -C Avoid\shaving\sthe\ssorter\smerge\stoo\smany\sPMAs\sat\sa\stime\swhen\sincrementally\smerging\sdata\sfollowing\sa\sSorterRewind(). -D 2014-04-11T19:43:07.755 +C Fix\smany\sissues\swith\snew\scode. +D 2014-04-12T19:34:44.467 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23 @@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4 F src/vdbeaux.c d8dc38965507a34b0e150c0d7fc82b02f8cf25ea F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447 -F src/vdbesort.c 2984e3624383adf9c762558b8f85a17a626c11a7 +F src/vdbesort.c bc0d90e00abcc88997f463d4d41b7ba4a10cfd88 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8 @@ -1163,7 +1163,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff -P f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f -R f6c598c1c558c5930404cda096730209 +P 98bf0307b121b0776a7170108cc8d3f948a7ebfe +R f3107fdb117ba86f9bee4609a5b08bfd U dan -Z 3e9d4ee1a6e7b343cf831d1b18651067 +Z 5a1b16a83fca264558f06f5fb6536949 diff --git a/manifest.uuid b/manifest.uuid index b1943056c1..e7ab0df5c2 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -98bf0307b121b0776a7170108cc8d3f948a7ebfe \ No newline at end of file +62c406a042d7246f6df6b943421182a88483b2e3 \ No newline at end of file diff --git a/src/vdbesort.c b/src/vdbesort.c index 16f6c618c6..1889c8fde1 100644 --- a/src/vdbesort.c +++ b/src/vdbesort.c @@ -88,6 +88,15 @@ #include "sqliteInt.h" #include "vdbeInt.h" +/* +** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various +** messages to stderr that may be helpful in understanding the performance +** characteristics of the sorter in multi-threaded mode. +*/ +#if 0 +# define SQLITE_DEBUG_SORTER_THREADS 1 +#endif + /* ** Private objects used by the sorter */ @@ -97,19 +106,48 @@ typedef struct PmaWriter PmaWriter; /* Incrementally write on PMA */ typedef struct SorterRecord SorterRecord; /* A record being sorted */ typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ typedef struct SorterFile SorterFile; +typedef struct SorterThread SorterThread; +typedef struct SorterList SorterList; typedef struct IncrMerger IncrMerger; +/* +** A container for a temp file handle and the current amount of data +** stored in the file. +*/ +struct SorterFile { + sqlite3_file *pFd; /* File handle */ + i64 iEof; /* Bytes of data stored in pFd */ +}; /* -** Candidate values for SortSubtask.eWork +** An object of this type is used to store the thread handle for each +** background thread launched by the sorter. Before the thread is launched, +** variable bDone is set to 0. Then, right before it exits, the thread +** itself sets bDone to 1. +** +** This is then used for two purposes: +** +** 1. When flushing the contents of memory to a level-0 PMA on disk, to +** attempt to select a SortSubtask for which there is not already an +** active background thread (since doing so causes the main thread +** to block until it finishes). +** +** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call +** to sqlite3ThreadJoin() is likely to block. +** +** In both cases, the effects of the main thread seeing (bDone==0) even +** after the thread has finished are not dire. So we don't worry about +** memory barriers and such here. */ -#define SORT_SUBTASK_SORT 1 /* Sort records on pList */ -#define SORT_SUBTASK_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pTemp1 */ -#define SORT_SUBTASK_CONS 3 /* Consolidate multiple PMAs */ +struct SorterThread { + SQLiteThread *pThread; + int bDone; +}; -struct SorterFile { - sqlite3_file *pFd; - i64 iEof; +struct SorterList { + SorterRecord *pList; /* Linked list of records */ + u8 *aMemory; /* If non-NULL, blob of memory for pList */ + int szPMA; /* Size of pList as PMA in bytes */ }; /* @@ -148,21 +186,13 @@ struct SorterFile { ** remain in temp file SortSubtask.pTemp1. */ struct SortSubtask { - SQLiteThread *pThread; /* Thread handle, or NULL */ - int bDone; /* Set to true by pTask when finished */ - + SorterThread thread; sqlite3 *db; /* Database connection */ VdbeSorter *pSorter; /* Sorter */ KeyInfo *pKeyInfo; /* How to compare records */ UnpackedRecord *pUnpacked; /* Space to unpack a record */ int pgsz; /* Main database page size */ - - u8 eWork; /* One of the SORT_SUBTASK_* constants */ - int nConsolidate; /* For SORT_SUBTASK_CONS, max final PMAs */ - SorterRecord *pList; /* List of records for pTask to sort */ - int nInMemory; /* Expected size of PMA based on pList */ - u8 *aListMemory; /* Records memory (or NULL) */ - + SorterList list; /* List for thread to write to a PMA */ int nPMA; /* Number of PMAs currently in file */ SorterFile file; /* Temp file for level-0 PMAs */ SorterFile file2; /* Space for other PMAs */ @@ -248,16 +278,19 @@ struct MergeEngine { ** largest record in the sorter. */ struct VdbeSorter { - int nInMemory; /* Current size of pRecord list as PMA */ int mnPmaSize; /* Minimum PMA size, in bytes */ int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ int bUsePMA; /* True if one or more PMAs created */ int bUseThreads; /* True if one or more PMAs created */ - SorterRecord *pRecord; /* Head of in-memory record list */ PmaReader *pReader; /* Read data from here after Rewind() */ int mxKeysize; /* Largest serialized key seen so far */ UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */ +#if 0 + int nInMemory; /* Current size of pRecord list as PMA */ + SorterRecord *pRecord; /* Head of in-memory record list */ u8 *aMemory; /* Block of memory to alloc records from */ +#endif + SorterList list; int iMemory; /* Offset of first free byte in aMemory */ int nMemory; /* Size of aMemory allocation in bytes */ int iPrev; /* Previous thread used to flush PMA */ @@ -292,7 +325,7 @@ struct PmaReader { */ struct IncrMerger { SortSubtask *pTask; /* Task that owns this merger */ - SQLiteThread *pThread; /* Thread currently populating aFile[1] */ + SorterThread thread; /* Thread for populating aFile[1] */ MergeEngine *pMerger; /* Merge engine thread reads data from */ i64 iStartOff; /* Offset to start writing file at */ int mxSz; /* Maximum bytes of data to store */ @@ -787,8 +820,8 @@ int sqlite3VdbeSorterInit( if( sqlite3GlobalConfig.pHeap==0 ){ assert( pSorter->iMemory==0 ); pSorter->nMemory = pgsz; - pSorter->aMemory = (u8*)sqlite3Malloc(pgsz); - if( !pSorter->aMemory ) rc = SQLITE_NOMEM; + pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz); + if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM; } } } @@ -815,13 +848,13 @@ static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ sqlite3DbFree(db, pTask->pUnpacked); pTask->pUnpacked = 0; - if( pTask->aListMemory==0 ){ - vdbeSorterRecordFree(0, pTask->pList); + if( pTask->list.aMemory==0 ){ + vdbeSorterRecordFree(0, pTask->list.pList); }else{ - sqlite3_free(pTask->aListMemory); - pTask->aListMemory = 0; + sqlite3_free(pTask->list.aMemory); + pTask->list.aMemory = 0; } - pTask->pList = 0; + pTask->list.pList = 0; if( pTask->file.pFd ){ sqlite3OsCloseFree(pTask->file.pFd); pTask->file.pFd = 0; @@ -834,28 +867,96 @@ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ } } +#ifdef SQLITE_DEBUG_SORTER_THREADS +static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){ + i64 t; + int iTask = (pTask - pTask->pSorter->aTask); + sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); + fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent); +} +static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){ + i64 t; + sqlite3OsCurrentTimeInt64(db->pVfs, &t); + fprintf(stderr, "%lld:X %s\n", t, zEvent); +} +static void vdbeSorterPopulateDebug( + SortSubtask *pTask, + const char *zEvent +){ + i64 t; + int iTask = (pTask - pTask->pSorter->aTask); + sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); + fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent); +} +static void vdbeSorterBlockDebug( + SortSubtask *pTask, + int bBlocked, + const char *zEvent +){ + if( bBlocked ){ + i64 t; + sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); + fprintf(stderr, "%lld:main %s\n", t, zEvent); + } +} +#else +# define vdbeSorterWorkDebug(x,y) +# define vdbeSorterRewindDebug(x,y) +# define vdbeSorterPopulateDebug(x,y) +# define vdbeSorterBlockDebug(x,y,z) +#endif + +#if SQLITE_MAX_WORKER_THREADS>0 /* -** Join all threads. +** Join thread p. +*/ +static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){ + int rc = SQLITE_OK; + if( p->pThread ){ +#ifdef SQLITE_DEBUG_SORTER_THREADS + int bDone = p->bDone; +#endif + void *pRet; + vdbeSorterBlockDebug(pTask, !bDone, "enter"); + rc = sqlite3ThreadJoin(p->pThread, &pRet); + vdbeSorterBlockDebug(pTask, !bDone, "exit"); + if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + assert( p->bDone==1 ); + p->bDone = 0; + p->pThread = 0; + } + return rc; +} + +/* +** Launch a background thread to run xTask(pIn). +*/ +static int vdbeSorterCreateThread( + SorterThread *p, /* Thread object to populate */ + void *(*xTask)(void*), /* Routine to run in a separate thread */ + void *pIn /* Argument passed into xTask() */ +){ + assert( p->pThread==0 && p->bDone==0 ); + return sqlite3ThreadCreate(&p->pThread, xTask, pIn); +} + +/* +** Join all outstanding threads launched by SorterWrite() to create +** level-0 PMAs. */ -#if SQLITE_MAX_WORKER_THREADS>0 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ int rc = rcin; int i; for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; - if( pTask->pThread ){ - void *pRet; - int rc2 = sqlite3ThreadJoin(pTask->pThread, &pRet); - pTask->pThread = 0; - pTask->bDone = 0; - if( rc==SQLITE_OK ) rc = rc2; - if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); - } + int rc2 = vdbeSorterJoinThread(pTask, &pTask->thread); + if( rc==SQLITE_OK ) rc = rc2; } return rc; } #else # define vdbeSorterJoinAll(x,rcin) (rcin) +# define vdbeSorterJoinThread(pTask,p) SQLITE_OK #endif /* @@ -908,11 +1009,11 @@ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ SortSubtask *pTask = &pSorter->aTask[i]; vdbeSortSubtaskCleanup(db, pTask); } - if( pSorter->aMemory==0 ){ - vdbeSorterRecordFree(0, pSorter->pRecord); + if( pSorter->list.aMemory==0 ){ + vdbeSorterRecordFree(0, pSorter->list.pList); } - pSorter->pRecord = 0; - pSorter->nInMemory = 0; + pSorter->list.pList = 0; + pSorter->list.szPMA = 0; pSorter->bUsePMA = 0; pSorter->iMemory = 0; pSorter->mxKeysize = 0; @@ -927,7 +1028,7 @@ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ VdbeSorter *pSorter = pCsr->pSorter; if( pSorter ){ sqlite3VdbeSorterReset(db, pSorter); - sqlite3_free(pSorter->aMemory); + sqlite3_free(pSorter->list.aMemory); sqlite3DbFree(db, pSorter); pCsr->pSorter = 0; } @@ -952,6 +1053,21 @@ static int vdbeSorterOpenTempFile(sqlite3_vfs *pVfs, sqlite3_file **ppFile){ return rc; } +static int vdbeSortAllocUnpacked(SortSubtask *pTask){ + if( pTask->pUnpacked==0 ){ + char *pFree; + pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( + pTask->pKeyInfo, 0, 0, &pFree + ); + assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); + if( pFree==0 ) return SQLITE_NOMEM; + pTask->pUnpacked->nField = pTask->pKeyInfo->nField; + pTask->pUnpacked->errCode = 0; + } + return SQLITE_OK; +} + + /* ** Merge the two sorted lists p1 and p2 into a single list. ** Set *ppOut to the head of the new list. @@ -991,25 +1107,29 @@ static void vdbeSorterMerge( ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if ** an error occurs. */ -static int vdbeSorterSort(SortSubtask *pTask){ +static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){ int i; SorterRecord **aSlot; SorterRecord *p; + int rc; + + rc = vdbeSortAllocUnpacked(pTask); + if( rc!=SQLITE_OK ) return rc; aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *)); if( !aSlot ){ return SQLITE_NOMEM; } - p = pTask->pList; + p = pList->pList; while( p ){ SorterRecord *pNext; - if( pTask->aListMemory ){ - if( (u8*)p==pTask->aListMemory ){ + if( pList->aMemory ){ + if( (u8*)p==pList->aMemory ){ pNext = 0; }else{ - assert( p->u.iNextaListMemory) ); - pNext = (SorterRecord*)&pTask->aListMemory[p->u.iNext]; + assert( p->u.iNextaMemory) ); + pNext = (SorterRecord*)&pList->aMemory[p->u.iNext]; } }else{ pNext = p->u.pNext; @@ -1028,9 +1148,13 @@ static int vdbeSorterSort(SortSubtask *pTask){ for(i=0; i<64; i++){ vdbeSorterMerge(pTask, p, aSlot[i], &p); } - pTask->pList = p; + pList->pList = p; sqlite3_free(aSlot); + if( pTask->pUnpacked->errCode ){ + assert( pTask->pUnpacked->errCode==SQLITE_NOMEM ); + return SQLITE_NOMEM; + } return SQLITE_OK; } @@ -1144,8 +1268,9 @@ static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){ /* -** Write the current contents of the in-memory linked-list to a PMA. Return -** SQLITE_OK if successful, or an SQLite error code otherwise. +** Write the current contents of in-memory linked-list pList to a level-0 +** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if +** successful, or an SQLite error code otherwise. ** ** The format of a PMA is: ** @@ -1156,12 +1281,19 @@ static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){ ** Each record consists of a varint followed by a blob of data (the ** key). The varint is the number of bytes in the blob of data. */ -static int vdbeSorterListToPMA(SortSubtask *pTask){ +static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){ int rc = SQLITE_OK; /* Return code */ PmaWriter writer; /* Object used to write to the file */ +#ifdef SQLITE_DEBUG + /* Set iSz to the expected size of file pTask->file after writing the PMA. + ** This is used by an assert() statement at the end of this function. */ + i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof; +#endif + + vdbeSorterWorkDebug(pTask, "enter"); memset(&writer, 0, sizeof(PmaWriter)); - assert( pTask->nInMemory>0 ); + assert( pList->szPMA>0 ); /* If the first temporary PMA file has not been opened, open it now. */ if( pTask->file.pFd==0 ){ @@ -1174,10 +1306,15 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){ /* Try to get the file to memory map */ if( rc==SQLITE_OK ){ vdbeSorterExtendFile(pTask->db, - pTask->file.pFd, pTask->file.iEof + pTask->nInMemory + 9 + pTask->file.pFd, pTask->file.iEof + pList->szPMA + 9 ); } + /* Sort the list */ + if( rc==SQLITE_OK ){ + rc = vdbeSorterSort(pTask, pList); + } + if( rc==SQLITE_OK ){ SorterRecord *p; SorterRecord *pNext = 0; @@ -1185,18 +1322,20 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){ vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz, pTask->file.iEof); pTask->nPMA++; - vdbePmaWriteVarint(&writer, pTask->nInMemory); - for(p=pTask->pList; p; p=pNext){ + vdbePmaWriteVarint(&writer, pList->szPMA); + for(p=pList->pList; p; p=pNext){ pNext = p->u.pNext; vdbePmaWriteVarint(&writer, p->nVal); vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal); - if( pTask->aListMemory==0 ) sqlite3_free(p); + if( pList->aMemory==0 ) sqlite3_free(p); } - pTask->pList = p; + pList->pList = p; rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof); } - assert( pTask->pList==0 || rc!=SQLITE_OK ); + vdbeSorterWorkDebug(pTask, "exit"); + assert( rc!=SQLITE_OK || pList->pList==0 ); + assert( rc!=SQLITE_OK || pTask->file.iEof==iSz ); return rc; } @@ -1275,249 +1414,84 @@ static int vdbeSorterNext( return rc; } -#if 0 -static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){ - i64 t; - int iTask = (pTask - pTask->pSorter->aTask); - sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); - fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent); -} -static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){ - i64 t; - sqlite3OsCurrentTimeInt64(db->pVfs, &t); - fprintf(stderr, "%lld:X %s\n", t, zEvent); -} -static void vdbeSorterPopulateDebug( - SortSubtask *pTask, - const char *zEvent -){ - i64 t; - int iTask = (pTask - pTask->pSorter->aTask); - sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); - fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent); -} -#else -# define vdbeSorterWorkDebug(x,y) -# define vdbeSorterRewindDebug(x,y) -# define vdbeSorterPopulateDebug(x,y) -#endif - -static int vdbeSortAllocUnpacked(SortSubtask *pTask){ - if( pTask->pUnpacked==0 ){ - char *pFree; - pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( - pTask->pKeyInfo, 0, 0, &pFree - ); - assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); - if( pFree==0 ) return SQLITE_NOMEM; - pTask->pUnpacked->nField = pTask->pKeyInfo->nField; - pTask->pUnpacked->errCode = 0; - } - return SQLITE_OK; -} - /* ** The main routine for sorter-thread operations. */ -static void *vdbeSortSubtaskMain(void *pCtx){ - int rc = SQLITE_OK; +static void *vdbeSorterFlushThread(void *pCtx){ SortSubtask *pTask = (SortSubtask*)pCtx; - - assert( pTask->eWork==SORT_SUBTASK_SORT - || pTask->eWork==SORT_SUBTASK_TO_PMA - || pTask->eWork==SORT_SUBTASK_CONS - ); - assert( pTask->bDone==0 ); - - vdbeSorterWorkDebug(pTask, "enter"); - - rc = vdbeSortAllocUnpacked(pTask); - if( rc!=SQLITE_OK ) goto thread_out; - - if( pTask->eWork==SORT_SUBTASK_CONS ){ - assert( pTask->pList==0 ); - while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){ - int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT); - sqlite3_file *pTemp2 = 0; /* Second temp file to use */ - MergeEngine *pMerger; /* Object for reading/merging PMA data */ - i64 iReadOff = 0; /* Offset in pTemp1 to read from */ - i64 iWriteOff = 0; /* Offset in pTemp2 to write to */ - int i; - - /* Allocate a merger object to merge PMAs together. */ - pMerger = vdbeMergeEngineNew(nIter); - if( pMerger==0 ){ - rc = SQLITE_NOMEM; - break; - } - - /* Open a second temp file to write merged data to */ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2); - if( rc==SQLITE_OK ){ - vdbeSorterExtendFile(pTask->db, pTemp2, pTask->file.iEof); - }else{ - vdbeMergeEngineFree(pMerger); - break; - } - - /* This loop runs once for each output PMA. Each output PMA is made - ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */ - for(i=0; rc==SQLITE_OK && inPMA; i+=SORTER_MAX_MERGE_COUNT){ - PmaWriter writer; /* Object for writing data to pTemp2 */ - i64 nOut = 0; /* Bytes of data in output PMA */ - int bEof = 0; - int rc2; - - /* Configure the merger object to read and merge data from the next - ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs, - ** if that is fewer). */ - int iIter; - for(iIter=0; iIteraIter[iIter]; - rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nOut); - iReadOff = pIter->iEof; - if( iReadOff>=pTask->file.iEof || rc!=SQLITE_OK ) break; - } - for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){ - rc = vdbeSorterDoCompare(pTask, pMerger, iIter); - } - - vdbePmaWriterInit(pTemp2, &writer, pTask->pgsz, iWriteOff); - vdbePmaWriteVarint(&writer, nOut); - while( rc==SQLITE_OK && bEof==0 ){ - PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ]; - assert( pIter->pFile!=0 ); /* pIter is not at EOF */ - vdbePmaWriteVarint(&writer, pIter->nKey); - vdbePmaWriteBlob(&writer, pIter->aKey, pIter->nKey); - rc = vdbeSorterNext(pTask, pMerger, &bEof); - } - rc2 = vdbePmaWriterFinish(&writer, &iWriteOff); - if( rc==SQLITE_OK ) rc = rc2; - } - - vdbeMergeEngineFree(pMerger); - sqlite3OsCloseFree(pTask->file.pFd); - pTask->file.pFd = pTemp2; - pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT); - pTask->file.iEof = iWriteOff; - } - }else{ - /* Sort the pTask->pList list */ - rc = vdbeSorterSort(pTask); - - /* If required, write the list out to a PMA. */ - if( rc==SQLITE_OK && pTask->eWork==SORT_SUBTASK_TO_PMA ){ -#ifdef SQLITE_DEBUG - i64 nExpect = pTask->nInMemory - + sqlite3VarintLen(pTask->nInMemory) - + pTask->file.iEof; -#endif - rc = vdbeSorterListToPMA(pTask); - assert( rc!=SQLITE_OK || (nExpect==pTask->file.iEof) ); - } - } - - thread_out: - pTask->bDone = 1; - if( rc==SQLITE_OK && pTask->pUnpacked->errCode ){ - assert( pTask->pUnpacked->errCode==SQLITE_NOMEM ); - rc = SQLITE_NOMEM; - } - vdbeSorterWorkDebug(pTask, "exit"); + int rc; /* Return code */ + assert( pTask->thread.bDone==0 ); + rc = vdbeSorterListToPMA(pTask, &pTask->list); + pTask->thread.bDone = 1; return SQLITE_INT_TO_PTR(rc); } /* -** Run the activity scheduled by the object passed as the only argument -** in the current thread. -*/ -static int vdbeSorterRunTask(SortSubtask *pTask){ - int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pTask) ); - assert( pTask->bDone ); - pTask->bDone = 0; - return rc; -} - -/* -** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly +** Flush the current contents of VdbeSorter.list to a new PMA, possibly ** using a background thread. -** -** If argument bFg is non-zero, the operation always uses the calling thread. */ -static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ - VdbeSorter *pSorter = pCsr->pSorter; +static int vdbeSorterFlushPMA(VdbeSorter *pSorter){ +#if SQLITE_MAX_WORKER_THREADS==0 + pSorter->bUsePMA = 1; + return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list); +#else int rc = SQLITE_OK; int i; SortSubtask *pTask = 0; /* Thread context used to create new PMA */ int nWorker = (pSorter->nTask-1); + /* Set the flag to indicate that at least one PMA has been written. + ** Or will be, anyhow. */ pSorter->bUsePMA = 1; + + /* Select a sub-task to sort and flush the current list of in-memory + ** records to disk. If the sorter is running in multi-threaded mode, + ** round-robin between the first (pSorter->nTask-1) tasks. Except, if + ** the background thread from a sub-tasks previous turn is still running, + ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy, + ** fall back to using the final sub-task. The first (pSorter->nTask-1) + ** sub-tasks are prefered as they use background threads - the final + ** sub-task uses the main thread. */ for(i=0; iiPrev + i + 1) % nWorker; pTask = &pSorter->aTask[iTest]; -#if SQLITE_MAX_WORKER_THREADS>0 - if( pTask->bDone ){ - void *pRet; - assert( pTask->pThread ); - rc = sqlite3ThreadJoin(pTask->pThread, &pRet); - pTask->pThread = 0; - pTask->bDone = 0; - if( rc==SQLITE_OK ){ - rc = SQLITE_PTR_TO_INT(pRet); - } + if( pTask->thread.bDone ){ + rc = vdbeSorterJoinThread(pTask, &pTask->thread); } -#endif - if( pTask->pThread==0 ) break; - pTask = 0; - } - if( pTask==0 ){ - pTask = &pSorter->aTask[nWorker]; + if( pTask->thread.pThread==0 || rc!=SQLITE_OK ) break; } - pSorter->iPrev = (pTask - pSorter->aTask); if( rc==SQLITE_OK ){ - assert( pTask->pThread==0 && pTask->bDone==0 ); - pTask->eWork = SORT_SUBTASK_TO_PMA; - pTask->pList = pSorter->pRecord; - pTask->nInMemory = pSorter->nInMemory; - pSorter->nInMemory = 0; - pSorter->pRecord = 0; - - if( pSorter->aMemory ){ - u8 *aMem = pTask->aListMemory; - pTask->aListMemory = pSorter->aMemory; - pSorter->aMemory = aMem; - } - -#if SQLITE_MAX_WORKER_THREADS>0 - if( !bFg && pTask!=&pSorter->aTask[nWorker] ){ + if( i==nWorker ){ + /* Use the foreground thread for this operation */ + rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list); + }else{ /* Launch a background thread for this operation */ + u8 *aMem = pTask->list.aMemory; void *pCtx = (void*)pTask; - assert( pSorter->aMemory==0 || pTask->aListMemory!=0 ); - if( pTask->aListMemory ){ - if( pSorter->aMemory==0 ){ - pSorter->aMemory = sqlite3Malloc(pSorter->nMemory); - if( pSorter->aMemory==0 ) return SQLITE_NOMEM; - }else{ - pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory); - } - } - rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx); - }else -#endif - { - /* Use the foreground thread for this operation */ - rc = vdbeSorterRunTask(pTask); - if( rc==SQLITE_OK ){ - u8 *aMem = pTask->aListMemory; - pTask->aListMemory = pSorter->aMemory; - pSorter->aMemory = aMem; - assert( pTask->pList==0 ); + + assert( pTask->thread.pThread==0 && pTask->thread.bDone==0 ); + assert( pTask->list.pList==0 ); + assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 ); + + pSorter->iPrev = (pTask - pSorter->aTask); + pTask->list = pSorter->list; + pSorter->list.pList = 0; + pSorter->list.szPMA = 0; + if( aMem ){ + pSorter->list.aMemory = aMem; + pSorter->nMemory = sqlite3MallocSize(aMem); + }else{ + pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory); + if( !pSorter->list.aMemory ) return SQLITE_NOMEM; } + + rc = vdbeSorterCreateThread(&pTask->thread, vdbeSorterFlushThread, pCtx); } } return rc; +#endif } /* @@ -1557,28 +1531,28 @@ int sqlite3VdbeSorterWrite( nReq = pVal->n + sizeof(SorterRecord); nPMA = pVal->n + sqlite3VarintLen(pVal->n); if( pSorter->mxPmaSize ){ - if( pSorter->aMemory ){ + if( pSorter->list.aMemory ){ bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize; }else{ bFlush = ( - (pSorter->nInMemory > pSorter->mxPmaSize) - || (pSorter->nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) + (pSorter->list.szPMA > pSorter->mxPmaSize) + || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) ); } if( bFlush ){ - rc = vdbeSorterFlushPMA(db, pCsr, 0); - pSorter->nInMemory = 0; + rc = vdbeSorterFlushPMA(pSorter); + pSorter->list.szPMA = 0; pSorter->iMemory = 0; - assert( rc!=SQLITE_OK || pSorter->pRecord==0 ); + assert( rc!=SQLITE_OK || pSorter->list.pList==0 ); } } - pSorter->nInMemory += nPMA; + pSorter->list.szPMA += nPMA; if( nPMA>pSorter->mxKeysize ){ pSorter->mxKeysize = nPMA; } - if( pSorter->aMemory ){ + if( pSorter->list.aMemory ){ int nMin = pSorter->iMemory + nReq; if( nMin>pSorter->nMemory ){ @@ -1588,45 +1562,33 @@ int sqlite3VdbeSorterWrite( if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize; if( nNew < nMin ) nNew = nMin; - aNew = sqlite3Realloc(pSorter->aMemory, nNew); + aNew = sqlite3Realloc(pSorter->list.aMemory, nNew); if( !aNew ) return SQLITE_NOMEM; - pSorter->pRecord = (SorterRecord*)( - aNew + ((u8*)pSorter->pRecord - pSorter->aMemory) + pSorter->list.pList = (SorterRecord*)( + aNew + ((u8*)pSorter->list.pList - pSorter->list.aMemory) ); - pSorter->aMemory = aNew; + pSorter->list.aMemory = aNew; pSorter->nMemory = nNew; } - pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory]; + pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory]; pSorter->iMemory += ROUND8(nReq); - pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory; + pNew->u.iNext = (u8*)(pSorter->list.pList) - pSorter->list.aMemory; }else{ pNew = (SorterRecord *)sqlite3Malloc(nReq); if( pNew==0 ){ return SQLITE_NOMEM; } - pNew->u.pNext = pSorter->pRecord; + pNew->u.pNext = pSorter->list.pList; } memcpy(SRVAL(pNew), pVal->z, pVal->n); pNew->nVal = pVal->n; - pSorter->pRecord = pNew; + pSorter->list.pList = pNew; return rc; } -/* -** Return the total number of PMAs in all temporary files. -*/ -static int vdbeSorterCountPMA(VdbeSorter *pSorter){ - int nPMA = 0; - int i; - for(i=0; inTask; i++){ - nPMA += pSorter->aTask[i].nPMA; - } - return nPMA; -} - /* ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format ** of the data stored in aFile[1] is the same as that used by regular PMAs, @@ -1667,39 +1629,27 @@ static int vdbeIncrPopulate(IncrMerger *pIncr){ return rc; } -static void *vdbeIncrPopulateThreadMain(void *pCtx){ +static void *vdbeIncrPopulateThread(void *pCtx){ IncrMerger *pIncr = (IncrMerger*)pCtx; - return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) ); + void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) ); + pIncr->thread.bDone = 1; + return pRet; } -static int vdbeIncrBgPopulate(IncrMerger *pIncr){ - int rc; - assert( pIncr->pThread==0 ); - if( pIncr->bUseThread==0 ){ - rc = vdbeIncrPopulate(pIncr); - } #if SQLITE_MAX_WORKER_THREADS>0 - else{ - void *pCtx = (void*)pIncr; - rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx); - } -#endif - return rc; +static int vdbeIncrBgPopulate(IncrMerger *pIncr){ + void *pCtx = (void*)pIncr; + assert( pIncr->bUseThread ); + return vdbeSorterCreateThread(&pIncr->thread, vdbeIncrPopulateThread, pCtx); } +#endif static int vdbeIncrSwap(IncrMerger *pIncr){ int rc = SQLITE_OK; - if( pIncr->bUseThread ){ #if SQLITE_MAX_WORKER_THREADS>0 - if( pIncr->pThread ){ - void *pRet; - assert( pIncr->bUseThread ); - rc = sqlite3ThreadJoin(pIncr->pThread, &pRet); - if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); - pIncr->pThread = 0; - } -#endif + if( pIncr->bUseThread ){ + rc = vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread); if( rc==SQLITE_OK ){ SorterFile f0 = pIncr->aFile[0]; @@ -1714,7 +1664,9 @@ static int vdbeIncrSwap(IncrMerger *pIncr){ rc = vdbeIncrBgPopulate(pIncr); } } - }else{ + }else +#endif + { rc = vdbeIncrPopulate(pIncr); pIncr->aFile[0] = pIncr->aFile[1]; if( pIncr->aFile[0].iEof==pIncr->iStartOff ){ @@ -1728,10 +1680,7 @@ static int vdbeIncrSwap(IncrMerger *pIncr){ static void vdbeIncrFree(IncrMerger *pIncr){ if( pIncr ){ #if SQLITE_MAX_WORKER_THREADS>0 - if( pIncr->pThread ){ - void *pRet; - sqlite3ThreadJoin(pIncr->pThread, &pRet); - } + vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread); if( pIncr->bUseThread ){ if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); @@ -1750,18 +1699,6 @@ static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){ pIncr->pTask = pTask; pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2); pTask->file2.iEof += pIncr->mxSz; - -#if 0 - /* Open the two temp files. */ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd); - if( rc==SQLITE_OK ){ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd); - } - if( rc!=SQLITE_OK ){ - vdbeIncrFree(pIncr); - pIncr = 0; - } -#endif } return pIncr; } @@ -1784,9 +1721,6 @@ static int vdbeIncrInit2(PmaReader *pIter){ for(i=0; rc==SQLITE_OK && inTree; i++){ rc = vdbeIncrInit2(&pMerger->aIter[i]); } - for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ - rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i); - } /* Set up the required files for pIncr */ if( rc==SQLITE_OK ){ @@ -1812,6 +1746,10 @@ static int vdbeIncrInit2(PmaReader *pIter){ } } + for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ + rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i); + } + if( rc==SQLITE_OK && pIncr->bUseThread ){ rc = vdbeIncrBgPopulate(pIncr); } @@ -1998,9 +1936,7 @@ static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){ } } } - if( rc==SQLITE_OK ){ - rc = vdbeIncrInit2(pIter); - } + if( rc==SQLITE_OK ) rc = vdbeIncrInit2(pIter); sqlite3_free(aMerge); return rc; @@ -2022,17 +1958,9 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly ** from the in-memory list. */ if( pSorter->bUsePMA==0 ){ - if( pSorter->pRecord ){ - SortSubtask *pTask = &pSorter->aTask[0]; + if( pSorter->list.pList ){ *pbEof = 0; - pTask->pList = pSorter->pRecord; - pTask->eWork = SORT_SUBTASK_SORT; - assert( pTask->aListMemory==0 ); - pTask->aListMemory = pSorter->aMemory; - rc = vdbeSorterRunTask(pTask); - pTask->aListMemory = 0; - pSorter->pRecord = pTask->pList; - pTask->pList = 0; + rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list); }else{ *pbEof = 1; } @@ -2040,8 +1968,8 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ } /* Write the current in-memory list to a PMA. */ - if( pSorter->pRecord ){ - rc = vdbeSorterFlushPMA(db, pCsr, 1); + if( pSorter->list.pList ){ + rc = vdbeSorterFlushPMA(pSorter); } /* Join all threads */ @@ -2076,11 +2004,11 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ rc = vdbePmaReaderNext(pSorter->pReader); *pbEof = (pSorter->pReader->pFile==0); }else{ - SorterRecord *pFree = pSorter->pRecord; - pSorter->pRecord = pFree->u.pNext; + SorterRecord *pFree = pSorter->list.pList; + pSorter->list.pList = pFree->u.pNext; pFree->u.pNext = 0; - if( pSorter->aMemory==0 ) vdbeSorterRecordFree(db, pFree); - *pbEof = !pSorter->pRecord; + if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree); + *pbEof = !pSorter->list.pList; rc = SQLITE_OK; } return rc; @@ -2099,8 +2027,8 @@ static void *vdbeSorterRowkey( *pnKey = pSorter->pReader->nKey; pKey = pSorter->pReader->aKey; }else{ - *pnKey = pSorter->pRecord->nVal; - pKey = SRVAL(pSorter->pRecord); + *pnKey = pSorter->list.pList->nVal; + pKey = SRVAL(pSorter->list.pList); } return pKey; }