]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Fix many issues with new code.
authordan <dan@noemail.net>
Sat, 12 Apr 2014 19:34:44 +0000 (19:34 +0000)
committerdan <dan@noemail.net>
Sat, 12 Apr 2014 19:34:44 +0000 (19:34 +0000)
FossilOrigin-Name: 62c406a042d7246f6df6b943421182a88483b2e3

manifest
manifest.uuid
src/vdbesort.c

index f48df9c72d7e7f4c9162be875ab004bc816853d6..bca9ab362eb9dc2c927cbe3ce0e2f028c73be2d1 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Avoid\shaving\sthe\ssorter\smerge\stoo\smany\sPMAs\sat\sa\stime\swhen\sincrementally\smerging\sdata\sfollowing\sa\sSorterRewind().
-D 2014-04-11T19:43:07.755
+C Fix\smany\sissues\swith\snew\scode.
+D 2014-04-12T19:34:44.467
 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
 F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125
 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4
 F src/vdbeaux.c d8dc38965507a34b0e150c0d7fc82b02f8cf25ea
 F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa
 F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447
-F src/vdbesort.c 2984e3624383adf9c762558b8f85a17a626c11a7
+F src/vdbesort.c bc0d90e00abcc88997f463d4d41b7ba4a10cfd88
 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767
 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd
 F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8
@@ -1163,7 +1163,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1
 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01
 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff
-P f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f
-R f6c598c1c558c5930404cda096730209
+P 98bf0307b121b0776a7170108cc8d3f948a7ebfe
+R f3107fdb117ba86f9bee4609a5b08bfd
 U dan
-Z 3e9d4ee1a6e7b343cf831d1b18651067
+Z 5a1b16a83fca264558f06f5fb6536949
index b1943056c17dae2d7c7cf79902bfaa39dbf981f2..e7ab0df5c2b0322f133c5dec1d923d4687964276 100644 (file)
@@ -1 +1 @@
-98bf0307b121b0776a7170108cc8d3f948a7ebfe
\ No newline at end of file
+62c406a042d7246f6df6b943421182a88483b2e3
\ No newline at end of file
index 16f6c618c6354229e7c307757d87b721b7179dd8..1889c8fde175a2c2f0102f86bf9acdf79007f0fd 100644 (file)
 #include "sqliteInt.h"
 #include "vdbeInt.h"
 
+/* 
+** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
+** messages to stderr that may be helpful in understanding the performance
+** characteristics of the sorter in multi-threaded mode.
+*/
+#if 0
+# define SQLITE_DEBUG_SORTER_THREADS 1
+#endif
+
 /*
 ** Private objects used by the sorter
 */
@@ -97,19 +106,48 @@ typedef struct PmaWriter PmaWriter;         /* Incrementally write on PMA */
 typedef struct SorterRecord SorterRecord;   /* A record being sorted */
 typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
 typedef struct SorterFile SorterFile;
+typedef struct SorterThread SorterThread;
+typedef struct SorterList SorterList;
 typedef struct IncrMerger IncrMerger;
 
+/*
+** A container for a temp file handle and the current amount of data 
+** stored in the file.
+*/
+struct SorterFile {
+  sqlite3_file *pFd;              /* File handle */
+  i64 iEof;                       /* Bytes of data stored in pFd */
+};
 
 /*
-** Candidate values for SortSubtask.eWork
+** An object of this type is used to store the thread handle for each 
+** background thread launched by the sorter. Before the thread is launched,
+** variable bDone is set to 0. Then, right before it exits, the thread 
+** itself sets bDone to 1.
+**
+** This is then used for two purposes:
+**
+**   1. When flushing the contents of memory to a level-0 PMA on disk, to
+**      attempt to select a SortSubtask for which there is not already an
+**      active background thread (since doing so causes the main thread
+**      to block until it finishes).
+**
+**   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
+**      to sqlite3ThreadJoin() is likely to block.
+**
+** In both cases, the effects of the main thread seeing (bDone==0) even
+** after the thread has finished are not dire. So we don't worry about
+** memory barriers and such here.
 */
-#define SORT_SUBTASK_SORT   1     /* Sort records on pList */
-#define SORT_SUBTASK_TO_PMA 2     /* Xfer pList to Packed-Memory-Array pTemp1 */
-#define SORT_SUBTASK_CONS   3     /* Consolidate multiple PMAs */
+struct SorterThread {
+  SQLiteThread *pThread;
+  int bDone;
+};
 
-struct SorterFile {
-  sqlite3_file *pFd;
-  i64 iEof;
+struct SorterList {
+  SorterRecord *pList;            /* Linked list of records */
+  u8 *aMemory;                    /* If non-NULL, blob of memory for pList */
+  int szPMA;                      /* Size of pList as PMA in bytes */
 };
 
 /*
@@ -148,21 +186,13 @@ struct SorterFile {
 **     remain in temp file SortSubtask.pTemp1.
 */
 struct SortSubtask {
-  SQLiteThread *pThread;          /* Thread handle, or NULL */
-  int bDone;                      /* Set to true by pTask when finished */
-
+  SorterThread thread;
   sqlite3 *db;                    /* Database connection */
   VdbeSorter *pSorter;            /* Sorter */
   KeyInfo *pKeyInfo;              /* How to compare records */
   UnpackedRecord *pUnpacked;      /* Space to unpack a record */
   int pgsz;                       /* Main database page size */
-
-  u8 eWork;                       /* One of the SORT_SUBTASK_* constants */
-  int nConsolidate;               /* For SORT_SUBTASK_CONS, max final PMAs */
-  SorterRecord *pList;            /* List of records for pTask to sort */
-  int nInMemory;                  /* Expected size of PMA based on pList */
-  u8 *aListMemory;                /* Records memory (or NULL) */
-
+  SorterList list;                /* List for thread to write to a PMA */
   int nPMA;                       /* Number of PMAs currently in file */
   SorterFile file;                /* Temp file for level-0 PMAs */
   SorterFile file2;               /* Space for other PMAs */
@@ -248,16 +278,19 @@ struct MergeEngine {
 **   largest record in the sorter.
 */
 struct VdbeSorter {
-  int nInMemory;                  /* Current size of pRecord list as PMA */
   int mnPmaSize;                  /* Minimum PMA size, in bytes */
   int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
   int bUsePMA;                    /* True if one or more PMAs created */
   int bUseThreads;                /* True if one or more PMAs created */
-  SorterRecord *pRecord;          /* Head of in-memory record list */
   PmaReader *pReader;             /* Read data from here after Rewind() */
   int mxKeysize;                  /* Largest serialized key seen so far */
   UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
+#if 0
+  int nInMemory;                  /* Current size of pRecord list as PMA */
+  SorterRecord *pRecord;          /* Head of in-memory record list */
   u8 *aMemory;                    /* Block of memory to alloc records from */
+#endif
+  SorterList list;
   int iMemory;                    /* Offset of first free byte in aMemory */
   int nMemory;                    /* Size of aMemory allocation in bytes */
   int iPrev;                      /* Previous thread used to flush PMA */
@@ -292,7 +325,7 @@ struct PmaReader {
 */
 struct IncrMerger {
   SortSubtask *pTask;             /* Task that owns this merger */
-  SQLiteThread *pThread;          /* Thread currently populating aFile[1] */
+  SorterThread thread;            /* Thread for populating aFile[1] */
   MergeEngine *pMerger;           /* Merge engine thread reads data from */
   i64 iStartOff;                  /* Offset to start writing file at */
   int mxSz;                       /* Maximum bytes of data to store */
@@ -787,8 +820,8 @@ int sqlite3VdbeSorterInit(
       if( sqlite3GlobalConfig.pHeap==0 ){
         assert( pSorter->iMemory==0 );
         pSorter->nMemory = pgsz;
-        pSorter->aMemory = (u8*)sqlite3Malloc(pgsz);
-        if( !pSorter->aMemory ) rc = SQLITE_NOMEM;
+        pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
+        if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM;
       }
     }
   }
@@ -815,13 +848,13 @@ static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
 static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
   sqlite3DbFree(db, pTask->pUnpacked);
   pTask->pUnpacked = 0;
-  if( pTask->aListMemory==0 ){
-    vdbeSorterRecordFree(0, pTask->pList);
+  if( pTask->list.aMemory==0 ){
+    vdbeSorterRecordFree(0, pTask->list.pList);
   }else{
-    sqlite3_free(pTask->aListMemory);
-    pTask->aListMemory = 0;
+    sqlite3_free(pTask->list.aMemory);
+    pTask->list.aMemory = 0;
   }
-  pTask->pList = 0;
+  pTask->list.pList = 0;
   if( pTask->file.pFd ){
     sqlite3OsCloseFree(pTask->file.pFd);
     pTask->file.pFd = 0;
@@ -834,28 +867,96 @@ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
   }
 }
 
+#ifdef SQLITE_DEBUG_SORTER_THREADS
+static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
+  i64 t;
+  int iTask = (pTask - pTask->pSorter->aTask);
+  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
+  fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
+}
+static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
+  i64 t;
+  sqlite3OsCurrentTimeInt64(db->pVfs, &t);
+  fprintf(stderr, "%lld:X %s\n", t, zEvent);
+}
+static void vdbeSorterPopulateDebug(
+  SortSubtask *pTask,
+  const char *zEvent
+){
+  i64 t;
+  int iTask = (pTask - pTask->pSorter->aTask);
+  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
+  fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
+}
+static void vdbeSorterBlockDebug(
+  SortSubtask *pTask,
+  int bBlocked,
+  const char *zEvent
+){
+  if( bBlocked ){
+    i64 t;
+    sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
+    fprintf(stderr, "%lld:main %s\n", t, zEvent);
+  }
+}
+#else
+# define vdbeSorterWorkDebug(x,y)
+# define vdbeSorterRewindDebug(x,y)
+# define vdbeSorterPopulateDebug(x,y)
+# define vdbeSorterBlockDebug(x,y,z)
+#endif
+
+#if SQLITE_MAX_WORKER_THREADS>0
 /*
-** Join all threads.  
+** Join thread p.
+*/
+static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){
+  int rc = SQLITE_OK;
+  if( p->pThread ){
+#ifdef SQLITE_DEBUG_SORTER_THREADS
+    int bDone = p->bDone;
+#endif
+    void *pRet;
+    vdbeSorterBlockDebug(pTask, !bDone, "enter");
+    rc = sqlite3ThreadJoin(p->pThread, &pRet);
+    vdbeSorterBlockDebug(pTask, !bDone, "exit");
+    if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+    assert( p->bDone==1 );
+    p->bDone = 0;
+    p->pThread = 0;
+  }
+  return rc;
+}
+
+/*
+** Launch a background thread to run xTask(pIn).
+*/
+static int vdbeSorterCreateThread(
+  SorterThread *p,                /* Thread object to populate */
+  void *(*xTask)(void*),          /* Routine to run in a separate thread */
+  void *pIn                       /* Argument passed into xTask() */
+){
+  assert( p->pThread==0 && p->bDone==0 );
+  return sqlite3ThreadCreate(&p->pThread, xTask, pIn);
+}
+
+/*
+** Join all outstanding threads launched by SorterWrite() to create 
+** level-0 PMAs.
 */
-#if SQLITE_MAX_WORKER_THREADS>0
 static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
   int rc = rcin;
   int i;
   for(i=0; i<pSorter->nTask; i++){
     SortSubtask *pTask = &pSorter->aTask[i];
-    if( pTask->pThread ){
-      void *pRet;
-      int rc2 = sqlite3ThreadJoin(pTask->pThread, &pRet);
-      pTask->pThread = 0;
-      pTask->bDone = 0;
-      if( rc==SQLITE_OK ) rc = rc2;
-      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
-    }
+    int rc2 = vdbeSorterJoinThread(pTask, &pTask->thread);
+    if( rc==SQLITE_OK ) rc = rc2;
   }
   return rc;
 }
 #else
 # define vdbeSorterJoinAll(x,rcin) (rcin)
+# define vdbeSorterJoinThread(pTask,p) SQLITE_OK
 #endif
 
 /*
@@ -908,11 +1009,11 @@ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
     SortSubtask *pTask = &pSorter->aTask[i];
     vdbeSortSubtaskCleanup(db, pTask);
   }
-  if( pSorter->aMemory==0 ){
-    vdbeSorterRecordFree(0, pSorter->pRecord);
+  if( pSorter->list.aMemory==0 ){
+    vdbeSorterRecordFree(0, pSorter->list.pList);
   }
-  pSorter->pRecord = 0;
-  pSorter->nInMemory = 0;
+  pSorter->list.pList = 0;
+  pSorter->list.szPMA = 0;
   pSorter->bUsePMA = 0;
   pSorter->iMemory = 0;
   pSorter->mxKeysize = 0;
@@ -927,7 +1028,7 @@ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
   VdbeSorter *pSorter = pCsr->pSorter;
   if( pSorter ){
     sqlite3VdbeSorterReset(db, pSorter);
-    sqlite3_free(pSorter->aMemory);
+    sqlite3_free(pSorter->list.aMemory);
     sqlite3DbFree(db, pSorter);
     pCsr->pSorter = 0;
   }
@@ -952,6 +1053,21 @@ static int vdbeSorterOpenTempFile(sqlite3_vfs *pVfs, sqlite3_file **ppFile){
   return rc;
 }
 
+static int vdbeSortAllocUnpacked(SortSubtask *pTask){
+  if( pTask->pUnpacked==0 ){
+    char *pFree;
+    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
+        pTask->pKeyInfo, 0, 0, &pFree
+    );
+    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
+    if( pFree==0 ) return SQLITE_NOMEM;
+    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
+    pTask->pUnpacked->errCode = 0;
+  }
+  return SQLITE_OK;
+}
+
+
 /*
 ** Merge the two sorted lists p1 and p2 into a single list.
 ** Set *ppOut to the head of the new list.
@@ -991,25 +1107,29 @@ static void vdbeSorterMerge(
 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
 ** an error occurs.
 */
-static int vdbeSorterSort(SortSubtask *pTask){
+static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
   int i;
   SorterRecord **aSlot;
   SorterRecord *p;
+  int rc;
+
+  rc = vdbeSortAllocUnpacked(pTask);
+  if( rc!=SQLITE_OK ) return rc;
 
   aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
   if( !aSlot ){
     return SQLITE_NOMEM;
   }
 
-  p = pTask->pList;
+  p = pList->pList;
   while( p ){
     SorterRecord *pNext;
-    if( pTask->aListMemory ){
-      if( (u8*)p==pTask->aListMemory ){
+    if( pList->aMemory ){
+      if( (u8*)p==pList->aMemory ){
         pNext = 0;
       }else{
-        assert( p->u.iNext<sqlite3MallocSize(pTask->aListMemory) );
-        pNext = (SorterRecord*)&pTask->aListMemory[p->u.iNext];
+        assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
+        pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
       }
     }else{
       pNext = p->u.pNext;
@@ -1028,9 +1148,13 @@ static int vdbeSorterSort(SortSubtask *pTask){
   for(i=0; i<64; i++){
     vdbeSorterMerge(pTask, p, aSlot[i], &p);
   }
-  pTask->pList = p;
+  pList->pList = p;
 
   sqlite3_free(aSlot);
+  if( pTask->pUnpacked->errCode ){
+    assert( pTask->pUnpacked->errCode==SQLITE_NOMEM );
+    return SQLITE_NOMEM;
+  }
   return SQLITE_OK;
 }
 
@@ -1144,8 +1268,9 @@ static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){
 
 
 /*
-** Write the current contents of the in-memory linked-list to a PMA. Return
-** SQLITE_OK if successful, or an SQLite error code otherwise.
+** Write the current contents of in-memory linked-list pList to a level-0
+** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if 
+** successful, or an SQLite error code otherwise.
 **
 ** The format of a PMA is:
 **
@@ -1156,12 +1281,19 @@ static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){
 **       Each record consists of a varint followed by a blob of data (the 
 **       key). The varint is the number of bytes in the blob of data.
 */
-static int vdbeSorterListToPMA(SortSubtask *pTask){
+static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
   int rc = SQLITE_OK;             /* Return code */
   PmaWriter writer;               /* Object used to write to the file */
 
+#ifdef SQLITE_DEBUG
+  /* Set iSz to the expected size of file pTask->file after writing the PMA. 
+  ** This is used by an assert() statement at the end of this function.  */
+  i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
+#endif
+
+  vdbeSorterWorkDebug(pTask, "enter");
   memset(&writer, 0, sizeof(PmaWriter));
-  assert( pTask->nInMemory>0 );
+  assert( pList->szPMA>0 );
 
   /* If the first temporary PMA file has not been opened, open it now. */
   if( pTask->file.pFd==0 ){
@@ -1174,10 +1306,15 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){
   /* Try to get the file to memory map */
   if( rc==SQLITE_OK ){
     vdbeSorterExtendFile(pTask->db, 
-        pTask->file.pFd, pTask->file.iEof + pTask->nInMemory + 9
+        pTask->file.pFd, pTask->file.iEof + pList->szPMA + 9
     );
   }
 
+  /* Sort the list */
+  if( rc==SQLITE_OK ){
+    rc = vdbeSorterSort(pTask, pList);
+  }
+
   if( rc==SQLITE_OK ){
     SorterRecord *p;
     SorterRecord *pNext = 0;
@@ -1185,18 +1322,20 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){
     vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz,
                       pTask->file.iEof);
     pTask->nPMA++;
-    vdbePmaWriteVarint(&writer, pTask->nInMemory);
-    for(p=pTask->pList; p; p=pNext){
+    vdbePmaWriteVarint(&writer, pList->szPMA);
+    for(p=pList->pList; p; p=pNext){
       pNext = p->u.pNext;
       vdbePmaWriteVarint(&writer, p->nVal);
       vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
-      if( pTask->aListMemory==0 ) sqlite3_free(p);
+      if( pList->aMemory==0 ) sqlite3_free(p);
     }
-    pTask->pList = p;
+    pList->pList = p;
     rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
   }
 
-  assert( pTask->pList==0 || rc!=SQLITE_OK );
+  vdbeSorterWorkDebug(pTask, "exit");
+  assert( rc!=SQLITE_OK || pList->pList==0 );
+  assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
   return rc;
 }
 
@@ -1275,249 +1414,84 @@ static int vdbeSorterNext(
   return rc;
 }
 
-#if 0
-static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
-  i64 t;
-  int iTask = (pTask - pTask->pSorter->aTask);
-  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
-  fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
-}
-static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
-  i64 t;
-  sqlite3OsCurrentTimeInt64(db->pVfs, &t);
-  fprintf(stderr, "%lld:X %s\n", t, zEvent);
-}
-static void vdbeSorterPopulateDebug(
-  SortSubtask *pTask,
-  const char *zEvent
-){
-  i64 t;
-  int iTask = (pTask - pTask->pSorter->aTask);
-  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
-  fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
-}
-#else
-# define vdbeSorterWorkDebug(x,y)
-# define vdbeSorterRewindDebug(x,y)
-# define vdbeSorterPopulateDebug(x,y)
-#endif
-
-static int vdbeSortAllocUnpacked(SortSubtask *pTask){
-  if( pTask->pUnpacked==0 ){
-    char *pFree;
-    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
-        pTask->pKeyInfo, 0, 0, &pFree
-    );
-    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
-    if( pFree==0 ) return SQLITE_NOMEM;
-    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
-    pTask->pUnpacked->errCode = 0;
-  }
-  return SQLITE_OK;
-}
-
 /*
 ** The main routine for sorter-thread operations.
 */
-static void *vdbeSortSubtaskMain(void *pCtx){
-  int rc = SQLITE_OK;
+static void *vdbeSorterFlushThread(void *pCtx){
   SortSubtask *pTask = (SortSubtask*)pCtx;
-
-  assert( pTask->eWork==SORT_SUBTASK_SORT
-       || pTask->eWork==SORT_SUBTASK_TO_PMA
-       || pTask->eWork==SORT_SUBTASK_CONS
-  );
-  assert( pTask->bDone==0 );
-
-  vdbeSorterWorkDebug(pTask, "enter");
-
-  rc = vdbeSortAllocUnpacked(pTask);
-  if( rc!=SQLITE_OK ) goto thread_out;
-
-  if( pTask->eWork==SORT_SUBTASK_CONS ){
-    assert( pTask->pList==0 );
-    while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){
-      int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT);
-      sqlite3_file *pTemp2 = 0;     /* Second temp file to use */
-      MergeEngine *pMerger;         /* Object for reading/merging PMA data */
-      i64 iReadOff = 0;             /* Offset in pTemp1 to read from */
-      i64 iWriteOff = 0;            /* Offset in pTemp2 to write to */
-      int i;
-      
-      /* Allocate a merger object to merge PMAs together. */
-      pMerger = vdbeMergeEngineNew(nIter);
-      if( pMerger==0 ){
-        rc = SQLITE_NOMEM;
-        break;
-      }
-
-      /* Open a second temp file to write merged data to */
-      rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2);
-      if( rc==SQLITE_OK ){
-        vdbeSorterExtendFile(pTask->db, pTemp2, pTask->file.iEof);
-      }else{
-        vdbeMergeEngineFree(pMerger);
-        break;
-      }
-
-      /* This loop runs once for each output PMA. Each output PMA is made
-      ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */
-      for(i=0; rc==SQLITE_OK && i<pTask->nPMA; i+=SORTER_MAX_MERGE_COUNT){
-        PmaWriter writer;         /* Object for writing data to pTemp2 */
-        i64 nOut = 0;             /* Bytes of data in output PMA */
-        int bEof = 0;
-        int rc2;
-
-        /* Configure the merger object to read and merge data from the next 
-        ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs,
-        ** if that is fewer). */
-        int iIter;
-        for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
-          PmaReader *pIter = &pMerger->aIter[iIter];
-          rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nOut);
-          iReadOff = pIter->iEof;
-          if( iReadOff>=pTask->file.iEof || rc!=SQLITE_OK ) break;
-        }
-        for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
-          rc = vdbeSorterDoCompare(pTask, pMerger, iIter);
-        }
-
-        vdbePmaWriterInit(pTemp2, &writer, pTask->pgsz, iWriteOff);
-        vdbePmaWriteVarint(&writer, nOut);
-        while( rc==SQLITE_OK && bEof==0 ){
-          PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
-          assert( pIter->pFile!=0 );        /* pIter is not at EOF */
-          vdbePmaWriteVarint(&writer, pIter->nKey);
-          vdbePmaWriteBlob(&writer, pIter->aKey, pIter->nKey);
-          rc = vdbeSorterNext(pTask, pMerger, &bEof);
-        }
-        rc2 = vdbePmaWriterFinish(&writer, &iWriteOff);
-        if( rc==SQLITE_OK ) rc = rc2;
-      }
-
-      vdbeMergeEngineFree(pMerger);
-      sqlite3OsCloseFree(pTask->file.pFd);
-      pTask->file.pFd = pTemp2;
-      pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT);
-      pTask->file.iEof = iWriteOff;
-    }
-  }else{
-    /* Sort the pTask->pList list */
-    rc = vdbeSorterSort(pTask);
-
-    /* If required, write the list out to a PMA. */
-    if( rc==SQLITE_OK && pTask->eWork==SORT_SUBTASK_TO_PMA ){
-#ifdef SQLITE_DEBUG
-      i64 nExpect = pTask->nInMemory
-        + sqlite3VarintLen(pTask->nInMemory)
-        + pTask->file.iEof;
-#endif
-      rc = vdbeSorterListToPMA(pTask);
-      assert( rc!=SQLITE_OK || (nExpect==pTask->file.iEof) );
-    }
-  }
-
- thread_out:
-  pTask->bDone = 1;
-  if( rc==SQLITE_OK && pTask->pUnpacked->errCode ){
-    assert( pTask->pUnpacked->errCode==SQLITE_NOMEM );
-    rc = SQLITE_NOMEM;
-  }
-  vdbeSorterWorkDebug(pTask, "exit");
+  int rc;                         /* Return code */
+  assert( pTask->thread.bDone==0 );
+  rc = vdbeSorterListToPMA(pTask, &pTask->list);
+  pTask->thread.bDone = 1;
   return SQLITE_INT_TO_PTR(rc);
 }
 
 /*
-** Run the activity scheduled by the object passed as the only argument
-** in the current thread.
-*/
-static int vdbeSorterRunTask(SortSubtask *pTask){
-  int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pTask) );
-  assert( pTask->bDone );
-  pTask->bDone = 0;
-  return rc;
-}
-
-/*
-** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly
+** Flush the current contents of VdbeSorter.list to a new PMA, possibly
 ** using a background thread.
-**
-** If argument bFg is non-zero, the operation always uses the calling thread.
 */
-static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
-  VdbeSorter *pSorter = pCsr->pSorter;
+static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
+#if SQLITE_MAX_WORKER_THREADS==0
+  pSorter->bUsePMA = 1;
+  return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
+#else
   int rc = SQLITE_OK;
   int i;
   SortSubtask *pTask = 0;    /* Thread context used to create new PMA */
   int nWorker = (pSorter->nTask-1);
 
+  /* Set the flag to indicate that at least one PMA has been written. 
+  ** Or will be, anyhow.  */
   pSorter->bUsePMA = 1;
+
+  /* Select a sub-task to sort and flush the current list of in-memory
+  ** records to disk. If the sorter is running in multi-threaded mode,
+  ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
+  ** the background thread from a sub-tasks previous turn is still running,
+  ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
+  ** fall back to using the final sub-task. The first (pSorter->nTask-1)
+  ** sub-tasks are prefered as they use background threads - the final 
+  ** sub-task uses the main thread. */
   for(i=0; i<nWorker; i++){
     int iTest = (pSorter->iPrev + i + 1) % nWorker;
     pTask = &pSorter->aTask[iTest];
-#if SQLITE_MAX_WORKER_THREADS>0
-    if( pTask->bDone ){
-      void *pRet;
-      assert( pTask->pThread );
-      rc = sqlite3ThreadJoin(pTask->pThread, &pRet);
-      pTask->pThread = 0;
-      pTask->bDone = 0;
-      if( rc==SQLITE_OK ){
-        rc = SQLITE_PTR_TO_INT(pRet);
-      }
+    if( pTask->thread.bDone ){
+      rc = vdbeSorterJoinThread(pTask, &pTask->thread);
     }
-#endif
-    if( pTask->pThread==0 ) break;
-    pTask = 0;
-  }
-  if( pTask==0 ){
-    pTask = &pSorter->aTask[nWorker];
+    if( pTask->thread.pThread==0 || rc!=SQLITE_OK ) break;
   }
-  pSorter->iPrev = (pTask - pSorter->aTask);
 
   if( rc==SQLITE_OK ){
-    assert( pTask->pThread==0 && pTask->bDone==0 );
-    pTask->eWork = SORT_SUBTASK_TO_PMA;
-    pTask->pList = pSorter->pRecord;
-    pTask->nInMemory = pSorter->nInMemory;
-    pSorter->nInMemory = 0;
-    pSorter->pRecord = 0;
-
-    if( pSorter->aMemory ){
-      u8 *aMem = pTask->aListMemory;
-      pTask->aListMemory = pSorter->aMemory;
-      pSorter->aMemory = aMem;
-    }
-
-#if SQLITE_MAX_WORKER_THREADS>0
-    if( !bFg && pTask!=&pSorter->aTask[nWorker] ){
+    if( i==nWorker ){
+      /* Use the foreground thread for this operation */
+      rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
+    }else{
       /* Launch a background thread for this operation */
+      u8 *aMem = pTask->list.aMemory;
       void *pCtx = (void*)pTask;
-      assert( pSorter->aMemory==0 || pTask->aListMemory!=0 );
-      if( pTask->aListMemory ){
-        if( pSorter->aMemory==0 ){
-          pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
-          if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
-        }else{
-          pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory);
-        }
-      }
-      rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx);
-    }else
-#endif
-    {
-      /* Use the foreground thread for this operation */
-      rc = vdbeSorterRunTask(pTask);
-      if( rc==SQLITE_OK ){
-        u8 *aMem = pTask->aListMemory;
-        pTask->aListMemory = pSorter->aMemory;
-        pSorter->aMemory = aMem;
-        assert( pTask->pList==0 );
+
+      assert( pTask->thread.pThread==0 && pTask->thread.bDone==0 );
+      assert( pTask->list.pList==0 );
+      assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
+
+      pSorter->iPrev = (pTask - pSorter->aTask);
+      pTask->list = pSorter->list;
+      pSorter->list.pList = 0;
+      pSorter->list.szPMA = 0;
+      if( aMem ){
+        pSorter->list.aMemory = aMem;
+        pSorter->nMemory = sqlite3MallocSize(aMem);
+      }else{
+        pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
+        if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
       }
+
+      rc = vdbeSorterCreateThread(&pTask->thread, vdbeSorterFlushThread, pCtx);
     }
   }
 
   return rc;
+#endif
 }
 
 /*
@@ -1557,28 +1531,28 @@ int sqlite3VdbeSorterWrite(
   nReq = pVal->n + sizeof(SorterRecord);
   nPMA = pVal->n + sqlite3VarintLen(pVal->n);
   if( pSorter->mxPmaSize ){
-    if( pSorter->aMemory ){
+    if( pSorter->list.aMemory ){
       bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
     }else{
       bFlush = (
-          (pSorter->nInMemory > pSorter->mxPmaSize)
-       || (pSorter->nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
+          (pSorter->list.szPMA > pSorter->mxPmaSize)
+       || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
       );
     }
     if( bFlush ){
-      rc = vdbeSorterFlushPMA(db, pCsr, 0);
-      pSorter->nInMemory = 0;
+      rc = vdbeSorterFlushPMA(pSorter);
+      pSorter->list.szPMA = 0;
       pSorter->iMemory = 0;
-      assert( rc!=SQLITE_OK || pSorter->pRecord==0 );
+      assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
     }
   }
 
-  pSorter->nInMemory += nPMA;
+  pSorter->list.szPMA += nPMA;
   if( nPMA>pSorter->mxKeysize ){
     pSorter->mxKeysize = nPMA;
   }
 
-  if( pSorter->aMemory ){
+  if( pSorter->list.aMemory ){
     int nMin = pSorter->iMemory + nReq;
 
     if( nMin>pSorter->nMemory ){
@@ -1588,45 +1562,33 @@ int sqlite3VdbeSorterWrite(
       if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
       if( nNew < nMin ) nNew = nMin;
 
-      aNew = sqlite3Realloc(pSorter->aMemory, nNew);
+      aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
       if( !aNew ) return SQLITE_NOMEM;
-      pSorter->pRecord = (SorterRecord*)(
-          aNew + ((u8*)pSorter->pRecord - pSorter->aMemory)
+      pSorter->list.pList = (SorterRecord*)(
+          aNew + ((u8*)pSorter->list.pList - pSorter->list.aMemory)
       );
-      pSorter->aMemory = aNew;
+      pSorter->list.aMemory = aNew;
       pSorter->nMemory = nNew;
     }
 
-    pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory];
+    pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
     pSorter->iMemory += ROUND8(nReq);
-    pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory;
+    pNew->u.iNext = (u8*)(pSorter->list.pList) - pSorter->list.aMemory;
   }else{
     pNew = (SorterRecord *)sqlite3Malloc(nReq);
     if( pNew==0 ){
       return SQLITE_NOMEM;
     }
-    pNew->u.pNext = pSorter->pRecord;
+    pNew->u.pNext = pSorter->list.pList;
   }
 
   memcpy(SRVAL(pNew), pVal->z, pVal->n);
   pNew->nVal = pVal->n;
-  pSorter->pRecord = pNew;
+  pSorter->list.pList = pNew;
 
   return rc;
 }
 
-/*
-** Return the total number of PMAs in all temporary files.
-*/
-static int vdbeSorterCountPMA(VdbeSorter *pSorter){
-  int nPMA = 0;
-  int i;
-  for(i=0; i<pSorter->nTask; i++){
-    nPMA += pSorter->aTask[i].nPMA;
-  }
-  return nPMA;
-}
-
 /*
 ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
 ** of the data stored in aFile[1] is the same as that used by regular PMAs,
@@ -1667,39 +1629,27 @@ static int vdbeIncrPopulate(IncrMerger *pIncr){
   return rc;
 }
 
-static void *vdbeIncrPopulateThreadMain(void *pCtx){
+static void *vdbeIncrPopulateThread(void *pCtx){
   IncrMerger *pIncr = (IncrMerger*)pCtx;
-  return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
+  void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
+  pIncr->thread.bDone = 1;
+  return pRet;
 }
 
-static int vdbeIncrBgPopulate(IncrMerger *pIncr){
-  int rc;
-  assert( pIncr->pThread==0 );
-  if( pIncr->bUseThread==0 ){
-    rc = vdbeIncrPopulate(pIncr);
-  }
 #if SQLITE_MAX_WORKER_THREADS>0
-  else{
-    void *pCtx = (void*)pIncr;
-    rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
-  }
-#endif
-  return rc;
+static int vdbeIncrBgPopulate(IncrMerger *pIncr){
+  void *pCtx = (void*)pIncr;
+  assert( pIncr->bUseThread );
+  return vdbeSorterCreateThread(&pIncr->thread, vdbeIncrPopulateThread, pCtx);
 }
+#endif
 
 static int vdbeIncrSwap(IncrMerger *pIncr){
   int rc = SQLITE_OK;
 
-  if( pIncr->bUseThread ){
 #if SQLITE_MAX_WORKER_THREADS>0
-    if( pIncr->pThread ){
-      void *pRet;
-      assert( pIncr->bUseThread );
-      rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
-      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
-      pIncr->pThread = 0;
-    }
-#endif
+  if( pIncr->bUseThread ){
+    rc = vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
 
     if( rc==SQLITE_OK ){
       SorterFile f0 = pIncr->aFile[0];
@@ -1714,7 +1664,9 @@ static int vdbeIncrSwap(IncrMerger *pIncr){
         rc = vdbeIncrBgPopulate(pIncr);
       }
     }
-  }else{
+  }else
+#endif
+  {
     rc = vdbeIncrPopulate(pIncr);
     pIncr->aFile[0] = pIncr->aFile[1];
     if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
@@ -1728,10 +1680,7 @@ static int vdbeIncrSwap(IncrMerger *pIncr){
 static void vdbeIncrFree(IncrMerger *pIncr){
   if( pIncr ){
 #if SQLITE_MAX_WORKER_THREADS>0
-    if( pIncr->pThread ){
-      void *pRet;
-      sqlite3ThreadJoin(pIncr->pThread, &pRet);
-    }
+    vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
     if( pIncr->bUseThread ){
       if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
       if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
@@ -1750,18 +1699,6 @@ static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
     pIncr->pTask = pTask;
     pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
     pTask->file2.iEof += pIncr->mxSz;
-
-#if 0
-    /* Open the two temp files. */
-    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
-    if( rc==SQLITE_OK ){
-      rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
-    }
-    if( rc!=SQLITE_OK ){
-      vdbeIncrFree(pIncr);
-      pIncr = 0;
-    }
-#endif
   }
   return pIncr;
 }
@@ -1784,9 +1721,6 @@ static int vdbeIncrInit2(PmaReader *pIter){
     for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
       rc = vdbeIncrInit2(&pMerger->aIter[i]);
     }
-    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
-      rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
-    }
 
     /* Set up the required files for pIncr */
     if( rc==SQLITE_OK ){
@@ -1812,6 +1746,10 @@ static int vdbeIncrInit2(PmaReader *pIter){
       }
     }
 
+    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
+      rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
+    }
+
     if( rc==SQLITE_OK && pIncr->bUseThread ){
       rc = vdbeIncrBgPopulate(pIncr);
     }
@@ -1998,9 +1936,7 @@ static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){
       }
     }
   }
-  if( rc==SQLITE_OK ){
-    rc = vdbeIncrInit2(pIter);
-  }
+  if( rc==SQLITE_OK ) rc = vdbeIncrInit2(pIter);
 
   sqlite3_free(aMerge);
   return rc;
@@ -2022,17 +1958,9 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
   ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
   ** from the in-memory list.  */
   if( pSorter->bUsePMA==0 ){
-    if( pSorter->pRecord ){
-      SortSubtask *pTask = &pSorter->aTask[0];
+    if( pSorter->list.pList ){
       *pbEof = 0;
-      pTask->pList = pSorter->pRecord;
-      pTask->eWork = SORT_SUBTASK_SORT;
-      assert( pTask->aListMemory==0 );
-      pTask->aListMemory = pSorter->aMemory;
-      rc = vdbeSorterRunTask(pTask);
-      pTask->aListMemory = 0;
-      pSorter->pRecord = pTask->pList;
-      pTask->pList = 0;
+      rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
     }else{
       *pbEof = 1;
     }
@@ -2040,8 +1968,8 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
   }
 
   /* Write the current in-memory list to a PMA. */
-  if( pSorter->pRecord ){
-    rc = vdbeSorterFlushPMA(db, pCsr, 1);
+  if( pSorter->list.pList ){
+    rc = vdbeSorterFlushPMA(pSorter);
   }
 
   /* Join all threads */
@@ -2076,11 +2004,11 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
     rc = vdbePmaReaderNext(pSorter->pReader);
     *pbEof = (pSorter->pReader->pFile==0);
   }else{
-    SorterRecord *pFree = pSorter->pRecord;
-    pSorter->pRecord = pFree->u.pNext;
+    SorterRecord *pFree = pSorter->list.pList;
+    pSorter->list.pList = pFree->u.pNext;
     pFree->u.pNext = 0;
-    if( pSorter->aMemory==0 ) vdbeSorterRecordFree(db, pFree);
-    *pbEof = !pSorter->pRecord;
+    if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
+    *pbEof = !pSorter->list.pList;
     rc = SQLITE_OK;
   }
   return rc;
@@ -2099,8 +2027,8 @@ static void *vdbeSorterRowkey(
     *pnKey = pSorter->pReader->nKey;
     pKey = pSorter->pReader->aKey;
   }else{
-    *pnKey = pSorter->pRecord->nVal;
-    pKey = SRVAL(pSorter->pRecord);
+    *pnKey = pSorter->list.pList->nVal;
+    pKey = SRVAL(pSorter->list.pList);
   }
   return pKey;
 }