/*
** Allocate and return a new IncrMerger object to read data from pMerger.
+**
+** If an OOM condition is encountered, return NULL. In this case free the
+** pMerger argument before returning.
*/
-static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
- IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
+static int vdbeIncrNew(
+ SortSubtask *pTask,
+ MergeEngine *pMerger,
+ IncrMerger **ppOut
+){
+ int rc = SQLITE_OK;
+ IncrMerger *pIncr = *ppOut = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger));
if( pIncr ){
memset(pIncr, 0, sizeof(IncrMerger));
pIncr->pMerger = pMerger;
pIncr->pTask = pTask;
pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
pTask->file2.iEof += pIncr->mxSz;
+ }else{
+ vdbeMergeEngineFree(pMerger);
+ rc = SQLITE_NOMEM;
}
- return pIncr;
+ return rc;
}
/*
#define INCRINIT2_NORMAL 0
#define INCRINIT2_TASK 1
#define INCRINIT2_ROOT 2
-
static int vdbeIncrInit2(PmaReader *pIter, int eMode);
+/*
+** Initialize the merger argument passed as the second argument. Once this
+** function returns, the first key of merged data may be read from the merger
+** object in the usual fashion.
+**
+** If argument eMode is INCRINIT2_ROOT, then it is assumed that any IncrMerge
+** objects attached to the PmaReader objects that the merger reads from have
+** already been populated, but that they have not yet populated aFile[0] and
+** set the PmaReader objects up to read from it. In this case all that is
+** required is to call vdbePmaReaderNext() on each iterator to point it at
+** its first key.
+**
+** Otherwise, if eMode is any value other than INCRINIT2_ROOT, then use
+** vdbeIncrInit2() to initialize each PmaReader that feeds data to pMerger.
+**
+** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
+*/
static int vdbeIncrInitMerger(
SortSubtask *pTask,
MergeEngine *pMerger,
- int eMode
+ int eMode /* One of the INCRINIT2_XXX constants */
){
- int i;
- int rc = SQLITE_OK;
+ int rc = SQLITE_OK; /* Return code */
+ int i; /* For iterating through PmaReader objects */
for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
if( eMode==INCRINIT2_ROOT ){
}
#if SQLITE_MAX_WORKER_THREADS>0
+/*
+** The main routine for vdbeIncrInit2() operations run in background threads.
+*/
static void *vdbeIncrInit2Thread(void *pCtx){
PmaReader *pReader = (PmaReader*)pCtx;
void *pRet = SQLITE_INT_TO_PTR( vdbeIncrInit2(pReader, INCRINIT2_TASK) );
return pRet;
}
+/*
+** Use a background thread to invoke vdbeIncrInit2(INCRINIT2_TASK) on the
+** the PmaReader object passed as the first argument.
+**
+** This call will initialize the various fields of the pIter->pIncr
+** structure and, if it is a multi-threaded IncrMerger, launch a
+** background thread to populate aFile[1].
+*/
static int vdbeIncrBgInit2(PmaReader *pIter){
void *pCtx = (void*)pIter;
return vdbeSorterCreateThread(pIter->pIncr->pTask, vdbeIncrInit2Thread, pCtx);
return rc;
}
-typedef struct IncrBuilder IncrBuilder;
-struct IncrBuilder {
- int nPMA; /* Number of iterators used so far */
- MergeEngine *pMerger; /* Merge engine to populate. */
-};
+/*
+** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
+** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
+**
+** i.e.
+**
+** nPMA<=16 -> TreeDepth() == 0
+** nPMA<=256 -> TreeDepth() == 1
+** nPMA<=65536 -> TreeDepth() == 2
+*/
+static int vdbeSorterTreeDepth(int nPMA){
+ int nDepth = 0;
+ i64 nDiv = SORTER_MAX_MERGE_COUNT;
+ while( nDiv < (i64)nPMA ){
+ nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
+ nDepth++;
+ }
+ return nDepth;
+}
-static int vdbeAddToBuilder(
- SortSubtask *pTask,
- IncrBuilder *pBuilder,
- MergeEngine *pMerger
+/*
+** pRoot is the root of an incremental merge-tree with depth nDepth (according
+** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
+** tree, counting from zero. This function adds pLeaf to the tree.
+**
+** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
+** code is returned and pLeaf is freed.
+*/
+static int vdbeSorterAddToTree(
+ SortSubtask *pTask, /* Task context */
+ int nDepth, /* Depth of tree according to TreeDepth() */
+ int iSeq, /* Sequence number of leaf within tree */
+ MergeEngine *pRoot, /* Root of tree */
+ MergeEngine *pLeaf /* Leaf to add to tree */
){
int rc = SQLITE_OK;
+ int nDiv = 1;
+ int i;
+ MergeEngine *p = pRoot;
IncrMerger *pIncr;
- assert( pMerger );
- if( pBuilder->nPMA==SORTER_MAX_MERGE_COUNT ){
- rc = vdbeAddToBuilder(pTask, &pBuilder[1], pBuilder->pMerger);
- pBuilder->pMerger = 0;
- pBuilder->nPMA = 0;
- }
+ rc = vdbeIncrNew(pTask, pLeaf, &pIncr);
- if( rc==SQLITE_OK && pBuilder->pMerger==0 ){
- pBuilder->pMerger = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
- if( pBuilder->pMerger==0 ) rc = SQLITE_NOMEM;
+ for(i=1; i<nDepth; i++){
+ nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
}
- if( rc==SQLITE_OK ){
- pIncr = vdbeIncrNew(pTask, pMerger);
- if( pIncr==0 ) rc = SQLITE_NOMEM;
- pBuilder->pMerger->aIter[pBuilder->nPMA++].pIncr = pIncr;
- }
+ for(i=1; i<nDepth && rc==SQLITE_OK; i++){
+ int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT;
+ PmaReader *pIter = &p->aIter[iIter];
- if( rc!=SQLITE_OK ){
- vdbeMergeEngineFree(pMerger);
+ if( pIter->pIncr==0 ){
+ MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
+ if( pNew==0 ){
+ rc = SQLITE_NOMEM;
+ }else{
+ rc = vdbeIncrNew(pTask, pNew, &pIter->pIncr);
+ }
+ }
+
+ p = pIter->pIncr->pMerger;
+ nDiv = nDiv / SORTER_MAX_MERGE_COUNT;
}
+ if( rc==SQLITE_OK ){
+ p->aIter[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr;
+ }else{
+ vdbeIncrFree(pIncr);
+ }
return rc;
}
/*
-** Populate iterator *pIter so that it may be used to iterate through all
-** keys stored in all PMAs created by this sorter.
+** This function is called as part of a SorterRewind() operation on a sorter
+** that has already written two or more level-0 PMAs to one or more temp
+** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that
+** can be used to incrementally merge all PMAs on disk.
+**
+** If successful, SQLITE_OK is returned and *ppOut set to point to the
+** MergeEngine object at the root of the tree before returning. Or, if an
+** error occurs, an SQLite error code is returned and the final value
+** of *ppOut is undefined.
*/
-static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){
- SortSubtask *pTask0 = &pSorter->aTask[0];
+static int vdbeSorterMergeTreeBuild(VdbeSorter *pSorter, MergeEngine **ppOut){
MergeEngine *pMain = 0;
- sqlite3 *db = pTask0->pSorter->db;
int rc = SQLITE_OK;
int iTask;
- IncrBuilder *aMerge;
- const int nMerge = 32;
- aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge);
- if( aMerge==0 ) return SQLITE_NOMEM;
-
+ /* If the sorter uses more than one task, then create the top-level
+ ** MergeEngine here. This MergeEngine will read data from exactly
+ ** one PmaReader per sub-task. */
+ assert( pSorter->bUseThreads || pSorter->nTask==1 );
if( pSorter->nTask>1 ){
pMain = vdbeMergeEngineNew(pSorter->nTask);
if( pMain==0 ) rc = SQLITE_NOMEM;
}
for(iTask=0; iTask<pSorter->nTask && rc==SQLITE_OK; iTask++){
- MergeEngine *pRoot = 0;
- int iPMA;
- i64 iReadOff = 0;
SortSubtask *pTask = &pSorter->aTask[iTask];
- if( pTask->nPMA==0 ) continue;
- for(iPMA=0; iPMA<pTask->nPMA; iPMA += SORTER_MAX_MERGE_COUNT){
- MergeEngine *pMerger = 0;
- int nReader = MIN(pTask->nPMA - iPMA, SORTER_MAX_MERGE_COUNT);
-
- rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
- if( rc!=SQLITE_OK ) break;
+ if( pTask->nPMA ){
+ MergeEngine *pRoot = 0; /* Root node of tree for this task */
+ int nDepth = vdbeSorterTreeDepth(pTask->nPMA);
+ i64 iReadOff = 0;
- if( iPMA==0 ){
- pRoot = pMerger;
+ if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){
+ rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot);
}else{
- if( pRoot ){
- rc = vdbeAddToBuilder(pTask, &aMerge[0], pRoot);
- pRoot = 0;
- if( rc!=SQLITE_OK ){
- vdbeMergeEngineFree(pMerger);
- break;
+ int i;
+ int iSeq = 0;
+ pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
+ if( pRoot==0 ) rc = SQLITE_NOMEM;
+ for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){
+ MergeEngine *pMerger = 0; /* New level-0 PMA merger */
+ int nReader; /* Number of level-0 PMAs to merge */
+
+ nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT);
+ rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger);
}
}
- rc = vdbeAddToBuilder(pTask, &aMerge[0], pMerger);
- if( rc!=SQLITE_OK ) break;
}
- }
- if( pRoot==0 ){
- int i;
- for(i=0; rc==SQLITE_OK && i<nMerge; i++){
- if( aMerge[i].pMerger ){
- if( pRoot ){
- rc = vdbeAddToBuilder(pTask, &aMerge[i], pRoot);
- if( rc!=SQLITE_OK ) break;
- }
- pRoot = aMerge[i].pMerger;
- aMerge[i].pMerger = 0;
+ if( rc==SQLITE_OK ){
+ if( pMain==0 ){
+ pMain = pRoot;
+ }else{
+ rc = vdbeIncrNew(pTask, pRoot, &pMain->aIter[iTask].pIncr);
}
- }
- }
-
- if( rc==SQLITE_OK ){
- if( pMain==0 ){
- pMain = pRoot;
}else{
- IncrMerger *pNew = vdbeIncrNew(pTask, pRoot);
- pMain->aIter[iTask].pIncr = pNew;
- if( pNew==0 ) rc = SQLITE_NOMEM;
+ vdbeMergeEngineFree(pRoot);
}
- memset(aMerge, 0, nMerge*sizeof(aMerge[0]));
}
+ }
- if( rc!=SQLITE_OK ){
- vdbeMergeEngineFree(pRoot);
- }
+ if( rc!=SQLITE_OK ){
+ vdbeMergeEngineFree(pMain);
+ pMain = 0;
}
+ *ppOut = pMain;
+ return rc;
+}
+
+/*
+** Populate iterator *pIter so that it may be used to iterate through all
+** keys stored in all PMAs created by this sorter.
+*/
+static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){
+ int rc; /* Return code */
+ SortSubtask *pTask0 = &pSorter->aTask[0];
+ MergeEngine *pMain = 0;
+ sqlite3 *db = pTask0->pSorter->db;
+ int iTask;
+ rc = vdbeSorterMergeTreeBuild(pSorter, &pMain);
if( rc==SQLITE_OK ){
#if SQLITE_MAX_WORKER_THREADS
if( pSorter->bUseThreads ){
pSorter->pReader = pIter;
}
if( rc==SQLITE_OK ){
- pIter->pIncr = vdbeIncrNew(pLast, pMain);
- if( pIter->pIncr==0 ){
- rc = SQLITE_NOMEM;
- }
- else{
+ rc = vdbeIncrNew(pLast, pMain, &pIter->pIncr);
+ if( rc==SQLITE_OK ){
vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads);
for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
IncrMerger *pIncr;
if( p->pIncr ){ rc = vdbeIncrBgInit2(p); }
}
}
- pMain = 0;
}
+ pMain = 0;
}
if( rc==SQLITE_OK ){
int eMode = (pSorter->nTask>1 ? INCRINIT2_ROOT : INCRINIT2_NORMAL);
}else
#endif
{
- pSorter->pMerger = pMain;
rc = vdbeIncrInitMerger(pTask0, pMain, INCRINIT2_NORMAL);
+ pSorter->pMerger = pMain;
pMain = 0;
}
}
if( rc!=SQLITE_OK ){
- int i;
- for(i=0; rc==SQLITE_OK && i<nMerge; i++){
- vdbeMergeEngineFree(aMerge[i].pMerger);
- }
vdbeMergeEngineFree(pMain);
}
- sqlite3_free(aMerge);
return rc;
}