]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Fix further code and documentation issues in vdbesort.c.
authordan <dan@noemail.net>
Tue, 15 Apr 2014 19:52:34 +0000 (19:52 +0000)
committerdan <dan@noemail.net>
Tue, 15 Apr 2014 19:52:34 +0000 (19:52 +0000)
FossilOrigin-Name: d03f5b8622d304f029f73c7cd0bee3182a81d081

manifest
manifest.uuid
src/vdbesort.c

index f04882a0e81e8e3cdee9140ff35e5cb9b88cce97..0ca4cbc3455d48e551babfea9bbcacac2dda9b6c 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Allow\sthe\ssorter\sto\sbegin\sreturning\sdata\sto\sthe\sVDBE\sas\ssoon\sas\sit\sis\savailable,\sinstead\sof\swaiting\suntil\sall\skeys\shave\sbeen\ssorted.
-D 2014-04-14T19:23:18.139
+C Fix\sfurther\scode\sand\sdocumentation\sissues\sin\svdbesort.c.
+D 2014-04-15T19:52:34.797
 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
 F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125
 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4
 F src/vdbeaux.c d8dc38965507a34b0e150c0d7fc82b02f8cf25ea
 F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa
 F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447
-F src/vdbesort.c 15f59dc56c7c4509b4783ad20fb25479ac63d267
+F src/vdbesort.c ceb8e16055327d0c52bdd2087fcdd7d132fe314f
 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767
 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd
 F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8
@@ -1163,7 +1163,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1
 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01
 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff
-P e54dded2012f0ab486ee138e9bd57c528af33980 02610cd9b77caa2c181210056088beb3ad6ce30f
-R b74f7a073d8c77e32a2dce332520aa10
+P cb0ab20c48962cdee03115efa93d7d501780ac73
+R 44c59b06ba4f012b63c2d0e26d30a7e6
 U dan
-Z ebd968da8bc859eb7bfdd863767ac91c
+Z 2cc8bdff8b0228a6efe530348d9564c8
index 1714edb86bf3e80c67505c24fec3eca1a24e2331..b7e5cbc645021fd725a8f84653c073aa01abaa1b 100644 (file)
@@ -1 +1 @@
-cb0ab20c48962cdee03115efa93d7d501780ac73
\ No newline at end of file
+d03f5b8622d304f029f73c7cd0bee3182a81d081
\ No newline at end of file
index 857b7ba6901f32100eb505430e6df20f2a6a9419..8b6cce625917af83cd8c760dd9dfdd88bfa9c91c 100644 (file)
@@ -10,7 +10,7 @@
 **
 *************************************************************************
 ** This file contains code for the VdbeSorter object, used in concert with
-** a VdbeCursor to sort large numbers of keys for CREATE TABLE statements
+** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
 ** or by SELECT statements with ORDER BY clauses that cannot be satisfied
 ** using indexes and without LIMIT clauses.
 **
 **
 ** The interfaces above must be called in a particular order.  Write() can 
 ** only occur in between Init()/Reset() and Rewind().  Next(), Rowkey(), and
-** Compare() can only occur in between Rewind() and Close()/Reset().
+** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
+**
+**   Init()
+**   for each record: Write()
+**   Rewind()
+**     Rowkey()/Compare()
+**   Next() 
+**   Close()
 **
 ** Algorithm:
 **
-** Records to be sorted are initially held in memory, in the order in
-** which they arrive from Write().  When the amount of memory needed exceeds
-** a threshold, all in-memory records are sorted and then appended to
-** a temporary file as a "Packed-Memory-Array" or "PMA" and the memory is
-** reset.  There is a single temporary file used for all PMAs.  The PMAs
-** are packed one after another in the file.  The VdbeSorter object keeps
-** track of the number of PMAs written.
-**
-** When the Rewind() is seen, any records still held in memory are sorted.
-** If no PMAs have been written (if all records are still held in memory)
-** then subsequent Rowkey(), Next(), and Compare() operations work directly
-** from memory.  But if PMAs have been written things get a little more
-** complicated.
-**
-** When Rewind() is seen after PMAs have been written, any records still
-** in memory are sorted and written as a final PMA.  Then all the PMAs
-** are merged together into a single massive PMA that Next(), Rowkey(),
-** and Compare() walk to extract the records in sorted order.
-**
-** If SQLITE_MAX_WORKER_THREADS is non-zero, various steps of the above
-** algorithm might be performed in parallel by separate threads.  Threads
-** are only used when one or more PMA spill to disk.  If the sort is small
-** enough to fit entirely in memory, everything happens on the main thread.
+** Records passed to the sorter via calls to Write() are initially held 
+** unsorted in main memory. Assuming the amount of memory used never exceeds
+** a threshold, when Rewind() is called the set of records is sorted using
+** an in-memory merge sort. In this case, no temporary files are required
+** and subsequent calls to Rowkey(), Next() and Compare() read records 
+** directly from main memory.
+**
+** If the amount of space used to store records in main memory exceeds the
+** threshold, then the set of records currently in memory are sorted and
+** written to a temporary file in "Packed Memory Array" (PMA) format.
+** A PMA created at this point is known as a "level-0 PMA". Higher levels
+** of PMAs may be created by merging existing PMAs together - for example
+** merging two or more level-0 PMAs together creates a level-1 PMA.
+**
+** The threshold for the amount of main memory to use before flushing 
+** records to a PMA is roughly the same as the limit configured for the
+** page-cache of the main database. Specifically, the threshold is set to 
+** the value returned multiplied by "PRAGMA main.page_size" multipled by 
+** that returned by "PRAGMA main.cache_size", in bytes.
+**
+** If the sorter is running in single-threaded mode, then all PMAs generated
+** are appended to a single temporary file. Or, if the sorter is running in
+** multi-threaded mode then up to (N+1) temporary files may be opened, where
+** N is the configured number of worker threads. In this case, instead of
+** sorting the records and writing the PMA to a temporary file itself, the
+** calling thread usually launches a worker thread to do so. Except, if
+** there are already N worker threads running, the main thread does the work
+** itself.
+**
+** The sorter is running in multi-threaded mode if (a) the library was built
+** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
+** than zero, and (b) worker threads have been enabled at runtime by calling
+** sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, ...).
+**
+** When Rewind() is called, any data remaining in memory is flushed to a 
+** final PMA. So at this point the data is stored in some number of sorted
+** PMAs within temporary files on disk. Within a single file sorter is 
+** running in single threaded mode, or distributed between one or more files
+** for multi-threaded sorters.
+**
+** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
+** sorter is running in single-threaded mode, then these PMAs are merged
+** incrementally as keys are retreived from the sorter by the VDBE. See
+** comments above object MergeEngine below for details.
+**
+** Or, if running in multi-threaded mode, then a background thread is
+** launched to merge the existing PMAs. Once the background thread has
+** merged T bytes of data into a single sorted PMA, the main thread 
+** begins reading keys from that PMA while the background thread proceeds
+** with merging the next T bytes of data. And so on.
+**
+** Parameter T is set to half the value of the memory threshold used 
+** by Write() above to determine when to create a new PMA.
+**
+** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when 
+** Rewind() is called, then a hierarchy of incremental-merges is used. 
+** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on 
+** disk are merged together. Then T bytes of data from the second set, and
+** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
+** PMAs at a time. This done is to improve locality.
+**
+** If running in multi-threaded mode and there are more than
+** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
+** than one background thread may be created. Specifically, there may be
+** one background thread for each temporary file on disk, and one background
+** thread to merge the output of each of the others to a single PMA for
+** the main thread to read from.
 */
 #include "sqliteInt.h"
 #include "vdbeInt.h"
 */
 typedef struct MergeEngine MergeEngine;     /* Merge PMAs together */
 typedef struct PmaReader PmaReader;         /* Incrementally read one PMA */
-typedef struct PmaWriter PmaWriter;         /* Incrementally write on PMA */
+typedef struct PmaWriter PmaWriter;         /* Incrementally write one PMA */
 typedef struct SorterRecord SorterRecord;   /* A record being sorted */
 typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
-typedef struct SorterFile SorterFile;
-typedef struct SorterThread SorterThread;
-typedef struct SorterList SorterList;
+typedef struct SorterFile SorterFile;       /* Temporary file object wrapper */
+typedef struct SorterList SorterList;       /* In-memory list of records */
 typedef struct IncrMerger IncrMerger;
 
 /*
@@ -120,85 +170,14 @@ struct SorterFile {
 };
 
 /*
-** An object of this type is used to store the thread handle for each 
-** background thread launched by the sorter. Before the thread is launched,
-** variable bDone is set to 0. Then, right before it exits, the thread 
-** itself sets bDone to 1.
-**
-** This is then used for two purposes:
-**
-**   1. When flushing the contents of memory to a level-0 PMA on disk, to
-**      attempt to select a SortSubtask for which there is not already an
-**      active background thread (since doing so causes the main thread
-**      to block until it finishes).
-**
-**   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
-**      to sqlite3ThreadJoin() is likely to block.
-**
-** In both cases, the effects of the main thread seeing (bDone==0) even
-** after the thread has finished are not dire. So we don't worry about
-** memory barriers and such here.
+** In memory linked list of records.
 */
-struct SorterThread {
-  SQLiteThread *pThread;
-  int bDone;
-};
-
 struct SorterList {
   SorterRecord *pList;            /* Linked list of records */
   u8 *aMemory;                    /* If non-NULL, blob of memory for pList */
   int szPMA;                      /* Size of pList as PMA in bytes */
 };
 
-/*
-** Sorting is divided up into smaller subtasks.  Each subtask is controlled
-** by an instance of this object. A Subtask might run in either the main thread
-** or in a background thread.
-**
-** Exactly VdbeSorter.nTask instances of this object are allocated
-** as part of each VdbeSorter object. Instances are never allocated any other
-** way. VdbeSorter.nTask is set to the number of worker threads allowed
-** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
-**
-** When a background thread is launched to perform work, SortSubtask.bDone
-** is set to 0 and the SortSubtask.pTask variable set to point to the
-** thread handle. SortSubtask.bDone is set to 1 (to indicate to the main
-** thread that joining SortSubtask.pTask will not block) before the thread
-** exits. SortSubtask.pTask and bDone are always cleared after the 
-** background thread has been joined.
-**
-** One object (specifically, VdbeSorter.aTask[VdbeSorter.nTask-1])
-** is reserved for the foreground thread.
-**
-** The nature of the work performed is determined by SortSubtask.eWork,
-** as follows:
-**
-**   SORT_SUBTASK_SORT:
-**     Sort the linked list of records at SortSubtask.pList.
-**
-**   SORT_SUBTASK_TO_PMA:
-**     Sort the linked list of records at SortSubtask.pList, and write
-**     the results to a new PMA in temp file SortSubtask.pTemp1. Open
-**     the temp file if it is not already open.
-**
-**   SORT_SUBTASK_CONS:
-**     Merge existing PMAs until SortSubtask.nConsolidate or fewer
-**     remain in temp file SortSubtask.pTemp1.
-*/
-struct SortSubtask {
-  SorterThread thread;
-  sqlite3 *db;                    /* Database connection */
-  VdbeSorter *pSorter;            /* Sorter */
-  KeyInfo *pKeyInfo;              /* How to compare records */
-  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
-  int pgsz;                       /* Main database page size */
-  SorterList list;                /* List for thread to write to a PMA */
-  int nPMA;                       /* Number of PMAs currently in file */
-  SorterFile file;                /* Temp file for level-0 PMAs */
-  SorterFile file2;               /* Space for other PMAs */
-};
-
-
 /*
 ** The MergeEngine object is used to combine two or more smaller PMAs into
 ** one big PMA using a merge operation.  Separate PMAs all need to be
@@ -268,6 +247,45 @@ struct MergeEngine {
   PmaReader *aIter;          /* Array of iterators to merge data from */
 };
 
+/*
+** Exactly VdbeSorter.nTask instances of this object are allocated
+** as part of each VdbeSorter object. Instances are never allocated any
+** other way. VdbeSorter.nTask is set to the number of worker threads allowed
+** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
+**
+** Essentially, this structure contains all those fields of the VdbeSorter
+** structure for which each thread requires a separate instance. For example,
+** each thread requries its own UnpackedRecord object to unpack records in
+** as part of comparison operations.
+**
+** Before a background thread is launched, variable bDone is set to 0. Then, 
+** right before it exits, the thread itself sets bDone to 1. This is used for 
+** two purposes:
+**
+**   1. When flushing the contents of memory to a level-0 PMA on disk, to
+**      attempt to select a SortSubtask for which there is not already an
+**      active background thread (since doing so causes the main thread
+**      to block until it finishes).
+**
+**   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
+**      to sqlite3ThreadJoin() is likely to block. Cases that are likely to
+**      block provoke debugging output.
+**
+** In both cases, the effects of the main thread seeing (bDone==0) even
+** after the thread has finished are not dire. So we don't worry about
+** memory barriers and such here.
+*/
+struct SortSubtask {
+  SQLiteThread *pThread;          /* Background thread, if any */
+  int bDone;                      /* Set if thread is finished but not joined */
+  VdbeSorter *pSorter;            /* Sorter that owns this sub-task */
+  UnpackedRecord *pUnpacked;      /* Space to unpack a record */
+  SorterList list;                /* List for thread to write to a PMA */
+  int nPMA;                       /* Number of PMAs currently in file */
+  SorterFile file;                /* Temp file for level-0 PMAs */
+  SorterFile file2;               /* Space for other PMAs */
+};
+
 /*
 ** Main sorter structure. A single instance of this is allocated for each 
 ** sorter cursor created by the VDBE.
@@ -280,9 +298,12 @@ struct MergeEngine {
 struct VdbeSorter {
   int mnPmaSize;                  /* Minimum PMA size, in bytes */
   int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
+  int mxKeysize;                  /* Largest serialized key seen so far */
+  int pgsz;                       /* Main database page size */
   PmaReader *pReader;             /* Read data from here after Rewind() */
   MergeEngine *pMerger;           /* Or here, if bUseThreads==0 */
-  int mxKeysize;                  /* Largest serialized key seen so far */
+  sqlite3 *db;                    /* Database connection */
+  KeyInfo *pKeyInfo;              /* How to compare records */
   UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
   SorterList list;                /* List of in-memory records */
   int iMemory;                    /* Offset of free space in list.aMemory */
@@ -318,10 +339,35 @@ struct PmaReader {
 ** within a temp file. However, if the PmaReader.pIncr variable points to
 ** an object of the following type, it may be used to iterate/merge through
 ** multiple PMAs simultaneously.
+**
+** There are two types of IncrMerger object - single (bUseThread==0) and 
+** multi-threaded (bUseThread==1). 
+**
+** A multi-threaded IncrMerger object uses two temporary files - aFile[0] 
+** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in 
+** size. When the IncrMerger is initialized, it reads enough data from 
+** pMerger to populate aFile[0]. It then sets variables within the 
+** corresponding PmaReader object to read from that file and kicks off 
+** a background thread to populate aFile[1] with the next mxSz bytes of 
+** sorted record data from pMerger. 
+**
+** When the PmaReader reaches the end of aFile[0], it blocks until the
+** background thread has finished populating aFile[1]. It then exchanges
+** the contents of the aFile[0] and aFile[1] variables within this structure,
+** sets the PmaReader fields to read from the new aFile[0] and kicks off
+** another background thread to populate the new aFile[1]. And so on, until
+** the contents of pMerger are exhausted.
+**
+** A single-threaded IncrMerger does not open any temporary files of its
+** own. Instead, it has exclusive access to mxSz bytes of space beginning
+** at offset iStartOff of file pTask->file2. And instead of using a 
+** background thread to prepare data for the PmaReader, with a single
+** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
+** keys from pMerger by the calling thread whenever the PmaReader runs out
+** of data.
 */
 struct IncrMerger {
   SortSubtask *pTask;             /* Task that owns this merger */
-  SorterThread thread;            /* Thread for populating aFile[1] */
   MergeEngine *pMerger;           /* Merge engine thread reads data from */
   i64 iStartOff;                  /* Offset to start writing file at */
   int mxSz;                       /* Maximum bytes of data to store */
@@ -354,10 +400,10 @@ struct PmaWriter {
 **
 ** How the linked list is connected depends on how memory is being managed
 ** by this module. If using a separate allocation for each in-memory record
-** (VdbeSorter.aMemory==0), then the list is always connected using the
+** (VdbeSorter.list.aMemory==0), then the list is always connected using the
 ** SorterRecord.u.pNext pointers.
 **
-** Or, if using the single large allocation method (VdbeSorter.aMemory!=0),
+** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
 ** then while records are being accumulated the list is linked using the
 ** SorterRecord.u.iNext offset. This is because the aMemory[] array may
 ** be sqlite3Realloc()ed while records are being accumulated. Once the VM
@@ -390,7 +436,7 @@ struct SorterRecord {
 #define SORTER_MAX_MERGE_COUNT 16
 
 static int vdbeIncrSwap(IncrMerger*);
-static void vdbeIncrFree(IncrMerger*);
+static void vdbeIncrFree(IncrMerger *);
 
 /*
 ** Free all memory belonging to the PmaReader object passed as the second
@@ -531,56 +577,69 @@ static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
   return SQLITE_OK;
 }
 
+/*
+** Attempt to memory map file pFile. If successful, set *pp to point to the
+** new mapping and return SQLITE_OK. If the mapping is not attempted 
+** (because the file is too large or the VFS layer is configured not to use
+** mmap), return SQLITE_OK and set *pp to NULL.
+**
+** Or, if an error occurs, return an SQLite error code. The final value of
+** *pp is undefined in this case.
+*/
 static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
   int rc = SQLITE_OK;
-  if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
+  if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
     rc = sqlite3OsFetch(pFile->pFd, 0, pFile->iEof, (void**)pp);
   }
   return rc;
 }
 
-static int vdbePmaReaderReinit(PmaReader *pIter){
-  IncrMerger *pIncr = pIter->pIncr;
-  SortSubtask *pTask = pIncr->pTask;
+/*
+** Seek iterator pIter to offset iOff within file pFile. Return SQLITE_OK 
+** if successful, or an SQLite error code if an error occurs.
+*/
+static int vdbePmaReaderSeek(
+  SortSubtask *pTask,             /* Task context */
+  PmaReader *pIter,               /* Iterate to populate */
+  SorterFile *pFile,              /* Sorter file to read from */
+  i64 iOff                        /* Offset in pFile */
+){
   int rc = SQLITE_OK;
 
-  assert( pIncr->bEof==0 );
+  assert( pIter->pIncr==0 || pIter->pIncr->bEof==0 );
 
   if( pIter->aMap ){
     sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
     pIter->aMap = 0;
   }
-  pIter->iReadOff = pIncr->iStartOff;
-  pIter->iEof = pIncr->aFile[0].iEof;
-  pIter->pFile = pIncr->aFile[0].pFd;
+  pIter->iReadOff = iOff;
+  pIter->iEof = pFile->iEof;
+  pIter->pFile = pFile->pFd;
 
-  rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
-  if( rc==SQLITE_OK ){
-    if( pIter->aMap==0 ){
-      /* TODO: Combine this code with similar code in vdbePmaReaderInit() */
-      int iBuf = pIter->iReadOff % pTask->pgsz;
-      if( pIter->aBuffer==0 ){
-        pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
-        if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
-        pIter->nBuffer = pTask->pgsz;
-      }
-      if( iBuf ){
-        int nRead = pTask->pgsz - iBuf;
-        if( (pIter->iReadOff + nRead) > pIter->iEof ){
-          nRead = (int)(pIter->iEof - pIter->iReadOff);
-        }
-        rc = sqlite3OsRead(
-            pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
-        );
-        assert( rc!=SQLITE_IOERR_SHORT_READ );
+  rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
+  if( rc==SQLITE_OK && pIter->aMap==0 ){
+    int pgsz = pTask->pSorter->pgsz;
+    int iBuf = pIter->iReadOff % pgsz;
+    if( pIter->aBuffer==0 ){
+      pIter->aBuffer = (u8*)sqlite3Malloc(pgsz);
+      if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
+      pIter->nBuffer = pgsz;
+    }
+    if( iBuf ){
+      int nRead = pgsz - iBuf;
+      if( (pIter->iReadOff + nRead) > pIter->iEof ){
+        nRead = (int)(pIter->iEof - pIter->iReadOff);
       }
+      rc = sqlite3OsRead(
+          pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
+      );
+      assert( rc!=SQLITE_IOERR_SHORT_READ );
     }
   }
 
   return rc;
 }
 
-
 /*
 ** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
 ** no error occurs, or an SQLite error code if one does.
@@ -590,11 +649,14 @@ static int vdbePmaReaderNext(PmaReader *pIter){
   u64 nRec = 0;                   /* Size of record in bytes */
 
   if( pIter->iReadOff>=pIter->iEof ){
+    IncrMerger *pIncr = pIter->pIncr;
     int bEof = 1;
-    if( pIter->pIncr ){
-      rc = vdbeIncrSwap(pIter->pIncr);
-      if( rc==SQLITE_OK && pIter->pIncr->bEof==0 ){
-        rc = vdbePmaReaderReinit(pIter);
+    if( pIncr ){
+      rc = vdbeIncrSwap(pIncr);
+      if( rc==SQLITE_OK && pIncr->bEof==0 ){
+        rc = vdbePmaReaderSeek(
+            pIncr->pTask, pIter, &pIncr->aFile[0], pIncr->iStartOff
+        );
         bEof = 0;
       }
     }
@@ -633,47 +695,16 @@ static int vdbePmaReaderInit(
   PmaReader *pIter,               /* Iterator to populate */
   i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
 ){
-  int rc = SQLITE_OK;
-  int nBuf = pTask->pgsz;
+  int rc;
 
   assert( pFile->iEof>iStart );
-  assert( pIter->aAlloc==0 );
+  assert( pIter->aAlloc==0 && pIter->nAlloc==0 );
   assert( pIter->aBuffer==0 );
-  pIter->pFile = pFile->pFd;
-  pIter->iReadOff = iStart;
-  pIter->nAlloc = 128;
-  pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
-  if( pIter->aAlloc ){
-    /* Try to xFetch() a mapping of the entire temp file. If this is possible,
-    ** the PMA will be read via the mapping. Otherwise, use xRead().  */
-    rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
-  }else{
-    rc = SQLITE_NOMEM;
-  }
-
-  if( rc==SQLITE_OK && pIter->aMap==0 ){
-    pIter->nBuffer = nBuf;
-    pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
-    if( !pIter->aBuffer ){
-      rc = SQLITE_NOMEM;
-    }else{
-      int iBuf = iStart % nBuf;
-      if( iBuf ){
-        int nRead = nBuf - iBuf;
-        if( (iStart + nRead) > pFile->iEof ){
-          nRead = (int)(pFile->iEof - iStart);
-        }
-        rc = sqlite3OsRead(
-            pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
-        );
-        assert( rc!=SQLITE_IOERR_SHORT_READ );
-      }
-    }
-  }
+  assert( pIter->aMap==0 );
 
+  rc = vdbePmaReaderSeek(pTask, pIter, pFile, iStart);
   if( rc==SQLITE_OK ){
     u64 nByte;                    /* Size of PMA in bytes */
-    pIter->iEof = pFile->iEof;
     rc = vdbePmaReadVarint(pIter, &nByte);
     pIter->iEof = pIter->iReadOff + nByte;
     *pnByte += nByte;
@@ -706,7 +737,7 @@ static int vdbeSorterCompare(
 ){
   UnpackedRecord *r2 = pTask->pUnpacked;
   if( pKey2 ){
-    sqlite3VdbeRecordUnpack(pTask->pKeyInfo, nKey2, pKey2, r2);
+    sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
   }
   return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
 }
@@ -788,19 +819,16 @@ int sqlite3VdbeSorterInit(
   if( pSorter==0 ){
     rc = SQLITE_NOMEM;
   }else{
-    pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
+    pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
     memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
     pKeyInfo->db = 0;
     if( nField && nWorker==0 ) pKeyInfo->nField = nField;
-    pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
-
+    pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
     pSorter->nTask = nWorker + 1;
     pSorter->bUseThreads = (pSorter->nTask>1);
+    pSorter->db = db;
     for(i=0; i<pSorter->nTask; i++){
       SortSubtask *pTask = &pSorter->aTask[i];
-      pTask->pKeyInfo = pKeyInfo;
-      pTask->pgsz = pgsz;
-      pTask->db = db;
       pTask->pSorter = pSorter;
     }
 
@@ -904,22 +932,22 @@ static void vdbeSorterBlockDebug(
 
 #if SQLITE_MAX_WORKER_THREADS>0
 /*
-** Join thread p.
+** Join thread pTask->thread.
 */
-static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){
+static int vdbeSorterJoinThread(SortSubtask *pTask){
   int rc = SQLITE_OK;
-  if( p->pThread ){
+  if( pTask->pThread ){
 #ifdef SQLITE_DEBUG_SORTER_THREADS
-    int bDone = p->bDone;
+    int bDone = pTask->bDone;
 #endif
     void *pRet;
     vdbeSorterBlockDebug(pTask, !bDone, "enter");
-    rc = sqlite3ThreadJoin(p->pThread, &pRet);
+    rc = sqlite3ThreadJoin(pTask->pThread, &pRet);
     vdbeSorterBlockDebug(pTask, !bDone, "exit");
     if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
-    assert( p->bDone==1 );
-    p->bDone = 0;
-    p->pThread = 0;
+    assert( pTask->bDone==1 );
+    pTask->bDone = 0;
+    pTask->pThread = 0;
   }
   return rc;
 }
@@ -928,12 +956,12 @@ static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){
 ** Launch a background thread to run xTask(pIn).
 */
 static int vdbeSorterCreateThread(
-  SorterThread *p,                /* Thread object to populate */
+  SortSubtask *pTask,             /* Thread will use this task object */
   void *(*xTask)(void*),          /* Routine to run in a separate thread */
   void *pIn                       /* Argument passed into xTask() */
 ){
-  assert( p->pThread==0 && p->bDone==0 );
-  return sqlite3ThreadCreate(&p->pThread, xTask, pIn);
+  assert( pTask->pThread==0 && pTask->bDone==0 );
+  return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
 }
 
 /*
@@ -945,14 +973,14 @@ static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
   int i;
   for(i=0; i<pSorter->nTask; i++){
     SortSubtask *pTask = &pSorter->aTask[i];
-    int rc2 = vdbeSorterJoinThread(pTask, &pTask->thread);
+    int rc2 = vdbeSorterJoinThread(pTask);
     if( rc==SQLITE_OK ) rc = rc2;
   }
   return rc;
 }
 #else
 # define vdbeSorterJoinAll(x,rcin) (rcin)
-# define vdbeSorterJoinThread(pTask,p) SQLITE_OK
+# define vdbeSorterJoinThread(pTask) SQLITE_OK
 #endif
 
 /*
@@ -990,6 +1018,24 @@ static void vdbeMergeEngineFree(MergeEngine *pMerger){
   sqlite3_free(pMerger);
 }
 
+/*
+** Free all resources associated with the IncrMerger object indicated by
+** the first argument.
+*/
+static void vdbeIncrFree(IncrMerger *pIncr){
+  if( pIncr ){
+#if SQLITE_MAX_WORKER_THREADS>0
+    if( pIncr->bUseThread ){
+      vdbeSorterJoinThread(pIncr->pTask);
+      if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
+      if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
+    }
+#endif
+    vdbeMergeEngineFree(pIncr->pMerger);
+    sqlite3_free(pIncr);
+  }
+}
+
 /*
 ** Reset a sorting cursor back to its original empty state.
 */
@@ -1051,15 +1097,20 @@ static int vdbeSorterOpenTempFile(sqlite3_vfs *pVfs, sqlite3_file **ppFile){
   return rc;
 }
 
+/*
+** If it has not already been allocated, allocate the UnpackedRecord 
+** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or 
+** if no allocation was required), or SQLITE_NOMEM otherwise.
+*/
 static int vdbeSortAllocUnpacked(SortSubtask *pTask){
   if( pTask->pUnpacked==0 ){
     char *pFree;
     pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
-        pTask->pKeyInfo, 0, 0, &pFree
+        pTask->pSorter->pKeyInfo, 0, 0, &pFree
     );
     assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
     if( pFree==0 ) return SQLITE_NOMEM;
-    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
+    pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField;
     pTask->pUnpacked->errCode = 0;
   }
   return SQLITE_OK;
@@ -1280,6 +1331,7 @@ static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){
 **       key). The varint is the number of bytes in the blob of data.
 */
 static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
+  sqlite3 *db = pTask->pSorter->db;
   int rc = SQLITE_OK;             /* Return code */
   PmaWriter writer;               /* Object used to write to the file */
 
@@ -1295,7 +1347,7 @@ static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
 
   /* If the first temporary PMA file has not been opened, open it now. */
   if( pTask->file.pFd==0 ){
-    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file.pFd);
+    rc = vdbeSorterOpenTempFile(db->pVfs, &pTask->file.pFd);
     assert( rc!=SQLITE_OK || pTask->file.pFd );
     assert( pTask->file.iEof==0 );
     assert( pTask->nPMA==0 );
@@ -1303,9 +1355,7 @@ static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
 
   /* Try to get the file to memory map */
   if( rc==SQLITE_OK ){
-    vdbeSorterExtendFile(pTask->db, 
-        pTask->file.pFd, pTask->file.iEof + pList->szPMA + 9
-    );
+    vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
   }
 
   /* Sort the list */
@@ -1317,7 +1367,7 @@ static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
     SorterRecord *p;
     SorterRecord *pNext = 0;
 
-    vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz,
+    vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
                       pTask->file.iEof);
     pTask->nPMA++;
     vdbePmaWriteVarint(&writer, pList->szPMA);
@@ -1413,14 +1463,14 @@ static int vdbeSorterNext(
 }
 
 /*
-** The main routine for sorter-thread operations.
+** The main routine for background threads that write level-0 PMAs.
 */
 static void *vdbeSorterFlushThread(void *pCtx){
   SortSubtask *pTask = (SortSubtask*)pCtx;
   int rc;                         /* Return code */
-  assert( pTask->thread.bDone==0 );
+  assert( pTask->bDone==0 );
   rc = vdbeSorterListToPMA(pTask, &pTask->list);
-  pTask->thread.bDone = 1;
+  pTask->bDone = 1;
   return SQLITE_INT_TO_PTR(rc);
 }
 
@@ -1453,10 +1503,10 @@ static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
   for(i=0; i<nWorker; i++){
     int iTest = (pSorter->iPrev + i + 1) % nWorker;
     pTask = &pSorter->aTask[iTest];
-    if( pTask->thread.bDone ){
-      rc = vdbeSorterJoinThread(pTask, &pTask->thread);
+    if( pTask->bDone ){
+      rc = vdbeSorterJoinThread(pTask);
     }
-    if( pTask->thread.pThread==0 || rc!=SQLITE_OK ) break;
+    if( pTask->pThread==0 || rc!=SQLITE_OK ) break;
   }
 
   if( rc==SQLITE_OK ){
@@ -1468,7 +1518,7 @@ static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
       u8 *aMem = pTask->list.aMemory;
       void *pCtx = (void*)pTask;
 
-      assert( pTask->thread.pThread==0 && pTask->thread.bDone==0 );
+      assert( pTask->pThread==0 && pTask->bDone==0 );
       assert( pTask->list.pList==0 );
       assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
 
@@ -1484,7 +1534,7 @@ static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
         if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
       }
 
-      rc = vdbeSorterCreateThread(&pTask->thread, vdbeSorterFlushThread, pCtx);
+      rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
     }
   }
 
@@ -1597,13 +1647,14 @@ static int vdbeIncrPopulate(IncrMerger *pIncr){
   int rc2;
   i64 iStart = pIncr->iStartOff;
   SorterFile *pOut = &pIncr->aFile[1];
+  SortSubtask *pTask = pIncr->pTask;
   MergeEngine *pMerger = pIncr->pMerger;
   PmaWriter writer;
   assert( pIncr->bEof==0 );
 
-  vdbeSorterPopulateDebug(pIncr->pTask, "enter");
+  vdbeSorterPopulateDebug(pTask, "enter");
 
-  vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart);
+  vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
   while( rc==SQLITE_OK ){
     int dummy;
     PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
@@ -1618,36 +1669,60 @@ static int vdbeIncrPopulate(IncrMerger *pIncr){
     /* Write the next key to the output. */
     vdbePmaWriteVarint(&writer, nKey);
     vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
-    rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy);
+    rc = vdbeSorterNext(pTask, pIncr->pMerger, &dummy);
   }
 
   rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
   if( rc==SQLITE_OK ) rc = rc2;
-  vdbeSorterPopulateDebug(pIncr->pTask, "exit");
+  vdbeSorterPopulateDebug(pTask, "exit");
   return rc;
 }
 
+#if SQLITE_MAX_WORKER_THREADS>0
+/*
+** The main routine for background threads that populate aFile[1] of
+** multi-threaded IncrMerger objects.
+*/
 static void *vdbeIncrPopulateThread(void *pCtx){
   IncrMerger *pIncr = (IncrMerger*)pCtx;
   void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
-  pIncr->thread.bDone = 1;
+  pIncr->pTask->bDone = 1;
   return pRet;
 }
 
-#if SQLITE_MAX_WORKER_THREADS>0
+/*
+** Launch a background thread to populate aFile[1] of pIncr.
+*/
 static int vdbeIncrBgPopulate(IncrMerger *pIncr){
-  void *pCtx = (void*)pIncr;
+  void *p = (void*)pIncr;
   assert( pIncr->bUseThread );
-  return vdbeSorterCreateThread(&pIncr->thread, vdbeIncrPopulateThread, pCtx);
+  return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
 }
 #endif
 
+/*
+** This function is called when the PmaReader corresponding to pIncr has
+** finished reading the contents of aFile[0]. Its purpose is to "refill"
+** aFile[0] such that the iterator should start rereading it from the
+** beginning.
+**
+** For single-threaded objects, this is accomplished by literally reading 
+** keys from pIncr->pMerger and repopulating aFile[0]. 
+**
+** For multi-threaded objects, all that is required is to wait until the 
+** background thread is finished (if it is not already) and then swap 
+** aFile[0] and aFile[1] in place. If the contents of pMerger have not
+** been exhausted, this function also launches a new background thread
+** to populate the new aFile[1].
+**
+** SQLITE_OK is returned on success, or an SQLite error code otherwise.
+*/
 static int vdbeIncrSwap(IncrMerger *pIncr){
   int rc = SQLITE_OK;
 
 #if SQLITE_MAX_WORKER_THREADS>0
   if( pIncr->bUseThread ){
-    rc = vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
+    rc = vdbeSorterJoinThread(pIncr->pTask);
 
     if( rc==SQLITE_OK ){
       SorterFile f0 = pIncr->aFile[0];
@@ -1675,20 +1750,9 @@ static int vdbeIncrSwap(IncrMerger *pIncr){
   return rc;
 }
 
-static void vdbeIncrFree(IncrMerger *pIncr){
-  if( pIncr ){
-#if SQLITE_MAX_WORKER_THREADS>0
-    vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
-    if( pIncr->bUseThread ){
-      if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
-      if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
-    }
-#endif
-    vdbeMergeEngineFree(pIncr->pMerger);
-    sqlite3_free(pIncr);
-  }
-}
-
+/*
+** Allocate and return a new IncrMerger object to read data from pMerger.
+*/
 static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
   IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
   if( pIncr ){
@@ -1701,6 +1765,9 @@ static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
   return pIncr;
 }
 
+/*
+** Set the "use-threads" flag on object pIncr.
+*/
 static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
   if( bUseThread ){
     pIncr->bUseThread = 1;
@@ -1742,6 +1809,7 @@ static int vdbeIncrInit2(PmaReader *pIter, int eMode){
   IncrMerger *pIncr = pIter->pIncr;
   if( pIncr ){
     SortSubtask *pTask = pIncr->pTask;
+    sqlite3 *db = pTask->pSorter->db;
 
     rc = vdbeIncrInitMerger(pTask, pIncr->pMerger, eMode);
 
@@ -1749,10 +1817,10 @@ static int vdbeIncrInit2(PmaReader *pIter, int eMode){
     if( rc==SQLITE_OK ){
       if( pIncr->bUseThread==0 ){
         if( pTask->file2.pFd==0 ){
-          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
+          rc = vdbeSorterOpenTempFile(db->pVfs, &pTask->file2.pFd);
           assert( pTask->file2.iEof>0 );
           if( rc==SQLITE_OK ){
-            vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof);
+            vdbeSorterExtendFile(db, pTask->file2.pFd, pTask->file2.iEof);
             pTask->file2.iEof = 0;
           }
         }
@@ -1762,9 +1830,9 @@ static int vdbeIncrInit2(PmaReader *pIter, int eMode){
           pTask->file2.iEof += pIncr->mxSz;
         }
       }else{
-        rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
+        rc = vdbeSorterOpenTempFile(db->pVfs, &pIncr->aFile[0].pFd);
         if( rc==SQLITE_OK ){
-          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
+          rc = vdbeSorterOpenTempFile(db->pVfs, &pIncr->aFile[1].pFd);
         }
       }
     }
@@ -1786,15 +1854,13 @@ static int vdbeIncrInit2(PmaReader *pIter, int eMode){
 static void *vdbeIncrInit2Thread(void *pCtx){
   PmaReader *pReader = (PmaReader*)pCtx;
   void *pRet = SQLITE_INT_TO_PTR( vdbeIncrInit2(pReader, INCRINIT2_TASK) );
-  pReader->pIncr->thread.bDone = 1;
+  pReader->pIncr->pTask->bDone = 1;
   return pRet;
 }
 
 static int vdbeIncrBgInit2(PmaReader *pIter){
   void *pCtx = (void*)pIter;
-  return vdbeSorterCreateThread(
-      &pIter->pIncr->thread, vdbeIncrInit2Thread, pCtx
-  );
+  return vdbeSorterCreateThread(pIter->pIncr->pTask, vdbeIncrInit2Thread, pCtx);
 }
 #endif
 
@@ -1885,7 +1951,7 @@ static int vdbeAddToBuilder(
 static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){
   SortSubtask *pTask0 = &pSorter->aTask[0];
   MergeEngine *pMain = 0;
-  sqlite3 *db = pTask0->db;
+  sqlite3 *db = pTask0->pSorter->db;
   int rc = SQLITE_OK;
   int iTask;