struct VdbeSorter {
int mnPmaSize; /* Minimum PMA size, in bytes */
int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
- int bUsePMA; /* True if one or more PMAs created */
- int bUseThreads; /* True if one or more PMAs created */
PmaReader *pReader; /* Read data from here after Rewind() */
+ MergeEngine *pMerger; /* Or here, if bUseThreads==0 */
int mxKeysize; /* Largest serialized key seen so far */
UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */
-#if 0
- int nInMemory; /* Current size of pRecord list as PMA */
- SorterRecord *pRecord; /* Head of in-memory record list */
- u8 *aMemory; /* Block of memory to alloc records from */
-#endif
- SorterList list;
- int iMemory; /* Offset of first free byte in aMemory */
- int nMemory; /* Size of aMemory allocation in bytes */
- int iPrev; /* Previous thread used to flush PMA */
- int nTask; /* Size of aTask[] array */
+ SorterList list; /* List of in-memory records */
+ int iMemory; /* Offset of free space in list.aMemory */
+ int nMemory; /* Size of list.aMemory allocation in bytes */
+ u8 bUsePMA; /* True if one or more PMAs created */
+ u8 bUseThreads; /* True to use background threads */
+ u8 iPrev; /* Previous thread used to flush PMA */
+ u8 nTask; /* Size of aTask[] array */
SortSubtask aTask[1]; /* One or more subtasks */
};
sqlite3DbFree(db, pSorter->pReader);
pSorter->pReader = 0;
}
+ vdbeMergeEngineFree(pSorter->pMerger);
+ pSorter->pMerger = 0;
for(i=0; i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i];
vdbeSortSubtaskCleanup(db, pTask);
#define INCRINIT2_NORMAL 0
#define INCRINIT2_TASK 1
#define INCRINIT2_ROOT 2
+
+static int vdbeIncrInit2(PmaReader *pIter, int eMode);
+
+static int vdbeIncrInitMerger(
+ SortSubtask *pTask,
+ MergeEngine *pMerger,
+ int eMode
+){
+ int i;
+ int rc = SQLITE_OK;
+
+ for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
+ if( eMode==INCRINIT2_ROOT ){
+ rc = vdbePmaReaderNext(&pMerger->aIter[i]);
+ }else{
+ rc = vdbeIncrInit2(&pMerger->aIter[i], INCRINIT2_NORMAL);
+ }
+ }
+
+ for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
+ rc = vdbeSorterDoCompare(pTask, pMerger, i);
+ }
+
+ return rc;
+}
+
static int vdbeIncrInit2(PmaReader *pIter, int eMode){
int rc = SQLITE_OK;
IncrMerger *pIncr = pIter->pIncr;
if( pIncr ){
SortSubtask *pTask = pIncr->pTask;
- int i;
- MergeEngine *pMerger = pIncr->pMerger;
- for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
- IncrMerger *p;
- if( eMode==INCRINIT2_ROOT ){
- rc = vdbePmaReaderNext(&pMerger->aIter[i]);
- }else{
- rc = vdbeIncrInit2(&pMerger->aIter[i], INCRINIT2_NORMAL);
- }
- }
+ rc = vdbeIncrInitMerger(pTask, pIncr->pMerger, eMode);
/* Set up the required files for pIncr */
if( rc==SQLITE_OK ){
}
}
- for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
- rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
- }
-
if( rc==SQLITE_OK && pIncr->bUseThread ){
/* Use the current thread */
assert( eMode==INCRINIT2_ROOT || eMode==INCRINIT2_TASK );
** Populate iterator *pIter so that it may be used to iterate through all
** keys stored in all PMAs created by this sorter.
*/
-static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){
+static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){
SortSubtask *pTask0 = &pSorter->aTask[0];
MergeEngine *pMain = 0;
sqlite3 *db = pTask0->db;
}
if( rc==SQLITE_OK ){
- SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
-
- rc = vdbeSortAllocUnpacked(pLast);
- if( rc==SQLITE_OK ){
- pIter->pIncr = vdbeIncrNew(pLast, pMain);
- if( pIter->pIncr==0 ){
- rc = SQLITE_NOMEM;
+#if SQLITE_MAX_WORKER_THREADS
+ if( pSorter->bUseThreads ){
+ PmaReader *pIter;
+ SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
+ rc = vdbeSortAllocUnpacked(pLast);
+ if( rc==SQLITE_OK ){
+ pIter = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
+ pSorter->pReader = pIter;
}
-#if SQLITE_MAX_WORKER_THREADS>0
- else{
- vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads);
- for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
- IncrMerger *pIncr;
- if( (pIncr = pMain->aIter[iTask].pIncr) ){
- vdbeIncrSetThreads(pIncr, pSorter->bUseThreads);
- assert( pIncr->pTask!=pLast );
- }
+ if( rc==SQLITE_OK ){
+ pIter->pIncr = vdbeIncrNew(pLast, pMain);
+ if( pIter->pIncr==0 ){
+ rc = SQLITE_NOMEM;
}
- if( pSorter->nTask>1 ){
- for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
- PmaReader *p = &pMain->aIter[iTask];
- assert( p->pIncr==0 || p->pIncr->pTask==&pSorter->aTask[iTask] );
- if( p->pIncr ){ rc = vdbeIncrBgInit2(p); }
+ else{
+ vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads);
+ for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
+ IncrMerger *pIncr;
+ if( (pIncr = pMain->aIter[iTask].pIncr) ){
+ vdbeIncrSetThreads(pIncr, pSorter->bUseThreads);
+ assert( pIncr->pTask!=pLast );
+ }
+ }
+ if( pSorter->nTask>1 ){
+ for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
+ PmaReader *p = &pMain->aIter[iTask];
+ assert( p->pIncr==0 || p->pIncr->pTask==&pSorter->aTask[iTask] );
+ if( p->pIncr ){ rc = vdbeIncrBgInit2(p); }
+ }
}
}
}
+ if( rc==SQLITE_OK ){
+ int eMode = (pSorter->nTask>1 ? INCRINIT2_ROOT : INCRINIT2_NORMAL);
+ rc = vdbeIncrInit2(pIter, eMode);
+ }
+ }else
#endif
+ {
+ pSorter->pMerger = pMain;
+ rc = vdbeIncrInitMerger(pTask0, pMain, INCRINIT2_NORMAL);
}
}
- if( rc==SQLITE_OK ){
- int eMode = (pSorter->nTask>1 ? INCRINIT2_ROOT : INCRINIT2_NORMAL);
- rc = vdbeIncrInit2(pIter, eMode);
- }
sqlite3_free(aMerge);
return rc;
** incrementally read and merge all remaining PMAs. */
assert( pSorter->pReader==0 );
if( rc==SQLITE_OK ){
- PmaReader *pReader;
- pReader = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
- pSorter->pReader = pReader;
- rc = vdbePmaReaderIncrInit(pSorter, pReader);
- assert( rc!=SQLITE_OK || pReader->pFile );
+ rc = vdbePmaReaderIncrInit(pSorter);
*pbEof = 0;
}
VdbeSorter *pSorter = pCsr->pSorter;
int rc; /* Return code */
- if( pSorter->pReader ){
- rc = vdbePmaReaderNext(pSorter->pReader);
- *pbEof = (pSorter->pReader->pFile==0);
+ assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) );
+ if( pSorter->bUsePMA ){
+ assert( pSorter->pReader==0 || pSorter->pMerger==0 );
+ assert( pSorter->bUseThreads==0 || pSorter->pReader );
+ assert( pSorter->bUseThreads==1 || pSorter->pMerger );
+ if( pSorter->bUseThreads ){
+ rc = vdbePmaReaderNext(pSorter->pReader);
+ *pbEof = (pSorter->pReader->pFile==0);
+ }else{
+ rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof);
+ }
}else{
SorterRecord *pFree = pSorter->list.pList;
pSorter->list.pList = pFree->u.pNext;
int *pnKey /* OUT: Size of current key in bytes */
){
void *pKey;
- if( pSorter->pReader ){
- *pnKey = pSorter->pReader->nKey;
- pKey = pSorter->pReader->aKey;
+ if( pSorter->bUsePMA ){
+ PmaReader *pReader = (pSorter->bUseThreads ?
+ pSorter->pReader : &pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]]
+ );
+ *pnKey = pReader->nKey;
+ pKey = pReader->aKey;
}else{
*pnKey = pSorter->list.pList->nVal;
pKey = SRVAL(pSorter->list.pList);