]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Add support for using separate worker threads to speed large sorts.
authordrh <drh@noemail.net>
Mon, 1 Sep 2014 17:36:46 +0000 (17:36 +0000)
committerdrh <drh@noemail.net>
Mon, 1 Sep 2014 17:36:46 +0000 (17:36 +0000)
The SQLITE_MAX_WORKER_THREADS and SQLITE_DEFAULT_WORKER_THREADS compile-time
options and the SQLITE_LIMIT_WORKER_THREADS argument to
sqlite3_limit() and the "PRAGMA threads=N" pragma are added.

FossilOrigin-Name: b1c0f0bc1bd8a3477cd7a7ab510f0442ac88b517

1  2 
manifest
manifest.uuid
src/btree.c
src/vdbesort.c

diff --cc manifest
index 5851cdd221a0c50f934ea6ffa29d0f5c89b622f8,6c7fe672c1896ce89d985cfc1b9d71c1006d9e20..cbe9919f0e57242a81a62266038ca8ef97f4d9cb
+++ b/manifest
@@@ -1,9 -1,9 +1,9 @@@
- C Attempt\sto\smake\sthe\sxDelete\smethod\sof\sthe\sunix\sVFS\smore\srobust\son\sVxWorks.
- D 2014-09-01T13:37:55.088
 -C Disable\sworker\sthreads\swhen\sSQLITE_THREADSAFE=0.\s\sSet\sthe\sdefault\scompile-time\nmaximum\snumber\sof\sworker\sthreads\sto\s8\sand\shonor\sthe\nSQLITE_DEFAULT_WORKER_THREADS\scompile-time\sconstant\s(which\sdefaults\sto\s0).
 -D 2014-08-29T19:06:07.922
++C Add\ssupport\sfor\susing\sseparate\sworker\sthreads\sto\sspeed\slarge\ssorts.\nThe\sSQLITE_MAX_WORKER_THREADS\sand\sSQLITE_DEFAULT_WORKER_THREADS\scompile-time\noptions\sand\sthe\sSQLITE_LIMIT_WORKER_THREADS\sargument\sto\s\nsqlite3_limit()\sand\sthe\s"PRAGMA\sthreads=N"\spragma\sare\sadded.
++D 2014-09-01T17:36:46.754
  F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
- F Makefile.in 5eb79e334a5de69c87740edd56af6527dd219308
+ F Makefile.in cf57f673d77606ab0f2d9627ca52a9ba1464146a
  F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
- F Makefile.msc 5b04e657cf08a9ac7fc47d876c5c8be962c47d6b
+ F Makefile.msc e31dee24038965fb6269d6d61073fd6b7e331dec
  F Makefile.vxworks 034289efa9d591b04b1a73598623119c306cbba0
  F README.md 64f270c43c38c46de749e419c22f0ae2f4499fe8
  F VERSION 53a0b870e7f16d3b06623c31d233a304c163a6af
@@@ -168,10 -168,10 +168,10 @@@ F src/auth.c 523da7fb4979469955d822ff92
  F src/backup.c a31809c65623cc41849b94d368917f8bb66e6a7e
  F src/bitvec.c 19a4ba637bd85f8f63fc8c9bae5ade9fb05ec1cb
  F src/btmutex.c ec9d3f1295dafeb278c3830211cc5584132468f4
- F src/btree.c 4d0427bab54229030fc5b8577d2e5ffdc8129030
 -F src/btree.c a10ceaccf04fbc1670907e1d79013e1a7a89edee
++F src/btree.c 2a483a8045118faa99867a8679da42754b532318
  F src/btree.h a79aa6a71e7f1055f01052b7f821bd1c2dce95c8
  F src/btreeInt.h cf180d86b2e9e418f638d65baa425c4c69c0e0e3
- F src/build.c 058e3aadb1376521ff291735237edf4c10f438fb
+ F src/build.c c26b233dcdb1e2c8f468d49236c266f9f3de96d8
  F src/callback.c b97d0695ffcf6a8710ee445ffe56ee387d4d8a6f
  F src/complete.c dc1d136c0feee03c2f7550bafc0d29075e36deac
  F src/ctime.c 0231df905e2c4abba4483ee18ffc05adc321df2a
@@@ -208,8 -208,8 +208,8 @@@ F src/os.c 1b147e4cf7cc39e618115c14a086
  F src/os.h 60d419395e32a8029fa380a80a3da2e9030f635e
  F src/os_common.h 92815ed65f805560b66166e3583470ff94478f04
  F src/os_setup.h c9d4553b5aaa6f73391448b265b89bed0b890faa
 -F src/os_unix.c bd7df3094a60915c148517504c76df4fca24e542
 +F src/os_unix.c 8525ca79457c5b4673a5fda2774ee39fe155f40f
- F src/os_win.c d067fce558a5032e6e6afe62899e5397bf63cf3e
+ F src/os_win.c 08ce5616a5755da9400931fb39146e4a97801a2a
  F src/os_win.h 09e751b20bbc107ffbd46e13555dc73576d88e21
  F src/pager.c 3e732d2bbdd8d8d95fed0c5ae7e718d73153c4c5
  F src/pager.h ffd5607f7b3e4590b415b007a4382f693334d428
@@@ -223,12 -223,12 +223,12 @@@ F src/printf.c 00986c86ddfffefc2fd3c736
  F src/random.c d10c1f85b6709ca97278428fd5db5bbb9c74eece
  F src/resolve.c 0ea356d32a5e884add23d1b9b4e8736681dd5697
  F src/rowset.c a9c9aae3234b44a6d7c6f5a3cadf90dce1e627be
- F src/select.c 1c4667571f2c9e339b5a5c5b152a9ea7b0bc4163
+ F src/select.c 89e569b263535662f54b537eb9118b2c554ae7aa
 -F src/shell.c 88378cef39aba4b4a1df82793dcb1daf9276bb81
 +F src/shell.c 713cef4d73c05fc8e12f4960072329d767a05d50
- F src/sqlite.h.in ed9d35990c61f0388ca6405706455c4095310553
+ F src/sqlite.h.in 74b42237f0d2b010779cc1b1a00190452b31a2ec
  F src/sqlite3.rc 992c9f5fb8285ae285d6be28240a7e8d3a7f2bad
  F src/sqlite3ext.h 886f5a34de171002ad46fae8c36a7d8051c190fc
- F src/sqliteInt.h d8a9be2aa123a78c90ad4aba09b23e7dd3f8cc9f
+ F src/sqliteInt.h 6244ee9052752e26d1275ab20c9b774385aa57d2
  F src/sqliteLimit.h 164b0e6749d31e0daa1a4589a169d31c0dec7b3d
  F src/status.c 7ac05a5c7017d0b9f0b4bcd701228b784f987158
  F src/table.c 2cd62736f845d82200acfa1287e33feb3c15d62e
@@@ -284,14 -285,14 +285,14 @@@ F src/update.c ea336ce7b8b3fc5e316ba8f0
  F src/utf.c 77abb5e6d27f3d236e50f7c8fff1d00e15262359
  F src/util.c 068dcd26354a3898ccc64ad5c4bdb95a7a15d33a
  F src/vacuum.c 3728d74919d4fb1356f9e9a13e27773db60b7179
- F src/vdbe.c fd193824d1cc4a71c631eb792ce837aab530210a
+ F src/vdbe.c 90db7ad740b6d3f7ab446e6244dbc17ce495cca6
  F src/vdbe.h c63fad052c9e7388d551e556e119c0bcf6bebdf8
- F src/vdbeInt.h 4653bb420abb7acdc215659cdcedd3a59f336191
+ F src/vdbeInt.h cdc8e421f85beb1ac9b4669ec5beadab6faa15e0
  F src/vdbeapi.c 09677a53dd8c71bcd670b0bd073bb9aefa02b441
- F src/vdbeaux.c dba006f67c9fd1b1d07ee7fb0fb38aa1905161d1
+ F src/vdbeaux.c cef5d34a64ae3a65b56d96d3fd663246ec8e1c36
  F src/vdbeblob.c 848238dc73e93e48432991bb5651bf87d865eca4
  F src/vdbemem.c 921d5468a68ac06f369810992e84ca22cc730a62
- F src/vdbesort.c f7f5563bf7d4695ca8f3203f3bf9de96d04ed0b3
 -F src/vdbesort.c f92628f3d5d4432f751b15a5f39bacc3c6a64a03
++F src/vdbesort.c 02646a9f86421776ae5d7594f620f9ed669d3698
  F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767
  F src/vtab.c 019dbfd0406a7447c990e1f7bd1dfcdb8895697f
  F src/wal.c 264df50a1b33124130b23180ded2e2c5663c652a
@@@ -1188,7 -1193,7 +1193,8 @@@ F tool/vdbe_profile.tcl 67746953071a9f8
  F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
  F tool/warnings.sh 0abfd78ceb09b7f7c27c688c8e3fe93268a13b32
  F tool/win/sqlite.vsix deb315d026cc8400325c5863eef847784a219a2f
- P 839c7996eecd5480152c514555b9aa1121a69ce0
- R aa6b1a81952b002633810b3bb14c199e
 -P 2ab4b5adc60b52bf2d2b79968d226b8dd7d2ab3b
 -R 97462c8661f494fef9aea0705c0547b9
++P b0f6b91f36b503d8ba8d5257bb194f8c1afb4833 33fa0410499900dd8beb44b9a8ffbd9f4b68c8d8
++R 91d41afdb912bf253c979082c1abeeae
++T +closed 33fa0410499900dd8beb44b9a8ffbd9f4b68c8d8
  U drh
- Z 8d5a13ba98607c42fffb4d1d92841eac
 -Z 369e082ce9db2aa4e0cc673a60a91a43
++Z 8ac04e78c2df72246f6a97d0d813c221
diff --cc manifest.uuid
index 9f24642c5883666e2ae9056933fb7887ed172ed9,680e153df5c40d813206fcee88914a6c5d7a36a9..754fc696d78ba6032797e959d933e4e557e512ea
@@@ -1,1 -1,1 +1,1 @@@
- b0f6b91f36b503d8ba8d5257bb194f8c1afb4833
 -33fa0410499900dd8beb44b9a8ffbd9f4b68c8d8
++b1c0f0bc1bd8a3477cd7a7ab510f0442ac88b517
diff --cc src/btree.c
Simple merge
diff --cc src/vdbesort.c
index 6a5855f2efef1efa67ce9e9756f601c48bc1ede1,158fa440fb9b835395b88cdddb68bce52ae42274..7318ea409e21b03ea7e8cee33d28ac8cd1e30b93
@@@ -753,17 -1404,177 +1404,177 @@@ static int vdbeSorterListToPMA(SortSubt
      SorterRecord *p;
      SorterRecord *pNext = 0;
  
-     fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff);
-     pSorter->nPMA++;
-     fileWriterWriteVarint(&writer, pSorter->nInMemory);
-     for(p=pSorter->pRecord; p; p=pNext){
-       pNext = p->pNext;
-       fileWriterWriteVarint(&writer, p->nVal);
-       fileWriterWrite(&writer, p->pVal, p->nVal);
-       sqlite3DbFree(db, p);
+     vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
+                       pTask->file.iEof);
+     pTask->nPMA++;
+     vdbePmaWriteVarint(&writer, pList->szPMA);
+     for(p=pList->pList; p; p=pNext){
+       pNext = p->u.pNext;
+       vdbePmaWriteVarint(&writer, p->nVal);
+       vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
+       if( pList->aMemory==0 ) sqlite3_free(p);
+     }
+     pList->pList = p;
+     rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
+   }
+   vdbeSorterWorkDebug(pTask, "exit");
+   assert( rc!=SQLITE_OK || pList->pList==0 );
+   assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
+   return rc;
+ }
+ /*
+ ** Advance the MergeEngine to its next entry.
+ ** Set *pbEof to true there is no next entry because
+ ** the MergeEngine has reached the end of all its inputs.
+ **
+ ** Return SQLITE_OK if successful or an error code if an error occurs.
+ */
+ static int vdbeMergeEngineStep(
+   MergeEngine *pMerger,      /* The merge engine to advance to the next row */
+   int *pbEof                 /* Set TRUE at EOF.  Set false for more content */
+ ){
+   int rc;
+   int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */
+   SortSubtask *pTask = pMerger->pTask;
+   /* Advance the current PmaReader */
+   rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
+   /* Update contents of aTree[] */
+   if( rc==SQLITE_OK ){
+     int i;                      /* Index of aTree[] to recalculate */
+     PmaReader *pReadr1;         /* First PmaReader to compare */
+     PmaReader *pReadr2;         /* Second PmaReader to compare */
+     u8 *pKey2;                  /* To pReadr2->aKey, or 0 if record cached */
+     /* Find the first two PmaReaders to compare. The one that was just
+     ** advanced (iPrev) and the one next to it in the array.  */
+     pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)];
+     pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)];
+     pKey2 = pReadr2->aKey;
+     for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
+       /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
+       int iRes;
+       if( pReadr1->pFd==0 ){
+         iRes = +1;
+       }else if( pReadr2->pFd==0 ){
+         iRes = -1;
+       }else{
+         iRes = vdbeSorterCompare(pTask, 
+             pReadr1->aKey, pReadr1->nKey, pKey2, pReadr2->nKey
+         );
+       }
+       /* If pReadr1 contained the smaller value, set aTree[i] to its index.
+       ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
+       ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
+       ** pKey2 to point to the record belonging to pReadr2.
+       **
+       ** Alternatively, if pReadr2 contains the smaller of the two values,
+       ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
+       ** was actually called above, then pTask->pUnpacked now contains
+       ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
+       ** vdbeSorterCompare() from decoding pReadr2 again.
+       **
+       ** If the two values were equal, then the value from the oldest
+       ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
+       ** is sorted from oldest to newest, so pReadr1 contains older values
+       ** than pReadr2 iff (pReadr1<pReadr2).  */
+       if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){
+         pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr);
+         pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
+         pKey2 = pReadr2->aKey;
+       }else{
+         if( pReadr1->pFd ) pKey2 = 0;
+         pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
+         pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
+       }
+     }
+     *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0);
+   }
+   return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
+ }
+ #if SQLITE_MAX_WORKER_THREADS>0
+ /*
+ ** The main routine for background threads that write level-0 PMAs.
+ */
+ static void *vdbeSorterFlushThread(void *pCtx){
+   SortSubtask *pTask = (SortSubtask*)pCtx;
+   int rc;                         /* Return code */
+   assert( pTask->bDone==0 );
+   rc = vdbeSorterListToPMA(pTask, &pTask->list);
+   pTask->bDone = 1;
+   return SQLITE_INT_TO_PTR(rc);
+ }
+ #endif /* SQLITE_MAX_WORKER_THREADS>0 */
+ /*
+ ** Flush the current contents of VdbeSorter.list to a new PMA, possibly
+ ** using a background thread.
+ */
+ static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
+ #if SQLITE_MAX_WORKER_THREADS==0
+   pSorter->bUsePMA = 1;
+   return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
+ #else
+   int rc = SQLITE_OK;
+   int i;
+   SortSubtask *pTask = 0;    /* Thread context used to create new PMA */
+   int nWorker = (pSorter->nTask-1);
+   /* Set the flag to indicate that at least one PMA has been written. 
+   ** Or will be, anyhow.  */
+   pSorter->bUsePMA = 1;
+   /* Select a sub-task to sort and flush the current list of in-memory
+   ** records to disk. If the sorter is running in multi-threaded mode,
+   ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
+   ** the background thread from a sub-tasks previous turn is still running,
+   ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
+   ** fall back to using the final sub-task. The first (pSorter->nTask-1)
+   ** sub-tasks are prefered as they use background threads - the final 
+   ** sub-task uses the main thread. */
+   for(i=0; i<nWorker; i++){
+     int iTest = (pSorter->iPrev + i + 1) % nWorker;
+     pTask = &pSorter->aTask[iTest];
+     if( pTask->bDone ){
+       rc = vdbeSorterJoinThread(pTask);
+     }
+     if( rc!=SQLITE_OK || pTask->pThread==0 ) break;
+   }
+   if( rc==SQLITE_OK ){
+     if( i==nWorker ){
+       /* Use the foreground thread for this operation */
+       rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
+     }else{
+       /* Launch a background thread for this operation */
+       u8 *aMem = pTask->list.aMemory;
+       void *pCtx = (void*)pTask;
+       assert( pTask->pThread==0 && pTask->bDone==0 );
+       assert( pTask->list.pList==0 );
+       assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
 -      pSorter->iPrev = (pTask - pSorter->aTask);
++      pSorter->iPrev = (u8)(pTask - pSorter->aTask);
+       pTask->list = pSorter->list;
+       pSorter->list.pList = 0;
+       pSorter->list.szPMA = 0;
+       if( aMem ){
+         pSorter->list.aMemory = aMem;
+         pSorter->nMemory = sqlite3MallocSize(aMem);
+       }else if( pSorter->list.aMemory ){
+         pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
+         if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
+       }
+       rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
      }
-     pSorter->pRecord = p;
-     rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff);
    }
  
    return rc;
@@@ -822,33 -1790,325 +1790,325 @@@ static int vdbeIncrSwap(IncrMerger *pIn
  }
  
  /*
- ** Helper function for sqlite3VdbeSorterRewind(). 
+ ** Allocate and return a new IncrMerger object to read data from pMerger.
+ **
+ ** If an OOM condition is encountered, return NULL. In this case free the
+ ** pMerger argument before returning.
  */
- static int vdbeSorterInitMerge(
-   sqlite3 *db,                    /* Database handle */
-   const VdbeCursor *pCsr,         /* Cursor handle for this sorter */
-   i64 *pnByte                     /* Sum of bytes in all opened PMAs */
+ static int vdbeIncrMergerNew(
+   SortSubtask *pTask,     /* The thread that will be using the new IncrMerger */
+   MergeEngine *pMerger,   /* The MergeEngine that the IncrMerger will control */
+   IncrMerger **ppOut      /* Write the new IncrMerger here */
+ ){
+   int rc = SQLITE_OK;
+   IncrMerger *pIncr = *ppOut = (IncrMerger*)
+        (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr)));
+   if( pIncr ){
+     pIncr->pMerger = pMerger;
+     pIncr->pTask = pTask;
+     pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
+     pTask->file2.iEof += pIncr->mxSz;
+   }else{
+     vdbeMergeEngineFree(pMerger);
+     rc = SQLITE_NOMEM;
+   }
+   return rc;
+ }
+ #if SQLITE_MAX_WORKER_THREADS>0
+ /*
+ ** Set the "use-threads" flag on object pIncr.
+ */
+ static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){
+   pIncr->bUseThread = 1;
+   pIncr->pTask->file2.iEof -= pIncr->mxSz;
+ }
+ #endif /* SQLITE_MAX_WORKER_THREADS>0 */
+ /*
+ ** Recompute pMerger->aTree[iOut] by comparing the next keys on the
+ ** two PmaReaders that feed that entry.  Neither of the PmaReaders
+ ** are advanced.  This routine merely does the comparison.
+ */
+ static void vdbeMergeEngineCompare(
+   MergeEngine *pMerger,  /* Merge engine containing PmaReaders to compare */
+   int iOut               /* Store the result in pMerger->aTree[iOut] */
+ ){
+   int i1;
+   int i2;
+   int iRes;
+   PmaReader *p1;
+   PmaReader *p2;
+   assert( iOut<pMerger->nTree && iOut>0 );
+   if( iOut>=(pMerger->nTree/2) ){
+     i1 = (iOut - pMerger->nTree/2) * 2;
+     i2 = i1 + 1;
+   }else{
+     i1 = pMerger->aTree[iOut*2];
+     i2 = pMerger->aTree[iOut*2+1];
+   }
+   p1 = &pMerger->aReadr[i1];
+   p2 = &pMerger->aReadr[i2];
+   if( p1->pFd==0 ){
+     iRes = i2;
+   }else if( p2->pFd==0 ){
+     iRes = i1;
+   }else{
+     int res;
+     assert( pMerger->pTask->pUnpacked!=0 );  /* from vdbeSortSubtaskMain() */
+     res = vdbeSorterCompare(
+         pMerger->pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey
+     );
+     if( res<=0 ){
+       iRes = i1;
+     }else{
+       iRes = i2;
+     }
+   }
+   pMerger->aTree[iOut] = iRes;
+ }
+ /*
+ ** Allowed values for the eMode parameter to vdbeMergeEngineInit()
+ ** and vdbePmaReaderIncrMergeInit().
+ **
+ ** Only INCRINIT_NORMAL is valid in single-threaded builds (when
+ ** SQLITE_MAX_WORKER_THREADS==0).  The other values are only used
+ ** when there exists one or more separate worker threads.
+ */
+ #define INCRINIT_NORMAL 0
+ #define INCRINIT_TASK   1
+ #define INCRINIT_ROOT   2
+ /* Forward reference.
+ ** The vdbeIncrMergeInit() and vdbePmaReaderIncrMergeInit() routines call each
+ ** other (when building a merge tree).
+ */
+ static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode);
+ /*
+ ** Initialize the MergeEngine object passed as the second argument. Once this
+ ** function returns, the first key of merged data may be read from the 
+ ** MergeEngine object in the usual fashion.
+ **
+ ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
+ ** objects attached to the PmaReader objects that the merger reads from have
+ ** already been populated, but that they have not yet populated aFile[0] and
+ ** set the PmaReader objects up to read from it. In this case all that is
+ ** required is to call vdbePmaReaderNext() on each PmaReader to point it at
+ ** its first key.
+ **
+ ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use 
+ ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data 
+ ** to pMerger.
+ **
+ ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
+ */
+ static int vdbeMergeEngineInit(
+   SortSubtask *pTask,             /* Thread that will run pMerger */
+   MergeEngine *pMerger,           /* MergeEngine to initialize */
+   int eMode                       /* One of the INCRINIT_XXX constants */
  ){
-   VdbeSorter *pSorter = pCsr->pSorter;
    int rc = SQLITE_OK;             /* Return code */
-   int i;                          /* Used to iterator through aIter[] */
-   i64 nByte = 0;                  /* Total bytes in all opened PMAs */
+   int i;                          /* For looping over PmaReader objects */
+   int nTree = pMerger->nTree;
+   /* eMode is always INCRINIT_NORMAL in single-threaded mode */
+   assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
+   /* Verify that the MergeEngine is assigned to a single thread */
 -  assert( pMerger->pTask==0 ); // || pMerger->pTask==pTask );
++  assert( pMerger->pTask==0 );
+   pMerger->pTask = pTask;
+   for(i=0; i<nTree; i++){
+     if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){
+       /* PmaReaders should be normally initialized in order, as if they are
+       ** reading from the same temp file this makes for more linear file IO.
+       ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
+       ** in use it will block the vdbePmaReaderNext() call while it uses
+       ** the main thread to fill its buffer. So calling PmaReaderNext()
+       ** on this PmaReader before any of the multi-threaded PmaReaders takes
+       ** better advantage of multi-processor hardware. */
+       rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
+     }else{
+       rc = vdbePmaReaderIncrMergeInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
+     }
+     if( rc!=SQLITE_OK ) return rc;
+   }
  
-   /* Initialize the iterators. */
-   for(i=0; i<SORTER_MAX_MERGE_COUNT; i++){
-     VdbeSorterIter *pIter = &pSorter->aIter[i];
-     rc = vdbeSorterIterInit(db, pSorter, pSorter->iReadOff, pIter, &nByte);
-     pSorter->iReadOff = pIter->iEof;
-     assert( rc!=SQLITE_OK || pSorter->iReadOff<=pSorter->iWriteOff );
-     if( rc!=SQLITE_OK || pSorter->iReadOff>=pSorter->iWriteOff ) break;
+   for(i=pMerger->nTree-1; i>0; i--){
+     vdbeMergeEngineCompare(pMerger, i);
    }
+   return pTask->pUnpacked->errCode;
+ }
+ /*
+ ** Initialize the IncrMerge field of a PmaReader.
+ **
+ ** If the PmaReader passed as the first argument is not an incremental-reader
+ ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it serves
+ ** to open and/or initialize the temp file related fields of the IncrMerge
+ ** object at (pReadr->pIncr).
+ **
+ ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
+ ** in the sub-tree headed by pReadr are also initialized. Data is then loaded
+ ** into the buffers belonging to pReadr and it is set to
+ ** point to the first key in its range.
+ **
+ ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
+ ** to be a multi-threaded PmaReader and this function is being called in a
+ ** background thread. In this case all PmaReaders in the sub-tree are 
+ ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
+ ** pReadr is populated. However, pReadr itself is not set up to point
+ ** to its first key. A call to vdbePmaReaderNext() is still required to do
+ ** that. 
+ **
+ ** The reason this function does not call vdbePmaReaderNext() immediately 
+ ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
+ ** to block on thread (pTask->thread) before accessing aFile[1]. But, since
+ ** this entire function is being run by thread (pTask->thread), that will
+ ** lead to the current background thread attempting to join itself.
+ **
+ ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
+ ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
+ ** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
+ ** In this case vdbePmaReaderNext() is called on all child PmaReaders and
+ ** the current PmaReader set to point to the first key in its range.
+ **
+ ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
+ */
+ static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
+   int rc = SQLITE_OK;
+   IncrMerger *pIncr = pReadr->pIncr;
  
-   /* Initialize the aTree[] array. */
-   for(i=pSorter->nTree-1; rc==SQLITE_OK && i>0; i--){
-     rc = vdbeSorterDoCompare(pCsr, i);
+   /* eMode is always INCRINIT_NORMAL in single-threaded mode */
+   assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
+   if( pIncr ){
+     SortSubtask *pTask = pIncr->pTask;
+     sqlite3 *db = pTask->pSorter->db;
+     rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
+     /* Set up the required files for pIncr. A multi-theaded IncrMerge object
+     ** requires two temp files to itself, whereas a single-threaded object
+     ** only requires a region of pTask->file2. */
+     if( rc==SQLITE_OK ){
+       int mxSz = pIncr->mxSz;
+ #if SQLITE_MAX_WORKER_THREADS>0
+       if( pIncr->bUseThread ){
+         rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
+         if( rc==SQLITE_OK ){
+           rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
+         }
+       }else
+ #endif
+       /*if( !pIncr->bUseThread )*/{
+         if( pTask->file2.pFd==0 ){
+           assert( pTask->file2.iEof>0 );
+           rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
+           pTask->file2.iEof = 0;
+         }
+         if( rc==SQLITE_OK ){
+           pIncr->aFile[1].pFd = pTask->file2.pFd;
+           pIncr->iStartOff = pTask->file2.iEof;
+           pTask->file2.iEof += mxSz;
+         }
+       }
+     }
+ #if SQLITE_MAX_WORKER_THREADS>0
+     if( rc==SQLITE_OK && pIncr->bUseThread ){
+       /* Use the current thread to populate aFile[1], even though this
+       ** PmaReader is multi-threaded. The reason being that this function
+       ** is already running in background thread pIncr->pTask->thread. */
+       assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
+       rc = vdbeIncrPopulate(pIncr);
+     }
+ #endif
+     if( rc==SQLITE_OK
+      && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK)
+     ){
+       rc = vdbePmaReaderNext(pReadr);
+     }
    }
+   return rc;
+ }
  
-   *pnByte = nByte;
+ #if SQLITE_MAX_WORKER_THREADS>0
+ /*
+ ** The main routine for vdbePmaReaderIncrMergeInit() operations run in 
+ ** background threads.
+ */
+ static void *vdbePmaReaderBgInit(void *pCtx){
+   PmaReader *pReader = (PmaReader*)pCtx;
+   void *pRet = SQLITE_INT_TO_PTR(
+                   vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
+                );
+   pReader->pIncr->pTask->bDone = 1;
+   return pRet;
+ }
+ /*
+ ** Use a background thread to invoke vdbePmaReaderIncrMergeInit(INCRINIT_TASK) 
+ ** on the the PmaReader object passed as the first argument.
+ **
+ ** This call will initialize the various fields of the pReadr->pIncr 
+ ** structure and, if it is a multi-threaded IncrMerger, launch a 
+ ** background thread to populate aFile[1].
+ */
+ static int vdbePmaReaderBgIncrInit(PmaReader *pReadr){
+   void *pCtx = (void*)pReadr;
+   return vdbeSorterCreateThread(pReadr->pIncr->pTask, vdbePmaReaderBgInit, pCtx);
+ }
+ #endif
+ /*
+ ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
+ ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
+ ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
+ ** to NULL and return an SQLite error code.
+ **
+ ** When this function is called, *piOffset is set to the offset of the
+ ** first PMA to read from pTask->file. Assuming no error occurs, it is 
+ ** set to the offset immediately following the last byte of the last
+ ** PMA before returning. If an error does occur, then the final value of
+ ** *piOffset is undefined.
+ */
+ static int vdbeMergeEngineLevel0(
+   SortSubtask *pTask,             /* Sorter task to read from */
+   int nPMA,                       /* Number of PMAs to read */
+   i64 *piOffset,                  /* IN/OUT: Readr offset in pTask->file */
+   MergeEngine **ppOut             /* OUT: New merge-engine */
+ ){
+   MergeEngine *pNew;              /* Merge engine to return */
+   i64 iOff = *piOffset;
+   int i;
+   int rc = SQLITE_OK;
+   *ppOut = pNew = vdbeMergeEngineNew(nPMA);
+   if( pNew==0 ) rc = SQLITE_NOMEM;
+   for(i=0; i<nPMA && rc==SQLITE_OK; i++){
+     i64 nDummy;
+     PmaReader *pReadr = &pNew->aReadr[i];
+     rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy);
+     iOff = pReadr->iEof;
+   }
+   if( rc!=SQLITE_OK ){
+     vdbeMergeEngineFree(pNew);
+     *ppOut = 0;
+   }
+   *piOffset = iOff;
    return rc;
  }