u8 *aListMemory; /* Records memory (or NULL) */
int nPMA; /* Number of PMAs currently in file */
- SorterFile file;
+ SorterFile file; /* Temp file for level-0 PMAs */
+ SorterFile file2; /* Space for other PMAs */
};
/*
** Main sorter structure. A single instance of this is allocated for each
** sorter cursor created by the VDBE.
+**
+** mxKeysize:
+** As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
+** this variable is updated so as to be set to the size on disk of the
+** largest record in the sorter.
*/
struct VdbeSorter {
int nInMemory; /* Current size of pRecord list as PMA */
int bUseThreads; /* True if one or more PMAs created */
SorterRecord *pRecord; /* Head of in-memory record list */
PmaReader *pReader; /* Read data from here after Rewind() */
+ int mxKeysize; /* Largest serialized key seen so far */
UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */
u8 *aMemory; /* Block of memory to alloc records from */
int iMemory; /* Offset of first free byte in aMemory */
IncrMerger *pIncr; /* Incremental merger */
};
+/*
+** Normally, a PmaReader object iterates through an existing PMA stored
+** within a temp file. However, if the PmaReader.pIncr variable points to
+** an object of the following type, it may be used to iterate/merge through
+** multiple PMAs simultaneously.
+*/
struct IncrMerger {
- int mxSz; /* Maximum size of files */
SortSubtask *pTask; /* Task that owns this merger */
+ SQLiteThread *pThread; /* Thread currently populating aFile[1] */
+ MergeEngine *pMerger; /* Merge engine thread reads data from */
+ i64 iStartOff; /* Offset to start writing file at */
+ int mxSz; /* Maximum bytes of data to store */
int bEof; /* Set to true when merge is finished */
+ int bUseThread; /* True to use a bg thread for this object */
SorterFile aFile[2]; /* aFile[0] for reading, [1] for writing */
- MergeEngine *pMerger; /* Merge engine thread reads data from */
- SQLiteThread *pThread; /* Thread currently populating aFile[1] */
};
/*
sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
pIter->aMap = 0;
}
- pIter->iReadOff = 0;
+ pIter->iReadOff = pIncr->iStartOff;
pIter->iEof = pIncr->aFile[0].iEof;
pIter->pFile = pIncr->aFile[0].pFd;
rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
if( rc==SQLITE_OK ){
- if( pIter->aMap==0 && pIter->aBuffer==0 ){
- pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
- if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
- pIter->nBuffer = pTask->pgsz;
+ if( pIter->aMap==0 ){
+ /* TODO: Combine this code with similar code in vdbePmaReaderInit() */
+ int iBuf = pIter->iReadOff % pTask->pgsz;
+ if( pIter->aBuffer==0 ){
+ pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
+ if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
+ pIter->nBuffer = pTask->pgsz;
+ }
+ if( iBuf ){
+ int nRead = pTask->pgsz - iBuf;
+ if( (pIter->iReadOff + nRead) > pIter->iEof ){
+ nRead = (int)(pIter->iEof - pIter->iReadOff);
+ }
+ rc = sqlite3OsRead(
+ pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
+ );
+ assert( rc!=SQLITE_IOERR_SHORT_READ );
+ }
}
}
){
int rc = SQLITE_OK;
int nBuf = pTask->pgsz;
- void *pMap = 0; /* Mapping of temp file */
assert( pFile->iEof>iStart );
assert( pIter->aAlloc==0 );
if( pIter->aAlloc ){
/* Try to xFetch() a mapping of the entire temp file. If this is possible,
** the PMA will be read via the mapping. Otherwise, use xRead(). */
- if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
- rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iEof, &pMap);
- }
+ rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
}else{
rc = SQLITE_NOMEM;
}
- if( rc==SQLITE_OK ){
- if( pMap ){
- pIter->aMap = (u8*)pMap;
+ if( rc==SQLITE_OK && pIter->aMap==0 ){
+ pIter->nBuffer = nBuf;
+ pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
+ if( !pIter->aBuffer ){
+ rc = SQLITE_NOMEM;
}else{
- pIter->nBuffer = nBuf;
- pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
- if( !pIter->aBuffer ){
- rc = SQLITE_NOMEM;
- }else{
- int iBuf = iStart % nBuf;
- if( iBuf ){
- int nRead = nBuf - iBuf;
- if( (iStart + nRead) > pFile->iEof ){
- nRead = (int)(pFile->iEof - iStart);
- }
- rc = sqlite3OsRead(
- pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
- );
- assert( rc!=SQLITE_IOERR_SHORT_READ );
+ int iBuf = iStart % nBuf;
+ if( iBuf ){
+ int nRead = nBuf - iBuf;
+ if( (iStart + nRead) > pFile->iEof ){
+ nRead = (int)(pFile->iEof - iStart);
}
+ rc = sqlite3OsRead(
+ pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
+ );
+ assert( rc!=SQLITE_IOERR_SHORT_READ );
}
}
}
pTask->file.pFd = 0;
pTask->file.iEof = 0;
}
+ if( pTask->file2.pFd ){
+ sqlite3OsCloseFree(pTask->file2.pFd);
+ pTask->file2.pFd = 0;
+ pTask->file2.iEof = 0;
+ }
}
/*
int nByte; /* Total bytes of space to allocate */
MergeEngine *pNew; /* Pointer to allocated object to return */
- /* assert( nIter<=SORTER_MAX_MERGE_COUNT ); */
+ assert( nIter<=SORTER_MAX_MERGE_COUNT );
while( N<nIter ) N += N;
nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
pSorter->nInMemory = 0;
pSorter->bUsePMA = 0;
pSorter->iMemory = 0;
+ pSorter->mxKeysize = 0;
sqlite3DbFree(db, pSorter->pUnpacked);
pSorter->pUnpacked = 0;
}
sqlite3OsCurrentTimeInt64(db->pVfs, &t);
fprintf(stderr, "%lld:X %s\n", t, zEvent);
}
+static void vdbeSorterPopulateDebug(
+ SortSubtask *pTask,
+ const char *zEvent
+){
+ i64 t;
+ int iTask = (pTask - pTask->pSorter->aTask);
+ sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
+ fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
+}
#else
# define vdbeSorterWorkDebug(x,y)
# define vdbeSorterRewindDebug(x,y)
+# define vdbeSorterPopulateDebug(x,y)
#endif
+static int vdbeSortAllocUnpacked(SortSubtask *pTask){
+ if( pTask->pUnpacked==0 ){
+ char *pFree;
+ pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
+ pTask->pKeyInfo, 0, 0, &pFree
+ );
+ assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
+ if( pFree==0 ) return SQLITE_NOMEM;
+ pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
+ pTask->pUnpacked->errCode = 0;
+ }
+ return SQLITE_OK;
+}
+
/*
** The main routine for sorter-thread operations.
*/
vdbeSorterWorkDebug(pTask, "enter");
- if( pTask->pUnpacked==0 ){
- char *pFree;
- pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
- pTask->pKeyInfo, 0, 0, &pFree
- );
- assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
- if( pFree==0 ){
- rc = SQLITE_NOMEM;
- goto thread_out;
- }
- pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
- pTask->pUnpacked->errCode = 0;
- }
+ rc = vdbeSortAllocUnpacked(pTask);
+ if( rc!=SQLITE_OK ) goto thread_out;
if( pTask->eWork==SORT_SUBTASK_CONS ){
assert( pTask->pList==0 );
}
pSorter->nInMemory += nPMA;
+ if( nPMA>pSorter->mxKeysize ){
+ pSorter->mxKeysize = nPMA;
+ }
if( pSorter->aMemory ){
int nMin = pSorter->iMemory + nReq;
static int vdbeIncrPopulate(IncrMerger *pIncr){
int rc = SQLITE_OK;
int rc2;
+ i64 iStart = pIncr->iStartOff;
SorterFile *pOut = &pIncr->aFile[1];
MergeEngine *pMerger = pIncr->pMerger;
PmaWriter writer;
assert( pIncr->bEof==0 );
- vdbePmaWriterInit(pIncr->aFile[1].pFd, &writer, pIncr->pTask->pgsz, 0);
+ vdbeSorterPopulateDebug(pIncr->pTask, "enter");
+
+ vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart);
while( rc==SQLITE_OK ){
int dummy;
PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
/* Check if the output file is full or if the input has been exhausted.
** In either case exit the loop. */
if( pReader->pFile==0 ) break;
- if( iEof && (iEof + nKey)>pIncr->mxSz ) break;
+ if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
/* Write the next key to the output. */
vdbePmaWriteVarint(&writer, nKey);
rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
if( rc==SQLITE_OK ) rc = rc2;
+ vdbeSorterPopulateDebug(pIncr->pTask, "exit");
return rc;
}
static int vdbeIncrBgPopulate(IncrMerger *pIncr){
int rc;
assert( pIncr->pThread==0 );
- if( pIncr->pTask->pSorter->bUseThreads==0 ){
+ if( pIncr->bUseThread==0 ){
rc = vdbeIncrPopulate(pIncr);
- }else{
+ }
+#if SQLITE_MAX_WORKER_THREADS>0
+ else{
void *pCtx = (void*)pIncr;
rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
}
+#endif
return rc;
}
static int vdbeIncrSwap(IncrMerger *pIncr){
int rc = SQLITE_OK;
-
- if( pIncr->pThread ){
- void *pRet;
- rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
- if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
- pIncr->pThread = 0;
- }
- if( rc==SQLITE_OK ){
- SorterFile f0 = pIncr->aFile[0];
- pIncr->aFile[0] = pIncr->aFile[1];
- pIncr->aFile[1] = f0;
+ if( pIncr->bUseThread ){
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pIncr->pThread ){
+ void *pRet;
+ assert( pIncr->bUseThread );
+ rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
+ if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+ pIncr->pThread = 0;
+ }
+#endif
+
+ if( rc==SQLITE_OK ){
+ SorterFile f0 = pIncr->aFile[0];
+ pIncr->aFile[0] = pIncr->aFile[1];
+ pIncr->aFile[1] = f0;
+ }
- if( pIncr->aFile[0].iEof==0 ){
+ if( rc==SQLITE_OK ){
+ if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
+ pIncr->bEof = 1;
+ }else{
+ rc = vdbeIncrBgPopulate(pIncr);
+ }
+ }
+ }else{
+ rc = vdbeIncrPopulate(pIncr);
+ pIncr->aFile[0] = pIncr->aFile[1];
+ if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
pIncr->bEof = 1;
- }else{
- rc = vdbeIncrBgPopulate(pIncr);
}
}
}
static void vdbeIncrFree(IncrMerger *pIncr){
- if( pIncr->pThread ){
- void *pRet;
- sqlite3ThreadJoin(pIncr->pThread, &pRet);
- }
- if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
- if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
- vdbeMergeEngineFree(pIncr->pMerger);
- sqlite3_free(pIncr);
+ if( pIncr ){
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pIncr->pThread ){
+ void *pRet;
+ sqlite3ThreadJoin(pIncr->pThread, &pRet);
+ }
+ if( pIncr->bUseThread ){
+ if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
+ if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
+ }
+#endif
+ vdbeMergeEngineFree(pIncr->pMerger);
+ sqlite3_free(pIncr);
+ }
+}
+
+static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
+ IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
+ if( pIncr ){
+ memset(pIncr, 0, sizeof(IncrMerger));
+ pIncr->pMerger = pMerger;
+ pIncr->pTask = pTask;
+ pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
+ pTask->file2.iEof += pIncr->mxSz;
+
+#if 0
+ /* Open the two temp files. */
+ rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
+ }
+ if( rc!=SQLITE_OK ){
+ vdbeIncrFree(pIncr);
+ pIncr = 0;
+ }
+#endif
+ }
+ return pIncr;
+}
+
+static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
+ if( bUseThread ){
+ pIncr->bUseThread = 1;
+ pIncr->pTask->file2.iEof -= pIncr->mxSz;
+ }
+}
+
+static int vdbeIncrInit2(PmaReader *pIter){
+ int rc = SQLITE_OK;
+ IncrMerger *pIncr = pIter->pIncr;
+ if( pIncr ){
+ SortSubtask *pTask = pIncr->pTask;
+ int i;
+ MergeEngine *pMerger = pIncr->pMerger;
+
+ for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
+ rc = vdbeIncrInit2(&pMerger->aIter[i]);
+ }
+ for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
+ rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
+ }
+
+ /* Set up the required files for pIncr */
+ if( rc==SQLITE_OK ){
+ if( pIncr->bUseThread==0 ){
+ if( pTask->file2.pFd==0 ){
+ rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
+ assert( pTask->file2.iEof>0 );
+ if( rc==SQLITE_OK ){
+ vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof);
+ pTask->file2.iEof = 0;
+ }
+ }
+ if( rc==SQLITE_OK ){
+ pIncr->aFile[1].pFd = pTask->file2.pFd;
+ pIncr->iStartOff = pTask->file2.iEof;
+ pTask->file2.iEof += pIncr->mxSz;
+ }
+ }else{
+ rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
+ }
+ }
+ }
+
+ if( rc==SQLITE_OK && pIncr->bUseThread ){
+ rc = vdbeIncrBgPopulate(pIncr);
+ }
+
+ if( rc==SQLITE_OK ){
+ rc = vdbePmaReaderNext(pIter);
+ }
+ }
+ return rc;
+}
+
+/*
+** Allocate a new MergeEngine object to merge the contents of nPMA level-0
+** PMAs from pTask->file. If no error occurs, set *ppOut to point to
+** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
+** to NULL and return an SQLite error code.
+**
+** When this function is called, *piOffset is set to the offset of the
+** first PMA to read from pTask->file. Assuming no error occurs, it is
+** set to the offset immediately following the last byte of the last
+** PMA before returning. If an error does occur, then the final value of
+** *piOffset is undefined.
+*/
+static int vdbeMergeEngineLevel0(
+ SortSubtask *pTask, /* Sorter task to read from */
+ int nPMA, /* Number of PMAs to read */
+ i64 *piOffset, /* IN/OUT: Read offset in pTask->file */
+ MergeEngine **ppOut /* OUT: New merge-engine */
+){
+ MergeEngine *pNew; /* Merge engine to return */
+ i64 iOff = *piOffset;
+ int i;
+ int rc = SQLITE_OK;
+
+ *ppOut = pNew = vdbeMergeEngineNew(nPMA);
+ if( pNew==0 ) rc = SQLITE_NOMEM;
+
+ for(i=0; i<nPMA && rc==SQLITE_OK; i++){
+ i64 nDummy;
+ PmaReader *pIter = &pNew->aIter[i];
+ rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pIter, &nDummy);
+ iOff = pIter->iEof;
+ }
+
+ if( rc!=SQLITE_OK ){
+ vdbeMergeEngineFree(pNew);
+ *ppOut = 0;
+ }
+ *piOffset = iOff;
+ return rc;
+}
+
+typedef struct IncrBuilder IncrBuilder;
+struct IncrBuilder {
+ int nPMA; /* Number of iterators used so far */
+ MergeEngine *pMerger; /* Merge engine to populate. */
+};
+
+static int vdbeAddToBuilder(
+ SortSubtask *pTask,
+ IncrBuilder *pBuilder,
+ MergeEngine *pMerger
+){
+ int rc = SQLITE_OK;
+ IncrMerger *pIncr;
+
+ assert( pMerger );
+ if( pBuilder->nPMA==SORTER_MAX_MERGE_COUNT ){
+ rc = vdbeAddToBuilder(pTask, &pBuilder[1], pBuilder->pMerger);
+ pBuilder->pMerger = 0;
+ pBuilder->nPMA = 0;
+ }
+
+ if( rc==SQLITE_OK && pBuilder->pMerger==0 ){
+ pBuilder->pMerger = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
+ if( pBuilder->pMerger==0 ) rc = SQLITE_NOMEM;
+ }
+
+ if( rc==SQLITE_OK ){
+ pIncr = vdbeIncrNew(pTask, pMerger);
+ if( pIncr==0 ) rc = SQLITE_NOMEM;
+ pBuilder->pMerger->aIter[pBuilder->nPMA++].pIncr = pIncr;
+ }
+
+ if( rc!=SQLITE_OK ){
+ vdbeMergeEngineFree(pMerger);
+ }
+
+ return rc;
}
/*
** Populate iterator *pIter so that it may be used to iterate through all
-** keys stored in subtask pTask using the incremental merge method.
+** keys stored in all PMAs created by this sorter.
*/
static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){
SortSubtask *pTask0 = &pSorter->aTask[0];
+ MergeEngine *pMain = 0;
+ sqlite3 *db = pTask0->db;
int rc = SQLITE_OK;
- MergeEngine *pMerger = 0;
- IncrMerger *pIncr = 0;
- int i;
- int nPMA = 0;
+ int iTask;
- for(i=0; i<pSorter->nTask; i++){
- nPMA += pSorter->aTask[i].nPMA;
+ IncrBuilder *aMerge;
+ const int nMerge = 32;
+ aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge);
+ if( aMerge==0 ) return SQLITE_NOMEM;
+
+ if( pSorter->nTask>1 ){
+ pMain = vdbeMergeEngineNew(pSorter->nTask);
+ if( pMain==0 ) rc = SQLITE_NOMEM;
}
- pMerger = vdbeMergeEngineNew(nPMA);
- if( pMerger==0 ){
- rc = SQLITE_NOMEM;
- }else{
- int iIter = 0;
+
+ for(iTask=0; iTask<pSorter->nTask && rc==SQLITE_OK; iTask++){
+ MergeEngine *pRoot = 0;
int iPMA;
- for(i=0; i<pSorter->nTask; i++){
- i64 iReadOff = 0;
- SortSubtask *pTask = &pSorter->aTask[i];
- for(iPMA=0; iPMA<pTask->nPMA; iPMA++){
- i64 nDummy = 0;
- PmaReader *pIter = &pMerger->aIter[iIter++];
- rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nDummy);
- iReadOff = pIter->iEof;
+ i64 iReadOff = 0;
+ SortSubtask *pTask = &pSorter->aTask[iTask];
+ if( pTask->nPMA==0 ) continue;
+ for(iPMA=0; iPMA<pTask->nPMA; iPMA += SORTER_MAX_MERGE_COUNT){
+ MergeEngine *pMerger = 0;
+ int nReader = MIN(pTask->nPMA - iPMA, SORTER_MAX_MERGE_COUNT);
+
+ rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
+ if( rc!=SQLITE_OK ) break;
+
+ if( iPMA==0 ){
+ pRoot = pMerger;
+ }else{
+ if( pRoot ){
+ rc = vdbeAddToBuilder(pTask, &aMerge[0], pRoot);
+ pRoot = 0;
+ if( rc!=SQLITE_OK ){
+ vdbeMergeEngineFree(pMerger);
+ break;
+ }
+ }
+ rc = vdbeAddToBuilder(pTask, &aMerge[0], pMerger);
}
}
- for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
- rc = vdbeSorterDoCompare(pTask0, pMerger, i);
+
+ if( pRoot==0 ){
+ int i;
+ for(i=0; rc==SQLITE_OK && i<nMerge; i++){
+ if( aMerge[i].pMerger ){
+ if( pRoot ){
+ rc = vdbeAddToBuilder(pTask, &aMerge[i], pRoot);
+ if( rc!=SQLITE_OK ) break;
+ }
+ pRoot = aMerge[i].pMerger;
+ aMerge[i].pMerger = 0;
+ }
+ }
}
- }
- if( rc==SQLITE_OK ){
- pIncr = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger));
- if( pIncr==0 ){
- rc = SQLITE_NOMEM;
- }else{
- memset(pIncr, 0, sizeof(IncrMerger));
- pIncr->mxSz = (pSorter->mxPmaSize / 2);
- pIncr->pMerger = pMerger;
- pIncr->pTask = pTask0;
+ if( rc==SQLITE_OK ){
+ if( pMain==0 ){
+ pMain = pRoot;
+ }else{
+ IncrMerger *pNew = vdbeIncrNew(pTask, pRoot);
+ pMain->aIter[iTask].pIncr = pNew;
+ if( pNew==0 ) rc = SQLITE_NOMEM;
+ }
+ memset(aMerge, 0, nMerge*sizeof(aMerge[0]));
}
}
- /* Open the two temp files. */
if( rc==SQLITE_OK ){
- rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[0].pFd);
- }
- if( rc==SQLITE_OK ){
- rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[1].pFd);
- }
+ SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
- /* Launch a background thread to populate aFile[1]. */
- if( rc==SQLITE_OK ){
- rc = vdbeIncrBgPopulate(pIncr);
+ rc = vdbeSortAllocUnpacked(pLast);
+ if( rc==SQLITE_OK ){
+ pIter->pIncr = vdbeIncrNew(pLast, pMain);
+ if( pIter->pIncr==0 ){
+ rc = SQLITE_NOMEM;
+ }else{
+ vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads);
+ for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
+ IncrMerger *pIncr;
+ if( (pIncr = pMain->aIter[iTask].pIncr) ){
+ vdbeIncrSetThreads(pIncr, pSorter->bUseThreads);
+ assert( pIncr->pTask!=pLast );
+ }
+ }
+ }
+ }
}
-
- pIter->pIncr = pIncr;
if( rc==SQLITE_OK ){
- rc = vdbePmaReaderNext(pIter);
+ rc = vdbeIncrInit2(pIter);
}
+
+ sqlite3_free(aMerge);
return rc;
}