From: dan Date: Mon, 7 Apr 2014 18:44:00 +0000 (+0000) Subject: Experimental multi-threaded sorting changes to begin merging PMAs before SorterRewind... X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=refs%2Fheads%2Fabandoned;p=thirdparty%2Fsqlite.git Experimental multi-threaded sorting changes to begin merging PMAs before SorterRewind() is called. FossilOrigin-Name: cbfc0f6d49b6607bb4eb45bfe4c258e39ac27403 --- diff --git a/manifest b/manifest index 2eb2b1da22..71ccb1cec3 100644 --- a/manifest +++ b/manifest @@ -1,5 +1,5 @@ -C Fix\sharmless\scompiler\swarnings. -D 2014-04-04T22:44:59.018 +C Experimental\smulti-threaded\ssorting\schanges\sto\sbegin\smerging\sPMAs\sbefore\sSorterRewind()\sis\scalled. +D 2014-04-07T18:44:00.443 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23 @@ -218,7 +218,7 @@ F src/random.c d10c1f85b6709ca97278428fd5db5bbb9c74eece F src/resolve.c 273d5f47c4e2c05b2d3d2bffeda939551ab59e66 F src/rowset.c 64655f1a627c9c212d9ab497899e7424a34222e0 F src/select.c 20055cf917222e660c4222fea306bd13a0623caa -F src/shell.c afc0b1a5a646d287142ef0c9a2a6e3139d57cba2 +F src/shell.c b44c3f17f0bf41b3431e9cc171706251156ae85f F src/sqlite.h.in 81221c50addbf698c3247154d92efd1095bfd885 F src/sqlite3.rc 11094cc6a157a028b301a9f06b3d03089ea37c3e F src/sqlite3ext.h 886f5a34de171002ad46fae8c36a7d8051c190fc @@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4 F src/vdbeaux.c d8dc38965507a34b0e150c0d7fc82b02f8cf25ea F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447 -F src/vdbesort.c 8da916fc74e78edd5bc95653206942e01710ac09 +F src/vdbesort.c 07c29a5a61e041eeb1c366eefef9e99dfe5e2174 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8 @@ -1163,7 +1163,10 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff -P 5e3dfa27c71a666e122e3cf64897038ff8424800 -R bbcde0d30a2bb025a3c15f4a10b2b404 -U drh -Z 9376e61a8443d421f4f6f69d3d5500ac +P e54dded2012f0ab486ee138e9bd57c528af33980 +R 020c9d4d2bbea35c99aea5cfddab0eac +T *branch * threads-experimental +T *sym-threads-experimental * +T -sym-threads * +U dan +Z 16c0542d21b377d284993b464ea194ff diff --git a/manifest.uuid b/manifest.uuid index 44109c7358..96a44839db 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -e54dded2012f0ab486ee138e9bd57c528af33980 \ No newline at end of file +cbfc0f6d49b6607bb4eb45bfe4c258e39ac27403 \ No newline at end of file diff --git a/src/shell.c b/src/shell.c index e032bd36d2..40ac24093a 100644 --- a/src/shell.c +++ b/src/shell.c @@ -3535,7 +3535,7 @@ static void main_init(struct callback_data *data) { sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> "); sqlite3_snprintf(sizeof(continuePrompt), continuePrompt," ...> "); sqlite3_config(SQLITE_CONFIG_MULTITHREAD); - sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 3); + sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 4); } /* diff --git a/src/vdbesort.c b/src/vdbesort.c index f59e8f51f5..030c84ae7d 100644 --- a/src/vdbesort.c +++ b/src/vdbesort.c @@ -97,13 +97,42 @@ typedef struct PmaWriter PmaWriter; /* Incrementally write on PMA */ typedef struct SorterRecord SorterRecord; /* A record being sorted */ typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ +typedef struct SortList SortList; +typedef struct SortFile SortFile; +typedef struct SortLevel SortLevel; + /* -** Candidate values for SortSubtask.eWork +** A file containing zero or more PMAs. */ -#define SORT_SUBTASK_SORT 1 /* Sort records on pList */ -#define SORT_SUBTASK_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pTemp1 */ -#define SORT_SUBTASK_CONS 3 /* Consolidate multiple PMAs */ +struct SortFile { + sqlite3_file *pFd; /* File handle */ + i64 iOff; /* Current write offset */ + i64 nByte; /* Actual size of file */ + int nPMA; /* Number of PMA currently in file */ +}; + +/* +** A list of records. +*/ +struct SortList { + SorterRecord *pRecord; /* List of records for pTask to sort */ + int nInMemory; /* Expected size of PMA based on pList */ + u8 *aMemory; /* Records memory (or NULL) */ +}; + +struct SortLevel { + SortSubtask *pTask; /* Sorter task this level is a part of */ + SQLiteThread *pThread; /* Thread handle, or NULL */ + int bDone; /* Set to true by pThread when finished */ + union { + SortFile f; /* Input for level 1 and greater */ + SortList l; /* Input for level 0 */ + } in; + SortFile out; /* Level storage */ + SortLevel *pNext; /* Next level (containing larger PMAs) */ + UnpackedRecord *pUnpacked; /* Space to unpack a record */ +}; /* ** Sorting is divided up into smaller subtasks. Each subtask is controlled @@ -141,23 +170,13 @@ typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ ** remain in temp file SortSubtask.pTemp1. */ struct SortSubtask { - SQLiteThread *pThread; /* Thread handle, or NULL */ - int bDone; /* Set to true by pTask when finished */ - + int iId; /* Sub-task id */ sqlite3 *db; /* Database connection */ + VdbeSorter *pSorter; /* Sorter that owns this object */ KeyInfo *pKeyInfo; /* How to compare records */ - UnpackedRecord *pUnpacked; /* Space to unpack a record */ int pgsz; /* Main database page size */ - - u8 eWork; /* One of the SORT_SUBTASK_* constants */ - int nConsolidate; /* For SORT_SUBTASK_CONS, max final PMAs */ - SorterRecord *pList; /* List of records for pTask to sort */ - int nInMemory; /* Expected size of PMA based on pList */ - u8 *aListMemory; /* Records memory (or NULL) */ - - int nPMA; /* Number of PMAs currently in pTemp1 */ - i64 iTemp1Off; /* Offset to write to in pTemp1 */ - sqlite3_file *pTemp1; /* File to write PMAs to, or NULL */ + int nConsolidate; /* For consolidation, max final PMAs */ + SortLevel *pLevel; /* PMA level 0 */ }; @@ -235,16 +254,15 @@ struct MergeEngine { ** sorter cursor created by the VDBE. */ struct VdbeSorter { - int nInMemory; /* Current size of pRecord list as PMA */ int mnPmaSize; /* Minimum PMA size, in bytes */ int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ int bUsePMA; /* True if one or more PMAs created */ - SorterRecord *pRecord; /* Head of in-memory record list */ MergeEngine *pMerger; /* For final merge of PMAs (by caller) */ - u8 *aMemory; /* Block of memory to alloc records from */ - int iMemory; /* Offset of first free byte in aMemory */ - int nMemory; /* Size of aMemory allocation in bytes */ - int iPrev; /* Previous thread used to flush PMA */ + UnpackedRecord *pUnpacked; /* Used by sqlite3VdbeSorterCompare */ + int iMemory; /* Offset of free byte in list.aMemory */ + int nMemory; /* Size of list.aMemory allocation in bytes */ + SortList list; /* In memory records */ + int iPrev; /* Previous PMA flushed via task iPrev */ int nTask; /* Size of aTask[] array */ SortSubtask aTask[1]; /* One or more subtasks */ }; @@ -496,6 +514,7 @@ static int vdbePmaReaderNext(PmaReader *pIter){ */ static int vdbePmaReaderInit( SortSubtask *pTask, /* Thread context */ + SortFile *pFile, /* File to read from */ i64 iStart, /* Start offset in pTask->pTemp1 */ PmaReader *pIter, /* Iterator to populate */ i64 *pnByte /* IN/OUT: Increment this value by PMA size */ @@ -504,18 +523,18 @@ static int vdbePmaReaderInit( int nBuf = pTask->pgsz; void *pMap = 0; /* Mapping of temp file */ - assert( pTask->iTemp1Off>iStart ); + assert( pFile->iOff>iStart ); assert( pIter->aAlloc==0 ); assert( pIter->aBuffer==0 ); - pIter->pFile = pTask->pTemp1; + pIter->pFile = pFile->pFd; pIter->iReadOff = iStart; pIter->nAlloc = 128; pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc); if( pIter->aAlloc ){ /* Try to xFetch() a mapping of the entire temp file. If this is possible, ** the PMA will be read via the mapping. Otherwise, use xRead(). */ - if( pTask->iTemp1Off<=(i64)(pTask->db->nMaxSorterMmap) ){ - rc = sqlite3OsFetch(pIter->pFile, 0, pTask->iTemp1Off, &pMap); + if( pFile->iOff<=(i64)(pTask->db->nMaxSorterMmap) ){ + rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iOff, &pMap); } }else{ rc = SQLITE_NOMEM; @@ -533,12 +552,12 @@ static int vdbePmaReaderInit( int iBuf = iStart % nBuf; if( iBuf ){ int nRead = nBuf - iBuf; - if( (iStart + nRead) > pTask->iTemp1Off ){ - nRead = (int)(pTask->iTemp1Off - iStart); + if( (iStart + nRead) > pFile->iOff ){ + nRead = (int)(pFile->iOff - iStart); } rc = sqlite3OsRead( - pTask->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart - ); + pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart + ); assert( rc!=SQLITE_IOERR_SHORT_READ ); } } @@ -547,7 +566,7 @@ static int vdbePmaReaderInit( if( rc==SQLITE_OK ){ u64 nByte; /* Size of PMA in bytes */ - pIter->iEof = pTask->iTemp1Off; + pIter->iEof = pFile->iOff; rc = vdbePmaReadVarint(pIter, &nByte); pIter->iEof = pIter->iReadOff + nByte; *pnByte += nByte; @@ -562,25 +581,25 @@ static int vdbePmaReaderInit( /* ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, -** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences +** size nKey2 bytes). Use pKeyInfo for the collation sequences ** used by the comparison. Return the result of the comparison. ** -** Before returning, object (pTask->pUnpacked) is populated with the +** Before returning, object pUnpacked is populated with the ** unpacked version of key2. Or, if pKey2 is passed a NULL pointer, then it -** is assumed that the (pTask->pUnpacked) structure already contains the +** is assumed that the pUnpacked structure already contains the ** unpacked key to use as key2. ** -** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set +** If an OOM error is encountered, (pUnpacked->error_rc) is set ** to SQLITE_NOMEM. */ static int vdbeSorterCompare( - SortSubtask *pTask, /* Subtask context (for pKeyInfo) */ + KeyInfo *pKeyInfo, + UnpackedRecord *r2, const void *pKey1, int nKey1, /* Left side of comparison */ const void *pKey2, int nKey2 /* Right side of comparison */ ){ - UnpackedRecord *r2 = pTask->pUnpacked; if( pKey2 ){ - sqlite3VdbeRecordUnpack(pTask->pKeyInfo, nKey2, pKey2, r2); + sqlite3VdbeRecordUnpack(pKeyInfo, nKey2, pKey2, r2); } return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0); } @@ -591,7 +610,7 @@ static int vdbeSorterCompare( ** value to recalculate. */ static int vdbeSorterDoCompare( - SortSubtask *pTask, + SortLevel *pLvl, MergeEngine *pMerger, int iOut ){ @@ -620,9 +639,9 @@ static int vdbeSorterDoCompare( iRes = i1; }else{ int res; - assert( pTask->pUnpacked!=0 ); /* allocated in vdbeSortSubtaskMain() */ - res = vdbeSorterCompare( - pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey + assert( pLvl->pUnpacked!=0 ); /* allocated in vdbeSorterThread() */ + res = vdbeSorterCompare(pLvl->pTask->pKeyInfo, pLvl->pUnpacked, + p1->aKey, p1->nKey, p2->aKey, p2->nKey ); if( res<=0 ){ iRes = i1; @@ -674,6 +693,7 @@ int sqlite3VdbeSorterInit( pTask->pKeyInfo = pKeyInfo; pTask->pgsz = pgsz; pTask->db = db; + pTask->pSorter = pSorter; } if( !sqlite3TempInMemory(db) ){ @@ -688,8 +708,8 @@ int sqlite3VdbeSorterInit( if( sqlite3GlobalConfig.pHeap==0 ){ assert( pSorter->iMemory==0 ); pSorter->nMemory = pgsz; - pSorter->aMemory = (u8*)sqlite3Malloc(pgsz); - if( !pSorter->aMemory ) rc = SQLITE_NOMEM; + pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz); + if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM; } } } @@ -710,23 +730,31 @@ static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){ } /* -** Free all resources owned by the object indicated by argument pTask. All -** fields of *pTask are zeroed before returning. +** Free all resources owned by the object indicated by argument pTask. +** This does not include joining any outstanding threads. All fields of +** *pTask are zeroed before returning. */ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){ - sqlite3DbFree(db, pTask->pUnpacked); - pTask->pUnpacked = 0; - if( pTask->aListMemory==0 ){ - vdbeSorterRecordFree(0, pTask->pList); - }else{ - sqlite3_free(pTask->aListMemory); - pTask->aListMemory = 0; - } - pTask->pList = 0; - if( pTask->pTemp1 ){ - sqlite3OsCloseFree(pTask->pTemp1); - pTask->pTemp1 = 0; + SortLevel *pLvl; + SortLevel *pNext; + for(pLvl=pTask->pLevel; pLvl; pLvl=pNext){ + pNext = pLvl->pNext; + assert( pLvl->pThread==0 ); + if( pLvl==pTask->pLevel ){ + if( pLvl->in.l.aMemory==0 ){ + vdbeSorterRecordFree(0, pLvl->in.l.pRecord); + }else{ + sqlite3_free(pLvl->in.l.aMemory); + } + }else{ + if( pLvl->in.f.pFd ) sqlite3OsCloseFree(pLvl->in.f.pFd); + } + if( pLvl->out.pFd ) sqlite3OsCloseFree(pLvl->out.pFd); + sqlite3DbFree(db, pLvl->pUnpacked); + sqlite3_free(pLvl); } + pTask->pLevel = 0; + pTask->nConsolidate = 0; } /* @@ -738,13 +766,16 @@ static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ int i; for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; - if( pTask->pThread ){ - void *pRet; - int rc2 = sqlite3ThreadJoin(pTask->pThread, &pRet); - pTask->pThread = 0; - pTask->bDone = 0; - if( rc==SQLITE_OK ) rc = rc2; - if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + SortLevel *pLvl; + for(pLvl=pTask->pLevel; pLvl; pLvl=pLvl->pNext){ + if( pLvl->pThread ){ + void *pRet; + int rc2 = sqlite3ThreadJoin(pLvl->pThread, &pRet); + pLvl->pThread = 0; + pLvl->bDone = 0; + if( rc==SQLITE_OK ) rc = rc2; + if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + } } } return rc; @@ -799,11 +830,11 @@ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){ SortSubtask *pTask = &pSorter->aTask[i]; vdbeSortSubtaskCleanup(db, pTask); } - if( pSorter->aMemory==0 ){ - vdbeSorterRecordFree(0, pSorter->pRecord); + if( pSorter->list.aMemory==0 ){ + vdbeSorterRecordFree(0, pSorter->list.pRecord); } - pSorter->pRecord = 0; - pSorter->nInMemory = 0; + pSorter->list.pRecord = 0; + pSorter->list.nInMemory = 0; pSorter->bUsePMA = 0; pSorter->iMemory = 0; } @@ -816,7 +847,8 @@ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){ if( pSorter ){ sqlite3VdbeSorterReset(db, pSorter); vdbeMergeEngineFree(pSorter->pMerger); - sqlite3_free(pSorter->aMemory); + sqlite3_free(pSorter->list.aMemory); + sqlite3DbFree(db, pSorter->pUnpacked); sqlite3DbFree(db, pSorter); pCsr->pSorter = 0; } @@ -846,7 +878,8 @@ static int vdbeSorterOpenTempFile(sqlite3_vfs *pVfs, sqlite3_file **ppFile){ ** Set *ppOut to the head of the new list. */ static void vdbeSorterMerge( - SortSubtask *pTask, /* Calling thread context */ + KeyInfo *pKeyInfo, + UnpackedRecord *r2, SorterRecord *p1, /* First list to merge */ SorterRecord *p2, /* Second list to merge */ SorterRecord **ppOut /* OUT: Head of merged list */ @@ -857,7 +890,7 @@ static void vdbeSorterMerge( while( p1 && p2 ){ int res; - res = vdbeSorterCompare(pTask, SRVAL(p1), p1->nVal, pVal2, p2->nVal); + res = vdbeSorterCompare(pKeyInfo, r2, SRVAL(p1), p1->nVal, pVal2, p2->nVal); if( res<=0 ){ *pp = p1; pp = &p1->u.pNext; @@ -876,11 +909,15 @@ static void vdbeSorterMerge( } /* -** Sort the linked list of records headed at pTask->pList. Return +** Sort the linked list of records headed at pLvl->in.l.pRecord. Return ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if ** an error occurs. */ -static int vdbeSorterSort(SortSubtask *pTask){ +static int vdbeSorterSort( + SortList *pList, + KeyInfo *pKeyInfo, + UnpackedRecord *pUnpacked +){ int i; SorterRecord **aSlot; SorterRecord *p; @@ -890,15 +927,15 @@ static int vdbeSorterSort(SortSubtask *pTask){ return SQLITE_NOMEM; } - p = pTask->pList; + p = pList->pRecord; while( p ){ SorterRecord *pNext; - if( pTask->aListMemory ){ - if( (u8*)p==pTask->aListMemory ){ + if( pList->aMemory ){ + if( (u8*)p==pList->aMemory ){ pNext = 0; }else{ - assert( p->u.iNextaListMemory) ); - pNext = (SorterRecord*)&pTask->aListMemory[p->u.iNext]; + assert( p->u.iNextaMemory) ); + pNext = (SorterRecord*)&pList->aMemory[p->u.iNext]; } }else{ pNext = p->u.pNext; @@ -906,7 +943,7 @@ static int vdbeSorterSort(SortSubtask *pTask){ p->u.pNext = 0; for(i=0; aSlot[i]; i++){ - vdbeSorterMerge(pTask, p, aSlot[i], &p); + vdbeSorterMerge(pKeyInfo, pUnpacked, p, aSlot[i], &p); aSlot[i] = 0; } aSlot[i] = p; @@ -915,9 +952,9 @@ static int vdbeSorterSort(SortSubtask *pTask){ p = 0; for(i=0; i<64; i++){ - vdbeSorterMerge(pTask, p, aSlot[i], &p); + vdbeSorterMerge(pKeyInfo, pUnpacked, p, aSlot[i], &p); } - pTask->pList = p; + pList->pRecord = p; sqlite3_free(aSlot); return SQLITE_OK; @@ -1017,13 +1054,19 @@ static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){ ** Whether or not the file does end up memory mapped of course depends on ** the specific VFS implementation. */ -static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){ - if( nByte<=(i64)(db->nMaxSorterMmap) ){ - int rc = sqlite3OsTruncate(pFile, nByte); +static void vdbeSorterExtendFile( + sqlite3 *db, + SortFile *pFile, + i64 nByte +){ + if( nByte<=(i64)(db->nMaxSorterMmap) && nByte>pFile->nByte ){ + sqlite3_file *pFd = pFile->pFd; + int rc = sqlite3OsTruncate(pFd, nByte); if( rc==SQLITE_OK ){ void *p = 0; - sqlite3OsFetch(pFile, 0, nByte, &p); - sqlite3OsUnfetch(pFile, 0, p); + sqlite3OsFetch(pFd, 0, nByte, &p); + sqlite3OsUnfetch(pFd, 0, p); + pFile->nByte = nByte; } } } @@ -1031,64 +1074,6 @@ static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){ # define vdbeSorterExtendFile(x,y,z) SQLITE_OK #endif - -/* -** Write the current contents of the in-memory linked-list to a PMA. Return -** SQLITE_OK if successful, or an SQLite error code otherwise. -** -** The format of a PMA is: -** -** * A varint. This varint contains the total number of bytes of content -** in the PMA (not including the varint itself). -** -** * One or more records packed end-to-end in order of ascending keys. -** Each record consists of a varint followed by a blob of data (the -** key). The varint is the number of bytes in the blob of data. -*/ -static int vdbeSorterListToPMA(SortSubtask *pTask){ - int rc = SQLITE_OK; /* Return code */ - PmaWriter writer; /* Object used to write to the file */ - - memset(&writer, 0, sizeof(PmaWriter)); - assert( pTask->nInMemory>0 ); - - /* If the first temporary PMA file has not been opened, open it now. */ - if( pTask->pTemp1==0 ){ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->pTemp1); - assert( rc!=SQLITE_OK || pTask->pTemp1 ); - assert( pTask->iTemp1Off==0 ); - assert( pTask->nPMA==0 ); - } - - /* Try to get the file to memory map */ - if( rc==SQLITE_OK ){ - vdbeSorterExtendFile(pTask->db, - pTask->pTemp1, pTask->iTemp1Off + pTask->nInMemory + 9 - ); - } - - if( rc==SQLITE_OK ){ - SorterRecord *p; - SorterRecord *pNext = 0; - - vdbePmaWriterInit(pTask->pTemp1, &writer, pTask->pgsz, - pTask->iTemp1Off); - pTask->nPMA++; - vdbePmaWriteVarint(&writer, pTask->nInMemory); - for(p=pTask->pList; p; p=pNext){ - pNext = p->u.pNext; - vdbePmaWriteVarint(&writer, p->nVal); - vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal); - if( pTask->aListMemory==0 ) sqlite3_free(p); - } - pTask->pList = p; - rc = vdbePmaWriterFinish(&writer, &pTask->iTemp1Off); - } - - assert( pTask->pList==0 || rc!=SQLITE_OK ); - return rc; -} - /* ** Advance the MergeEngine iterator passed as the second argument to ** the next entry. Set *pbEof to true if this means the iterator has @@ -1097,12 +1082,14 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){ ** Return SQLITE_OK if successful or an error code if an error occurs. */ static int vdbeSorterNext( - SortSubtask *pTask, - MergeEngine *pMerger, + SortLevel *pLvl, + MergeEngine *pMerger, int *pbEof ){ int rc; int iPrev = pMerger->aTree[1];/* Index of iterator to advance */ + KeyInfo *pKeyInfo = pLvl->pTask->pKeyInfo; + UnpackedRecord *r2 = pLvl->pUnpacked; /* Advance the current iterator */ rc = vdbePmaReaderNext(&pMerger->aIter[iPrev]); @@ -1128,19 +1115,19 @@ static int vdbeSorterNext( }else if( pIter2->pFile==0 ){ iRes = -1; }else{ - iRes = vdbeSorterCompare(pTask, + iRes = vdbeSorterCompare(pKeyInfo, r2, pIter1->aKey, pIter1->nKey, pKey2, pIter2->nKey ); } /* If pIter1 contained the smaller value, set aTree[i] to its index. ** Then set pIter2 to the next iterator to compare to pIter1. In this - ** case there is no cache of pIter2 in pTask->pUnpacked, so set + ** case there is no cache of pIter2 in pLvl->pUnpacked, so set ** pKey2 to point to the record belonging to pIter2. ** ** Alternatively, if pIter2 contains the smaller of the two values, ** set aTree[i] to its index and update pIter1. If vdbeSorterCompare() - ** was actually called above, then pTask->pUnpacked now contains + ** was actually called above, then pLvl->pUnpacked now contains ** a value equivalent to pIter2. So set pKey2 to NULL to prevent ** vdbeSorterCompare() from decoding pIter2 again. ** @@ -1164,136 +1151,234 @@ static int vdbeSorterNext( return rc; } -/* -** The main routine for sorter-thread operations. -*/ -static void *vdbeSortSubtaskMain(void *pCtx){ - int rc = SQLITE_OK; - SortSubtask *pTask = (SortSubtask*)pCtx; +static UnpackedRecord *vdbeSorterAllocUnpackedRecord(KeyInfo *pKeyInfo){ + char *pFree; + UnpackedRecord *pRet; + pRet = sqlite3VdbeAllocUnpackedRecord(pKeyInfo, 0, 0, &pFree); + assert( pRet==(UnpackedRecord*)pFree ); + if( pRet ){ + pRet->nField = pKeyInfo->nField; + pRet->errCode = 0; + } + return pRet; +} - assert( pTask->eWork==SORT_SUBTASK_SORT - || pTask->eWork==SORT_SUBTASK_TO_PMA - || pTask->eWork==SORT_SUBTASK_CONS - ); - assert( pTask->bDone==0 ); +#if 0 +static void vdbeSorterWorkDebug(SortLevel *pLvl, const char *zEvent){ + i64 t; + SortLevel *p; + SortSubtask *pTask = pLvl->pTask; + int iTask = (pTask - pTask->pSorter->aTask); + int iLvl = 0; + for(p=pTask->pLevel; p!=pLvl; p=p->pNext) iLvl++; + sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t); + fprintf(stderr, "%lld:%d.%d %s\n", t, iTask, iLvl, zEvent); +} +static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){ + i64 t; + sqlite3OsCurrentTimeInt64(db->pVfs, &t); + fprintf(stderr, "%lld:X %s\n", t, zEvent); +} +#else +# define vdbeSorterWorkDebug(x,y) +# define vdbeSorterRewindDebug(x,y) +#endif - if( pTask->pUnpacked==0 ){ - char *pFree; - pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( - pTask->pKeyInfo, 0, 0, &pFree - ); - assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); - if( pFree==0 ){ +/* +** Merge the data currently stored in (pLevel->in), if any, into a new PMA +** stored within (pLevel->out). +*/ +static int vdbeSorterWorkLevel(SortLevel *pLvl){ + int rc = SQLITE_OK; /* Return code */ + SortSubtask *pTask = pLvl->pTask; + MergeEngine *pMerger = 0; + SortFile *pOut = &pLvl->out; /* Write new PMA here */ + i64 nOut = 0; /* Expected size of new PMA */ + PmaWriter writer; /* Used to write new PMA to pOut */ + int bEof = 0; + + vdbeSorterWorkDebug(pLvl, "enter"); + + if( pLvl->pUnpacked==0 ){ + pLvl->pUnpacked = vdbeSorterAllocUnpackedRecord(pTask->pKeyInfo); + if( pLvl->pUnpacked==0 ){ rc = SQLITE_NOMEM; - goto thread_out; + goto work_level_out; } - pTask->pUnpacked->nField = pTask->pKeyInfo->nField; - pTask->pUnpacked->errCode = 0; } - if( pTask->eWork==SORT_SUBTASK_CONS ){ - assert( pTask->pList==0 ); - while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){ - int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT); - sqlite3_file *pTemp2 = 0; /* Second temp file to use */ - MergeEngine *pMerger; /* Object for reading/merging PMA data */ - i64 iReadOff = 0; /* Offset in pTemp1 to read from */ - i64 iWriteOff = 0; /* Offset in pTemp2 to write to */ - int i; - - /* Allocate a merger object to merge PMAs together. */ - pMerger = vdbeMergeEngineNew(nIter); + if( pLvl->out.pFd==0 ){ + assert( pLvl->out.iOff==0 ); + assert( pLvl->out.nByte==0 ); + assert( pLvl->out.nPMA==0 ); + rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pLvl->out.pFd); + if( rc!=SQLITE_OK ) goto work_level_out; + } + + if( pLvl==pTask->pLevel ){ + if( pLvl->in.l.pRecord==0 ){ + bEof = 1; + }else{ + rc = vdbeSorterSort(&pLvl->in.l, pTask->pKeyInfo, pLvl->pUnpacked); + nOut = pLvl->in.l.nInMemory; + } + }else{ + int nPMA = pLvl->in.f.nPMA; + if( nPMA==0 ){ + bEof = 1; + }else{ + pMerger = vdbeMergeEngineNew(nPMA); if( pMerger==0 ){ rc = SQLITE_NOMEM; - break; - } - - /* Open a second temp file to write merged data to */ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2); - if( rc==SQLITE_OK ){ - vdbeSorterExtendFile(pTask->db, pTemp2, pTask->iTemp1Off); }else{ - vdbeMergeEngineFree(pMerger); - break; - } - - /* This loop runs once for each output PMA. Each output PMA is made - ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */ - for(i=0; rc==SQLITE_OK && inPMA; i+=SORTER_MAX_MERGE_COUNT){ - PmaWriter writer; /* Object for writing data to pTemp2 */ - i64 nOut = 0; /* Bytes of data in output PMA */ - int bEof = 0; - int rc2; - - /* Configure the merger object to read and merge data from the next - ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs, - ** if that is fewer). */ + /* Configure the merger object to read and merge data from all + ** PMAs at pLvl. */ int iIter; - for(iIter=0; iIteraIter[iIter]; - rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nOut); + rc = vdbePmaReaderInit(pTask, &pLvl->in.f, iReadOff, pIter, &nOut); iReadOff = pIter->iEof; - if( iReadOff>=pTask->iTemp1Off || rc!=SQLITE_OK ) break; } + for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){ - rc = vdbeSorterDoCompare(pTask, pMerger, iIter); + rc = vdbeSorterDoCompare(pLvl, pMerger, iIter); } + } + } + } + if( rc!=SQLITE_OK ) goto work_level_out; - vdbePmaWriterInit(pTemp2, &writer, pTask->pgsz, iWriteOff); - vdbePmaWriteVarint(&writer, nOut); - while( rc==SQLITE_OK && bEof==0 ){ - PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ]; - assert( pIter->pFile!=0 ); /* pIter is not at EOF */ - vdbePmaWriteVarint(&writer, pIter->nKey); - vdbePmaWriteBlob(&writer, pIter->aKey, pIter->nKey); - rc = vdbeSorterNext(pTask, pMerger, &bEof); - } - rc2 = vdbePmaWriterFinish(&writer, &iWriteOff); - if( rc==SQLITE_OK ) rc = rc2; + /* If mmap is to be used, pre-extend and map the temp file. */ + vdbeSorterExtendFile(pTask->db, &pLvl->out, pLvl->out.iOff + nOut + 9); + + if( bEof==0 ){ + vdbePmaWriterInit(pOut->pFd, &writer, pTask->pgsz, pOut->iOff); + vdbePmaWriteVarint(&writer, nOut); + + while( rc==SQLITE_OK && bEof==0 ){ + u8 *aKey; /* Next key to write to output */ + int nKey; /* Size of aKey[] in bytes */ + if( pMerger==0 ){ + aKey = SRVAL(pLvl->in.l.pRecord); + nKey = pLvl->in.l.pRecord->nVal; + }else{ + PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ]; + assert( pIter->pFile ); /* pIter is not at EOF */ + aKey = pIter->aKey; + nKey = pIter->nKey; } - vdbeMergeEngineFree(pMerger); - sqlite3OsCloseFree(pTask->pTemp1); - pTask->pTemp1 = pTemp2; - pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT); - pTask->iTemp1Off = iWriteOff; + vdbePmaWriteVarint(&writer, nKey); + vdbePmaWriteBlob(&writer, aKey, nKey); + + if( pMerger==0 ){ + SorterRecord *pNext = pLvl->in.l.pRecord->u.pNext; + if( pLvl->in.l.aMemory==0 ) sqlite3_free(pLvl->in.l.pRecord); + pLvl->in.l.pRecord = pNext; + bEof = (pNext==0); + }else{ + rc = vdbeSorterNext(pLvl, pMerger, &bEof); + } } - }else{ - /* Sort the pTask->pList list */ - rc = vdbeSorterSort(pTask); - - /* If required, write the list out to a PMA. */ - if( rc==SQLITE_OK && pTask->eWork==SORT_SUBTASK_TO_PMA ){ -#ifdef SQLITE_DEBUG - i64 nExpect = pTask->nInMemory - + sqlite3VarintLen(pTask->nInMemory) - + pTask->iTemp1Off; -#endif - rc = vdbeSorterListToPMA(pTask); - assert( rc!=SQLITE_OK || (nExpect==pTask->iTemp1Off) ); + rc = vdbePmaWriterFinish(&writer, &pOut->iOff); + pOut->nPMA++; + + if( rc==SQLITE_OK && pMerger ){ + sqlite3OsCloseFree(pLvl->in.f.pFd); + pLvl->in.f.pFd = 0; } + vdbeMergeEngineFree(pMerger); } - thread_out: - pTask->bDone = 1; - if( rc==SQLITE_OK && pTask->pUnpacked->errCode ){ - assert( pTask->pUnpacked->errCode==SQLITE_NOMEM ); + if( rc==SQLITE_OK && ( + (pOut->nPMA>=SORTER_MAX_MERGE_COUNT) + || (pTask->nConsolidate && pLvl->pNext) + || (pTask->nConsolidate && pTask->nConsolidatenPMA) + )){ + SortLevel *pNext = pLvl->pNext; + if( pNext==0 ){ + pNext = (SortLevel*)sqlite3_malloc(sizeof(SortLevel)); + if( pNext==0 ){ + rc = SQLITE_NOMEM; + goto work_level_out; + } + memset(pNext, 0, sizeof(SortLevel)); + pLvl->pNext = pNext; + pNext->pTask = pTask; + } + + /* If there is a thread running on the next level, block on it. */ +#if SQLITE_MAX_WORKER_THREADS>0 + if( pNext->pThread ){ + void *pRet; + rc = sqlite3ThreadJoin(pNext->pThread, &pRet); + pNext->pThread = 0; + pNext->bDone = 0; + if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + if( rc!=SQLITE_OK ) goto work_level_out; + } +#endif + + pNext->in.f = pLvl->out; + memset(&pLvl->out, 0, sizeof(pLvl->out)); + } + + work_level_out: + vdbeSorterWorkDebug(pLvl, "exit"); + if( rc==SQLITE_OK && pLvl->pUnpacked->errCode ){ + assert( pLvl->pUnpacked->errCode==SQLITE_NOMEM ); rc = SQLITE_NOMEM; } - return SQLITE_INT_TO_PTR(rc); + return rc; } /* ** Run the activity scheduled by the object passed as the only argument ** in the current thread. */ -static int vdbeSorterRunTask(SortSubtask *pTask){ - int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pTask) ); - assert( pTask->bDone ); - pTask->bDone = 0; +static int vdbeSorterRun(SortLevel *pLvl){ + int rc; + + assert( pLvl->bDone==0 ); + assert( pLvl->pThread==0 ); + while( 1 ){ + rc = vdbeSorterWorkLevel(pLvl); + if( rc==SQLITE_OK && pLvl->pTask->pLevel==pLvl && pLvl->in.l.aMemory ){ + assert( pLvl->pTask->pSorter->list.aMemory==0 ); + assert( pLvl->in.l.pRecord==0 ); + pLvl->pTask->pSorter->list.aMemory = pLvl->in.l.aMemory; + pLvl->in.l.aMemory = 0; + } + + if( rc!=SQLITE_OK || pLvl->out.nPMA>0 ) break; + pLvl = pLvl->pNext; + assert( pLvl->bDone==0 ); + assert( pLvl->pThread==0 ); + } + + pLvl->bDone = 0; return rc; } +#if SQLITE_MAX_WORKER_THREADS>0 +static void *vdbeSorterThread(void *pCtx){ + int rc; + SortLevel *pLvl = (SortLevel*)pCtx; + + rc = vdbeSorterWorkLevel(pLvl); + if( rc==SQLITE_OK && pLvl->out.nPMA==0 ){ + SortLevel *pNext = pLvl->pNext; + void *pCtx = (void*)pNext; + assert( pNext->pThread==0 ); + rc = sqlite3ThreadCreate(&pNext->pThread, vdbeSorterThread, pCtx); + } + + pLvl->bDone = 1; + return SQLITE_INT_TO_PTR(rc); +} +#endif + /* ** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly ** using a background thread. @@ -1304,71 +1389,74 @@ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){ VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; int i; - SortSubtask *pTask = 0; /* Thread context used to create new PMA */ - int nWorker = (pSorter->nTask-1); + SortSubtask *pTask = 0; /* Sub-task new PMA is written to */ + SortLevel *pLevel; /* Level to write to */ + /* Set the use-temp-files flag. */ pSorter->bUsePMA = 1; - for(i=0; iiPrev + i + 1) % nWorker; - pTask = &pSorter->aTask[iTest]; -#if SQLITE_MAX_WORKER_THREADS>0 - if( pTask->bDone ){ - void *pRet; - assert( pTask->pThread ); - rc = sqlite3ThreadJoin(pTask->pThread, &pRet); - pTask->pThread = 0; - pTask->bDone = 0; - if( rc==SQLITE_OK ){ - rc = SQLITE_PTR_TO_INT(pRet); - } + + /* Select one of the sub-tasks to flush this PMA. In single threaded + ** mode (pSorter->nTask==1), this is always aTask[0]. In multi-threaded mode, + ** it may be any of the pSorter->nTask sub-tasks. */ + for(i=0; inTask; i++){ + pTask = &pSorter->aTask[i]; + if( pTask->pLevel==0 + || pTask->pLevel->pThread==0 + || pTask->pLevel->bDone + ){ + break; } -#endif - if( pTask->pThread==0 ) break; - pTask = 0; } - if( pTask==0 ){ - pTask = &pSorter->aTask[nWorker]; - } - pSorter->iPrev = (pTask - pSorter->aTask); - if( rc==SQLITE_OK ){ - assert( pTask->pThread==0 && pTask->bDone==0 ); - pTask->eWork = SORT_SUBTASK_TO_PMA; - pTask->pList = pSorter->pRecord; - pTask->nInMemory = pSorter->nInMemory; - pSorter->nInMemory = 0; - pSorter->pRecord = 0; - - if( pSorter->aMemory ){ - u8 *aMem = pTask->aListMemory; - pTask->aListMemory = pSorter->aMemory; - pSorter->aMemory = aMem; + /* If the first level for this task has not been allocated, allocate it. */ + if( pTask->pLevel==0 ){ + SortLevel *pNew = (SortLevel*)sqlite3_malloc(sizeof(SortLevel)); + if( pNew==0 ){ + rc = SQLITE_NOMEM; + }else{ + memset(pNew, 0, sizeof(SortLevel)); + pNew->pTask = pTask; + pTask->pLevel = pNew; } + } + pLevel = pTask->pLevel; + /* If there is a background thread using the selected task, wait for + ** it to finish. */ #if SQLITE_MAX_WORKER_THREADS>0 - if( !bFg && pTask!=&pSorter->aTask[nWorker] ){ - /* Launch a background thread for this operation */ - void *pCtx = (void*)pTask; - assert( pSorter->aMemory==0 || pTask->aListMemory!=0 ); - if( pTask->aListMemory ){ - if( pSorter->aMemory==0 ){ - pSorter->aMemory = sqlite3Malloc(pSorter->nMemory); - if( pSorter->aMemory==0 ) return SQLITE_NOMEM; - }else{ - pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory); - } + if( rc==SQLITE_OK && pLevel->pThread ){ + void *pRet = 0; + rc = sqlite3ThreadJoin(pLevel->pThread, &pRet); + pLevel->pThread = 0; + pLevel->bDone = 0; + if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); + } +#endif + + if( rc==SQLITE_OK ){ + u8 *aNewMem = 0; + if( pSorter->list.aMemory && pSorter->nTask>1 ){ + aNewMem = pLevel->in.l.aMemory; + if( aNewMem==0 ){ + aNewMem = sqlite3_malloc(pSorter->mxPmaSize); + if( aNewMem==0 ) rc = SQLITE_NOMEM; } - rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx); - }else + } + assert( pLevel->in.l.pRecord==0 ); + pLevel->in.l = pSorter->list; + pSorter->list.pRecord = 0; + pSorter->list.nInMemory = 0; + pSorter->list.aMemory = aNewMem; + if( rc==SQLITE_OK ){ +#if SQLITE_MAX_WORKER_THREADS>0 + if( pSorter->nTask>1 ){ + void *pCtx = (void*)pLevel; + rc = sqlite3ThreadCreate(&pLevel->pThread, vdbeSorterThread, pCtx); + pSorter->nMemory = aNewMem ? sqlite3MallocSize(aNewMem) : 0; + }else #endif - { - /* Use the foreground thread for this operation */ - rc = vdbeSorterRunTask(pTask); - if( rc==SQLITE_OK ){ - u8 *aMem = pTask->aListMemory; - pTask->aListMemory = pSorter->aMemory; - pSorter->aMemory = aMem; - assert( pTask->pList==0 ); + { + rc = vdbeSorterRun(pLevel); } } } @@ -1413,25 +1501,26 @@ int sqlite3VdbeSorterWrite( nReq = pVal->n + sizeof(SorterRecord); nPMA = pVal->n + sqlite3VarintLen(pVal->n); if( pSorter->mxPmaSize ){ - if( pSorter->aMemory ){ + if( pSorter->list.aMemory ){ bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize; }else{ + int nInMemory = pSorter->list.nInMemory; bFlush = ( - (pSorter->nInMemory > pSorter->mxPmaSize) - || (pSorter->nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) + (nInMemory > pSorter->mxPmaSize) + || (nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull()) ); } if( bFlush ){ rc = vdbeSorterFlushPMA(db, pCsr, 0); - pSorter->nInMemory = 0; + pSorter->list.nInMemory = 0; pSorter->iMemory = 0; - assert( rc!=SQLITE_OK || pSorter->pRecord==0 ); + assert( rc!=SQLITE_OK || pSorter->list.pRecord==0 ); } } - pSorter->nInMemory += nPMA; + pSorter->list.nInMemory += nPMA; - if( pSorter->aMemory ){ + if( pSorter->list.aMemory ){ int nMin = pSorter->iMemory + nReq; if( nMin>pSorter->nMemory ){ @@ -1441,44 +1530,33 @@ int sqlite3VdbeSorterWrite( if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize; if( nNew < nMin ) nNew = nMin; - aNew = sqlite3Realloc(pSorter->aMemory, nNew); + aNew = sqlite3Realloc(pSorter->list.aMemory, nNew); if( !aNew ) return SQLITE_NOMEM; - pSorter->pRecord = (SorterRecord*)( - aNew + ((u8*)pSorter->pRecord - pSorter->aMemory) + pSorter->list.pRecord = (SorterRecord*)( + aNew + ((u8*)pSorter->list.pRecord - pSorter->list.aMemory) ); - pSorter->aMemory = aNew; + pSorter->list.aMemory = aNew; pSorter->nMemory = nNew; } - pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory]; + pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory]; pSorter->iMemory += ROUND8(nReq); - pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory; + pNew->u.iNext = (u8*)(pSorter->list.pRecord) - pSorter->list.aMemory; }else{ pNew = (SorterRecord *)sqlite3Malloc(nReq); if( pNew==0 ){ return SQLITE_NOMEM; } - pNew->u.pNext = pSorter->pRecord; + pNew->u.pNext = pSorter->list.pRecord; } memcpy(SRVAL(pNew), pVal->z, pVal->n); pNew->nVal = pVal->n; - pSorter->pRecord = pNew; + pSorter->list.pRecord = pNew; return rc; } -/* -** Return the total number of PMAs in all temporary files. -*/ -static int vdbeSorterCountPMA(VdbeSorter *pSorter){ - int nPMA = 0; - int i; - for(i=0; inTask; i++){ - nPMA += pSorter->aTask[i].nPMA; - } - return nPMA; -} /* ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite, @@ -1488,24 +1566,22 @@ static int vdbeSorterCountPMA(VdbeSorter *pSorter){ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ VdbeSorter *pSorter = pCsr->pSorter; int rc = SQLITE_OK; /* Return code */ + int nTask = 0; + int i; assert( pSorter ); /* If no data has been written to disk, then do not do so now. Instead, - ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly - ** from the in-memory list. */ + ** sort the VdbeSorter.list.pRecord list. The vdbe layer will read data + ** directly from the in-memory list. */ + *pbEof = 0; if( pSorter->bUsePMA==0 ){ - if( pSorter->pRecord ){ + if( pSorter->list.pRecord ){ SortSubtask *pTask = &pSorter->aTask[0]; - *pbEof = 0; - pTask->pList = pSorter->pRecord; - pTask->eWork = SORT_SUBTASK_SORT; - assert( pTask->aListMemory==0 ); - pTask->aListMemory = pSorter->aMemory; - rc = vdbeSorterRunTask(pTask); - pTask->aListMemory = 0; - pSorter->pRecord = pTask->pList; - pTask->pList = 0; + UnpackedRecord *pUnpack = vdbeSorterAllocUnpackedRecord(pTask->pKeyInfo); + if( pUnpack==0 ) return SQLITE_NOMEM; + rc = vdbeSorterSort(&pSorter->list, pTask->pKeyInfo, pUnpack); + sqlite3DbFree(db, pUnpack); }else{ *pbEof = 1; } @@ -1513,32 +1589,33 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ } /* Write the current in-memory list to a PMA. */ - if( pSorter->pRecord ){ - rc = vdbeSorterFlushPMA(db, pCsr, 1); + if( pSorter->list.pRecord ){ + rc = vdbeSorterFlushPMA(db, pCsr, 0); } /* Join all threads */ rc = vdbeSorterJoinAll(pSorter, rc); - /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge - ** some of them together so that this is no longer the case. */ - if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){ - int i; - for(i=0; rc==SQLITE_OK && inTask; i++){ - SortSubtask *pTask = &pSorter->aTask[i]; - if( pTask->pTemp1 ){ - pTask->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nTask; - pTask->eWork = SORT_SUBTASK_CONS; + vdbeSorterRewindDebug(db, "rewind"); + + for(i=0; inTask; i++){ + if( pSorter->aTask[i].pLevel ) nTask++; + } + for(i=0; rc==SQLITE_OK && inTask; i++){ + SortSubtask *pTask = &pSorter->aTask[i]; + if( pTask->pLevel ){ + SortLevel *pLvl = pTask->pLevel; + pTask->nConsolidate = (SORTER_MAX_MERGE_COUNT / nTask); #if SQLITE_MAX_WORKER_THREADS>0 - if( i<(pSorter->nTask-1) ){ - void *pCtx = (void*)pTask; - rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx); - }else + if( i<(pSorter->nTask-1) ){ + void *pCtx = (void*)pLvl; + rc = sqlite3ThreadCreate(&pLvl->pThread, vdbeSorterThread, pCtx); + }else #endif - { - rc = vdbeSorterRunTask(pTask); - } + { + assert( pLvl->pThread==0 ); + rc = vdbeSorterRun(pLvl); } } } @@ -1554,7 +1631,14 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ int i; MergeEngine *pMerger; for(i=0; inTask; i++){ - nIter += pSorter->aTask[i].nPMA; + SortSubtask *pTask = &pSorter->aTask[i]; + if( pTask->pLevel ){ + SortLevel *pLvl; + for(pLvl=pTask->pLevel; pLvl->pNext; pLvl=pLvl->pNext){ + assert( pLvl->out.nPMA==0 ); + } + nIter += pLvl->out.nPMA; + } } pSorter->pMerger = pMerger = vdbeMergeEngineNew(nIter); @@ -1562,21 +1646,25 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ rc = SQLITE_NOMEM; }else{ int iIter = 0; - int iThread = 0; - for(iThread=0; iThreadnTask; iThread++){ - int iPMA; - i64 iReadOff = 0; - SortSubtask *pTask = &pSorter->aTask[iThread]; - for(iPMA=0; iPMAnPMA && rc==SQLITE_OK; iPMA++){ - i64 nDummy = 0; - PmaReader *pIter = &pMerger->aIter[iIter++]; - rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nDummy); - iReadOff = pIter->iEof; + for(i=0; inTask; i++){ + SortSubtask *pTask = &pSorter->aTask[i]; + if( pTask->pLevel ){ + int iPMA; + i64 iReadOff = 0; + SortLevel *pLvl; + for(pLvl=pTask->pLevel; pLvl->pNext; pLvl=pLvl->pNext); + + for(iPMA=0; iPMAout.nPMA && rc==SQLITE_OK; iPMA++){ + i64 nDummy = 0; + PmaReader *pIter = &pMerger->aIter[iIter++]; + rc = vdbePmaReaderInit(pTask, &pLvl->out, iReadOff, pIter, &nDummy); + iReadOff = pIter->iEof; + } } } for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){ - rc = vdbeSorterDoCompare(&pSorter->aTask[0], pMerger, i); + rc = vdbeSorterDoCompare(pSorter->aTask[0].pLevel, pMerger, i); } } } @@ -1584,6 +1672,7 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ if( rc==SQLITE_OK ){ *pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0); } + vdbeSorterRewindDebug(db, "rewinddone"); return rc; } @@ -1595,13 +1684,13 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){ int rc; /* Return code */ if( pSorter->pMerger ){ - rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof); + rc = vdbeSorterNext(pSorter->aTask[0].pLevel, pSorter->pMerger, pbEof); }else{ - SorterRecord *pFree = pSorter->pRecord; - pSorter->pRecord = pFree->u.pNext; + SorterRecord *pFree = pSorter->list.pRecord; + pSorter->list.pRecord = pFree->u.pNext; pFree->u.pNext = 0; - if( pSorter->aMemory==0 ) vdbeSorterRecordFree(db, pFree); - *pbEof = !pSorter->pRecord; + if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree); + *pbEof = !pSorter->list.pRecord; rc = SQLITE_OK; } return rc; @@ -1622,8 +1711,8 @@ static void *vdbeSorterRowkey( *pnKey = pIter->nKey; pKey = pIter->aKey; }else{ - *pnKey = pSorter->pRecord->nVal; - pKey = SRVAL(pSorter->pRecord); + *pnKey = pSorter->list.pRecord->nVal; + pKey = SRVAL(pSorter->list.pRecord); } return pKey; } @@ -1669,13 +1758,19 @@ int sqlite3VdbeSorterCompare( int *pRes /* OUT: Result of comparison */ ){ VdbeSorter *pSorter = pCsr->pSorter; - UnpackedRecord *r2 = pSorter->aTask[0].pUnpacked; + UnpackedRecord *r2 = pSorter->pUnpacked; KeyInfo *pKeyInfo = pCsr->pKeyInfo; int i; void *pKey; int nKey; /* Sorter key to compare pVal with */ - assert( r2->nField>=pKeyInfo->nField-nIgnore ); - r2->nField = pKeyInfo->nField-nIgnore; + if( r2==0 ){ + r2 = vdbeSorterAllocUnpackedRecord(pSorter->aTask[0].pKeyInfo); + if( r2==0 ) return SQLITE_NOMEM; + pSorter->pUnpacked = r2; + assert( r2->nField>=pKeyInfo->nField-nIgnore ); + r2->nField = pKeyInfo->nField-nIgnore; + } + assert( r2->nField==pKeyInfo->nField-nIgnore ); pKey = vdbeSorterRowkey(pSorter, &nKey); sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);