]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Add an experimental multi-threaded capability to vdbesorter.c.
authordan <dan@noemail.net>
Mon, 17 Mar 2014 15:43:05 +0000 (15:43 +0000)
committerdan <dan@noemail.net>
Mon, 17 Mar 2014 15:43:05 +0000 (15:43 +0000)
FossilOrigin-Name: ff0b5c851ba7d04d1836d7c6a3222713e7d8d357

manifest
manifest.uuid
src/threads.c
src/vdbesort.c

index 4f0c8a0be4cc716c637fe32d3ef3459292c08b1f..2de90048c0178d21edcb6fe82e88163170244b61 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,11 +1,11 @@
-C Merge\slatest\strunk\schanges\sinto\sthis\sbranch.
-D 2014-03-13T15:41:09.146
+C Add\san\sexperimental\smulti-threaded\scapability\sto\svdbesorter.c.
+D 2014-03-17T15:43:05.543
 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
 F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125
 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
 F Makefile.msc 153eb9b9725bc7b8e4dbe963219298e0c4a644b0
 F Makefile.vxworks db21ed42a01d5740e656b16f92cb5d8d5e5dd315
-F README.md 64f270c43c38c46de749e419c22f0ae2f4499fe8 w README
+F README.md 64f270c43c38c46de749e419c22f0ae2f4499fe8
 F VERSION c3b0d47c3c5cf25c5bd4ff9e6f3af2f9d7934ea6
 F aclocal.m4 a5c22d164aff7ed549d53a90fa56d56955281f50
 F addopcodes.awk 9eb448a552d5c0185cf62c463f9c173cedae3811
@@ -108,17 +108,17 @@ F ext/icu/icu.c d415ccf984defeb9df2c0e1afcfaa2f6dc05eacb
 F ext/icu/sqliteicu.h 728867a802baa5a96de7495e9689a8e01715ef37
 F ext/misc/amatch.c 678056a4bfcd83c4e82dea81d37543cd1d6dbee1
 F ext/misc/closure.c 636024302cde41b2bf0c542f81c40c624cfb7012
-F ext/misc/fuzzer.c 136533c53cfce0957f0b48fa11dba27e21c5c01d w src/test_fuzzer.c
+F ext/misc/fuzzer.c 136533c53cfce0957f0b48fa11dba27e21c5c01d
 F ext/misc/ieee754.c b0362167289170627659e84173f5d2e8fee8566e
 F ext/misc/nextchar.c 35c8b8baacb96d92abbb34a83a997b797075b342
 F ext/misc/percentile.c bcbee3c061b884eccb80e21651daaae8e1e43c63
 F ext/misc/regexp.c af92cdaa5058fcec1451e49becc7ba44dba023dc
 F ext/misc/rot13.c 1ac6f95f99b575907b9b09c81a349114cf9be45a
-F ext/misc/spellfix.c 93f3961074cebe63c31fcefe62ca2a032ee8dfed w src/test_spellfix.c
+F ext/misc/spellfix.c 93f3961074cebe63c31fcefe62ca2a032ee8dfed
 F ext/misc/totype.c 4a167594e791abeed95e0a8db028822b5e8fe512
 F ext/misc/vfslog.c fe40fab5c077a40477f7e5eba994309ecac6cc95
 F ext/misc/vtshim.c babb0dc2bf116029e3e7c9a618b8a1377045303e
-F ext/misc/wholenumber.c 784b12543d60702ebdd47da936e278aa03076212 w src/test_wholenumber.c
+F ext/misc/wholenumber.c 784b12543d60702ebdd47da936e278aa03076212
 F ext/rtree/README 6315c0d73ebf0ec40dedb5aa0e942bc8b54e3761
 F ext/rtree/rtree.c 2d9f95da404d850474e628c720c5ce15d29b47de
 F ext/rtree/rtree.h 834dbcb82dc85b2481cde6a07cdadfddc99e9b9e
@@ -272,7 +272,7 @@ F src/test_thread.c 1e133a40b50e9c035b00174035b846e7eef481cb
 F src/test_vfs.c e72f555ef7a59080f898fcf1a233deb9eb704ea9
 F src/test_vfstrace.c 3a0ab304682fecbceb689e7d9b904211fde11d78
 F src/test_wsd.c 41cadfd9d97fe8e3e4e44f61a4a8ccd6f7ca8fe9
-F src/threads.c 2b918d1f4f0b0831e8f41c49bcaa097f01490120
+F src/threads.c cde9d885fd562b5427f89a42a8829085f88b17df
 F src/tokenize.c 6da2de6e12218ccb0aea5184b56727d011f4bee7
 F src/trigger.c 66f3470b03b52b395e839155786966e3e037fddb
 F src/update.c 5b3e74a03b3811e586b4f2b4cbd7c49f01c93115
@@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4
 F src/vdbeaux.c e45e3f9daf38c5be3fd39e9aacc1c9066af57a06
 F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa
 F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447
-F src/vdbesort.c 46801acb342e5e4c07ba1777fe58880c143abb59
+F src/vdbesort.c 1d973fcd0e00c77836e9d505e7c45df02601ffc2
 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767
 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd
 F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8
@@ -1157,7 +1157,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1
 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01
 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff
-P c92b0fe1371e7c20a5fbdf5fa96e30da14c40880 6504aa47a8ebb13827be017c4cb4b2dc3c4c55f4
-R c6806fc7f1eff740ed4af1293ffc55d0
+P d17231b63d48c1f9c4dee109c90cec112e2f0fd4
+R 3c79acd4eecb6926efd889b756a27898
 U dan
-Z b3b09b0f0acb184b570781ded35d947c
+Z 226b72b7faaae8e6d7b6586f32d5c5a0
index c9d158cd107be8a06f983acca132e42875993e31..bf397f3d56acd86231fb817deb4c281360098bbf 100644 (file)
@@ -1 +1 @@
-d17231b63d48c1f9c4dee109c90cec112e2f0fd4
\ No newline at end of file
+ff0b5c851ba7d04d1836d7c6a3222713e7d8d357
\ No newline at end of file
index 33781a7aace67cb58f17f5997ba98ae3947e3dad..7cc964250d8774ac48975d5898511eda17a0ad34 100644 (file)
@@ -47,7 +47,6 @@ int sqlite3ThreadCreate(
   void *pIn                 /* Argument passed into xTask() */
 ){
   SQLiteThread *p;
-  int rc;
 
   assert( ppThread!=0 );
   assert( xTask!=0 );
index b9ed97e8b3a4ba75b38f16e0f047fea2553e4572..e84a33fccc215ff816407431f409ebb6fab1367d 100644 (file)
 
 
 typedef struct VdbeSorterIter VdbeSorterIter;
+typedef struct SorterThread SorterThread;
 typedef struct SorterRecord SorterRecord;
+typedef struct SorterMerger SorterMerger;
 typedef struct FileWriter FileWriter;
 
+
+/*
+** Maximum number of threads to use. Setting this value to 1 forces all
+** operations to be single-threaded.
+*/
+#ifndef SQLITE_MAX_SORTER_THREAD
+# define SQLITE_MAX_SORTER_THREAD 1
+#endif
+
+/*
+** Candidate values for SorterThread.eWork
+*/
+#define SORTER_THREAD_SORT   1
+#define SORTER_THREAD_TO_PMA 2
+#define SORTER_THREAD_CONS   3
+
+/*
+** Much of the work performed in this module to sort the list of records is 
+** broken down into smaller units that may be peformed in parallel. In order
+** to perform such a unit of work, an instance of the following structure
+** is configured and passed to vdbeSorterThreadMain() - either directly by 
+** the main thread or via a background thread.
+**
+** Exactly SQLITE_MAX_SORTER_THREAD instances of this structure are allocated
+** as part of each VdbeSorter object. Instances are never allocated any other
+** way.
+**
+** When a background thread is launched to perform work, SorterThread.bDone
+** is set to 0 and the SorterThread.pThread variable set to point to the
+** thread handle. SorterThread.bDone is set to 1 (to indicate to the main
+** thread that joining SorterThread.pThread will not block) before the thread
+** exits. SorterThread.pThread and bDone are always cleared after the 
+** background thread has been joined.
+**
+** One object (specifically, VdbeSorter.aThread[SQLITE_MAX_SORTER_THREAD-1])
+** is reserved for the foreground thread.
+**
+** The nature of the work performed is determined by SorterThread.eWork,
+** as follows:
+**
+**   SORTER_THREAD_SORT:
+**     Sort the linked list of records at SorterThread.pList.
+**
+**   SORTER_THREAD_TO_PMA:
+**     Sort the linked list of records at SorterThread.pList, and write
+**     the results to a new PMA in temp file SorterThread.pTemp1. Open
+**     the temp file if it is not already open.
+**
+**   SORTER_THREAD_CONS:
+**     Merge existing PMAs until SorterThread.nConsolidate or fewer
+**     remain in temp file SorterThread.pTemp1.
+*/
+struct SorterThread {
+  SQLiteThread *pThread;          /* Thread handle, or NULL */
+  int bDone;                      /* Set to true by pThread when finished */
+
+  sqlite3_vfs *pVfs;              /* VFS used to open temporary files */
+  KeyInfo *pKeyInfo;              /* How to compare records */
+  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
+  int pgsz;                       /* Main database page size */
+
+  u8 eWork;                       /* One of the SORTER_THREAD_* constants */
+  int nConsolidate;               /* For THREAD_CONS, max final PMAs */
+  SorterRecord *pList;            /* List of records for pThread to sort */
+  int nInMemory;                  /* Expected size of PMA based on pList */
+
+  int nPMA;                       /* Number of PMAs currently in pTemp1 */
+  i64 iTemp1Off;                  /* Offset to write to in pTemp1 */
+  sqlite3_file *pTemp1;           /* File to write PMAs to, or NULL */
+};
+
+
 /*
 ** NOTES ON DATA STRUCTURE USED FOR N-WAY MERGES:
 **
@@ -92,19 +166,24 @@ typedef struct FileWriter FileWriter;
 ** key comparison operations are required, where N is the number of segments
 ** being merged (rounded up to the next power of 2).
 */
+struct SorterMerger {
+  int nTree;                      /* Used size of aTree/aIter (power of 2) */
+  int *aTree;                     /* Current state of incremental merge */
+  VdbeSorterIter *aIter;          /* Array of iterators to merge data from */
+};
+
+/*
+** Main sorter structure. A single instance of this is allocated for each 
+** sorter cursor created by the VDBE.
+*/
 struct VdbeSorter {
-  i64 iWriteOff;                  /* Current write offset within file pTemp1 */
-  i64 iReadOff;                   /* Current read offset within file pTemp1 */
   int nInMemory;                  /* Current size of pRecord list as PMA */
-  int nTree;                      /* Used size of aTree/aIter (power of 2) */
-  int nPMA;                       /* Number of PMAs stored in pTemp1 */
   int mnPmaSize;                  /* Minimum PMA size, in bytes */
   int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
-  VdbeSorterIter *aIter;          /* Array of iterators to merge */
-  int *aTree;                     /* Current state of incremental merge */
-  sqlite3_file *pTemp1;           /* PMA file 1 */
+  int bUsePMA;                    /* True if one or more PMAs created */
   SorterRecord *pRecord;          /* Head of in-memory record list */
-  UnpackedRecord *pUnpacked;      /* Used to unpack keys */
+  SorterMerger *pMerger;          /* For final merge of PMAs (by caller) */ 
+  SorterThread aThread[SQLITE_MAX_SORTER_THREAD];
 };
 
 /*
@@ -150,7 +229,8 @@ struct SorterRecord {
   SorterRecord *pNext;
 };
 
-/* Minimum allowable value for the VdbeSorter.nWorking variable */
+/* The minimum PMA size is set to this value multiplied by the database
+** page size in bytes.  */
 #define SORTER_MIN_WORKING 10
 
 /* Maximum number of segments to merge in a single pass. */
@@ -160,9 +240,9 @@ struct SorterRecord {
 ** Free all memory belonging to the VdbeSorterIter object passed as the second
 ** argument. All structure fields are set to zero before returning.
 */
-static void vdbeSorterIterZero(sqlite3 *db, VdbeSorterIter *pIter){
-  sqlite3DbFree(db, pIter->aAlloc);
-  sqlite3DbFree(db, pIter->aBuffer);
+static void vdbeSorterIterZero(VdbeSorterIter *pIter){
+  sqlite3_free(pIter->aAlloc);
+  sqlite3_free(pIter->aBuffer);
   memset(pIter, 0, sizeof(VdbeSorterIter));
 }
 
@@ -176,7 +256,6 @@ static void vdbeSorterIterZero(sqlite3 *db, VdbeSorterIter *pIter){
 ** next call to this function.
 */
 static int vdbeSorterIterRead(
-  sqlite3 *db,                    /* Database handle (for malloc) */
   VdbeSorterIter *p,              /* Iterator */
   int nByte,                      /* Bytes of data to read */
   u8 **ppOut                      /* OUT: Pointer to buffer containing data */
@@ -222,11 +301,13 @@ static int vdbeSorterIterRead(
 
     /* Extend the p->aAlloc[] allocation if required. */
     if( p->nAlloc<nByte ){
+      u8 *aNew;
       int nNew = p->nAlloc*2;
       while( nByte>nNew ) nNew = nNew*2;
-      p->aAlloc = sqlite3DbReallocOrFree(db, p->aAlloc, nNew);
-      if( !p->aAlloc ) return SQLITE_NOMEM;
+      aNew = sqlite3Realloc(p->aAlloc, nNew);
+      if( !aNew ) return SQLITE_NOMEM;
       p->nAlloc = nNew;
+      p->aAlloc = aNew;
     }
 
     /* Copy as much data as is available in the buffer into the start of
@@ -244,7 +325,7 @@ static int vdbeSorterIterRead(
 
       nCopy = nRem;
       if( nRem>p->nBuffer ) nCopy = p->nBuffer;
-      rc = vdbeSorterIterRead(db, p, nCopy, &aNext);
+      rc = vdbeSorterIterRead(p, nCopy, &aNext);
       if( rc!=SQLITE_OK ) return rc;
       assert( aNext!=p->aAlloc );
       memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
@@ -261,7 +342,7 @@ static int vdbeSorterIterRead(
 ** Read a varint from the stream of data accessed by p. Set *pnOut to
 ** the value read.
 */
-static int vdbeSorterIterVarint(sqlite3 *db, VdbeSorterIter *p, u64 *pnOut){
+static int vdbeSorterIterVarint(VdbeSorterIter *p, u64 *pnOut){
   int iBuf;
 
   iBuf = p->iReadOff % p->nBuffer;
@@ -271,7 +352,7 @@ static int vdbeSorterIterVarint(sqlite3 *db, VdbeSorterIter *p, u64 *pnOut){
     u8 aVarint[16], *a;
     int i = 0, rc;
     do{
-      rc = vdbeSorterIterRead(db, p, 1, &a);
+      rc = vdbeSorterIterRead(p, 1, &a);
       if( rc ) return rc;
       aVarint[(i++)&0xf] = a[0];
     }while( (a[0]&0x80)!=0 );
@@ -286,23 +367,20 @@ static int vdbeSorterIterVarint(sqlite3 *db, VdbeSorterIter *p, u64 *pnOut){
 ** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
 ** no error occurs, or an SQLite error code if one does.
 */
-static int vdbeSorterIterNext(
-  sqlite3 *db,                    /* Database handle (for sqlite3DbMalloc() ) */
-  VdbeSorterIter *pIter           /* Iterator to advance */
-){
+static int vdbeSorterIterNext(VdbeSorterIter *pIter){
   int rc;                         /* Return Code */
   u64 nRec = 0;                   /* Size of record in bytes */
 
   if( pIter->iReadOff>=pIter->iEof ){
     /* This is an EOF condition */
-    vdbeSorterIterZero(db, pIter);
+    vdbeSorterIterZero(pIter);
     return SQLITE_OK;
   }
 
-  rc = vdbeSorterIterVarint(db, pIter, &nRec);
+  rc = vdbeSorterIterVarint(pIter, &nRec);
   if( rc==SQLITE_OK ){
     pIter->nKey = (int)nRec;
-    rc = vdbeSorterIterRead(db, pIter, (int)nRec, &pIter->aKey);
+    rc = vdbeSorterIterRead(pIter, (int)nRec, &pIter->aKey);
   }
 
   return rc;
@@ -315,26 +393,23 @@ static int vdbeSorterIterNext(
 ** PMA is empty).
 */
 static int vdbeSorterIterInit(
-  sqlite3 *db,                    /* Database handle */
-  const VdbeSorter *pSorter,      /* Sorter object */
-  i64 iStart,                     /* Start offset in pFile */
+  SorterThread *pThread,          /* Thread context */
+  i64 iStart,                     /* Start offset in pThread->pTemp1 */
   VdbeSorterIter *pIter,          /* Iterator to populate */
   i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
 ){
   int rc = SQLITE_OK;
-  int nBuf;
+  int nBuf = pThread->pgsz;
 
-  nBuf = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
-
-  assert( pSorter->iWriteOff>iStart );
+  assert( pThread->iTemp1Off>iStart );
   assert( pIter->aAlloc==0 );
   assert( pIter->aBuffer==0 );
-  pIter->pFile = pSorter->pTemp1;
+  pIter->pFile = pThread->pTemp1;
   pIter->iReadOff = iStart;
   pIter->nAlloc = 128;
-  pIter->aAlloc = (u8 *)sqlite3DbMallocRaw(db, pIter->nAlloc);
+  pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
   pIter->nBuffer = nBuf;
-  pIter->aBuffer = (u8 *)sqlite3DbMallocRaw(db, nBuf);
+  pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
 
   if( !pIter->aBuffer ){
     rc = SQLITE_NOMEM;
@@ -344,26 +419,26 @@ static int vdbeSorterIterInit(
     iBuf = iStart % nBuf;
     if( iBuf ){
       int nRead = nBuf - iBuf;
-      if( (iStart + nRead) > pSorter->iWriteOff ){
-        nRead = (int)(pSorter->iWriteOff - iStart);
+      if( (iStart + nRead) > pThread->iTemp1Off ){
+        nRead = (int)(pThread->iTemp1Off - iStart);
       }
       rc = sqlite3OsRead(
-          pSorter->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
+          pThread->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
       );
       assert( rc!=SQLITE_IOERR_SHORT_READ );
     }
 
     if( rc==SQLITE_OK ){
-      u64 nByte;                       /* Size of PMA in bytes */
-      pIter->iEof = pSorter->iWriteOff;
-      rc = vdbeSorterIterVarint(db, pIter, &nByte);
+      u64 nByte;
+      pIter->iEof = pThread->iTemp1Off;
+      rc = vdbeSorterIterVarint(pIter, &nByte);
       pIter->iEof = pIter->iReadOff + nByte;
       *pnByte += nByte;
     }
   }
 
   if( rc==SQLITE_OK ){
-    rc = vdbeSorterIterNext(db, pIter);
+    rc = vdbeSorterIterNext(pIter);
   }
   return rc;
 }
@@ -385,15 +460,14 @@ static int vdbeSorterIterInit(
 ** has been allocated and contains an unpacked record that is used as key2.
 */
 static void vdbeSorterCompare(
-  const VdbeCursor *pCsr,         /* Cursor object (for pKeyInfo) */
+  SorterThread *pThread,          /* Thread context (for pKeyInfo) */
   int nIgnore,                    /* Ignore the last nIgnore fields */
   const void *pKey1, int nKey1,   /* Left side of comparison */
   const void *pKey2, int nKey2,   /* Right side of comparison */
   int *pRes                       /* OUT: Result of comparison */
 ){
-  KeyInfo *pKeyInfo = pCsr->pKeyInfo;
-  VdbeSorter *pSorter = pCsr->pSorter;
-  UnpackedRecord *r2 = pSorter->pUnpacked;
+  KeyInfo *pKeyInfo = pThread->pKeyInfo;
+  UnpackedRecord *r2 = pThread->pUnpacked;
   int i;
 
   if( pKey2 ){
@@ -420,26 +494,29 @@ static void vdbeSorterCompare(
 ** multiple b-tree segments. Parameter iOut is the index of the aTree[] 
 ** value to recalculate.
 */
-static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){
-  VdbeSorter *pSorter = pCsr->pSorter;
+static int vdbeSorterDoCompare(
+  SorterThread *pThread, 
+  SorterMerger *pMerger, 
+  int iOut
+){
   int i1;
   int i2;
   int iRes;
   VdbeSorterIter *p1;
   VdbeSorterIter *p2;
 
-  assert( iOut<pSorter->nTree && iOut>0 );
+  assert( iOut<pMerger->nTree && iOut>0 );
 
-  if( iOut>=(pSorter->nTree/2) ){
-    i1 = (iOut - pSorter->nTree/2) * 2;
+  if( iOut>=(pMerger->nTree/2) ){
+    i1 = (iOut - pMerger->nTree/2) * 2;
     i2 = i1 + 1;
   }else{
-    i1 = pSorter->aTree[iOut*2];
-    i2 = pSorter->aTree[iOut*2+1];
+    i1 = pMerger->aTree[iOut*2];
+    i2 = pMerger->aTree[iOut*2+1];
   }
 
-  p1 = &pSorter->aIter[i1];
-  p2 = &pSorter->aIter[i2];
+  p1 = &pMerger->aIter[i1];
+  p2 = &pMerger->aIter[i2];
 
   if( p1->pFile==0 ){
     iRes = i2;
@@ -447,9 +524,9 @@ static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){
     iRes = i1;
   }else{
     int res;
-    assert( pCsr->pSorter->pUnpacked!=0 );  /* allocated in vdbeSorterMerge() */
+    assert( pThread->pUnpacked!=0 );  /* allocated in vdbeSorterMerge() */
     vdbeSorterCompare(
-        pCsr, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
+        pThread, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
     );
     if( res<=0 ){
       iRes = i1;
@@ -458,7 +535,7 @@ static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){
     }
   }
 
-  pSorter->aTree[iOut] = iRes;
+  pMerger->aTree[iOut] = iRes;
   return SQLITE_OK;
 }
 
@@ -467,22 +544,32 @@ static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){
 */
 int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){
   int pgsz;                       /* Page size of main database */
+  int i;                          /* Used to iterate through aThread[] */
   int mxCache;                    /* Cache size */
   VdbeSorter *pSorter;            /* The new sorter */
-  char *d;                        /* Dummy */
+  KeyInfo *pKeyInfo;              /* Copy of pCsr->pKeyInfo with db==0 */
+  int szKeyInfo;                  /* Size of pCsr->pKeyInfo in bytes */
 
   assert( pCsr->pKeyInfo && pCsr->pBt==0 );
-  pCsr->pSorter = pSorter = sqlite3DbMallocZero(db, sizeof(VdbeSorter));
+  szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
+  pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sizeof(VdbeSorter)+szKeyInfo);
+  pCsr->pSorter = pSorter;
   if( pSorter==0 ){
     return SQLITE_NOMEM;
   }
-  
-  pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pCsr->pKeyInfo, 0, 0, &d);
-  if( pSorter->pUnpacked==0 ) return SQLITE_NOMEM;
-  assert( pSorter->pUnpacked==(UnpackedRecord *)d );
+  pKeyInfo = (KeyInfo*)&pSorter[1];
+  memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
+  pKeyInfo->db = 0;
+  pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
+
+  for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+    SorterThread *pThread = &pSorter->aThread[i];
+    pThread->pKeyInfo = pKeyInfo;
+    pThread->pVfs = db->pVfs;
+    pThread->pgsz = pgsz;
+  }
 
   if( !sqlite3TempInMemory(db) ){
-    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
     pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
     mxCache = db->aDb[0].pSchema->cache_size;
     if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
@@ -504,24 +591,89 @@ static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
   }
 }
 
+/*
+** Free all resources owned by the object indicated by argument pThread. All 
+** fields of *pThread are zeroed before returning.
+*/
+static void vdbeSorterThreadCleanup(sqlite3 *db, SorterThread *pThread){
+  sqlite3DbFree(db, pThread->pUnpacked);
+  vdbeSorterRecordFree(0, pThread->pList);
+  if( pThread->pTemp1 ){
+    sqlite3OsCloseFree(pThread->pTemp1);
+  }
+  memset(pThread, 0, sizeof(SorterThread));
+}
+
+/*
+** Join all threads.  
+*/
+static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
+  int rc = rcin;
+  int i;
+  for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+    SorterThread *pThread = &pSorter->aThread[i];
+    if( pThread->pThread ){
+      void *pRet;
+      int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet);
+      pThread->pThread = 0;
+      pThread->bDone = 0;
+      if( rc==SQLITE_OK ) rc = rc2;
+      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+    }
+  }
+  return rc;
+}
+
+/*
+** Allocate a new SorterMerger object with space for nIter iterators.
+*/
+static SorterMerger *vdbeSorterMergerNew(int nIter){
+  int N = 2;                      /* Smallest power of two >= nIter */
+  int nByte;                      /* Total bytes of space to allocate */
+  SorterMerger *pNew;             /* Pointer to allocated object to return */
+
+  assert( nIter<=SORTER_MAX_MERGE_COUNT );
+  while( N<nIter ) N += N;
+  nByte = sizeof(SorterMerger) + N * (sizeof(int) + sizeof(VdbeSorterIter));
+
+  pNew = (SorterMerger*)sqlite3MallocZero(nByte);
+  if( pNew ){
+    pNew->nTree = N;
+    pNew->aIter = (VdbeSorterIter*)&pNew[1];
+    pNew->aTree = (int*)&pNew->aIter[N];
+  }
+
+  return pNew;
+}
+
+/*
+** Free the SorterMerger object passed as the only argument.
+*/
+static void vdbeSorterMergerFree(SorterMerger *pMerger){
+  if( pMerger ){
+    int i;
+    for(i=0; i<pMerger->nTree; i++){
+      vdbeSorterIterZero(&pMerger->aIter[i]);
+    }
+    sqlite3_free(pMerger);
+  }
+}
+
 /*
 ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
 */
 void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
   VdbeSorter *pSorter = pCsr->pSorter;
   if( pSorter ){
-    if( pSorter->aIter ){
-      int i;
-      for(i=0; i<pSorter->nTree; i++){
-        vdbeSorterIterZero(db, &pSorter->aIter[i]);
-      }
-      sqlite3DbFree(db, pSorter->aIter);
-    }
-    if( pSorter->pTemp1 ){
-      sqlite3OsCloseFree(pSorter->pTemp1);
+    int i;
+    vdbeSorterJoinAll(pSorter, SQLITE_OK);
+    for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+      SorterThread *pThread = &pSorter->aThread[i];
+      vdbeSorterThreadCleanup(db, pThread);
     }
-    vdbeSorterRecordFree(db, pSorter->pRecord);
-    sqlite3DbFree(db, pSorter->pUnpacked);
+
+    vdbeSorterRecordFree(0, pSorter->pRecord);
+    vdbeSorterMergerFree(pSorter->pMerger);
     sqlite3DbFree(db, pSorter);
     pCsr->pSorter = 0;
   }
@@ -532,9 +684,9 @@ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
 ** set *ppFile to point to the malloc'd file-handle and return SQLITE_OK.
 ** Otherwise, set *ppFile to 0 and return an SQLite error code.
 */
-static int vdbeSorterOpenTempFile(sqlite3 *db, sqlite3_file **ppFile){
+static int vdbeSorterOpenTempFile(sqlite3_vfs *pVfs, sqlite3_file **ppFile){
   int dummy;
-  return sqlite3OsOpenMalloc(db->pVfs, 0, ppFile,
+  return sqlite3OsOpenMalloc(pVfs, 0, ppFile,
       SQLITE_OPEN_TEMP_JOURNAL |
       SQLITE_OPEN_READWRITE    | SQLITE_OPEN_CREATE |
       SQLITE_OPEN_EXCLUSIVE    | SQLITE_OPEN_DELETEONCLOSE, &dummy
@@ -546,7 +698,7 @@ static int vdbeSorterOpenTempFile(sqlite3 *db, sqlite3_file **ppFile){
 ** Set *ppOut to the head of the new list.
 */
 static void vdbeSorterMerge(
-  const VdbeCursor *pCsr,         /* For pKeyInfo */
+  SorterThread *pThread,          /* Calling thread context */
   SorterRecord *p1,               /* First list to merge */
   SorterRecord *p2,               /* Second list to merge */
   SorterRecord **ppOut            /* OUT: Head of merged list */
@@ -557,7 +709,7 @@ static void vdbeSorterMerge(
 
   while( p1 && p2 ){
     int res;
-    vdbeSorterCompare(pCsr, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res);
+    vdbeSorterCompare(pThread, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res);
     if( res<=0 ){
       *pp = p1;
       pp = &p1->pNext;
@@ -576,27 +728,26 @@ static void vdbeSorterMerge(
 }
 
 /*
-** Sort the linked list of records headed at pCsr->pRecord. Return SQLITE_OK
-** if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if an error
-** occurs.
+** Sort the linked list of records headed at pThread->pList. Return 
+** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
+** an error occurs.
 */
-static int vdbeSorterSort(const VdbeCursor *pCsr){
+static int vdbeSorterSort(SorterThread *pThread){
   int i;
   SorterRecord **aSlot;
   SorterRecord *p;
-  VdbeSorter *pSorter = pCsr->pSorter;
 
   aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
   if( !aSlot ){
     return SQLITE_NOMEM;
   }
 
-  p = pSorter->pRecord;
+  p = pThread->pList;
   while( p ){
     SorterRecord *pNext = p->pNext;
     p->pNext = 0;
     for(i=0; aSlot[i]; i++){
-      vdbeSorterMerge(pCsr, p, aSlot[i], &p);
+      vdbeSorterMerge(pThread, p, aSlot[i], &p);
       aSlot[i] = 0;
     }
     aSlot[i] = p;
@@ -605,9 +756,9 @@ static int vdbeSorterSort(const VdbeCursor *pCsr){
 
   p = 0;
   for(i=0; i<64; i++){
-    vdbeSorterMerge(pCsr, p, aSlot[i], &p);
+    vdbeSorterMerge(pThread, p, aSlot[i], &p);
   }
-  pSorter->pRecord = p;
+  pThread->pList = p;
 
   sqlite3_free(aSlot);
   return SQLITE_OK;
@@ -617,15 +768,13 @@ static int vdbeSorterSort(const VdbeCursor *pCsr){
 ** Initialize a file-writer object.
 */
 static void fileWriterInit(
-  sqlite3 *db,                    /* Database (for malloc) */
   sqlite3_file *pFile,            /* File to write to */
   FileWriter *p,                  /* Object to populate */
+  int nBuf,                       /* Buffer size */
   i64 iStart                      /* Offset of pFile to begin writing at */
 ){
-  int nBuf = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
-
   memset(p, 0, sizeof(FileWriter));
-  p->aBuffer = (u8 *)sqlite3DbMallocRaw(db, nBuf);
+  p->aBuffer = (u8*)sqlite3Malloc(nBuf);
   if( !p->aBuffer ){
     p->eFWErr = SQLITE_NOMEM;
   }else{
@@ -673,7 +822,7 @@ static void fileWriterWrite(FileWriter *p, u8 *pData, int nData){
 ** Before returning, set *piEof to the offset immediately following the
 ** last byte written to the file.
 */
-static int fileWriterFinish(sqlite3 *db, FileWriter *p, i64 *piEof){
+static int fileWriterFinish(FileWriter *p, i64 *piEof){
   int rc;
   if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
     p->eFWErr = sqlite3OsWrite(p->pFile, 
@@ -682,7 +831,7 @@ static int fileWriterFinish(sqlite3 *db, FileWriter *p, i64 *piEof){
     );
   }
   *piEof = (p->iWriteOff + p->iBufEnd);
-  sqlite3DbFree(db, p->aBuffer);
+  sqlite3_free(p->aBuffer);
   rc = p->eFWErr;
   memset(p, 0, sizeof(FileWriter));
   return rc;
@@ -712,43 +861,234 @@ static void fileWriterWriteVarint(FileWriter *p, u64 iVal){
 **       Each record consists of a varint followed by a blob of data (the 
 **       key). The varint is the number of bytes in the blob of data.
 */
-static int vdbeSorterListToPMA(sqlite3 *db, const VdbeCursor *pCsr){
+static int vdbeSorterListToPMA(SorterThread *pThread){
   int rc = SQLITE_OK;             /* Return code */
-  VdbeSorter *pSorter = pCsr->pSorter;
-  FileWriter writer;
+  FileWriter writer;              /* Object used to write to the file */
 
   memset(&writer, 0, sizeof(FileWriter));
-
-  if( pSorter->nInMemory==0 ){
-    assert( pSorter->pRecord==0 );
-    return rc;
-  }
-
-  rc = vdbeSorterSort(pCsr);
+  assert( pThread->nInMemory>0 );
 
   /* If the first temporary PMA file has not been opened, open it now. */
-  if( rc==SQLITE_OK && pSorter->pTemp1==0 ){
-    rc = vdbeSorterOpenTempFile(db, &pSorter->pTemp1);
-    assert( rc!=SQLITE_OK || pSorter->pTemp1 );
-    assert( pSorter->iWriteOff==0 );
-    assert( pSorter->nPMA==0 );
+  if( pThread->pTemp1==0 ){
+    rc = vdbeSorterOpenTempFile(pThread->pVfs, &pThread->pTemp1);
+    assert( rc!=SQLITE_OK || pThread->pTemp1 );
+    assert( pThread->iTemp1Off==0 );
+    assert( pThread->nPMA==0 );
   }
 
   if( rc==SQLITE_OK ){
     SorterRecord *p;
     SorterRecord *pNext = 0;
 
-    fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff);
-    pSorter->nPMA++;
-    fileWriterWriteVarint(&writer, pSorter->nInMemory);
-    for(p=pSorter->pRecord; p; p=pNext){
+    fileWriterInit(pThread->pTemp1, &writer, pThread->pgsz, pThread->iTemp1Off);
+    pThread->nPMA++;
+    fileWriterWriteVarint(&writer, pThread->nInMemory);
+    for(p=pThread->pList; p; p=pNext){
       pNext = p->pNext;
       fileWriterWriteVarint(&writer, p->nVal);
       fileWriterWrite(&writer, p->pVal, p->nVal);
-      sqlite3DbFree(db, p);
+      sqlite3_free(p);
+    }
+    pThread->pList = p;
+    rc = fileWriterFinish(&writer, &pThread->iTemp1Off);
+  }
+
+  return rc;
+}
+
+/*
+** Advance the SorterMerger iterator passed as the second argument to
+** the next entry. Set *pbEof to true if this means the iterator has 
+** reached EOF.
+**
+** Return SQLITE_OK if successful or an error code if an error occurs.
+*/
+static int vdbeSorterNext(
+  SorterThread *pThread, 
+  SorterMerger *pMerger, 
+  int *pbEof
+){
+  int rc;
+  int iPrev = pMerger->aTree[1];/* Index of iterator to advance */
+  int i;                        /* Index of aTree[] to recalculate */
+
+  /* Advance the current iterator */
+  rc = vdbeSorterIterNext(&pMerger->aIter[iPrev]);
+
+  /* Update contents of aTree[] */
+  for(i=(pMerger->nTree+iPrev)/2; rc==SQLITE_OK && i>0; i=i/2){
+    rc = vdbeSorterDoCompare(pThread, pMerger, i);
+  }
+
+  *pbEof = (pMerger->aIter[pMerger->aTree[1]].pFile==0);
+  return rc;
+}
+
+/*
+** The main routine for sorter-thread operations.
+*/
+static void *vdbeSorterThreadMain(void *pCtx){
+  int rc = SQLITE_OK;
+  SorterThread *pThread = (SorterThread*)pCtx;
+
+  assert( pThread->eWork==SORTER_THREAD_SORT
+       || pThread->eWork==SORTER_THREAD_TO_PMA
+       || pThread->eWork==SORTER_THREAD_CONS
+  );
+  assert( pThread->bDone==0 );
+
+  if( pThread->pUnpacked==0 ){
+    char *pFree;
+    pThread->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
+        pThread->pKeyInfo, 0, 0, &pFree
+    );
+    assert( pThread->pUnpacked==(UnpackedRecord*)pFree );
+    if( pFree==0 ){
+      rc = SQLITE_NOMEM;
+      goto thread_out;
+    }
+  }
+
+  if( pThread->eWork==SORTER_THREAD_CONS ){
+    assert( pThread->pList==0 );
+    while( pThread->nPMA>pThread->nConsolidate && rc==SQLITE_OK ){
+      int nIter = MIN(pThread->nPMA, SORTER_MAX_MERGE_COUNT);
+      sqlite3_file *pTemp2 = 0;     /* Second temp file to use */
+      SorterMerger *pMerger;        /* Object for reading/merging PMA data */
+      i64 iReadOff = 0;             /* Offset in pTemp1 to read from */
+      i64 iWriteOff = 0;            /* Offset in pTemp2 to write to */
+      int i;
+      
+      /* Allocate a merger object to merge PMAs together. */
+      pMerger = vdbeSorterMergerNew(nIter);
+      if( pMerger==0 ){
+        rc = SQLITE_NOMEM;
+        break;
+      }
+
+      /* Open a second temp file to write merged data to */
+      rc = vdbeSorterOpenTempFile(pThread->pVfs, &pTemp2);
+      if( rc!=SQLITE_OK ){
+        vdbeSorterMergerFree(pMerger);
+        break;
+      }
+
+      /* This loop runs once for each output PMA. Each output PMA is made
+      ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */
+      for(i=0; i<pThread->nPMA; i+=SORTER_MAX_MERGE_COUNT){
+        FileWriter writer;        /* Object for writing data to pTemp2 */
+        i64 nOut = 0;             /* Bytes of data in output PMA */
+        int bEof = 0;
+        int rc2;
+
+        /* Configure the merger object to read and merge data from the next 
+        ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs,
+        ** if that is fewer). */
+        int iIter;
+        for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
+          VdbeSorterIter *pIter = &pMerger->aIter[iIter];
+          rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nOut);
+          iReadOff = pIter->iEof;
+          if( iReadOff>=pThread->iTemp1Off || rc!=SQLITE_OK ) break;
+        }
+        for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
+          rc = vdbeSorterDoCompare(pThread, pMerger, iIter);
+        }
+
+        fileWriterInit(pTemp2, &writer, pThread->pgsz, iWriteOff);
+        fileWriterWriteVarint(&writer, nOut);
+        while( rc==SQLITE_OK && bEof==0 ){
+          VdbeSorterIter *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
+          assert( pIter->pFile!=0 );        /* pIter is not at EOF */
+          fileWriterWriteVarint(&writer, pIter->nKey);
+          fileWriterWrite(&writer, pIter->aKey, pIter->nKey);
+          rc = vdbeSorterNext(pThread, pMerger, &bEof);
+        }
+        rc2 = fileWriterFinish(&writer, &iWriteOff);
+        if( rc==SQLITE_OK ) rc = rc2;
+      }
+
+      vdbeSorterMergerFree(pMerger);
+      sqlite3OsCloseFree(pThread->pTemp1);
+      pThread->pTemp1 = pTemp2;
+      pThread->nPMA = (i / SORTER_MAX_MERGE_COUNT);
+      pThread->iTemp1Off = iWriteOff;
+    }
+  }else{
+    /* Sort the pThread->pList list */
+    rc = vdbeSorterSort(pThread);
+
+    /* If required, write the list out to a PMA. */
+    if( rc==SQLITE_OK && pThread->eWork==SORTER_THREAD_TO_PMA ){
+#ifdef SQLITE_DEBUG
+      i64 nExpect = pThread->nInMemory
+        + sqlite3VarintLen(pThread->nInMemory)
+        + pThread->iTemp1Off;
+#endif
+      rc = vdbeSorterListToPMA(pThread);
+      assert( rc!=SQLITE_OK || (nExpect==pThread->iTemp1Off) );
+    }
+  }
+
+ thread_out:
+  pThread->bDone = 1;
+  return SQLITE_INT_TO_PTR(rc);
+}
+
+/*
+** Run the activity scheduled by the object passed as the only argument
+** in the current thread.
+*/
+static int vdbeSorterRunThread(SorterThread *pThread){
+  int rc = SQLITE_PTR_TO_INT( vdbeSorterThreadMain((void*)pThread) );
+  assert( pThread->bDone );
+  pThread->bDone = 0;
+  return rc;
+}
+
+/*
+** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly
+** using a background thread.
+**
+** If argument bFg is non-zero, the operation always uses the calling thread.
+*/
+static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
+  VdbeSorter *pSorter = pCsr->pSorter;
+  int rc = SQLITE_OK;
+  int i;
+  SorterThread *pThread;        /* Thread context used to create new PMA */
+
+  pSorter->bUsePMA = 1;
+  for(i=0; ALWAYS( i<SQLITE_MAX_SORTER_THREAD ); i++){
+    pThread = &pSorter->aThread[i];
+    if( pThread->bDone ){
+      void *pRet;
+      assert( pThread->pThread );
+      rc = sqlite3ThreadJoin(pThread->pThread, &pRet);
+      pThread->pThread = 0;
+      pThread->bDone = 0;
+      if( rc==SQLITE_OK ){
+        rc = SQLITE_PTR_TO_INT(pRet);
+      }
+    }
+    if( pThread->pThread==0 ) break;
+  }
+
+  if( rc==SQLITE_OK ){
+    assert( pThread->pThread==0 && pThread->bDone==0 );
+    pThread->eWork = SORTER_THREAD_TO_PMA;
+    pThread->pList = pSorter->pRecord;
+    pThread->nInMemory = pSorter->nInMemory;
+    pSorter->nInMemory = 0;
+    pSorter->pRecord = 0;
+
+    if( bFg || i<(SQLITE_MAX_SORTER_THREAD-1) ){
+      void *pCtx = (void*)pThread;
+      rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSorterThreadMain, pCtx);
+    }else{
+      /* Use the foreground thread for this operation */
+      rc = vdbeSorterRunThread(pThread);
     }
-    pSorter->pRecord = p;
-    rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff);
   }
 
   return rc;
@@ -769,7 +1109,7 @@ int sqlite3VdbeSorterWrite(
   assert( pSorter );
   pSorter->nInMemory += sqlite3VarintLen(pVal->n) + pVal->n;
 
-  pNew = (SorterRecord *)sqlite3DbMallocRaw(db, pVal->n + sizeof(SorterRecord));
+  pNew = (SorterRecord *)sqlite3Malloc(pVal->n + sizeof(SorterRecord));
   if( pNew==0 ){
     rc = SQLITE_NOMEM;
   }else{
@@ -793,48 +1133,22 @@ int sqlite3VdbeSorterWrite(
         (pSorter->nInMemory>pSorter->mxPmaSize)
      || (pSorter->nInMemory>pSorter->mnPmaSize && sqlite3HeapNearlyFull())
   )){
-#ifdef SQLITE_DEBUG
-    i64 nExpect = pSorter->iWriteOff
-                + sqlite3VarintLen(pSorter->nInMemory)
-                + pSorter->nInMemory;
-#endif
-    rc = vdbeSorterListToPMA(db, pCsr);
-    pSorter->nInMemory = 0;
-    assert( rc!=SQLITE_OK || (nExpect==pSorter->iWriteOff) );
+    rc = vdbeSorterFlushPMA(db, pCsr, 0);
   }
 
   return rc;
 }
 
 /*
-** Helper function for sqlite3VdbeSorterRewind(). 
+** Return the total number of PMAs in all temporary files.
 */
-static int vdbeSorterInitMerge(
-  sqlite3 *db,                    /* Database handle */
-  const VdbeCursor *pCsr,         /* Cursor handle for this sorter */
-  i64 *pnByte                     /* Sum of bytes in all opened PMAs */
-){
-  VdbeSorter *pSorter = pCsr->pSorter;
-  int rc = SQLITE_OK;             /* Return code */
-  int i;                          /* Used to iterator through aIter[] */
-  i64 nByte = 0;                  /* Total bytes in all opened PMAs */
-
-  /* Initialize the iterators. */
-  for(i=0; i<SORTER_MAX_MERGE_COUNT; i++){
-    VdbeSorterIter *pIter = &pSorter->aIter[i];
-    rc = vdbeSorterIterInit(db, pSorter, pSorter->iReadOff, pIter, &nByte);
-    pSorter->iReadOff = pIter->iEof;
-    assert( rc!=SQLITE_OK || pSorter->iReadOff<=pSorter->iWriteOff );
-    if( rc!=SQLITE_OK || pSorter->iReadOff>=pSorter->iWriteOff ) break;
-  }
-
-  /* Initialize the aTree[] array. */
-  for(i=pSorter->nTree-1; rc==SQLITE_OK && i>0; i--){
-    rc = vdbeSorterDoCompare(pCsr, i);
+static int vdbeSorterCountPMA(VdbeSorter *pSorter){
+  int nPMA = 0;
+  int i;
+  for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+    nPMA += pSorter->aThread[i].nPMA;
   }
-
-  *pnByte = nByte;
-  return rc;
+  return nPMA;
 }
 
 /*
@@ -843,107 +1157,98 @@ static int vdbeSorterInitMerge(
 */
 int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
   VdbeSorter *pSorter = pCsr->pSorter;
-  int rc;                         /* Return code */
-  sqlite3_file *pTemp2 = 0;       /* Second temp file to use */
-  i64 iWrite2 = 0;                /* Write offset for pTemp2 */
-  int nIter;                      /* Number of iterators used */
-  int nByte;                      /* Bytes of space required for aIter/aTree */
-  int N = 2;                      /* Power of 2 >= nIter */
+  int rc = SQLITE_OK;             /* Return code */
 
   assert( pSorter );
 
   /* If no data has been written to disk, then do not do so now. Instead,
   ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
   ** from the in-memory list.  */
-  if( pSorter->nPMA==0 ){
-    *pbEof = !pSorter->pRecord;
-    assert( pSorter->aTree==0 );
-    return vdbeSorterSort(pCsr);
+  if( pSorter->bUsePMA==0 ){
+    if( pSorter->pRecord ){
+      SorterThread *pThread = &pSorter->aThread[0];
+      *pbEof = 0;
+      pThread->pList = pSorter->pRecord;
+      pThread->eWork = SORTER_THREAD_SORT;
+      rc = vdbeSorterRunThread(pThread);
+      pSorter->pRecord = pThread->pList;
+      pThread->pList = 0;
+    }else{
+      *pbEof = 1;
+    }
+    return rc;
   }
 
   /* Write the current in-memory list to a PMA. */
-  rc = vdbeSorterListToPMA(db, pCsr);
-  if( rc!=SQLITE_OK ) return rc;
+  if( pSorter->pRecord ){
+    rc = vdbeSorterFlushPMA(db, pCsr, 1);
+  }
 
-  /* Allocate space for aIter[] and aTree[]. */
-  nIter = pSorter->nPMA;
-  if( nIter>SORTER_MAX_MERGE_COUNT ) nIter = SORTER_MAX_MERGE_COUNT;
-  assert( nIter>0 );
-  while( N<nIter ) N += N;
-  nByte = N * (sizeof(int) + sizeof(VdbeSorterIter));
-  pSorter->aIter = (VdbeSorterIter *)sqlite3DbMallocZero(db, nByte);
-  if( !pSorter->aIter ) return SQLITE_NOMEM;
-  pSorter->aTree = (int *)&pSorter->aIter[N];
-  pSorter->nTree = N;
-
-  do {
-    int iNew;                     /* Index of new, merged, PMA */
-
-    for(iNew=0; 
-        rc==SQLITE_OK && iNew*SORTER_MAX_MERGE_COUNT<pSorter->nPMA; 
-        iNew++
-    ){
-      int rc2;                    /* Return code from fileWriterFinish() */
-      FileWriter writer;          /* Object used to write to disk */
-      i64 nWrite;                 /* Number of bytes in new PMA */
-
-      memset(&writer, 0, sizeof(FileWriter));
-
-      /* If there are SORTER_MAX_MERGE_COUNT or less PMAs in file pTemp1,
-      ** initialize an iterator for each of them and break out of the loop.
-      ** These iterators will be incrementally merged as the VDBE layer calls
-      ** sqlite3VdbeSorterNext().
-      **
-      ** Otherwise, if pTemp1 contains more than SORTER_MAX_MERGE_COUNT PMAs,
-      ** initialize interators for SORTER_MAX_MERGE_COUNT of them. These PMAs
-      ** are merged into a single PMA that is written to file pTemp2.
-      */
-      rc = vdbeSorterInitMerge(db, pCsr, &nWrite);
-      assert( rc!=SQLITE_OK || pSorter->aIter[ pSorter->aTree[1] ].pFile );
-      if( rc!=SQLITE_OK || pSorter->nPMA<=SORTER_MAX_MERGE_COUNT ){
-        break;
+  /* Join all threads */
+  rc = vdbeSorterJoinAll(pSorter, rc);
+
+  /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
+  ** some of them together so that this is no longer the case. */
+  assert( SORTER_MAX_MERGE_COUNT>=SQLITE_MAX_SORTER_THREAD );
+  if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
+    int i;
+    for(i=0; rc==SQLITE_OK && i<SQLITE_MAX_SORTER_THREAD; i++){
+      SorterThread *pThread = &pSorter->aThread[i];
+      if( pThread->pTemp1 ){
+        pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/SQLITE_MAX_SORTER_THREAD;
+        pThread->eWork = SORTER_THREAD_CONS;
+
+        if( i<(SQLITE_MAX_SORTER_THREAD-1) ){
+          void *pCtx = (void*)pThread;
+          rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx);
+        }else{
+          rc = vdbeSorterRunThread(pThread);
+        }
       }
+    }
+  }
 
-      /* Open the second temp file, if it is not already open. */
-      if( pTemp2==0 ){
-        assert( iWrite2==0 );
-        rc = vdbeSorterOpenTempFile(db, &pTemp2);
-      }
+  /* Join all threads */
+  rc = vdbeSorterJoinAll(pSorter, rc);
 
-      if( rc==SQLITE_OK ){
-        int bEof = 0;
-        fileWriterInit(db, pTemp2, &writer, iWrite2);
-        fileWriterWriteVarint(&writer, nWrite);
-        while( rc==SQLITE_OK && bEof==0 ){
-          VdbeSorterIter *pIter = &pSorter->aIter[ pSorter->aTree[1] ];
-          assert( pIter->pFile );
+  /* Assuming no errors have occurred, set up a merger structure to read
+  ** and merge all remaining PMAs.  */
+  assert( pSorter->pMerger==0 );
+  if( rc==SQLITE_OK ){
+    int nIter = 0;                /* Number of iterators used */
+    int i;
+    SorterMerger *pMerger;
+    for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+      nIter += pSorter->aThread[i].nPMA;
+    }
 
-          fileWriterWriteVarint(&writer, pIter->nKey);
-          fileWriterWrite(&writer, pIter->aKey, pIter->nKey);
-          rc = sqlite3VdbeSorterNext(db, pCsr, &bEof);
+    pSorter->pMerger = pMerger = vdbeSorterMergerNew(nIter);
+    if( pMerger==0 ){
+      rc = SQLITE_NOMEM;
+    }else{
+      int iIter = 0;
+      int iThread = 0;
+      for(iThread=0; iThread<SQLITE_MAX_SORTER_THREAD; iThread++){
+        int iPMA;
+        i64 iReadOff = 0;
+        SorterThread *pThread = &pSorter->aThread[iThread];
+        for(iPMA=0; iPMA<pThread->nPMA && rc==SQLITE_OK; iPMA++){
+          i64 nDummy = 0;
+          VdbeSorterIter *pIter = &pMerger->aIter[iIter++];
+          rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nDummy);
+          iReadOff = pIter->iEof;
         }
-        rc2 = fileWriterFinish(db, &writer, &iWrite2);
-        if( rc==SQLITE_OK ) rc = rc2;
       }
-    }
 
-    if( pSorter->nPMA<=SORTER_MAX_MERGE_COUNT ){
-      break;
-    }else{
-      sqlite3_file *pTmp = pSorter->pTemp1;
-      pSorter->nPMA = iNew;
-      pSorter->pTemp1 = pTemp2;
-      pTemp2 = pTmp;
-      pSorter->iWriteOff = iWrite2;
-      pSorter->iReadOff = 0;
-      iWrite2 = 0;
+      for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
+        rc = vdbeSorterDoCompare(&pSorter->aThread[0], pMerger, i);
+      }
     }
-  }while( rc==SQLITE_OK );
+  }
 
-  if( pTemp2 ){
-    sqlite3OsCloseFree(pTemp2);
+  if( rc==SQLITE_OK ){
+    *pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0);
   }
-  *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
   return rc;
 }
 
@@ -954,16 +1259,8 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
   VdbeSorter *pSorter = pCsr->pSorter;
   int rc;                         /* Return code */
 
-  if( pSorter->aTree ){
-    int iPrev = pSorter->aTree[1];/* Index of iterator to advance */
-    int i;                        /* Index of aTree[] to recalculate */
-
-    rc = vdbeSorterIterNext(db, &pSorter->aIter[iPrev]);
-    for(i=(pSorter->nTree+iPrev)/2; rc==SQLITE_OK && i>0; i=i/2){
-      rc = vdbeSorterDoCompare(pCsr, i);
-    }
-
-    *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
+  if( pSorter->pMerger ){
+    rc = vdbeSorterNext(&pSorter->aThread[0], pSorter->pMerger, pbEof);
   }else{
     SorterRecord *pFree = pSorter->pRecord;
     pSorter->pRecord = pFree->pNext;
@@ -984,9 +1281,9 @@ static void *vdbeSorterRowkey(
   int *pnKey                      /* OUT: Size of current key in bytes */
 ){
   void *pKey;
-  if( pSorter->aTree ){
+  if( pSorter->pMerger ){
     VdbeSorterIter *pIter;
-    pIter = &pSorter->aIter[ pSorter->aTree[1] ];
+    pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ];
     *pnKey = pIter->nKey;
     pKey = pIter->aKey;
   }else{
@@ -1031,9 +1328,10 @@ int sqlite3VdbeSorterCompare(
   int *pRes                       /* OUT: Result of comparison */
 ){
   VdbeSorter *pSorter = pCsr->pSorter;
+  SorterThread *pMain = &pSorter->aThread[0];
   void *pKey; int nKey;           /* Sorter key to compare pVal with */
 
   pKey = vdbeSorterRowkey(pSorter, &nKey);
-  vdbeSorterCompare(pCsr, nIgnore, pVal->z, pVal->n, pKey, nKey, pRes);
+  vdbeSorterCompare(pMain, nIgnore, pVal->z, pVal->n, pKey, nKey, pRes);
   return SQLITE_OK;
 }