typedef struct PmaWriter PmaWriter; /* Incrementally write on PMA */
typedef struct SorterRecord SorterRecord; /* A record being sorted */
typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */
+typedef struct SorterFile SorterFile;
+typedef struct IncrMerger IncrMerger;
/*
#define SORT_SUBTASK_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pTemp1 */
#define SORT_SUBTASK_CONS 3 /* Consolidate multiple PMAs */
+struct SorterFile {
+ sqlite3_file *pFd;
+ i64 iEof;
+};
+
/*
** Sorting is divided up into smaller subtasks. Each subtask is controlled
** by an instance of this object. A Subtask might run in either the main thread
int bDone; /* Set to true by pTask when finished */
sqlite3 *db; /* Database connection */
+ VdbeSorter *pSorter; /* Sorter */
KeyInfo *pKeyInfo; /* How to compare records */
UnpackedRecord *pUnpacked; /* Space to unpack a record */
int pgsz; /* Main database page size */
int nInMemory; /* Expected size of PMA based on pList */
u8 *aListMemory; /* Records memory (or NULL) */
- int nPMA; /* Number of PMAs currently in pTemp1 */
- i64 iTemp1Off; /* Offset to write to in pTemp1 */
- sqlite3_file *pTemp1; /* File to write PMAs to, or NULL */
+ int nPMA; /* Number of PMAs currently in file */
+ SorterFile file;
};
int mnPmaSize; /* Minimum PMA size, in bytes */
int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
int bUsePMA; /* True if one or more PMAs created */
+ int bUseThreads; /* True if one or more PMAs created */
SorterRecord *pRecord; /* Head of in-memory record list */
- MergeEngine *pMerger; /* For final merge of PMAs (by caller) */
+ PmaReader *pReader; /* Read data from here after Rewind() */
+ UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */
u8 *aMemory; /* Block of memory to alloc records from */
int iMemory; /* Offset of first free byte in aMemory */
int nMemory; /* Size of aMemory allocation in bytes */
u8 *aBuffer; /* Current read buffer */
int nBuffer; /* Size of read buffer in bytes */
u8 *aMap; /* Pointer to mapping of entire file */
+ IncrMerger *pIncr; /* Incremental merger */
+};
+
+struct IncrMerger {
+ int mxSz; /* Maximum size of files */
+ SortSubtask *pTask; /* Task that owns this merger */
+ int bEof; /* Set to true when merge is finished */
+ SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */
+ MergeEngine *pMerger; /* Merge engine thread reads data from */
+ SQLiteThread *pThread; /* Thread currently populating aFile[1] */
};
/*
/* Maximum number of PMAs that a single MergeEngine can merge */
#define SORTER_MAX_MERGE_COUNT 16
+static int vdbeIncrSwap(IncrMerger*);
+static void vdbeIncrFree(IncrMerger*);
+
/*
** Free all memory belonging to the PmaReader object passed as the second
** argument. All structure fields are set to zero before returning.
sqlite3_free(pIter->aAlloc);
sqlite3_free(pIter->aBuffer);
if( pIter->aMap ) sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
+ if( pIter->pIncr ) vdbeIncrFree(pIter->pIncr);
memset(pIter, 0, sizeof(PmaReader));
}
/* Extend the p->aAlloc[] allocation if required. */
if( p->nAlloc<nByte ){
u8 *aNew;
- int nNew = p->nAlloc*2;
+ int nNew = MAX(128, p->nAlloc*2);
while( nByte>nNew ) nNew = nNew*2;
aNew = sqlite3Realloc(p->aAlloc, nNew);
if( !aNew ) return SQLITE_NOMEM;
return SQLITE_OK;
}
+static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
+ int rc = SQLITE_OK;
+ if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
+ rc = sqlite3OsFetch(pFile->pFd, 0, pFile->iEof, (void**)pp);
+ }
+ return rc;
+}
+
+static int vdbePmaReaderReinit(PmaReader *pIter){
+ IncrMerger *pIncr = pIter->pIncr;
+ SortSubtask *pTask = pIncr->pTask;
+ int rc = SQLITE_OK;
+
+ assert( pIncr->bEof==0 );
+
+ if( pIter->aMap ){
+ sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
+ pIter->aMap = 0;
+ }
+ pIter->iReadOff = 0;
+ pIter->iEof = pIncr->aFile[0].iEof;
+ pIter->pFile = pIncr->aFile[0].pFd;
+
+ rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
+ if( rc==SQLITE_OK ){
+ if( pIter->aMap==0 && pIter->aBuffer==0 ){
+ pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
+ if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
+ pIter->nBuffer = pTask->pgsz;
+ }
+ }
+
+ return rc;
+}
+
/*
** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
** no error occurs, or an SQLite error code if one does.
*/
static int vdbePmaReaderNext(PmaReader *pIter){
- int rc; /* Return Code */
+ int rc = SQLITE_OK; /* Return Code */
u64 nRec = 0; /* Size of record in bytes */
if( pIter->iReadOff>=pIter->iEof ){
- /* This is an EOF condition */
- vdbePmaReaderClear(pIter);
- return SQLITE_OK;
+ int bEof = 1;
+ if( pIter->pIncr ){
+ rc = vdbeIncrSwap(pIter->pIncr);
+ if( rc==SQLITE_OK && pIter->pIncr->bEof==0 ){
+ rc = vdbePmaReaderReinit(pIter);
+ bEof = 0;
+ }
+ }
+
+ if( bEof ){
+ /* This is an EOF condition */
+ vdbePmaReaderClear(pIter);
+ return rc;
+ }
}
- rc = vdbePmaReadVarint(pIter, &nRec);
+ if( rc==SQLITE_OK ){
+ rc = vdbePmaReadVarint(pIter, &nRec);
+ }
if( rc==SQLITE_OK ){
pIter->nKey = (int)nRec;
rc = vdbePmaReadBlob(pIter, (int)nRec, &pIter->aKey);
** starting at offset iStart and ending at offset iEof-1. This function
** leaves the iterator pointing to the first key in the PMA (or EOF if the
** PMA is empty).
+**
+** If the pnByte parameter is NULL, then it is assumed that the file
+** contains a single PMA, and that that PMA omits the initial length varint.
*/
static int vdbePmaReaderInit(
- SortSubtask *pTask, /* Thread context */
- i64 iStart, /* Start offset in pTask->pTemp1 */
+ SortSubtask *pTask, /* Task context */
+ SorterFile *pFile, /* Sorter file to read from */
+ i64 iStart, /* Start offset in pFile */
PmaReader *pIter, /* Iterator to populate */
i64 *pnByte /* IN/OUT: Increment this value by PMA size */
){
int nBuf = pTask->pgsz;
void *pMap = 0; /* Mapping of temp file */
- assert( pTask->iTemp1Off>iStart );
+ assert( pFile->iEof>iStart );
assert( pIter->aAlloc==0 );
assert( pIter->aBuffer==0 );
- pIter->pFile = pTask->pTemp1;
+ pIter->pFile = pFile->pFd;
pIter->iReadOff = iStart;
pIter->nAlloc = 128;
pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
if( pIter->aAlloc ){
/* Try to xFetch() a mapping of the entire temp file. If this is possible,
** the PMA will be read via the mapping. Otherwise, use xRead(). */
- if( pTask->iTemp1Off<=(i64)(pTask->db->nMaxSorterMmap) ){
- rc = sqlite3OsFetch(pIter->pFile, 0, pTask->iTemp1Off, &pMap);
+ if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
+ rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iEof, &pMap);
}
}else{
rc = SQLITE_NOMEM;
int iBuf = iStart % nBuf;
if( iBuf ){
int nRead = nBuf - iBuf;
- if( (iStart + nRead) > pTask->iTemp1Off ){
- nRead = (int)(pTask->iTemp1Off - iStart);
+ if( (iStart + nRead) > pFile->iEof ){
+ nRead = (int)(pFile->iEof - iStart);
}
rc = sqlite3OsRead(
- pTask->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
- );
+ pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
+ );
assert( rc!=SQLITE_IOERR_SHORT_READ );
}
}
if( rc==SQLITE_OK ){
u64 nByte; /* Size of PMA in bytes */
- pIter->iEof = pTask->iTemp1Off;
+ pIter->iEof = pFile->iEof;
rc = vdbePmaReadVarint(pIter, &nByte);
pIter->iEof = pIter->iReadOff + nByte;
*pnByte += nByte;
pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
pSorter->nTask = nWorker + 1;
+ pSorter->bUseThreads = (pSorter->nTask>1);
for(i=0; i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i];
pTask->pKeyInfo = pKeyInfo;
pTask->pgsz = pgsz;
pTask->db = db;
+ pTask->pSorter = pSorter;
}
if( !sqlite3TempInMemory(db) ){
pTask->aListMemory = 0;
}
pTask->pList = 0;
- if( pTask->pTemp1 ){
- sqlite3OsCloseFree(pTask->pTemp1);
- pTask->pTemp1 = 0;
+ if( pTask->file.pFd ){
+ sqlite3OsCloseFree(pTask->file.pFd);
+ pTask->file.pFd = 0;
+ pTask->file.iEof = 0;
}
}
int nByte; /* Total bytes of space to allocate */
MergeEngine *pNew; /* Pointer to allocated object to return */
- assert( nIter<=SORTER_MAX_MERGE_COUNT );
+ /* assert( nIter<=SORTER_MAX_MERGE_COUNT ); */
+
while( N<nIter ) N += N;
nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
int i;
(void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
- vdbeMergeEngineFree(pSorter->pMerger);
- pSorter->pMerger = 0;
+ if( pSorter->pReader ){
+ vdbePmaReaderClear(pSorter->pReader);
+ sqlite3DbFree(db, pSorter->pReader);
+ pSorter->pReader = 0;
+ }
for(i=0; i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i];
vdbeSortSubtaskCleanup(db, pTask);
pSorter->nInMemory = 0;
pSorter->bUsePMA = 0;
pSorter->iMemory = 0;
+ sqlite3DbFree(db, pSorter->pUnpacked);
+ pSorter->pUnpacked = 0;
}
/*
VdbeSorter *pSorter = pCsr->pSorter;
if( pSorter ){
sqlite3VdbeSorterReset(db, pSorter);
- vdbeMergeEngineFree(pSorter->pMerger);
sqlite3_free(pSorter->aMemory);
sqlite3DbFree(db, pSorter);
pCsr->pSorter = 0;
assert( pTask->nInMemory>0 );
/* If the first temporary PMA file has not been opened, open it now. */
- if( pTask->pTemp1==0 ){
- rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->pTemp1);
- assert( rc!=SQLITE_OK || pTask->pTemp1 );
- assert( pTask->iTemp1Off==0 );
+ if( pTask->file.pFd==0 ){
+ rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file.pFd);
+ assert( rc!=SQLITE_OK || pTask->file.pFd );
+ assert( pTask->file.iEof==0 );
assert( pTask->nPMA==0 );
}
/* Try to get the file to memory map */
if( rc==SQLITE_OK ){
vdbeSorterExtendFile(pTask->db,
- pTask->pTemp1, pTask->iTemp1Off + pTask->nInMemory + 9
+ pTask->file.pFd, pTask->file.iEof + pTask->nInMemory + 9
);
}
SorterRecord *p;
SorterRecord *pNext = 0;
- vdbePmaWriterInit(pTask->pTemp1, &writer, pTask->pgsz,
- pTask->iTemp1Off);
+ vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz,
+ pTask->file.iEof);
pTask->nPMA++;
vdbePmaWriteVarint(&writer, pTask->nInMemory);
for(p=pTask->pList; p; p=pNext){
if( pTask->aListMemory==0 ) sqlite3_free(p);
}
pTask->pList = p;
- rc = vdbePmaWriterFinish(&writer, &pTask->iTemp1Off);
+ rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
}
assert( pTask->pList==0 || rc!=SQLITE_OK );
return rc;
}
+#if 0
+static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
+ i64 t;
+ int iTask = (pTask - pTask->pSorter->aTask);
+ sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
+ fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
+}
+static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
+ i64 t;
+ sqlite3OsCurrentTimeInt64(db->pVfs, &t);
+ fprintf(stderr, "%lld:X %s\n", t, zEvent);
+}
+#else
+# define vdbeSorterWorkDebug(x,y)
+# define vdbeSorterRewindDebug(x,y)
+#endif
+
/*
** The main routine for sorter-thread operations.
*/
);
assert( pTask->bDone==0 );
+ vdbeSorterWorkDebug(pTask, "enter");
+
if( pTask->pUnpacked==0 ){
char *pFree;
pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
/* Open a second temp file to write merged data to */
rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2);
if( rc==SQLITE_OK ){
- vdbeSorterExtendFile(pTask->db, pTemp2, pTask->iTemp1Off);
+ vdbeSorterExtendFile(pTask->db, pTemp2, pTask->file.iEof);
}else{
vdbeMergeEngineFree(pMerger);
break;
int iIter;
for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
PmaReader *pIter = &pMerger->aIter[iIter];
- rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nOut);
+ rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nOut);
iReadOff = pIter->iEof;
- if( iReadOff>=pTask->iTemp1Off || rc!=SQLITE_OK ) break;
+ if( iReadOff>=pTask->file.iEof || rc!=SQLITE_OK ) break;
}
for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
rc = vdbeSorterDoCompare(pTask, pMerger, iIter);
}
vdbeMergeEngineFree(pMerger);
- sqlite3OsCloseFree(pTask->pTemp1);
- pTask->pTemp1 = pTemp2;
+ sqlite3OsCloseFree(pTask->file.pFd);
+ pTask->file.pFd = pTemp2;
pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT);
- pTask->iTemp1Off = iWriteOff;
+ pTask->file.iEof = iWriteOff;
}
}else{
/* Sort the pTask->pList list */
#ifdef SQLITE_DEBUG
i64 nExpect = pTask->nInMemory
+ sqlite3VarintLen(pTask->nInMemory)
- + pTask->iTemp1Off;
+ + pTask->file.iEof;
#endif
rc = vdbeSorterListToPMA(pTask);
- assert( rc!=SQLITE_OK || (nExpect==pTask->iTemp1Off) );
+ assert( rc!=SQLITE_OK || (nExpect==pTask->file.iEof) );
}
}
assert( pTask->pUnpacked->errCode==SQLITE_NOMEM );
rc = SQLITE_NOMEM;
}
+ vdbeSorterWorkDebug(pTask, "exit");
return SQLITE_INT_TO_PTR(rc);
}
return nPMA;
}
+/*
+** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
+** of the data stored in aFile[1] is the same as that used by regular PMAs,
+** except that the number-of-bytes varint is omitted from the start.
+*/
+static int vdbeIncrPopulate(IncrMerger *pIncr){
+ int rc = SQLITE_OK;
+ int rc2;
+ SorterFile *pOut = &pIncr->aFile[1];
+ MergeEngine *pMerger = pIncr->pMerger;
+ PmaWriter writer;
+ assert( pIncr->bEof==0 );
+
+ vdbePmaWriterInit(pIncr->aFile[1].pFd, &writer, pIncr->pTask->pgsz, 0);
+ while( rc==SQLITE_OK ){
+ int dummy;
+ PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
+ int nKey = pReader->nKey;
+ i64 iEof = writer.iWriteOff + writer.iBufEnd;
+
+ /* Check if the output file is full or if the input has been exhausted.
+ ** In either case exit the loop. */
+ if( pReader->pFile==0 ) break;
+ if( iEof && (iEof + nKey)>pIncr->mxSz ) break;
+
+ /* Write the next key to the output. */
+ vdbePmaWriteVarint(&writer, nKey);
+ vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
+ rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy);
+ }
+
+ rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
+ if( rc==SQLITE_OK ) rc = rc2;
+ return rc;
+}
+
+static void *vdbeIncrPopulateThreadMain(void *pCtx){
+ IncrMerger *pIncr = (IncrMerger*)pCtx;
+ return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
+}
+
+static int vdbeIncrBgPopulate(IncrMerger *pIncr){
+ int rc;
+ assert( pIncr->pThread==0 );
+ if( pIncr->pTask->pSorter->bUseThreads==0 ){
+ rc = vdbeIncrPopulate(pIncr);
+ }else{
+ void *pCtx = (void*)pIncr;
+ rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
+ }
+ return rc;
+}
+
+static int vdbeIncrSwap(IncrMerger *pIncr){
+ int rc = SQLITE_OK;
+
+ if( pIncr->pThread ){
+ void *pRet;
+ rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
+ if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+ pIncr->pThread = 0;
+ }
+
+ if( rc==SQLITE_OK ){
+ SorterFile f0 = pIncr->aFile[0];
+ pIncr->aFile[0] = pIncr->aFile[1];
+ pIncr->aFile[1] = f0;
+
+ if( pIncr->aFile[0].iEof==0 ){
+ pIncr->bEof = 1;
+ }else{
+ rc = vdbeIncrBgPopulate(pIncr);
+ }
+ }
+
+ return rc;
+}
+
+static void vdbeIncrFree(IncrMerger *pIncr){
+ if( pIncr->pThread ){
+ void *pRet;
+ sqlite3ThreadJoin(pIncr->pThread, &pRet);
+ }
+ if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
+ if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
+ vdbeMergeEngineFree(pIncr->pMerger);
+ sqlite3_free(pIncr);
+}
+
+/*
+** Populate iterator *pIter so that it may be used to iterate through all
+** keys stored in subtask pTask using the incremental merge method.
+*/
+static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){
+ SortSubtask *pTask0 = &pSorter->aTask[0];
+ int rc = SQLITE_OK;
+ MergeEngine *pMerger = 0;
+ IncrMerger *pIncr = 0;
+ int i;
+ int nPMA = 0;
+
+ for(i=0; i<pSorter->nTask; i++){
+ nPMA += pSorter->aTask[i].nPMA;
+ }
+ pMerger = vdbeMergeEngineNew(nPMA);
+ if( pMerger==0 ){
+ rc = SQLITE_NOMEM;
+ }else{
+ int iIter = 0;
+ int iPMA;
+ for(i=0; i<pSorter->nTask; i++){
+ i64 iReadOff = 0;
+ SortSubtask *pTask = &pSorter->aTask[i];
+ for(iPMA=0; iPMA<pTask->nPMA; iPMA++){
+ i64 nDummy = 0;
+ PmaReader *pIter = &pMerger->aIter[iIter++];
+ rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nDummy);
+ iReadOff = pIter->iEof;
+ }
+ }
+ for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
+ rc = vdbeSorterDoCompare(pTask0, pMerger, i);
+ }
+ }
+
+ if( rc==SQLITE_OK ){
+ pIncr = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger));
+ if( pIncr==0 ){
+ rc = SQLITE_NOMEM;
+ }else{
+ memset(pIncr, 0, sizeof(IncrMerger));
+ pIncr->mxSz = (pSorter->mxPmaSize / 2);
+ pIncr->pMerger = pMerger;
+ pIncr->pTask = pTask0;
+ }
+ }
+
+ /* Open the two temp files. */
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[0].pFd);
+ }
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[1].pFd);
+ }
+
+ /* Launch a background thread to populate aFile[1]. */
+ if( rc==SQLITE_OK ){
+ rc = vdbeIncrBgPopulate(pIncr);
+ }
+
+ pIter->pIncr = pIncr;
+ if( rc==SQLITE_OK ){
+ rc = vdbePmaReaderNext(pIter);
+ }
+ return rc;
+}
+
+
/*
** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
** this function is called to prepare for iterating through the records
/* Join all threads */
rc = vdbeSorterJoinAll(pSorter, rc);
- /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
- ** some of them together so that this is no longer the case. */
- if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
- int i;
- for(i=0; rc==SQLITE_OK && i<pSorter->nTask; i++){
- SortSubtask *pTask = &pSorter->aTask[i];
- if( pTask->pTemp1 ){
- pTask->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nTask;
- pTask->eWork = SORT_SUBTASK_CONS;
+ vdbeSorterRewindDebug(db, "rewind");
-#if SQLITE_MAX_WORKER_THREADS>0
- if( i<(pSorter->nTask-1) ){
- void *pCtx = (void*)pTask;
- rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx);
- }else
-#endif
- {
- rc = vdbeSorterRunTask(pTask);
- }
- }
- }
- }
-
- /* Join all threads */
- rc = vdbeSorterJoinAll(pSorter, rc);
-
- /* Assuming no errors have occurred, set up a merger structure to read
- ** and merge all remaining PMAs. */
- assert( pSorter->pMerger==0 );
+ /* Assuming no errors have occurred, set up a merger structure to
+ ** incrementally read and merge all remaining PMAs. */
+ assert( pSorter->pReader==0 );
if( rc==SQLITE_OK ){
- int nIter = 0; /* Number of iterators used */
- int i;
- MergeEngine *pMerger;
- for(i=0; i<pSorter->nTask; i++){
- nIter += pSorter->aTask[i].nPMA;
- }
-
- pSorter->pMerger = pMerger = vdbeMergeEngineNew(nIter);
- if( pMerger==0 ){
- rc = SQLITE_NOMEM;
- }else{
- int iIter = 0;
- int iThread = 0;
- for(iThread=0; iThread<pSorter->nTask; iThread++){
- int iPMA;
- i64 iReadOff = 0;
- SortSubtask *pTask = &pSorter->aTask[iThread];
- for(iPMA=0; iPMA<pTask->nPMA && rc==SQLITE_OK; iPMA++){
- i64 nDummy = 0;
- PmaReader *pIter = &pMerger->aIter[iIter++];
- rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nDummy);
- iReadOff = pIter->iEof;
- }
- }
-
- for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
- rc = vdbeSorterDoCompare(&pSorter->aTask[0], pMerger, i);
- }
- }
+ PmaReader *pReader;
+ pReader = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
+ pSorter->pReader = pReader;
+ rc = vdbePmaReaderIncrInit(pSorter, pReader);
+ assert( rc!=SQLITE_OK || pReader->pFile );
+ *pbEof = 0;
}
- if( rc==SQLITE_OK ){
- *pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0);
- }
+ vdbeSorterRewindDebug(db, "rewinddone");
return rc;
}
VdbeSorter *pSorter = pCsr->pSorter;
int rc; /* Return code */
- if( pSorter->pMerger ){
- rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof);
+ if( pSorter->pReader ){
+ rc = vdbePmaReaderNext(pSorter->pReader);
+ *pbEof = (pSorter->pReader->pFile==0);
}else{
SorterRecord *pFree = pSorter->pRecord;
pSorter->pRecord = pFree->u.pNext;
int *pnKey /* OUT: Size of current key in bytes */
){
void *pKey;
- if( pSorter->pMerger ){
- PmaReader *pIter;
- pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ];
- *pnKey = pIter->nKey;
- pKey = pIter->aKey;
+ if( pSorter->pReader ){
+ *pnKey = pSorter->pReader->nKey;
+ pKey = pSorter->pReader->aKey;
}else{
*pnKey = pSorter->pRecord->nVal;
pKey = SRVAL(pSorter->pRecord);
int *pRes /* OUT: Result of comparison */
){
VdbeSorter *pSorter = pCsr->pSorter;
- UnpackedRecord *r2 = pSorter->aTask[0].pUnpacked;
+ UnpackedRecord *r2 = pSorter->pUnpacked;
KeyInfo *pKeyInfo = pCsr->pKeyInfo;
int i;
void *pKey; int nKey; /* Sorter key to compare pVal with */
+ if( r2==0 ){
+ char *p;
+ r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo,0,0,&p);
+ assert( pSorter->pUnpacked==(UnpackedRecord*)p );
+ if( r2==0 ) return SQLITE_NOMEM;
+ r2->nField = pKeyInfo->nField-nIgnore;
+ }
assert( r2->nField>=pKeyInfo->nField-nIgnore );
- r2->nField = pKeyInfo->nField-nIgnore;
pKey = vdbeSorterRowkey(pSorter, &nKey);
sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);