]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Experimental multi-threaded sorting changes to allow the sorter to begin returning...
authordan <dan@noemail.net>
Wed, 9 Apr 2014 20:04:17 +0000 (20:04 +0000)
committerdan <dan@noemail.net>
Wed, 9 Apr 2014 20:04:17 +0000 (20:04 +0000)
FossilOrigin-Name: f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f

manifest
manifest.uuid
src/vdbesort.c
test/sort2.test

index 2eb2b1da22d25d563bf99262f242479e0bc76880..883105388052cd5bdcb501f95da6c6b419edfcbd 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Fix\sharmless\scompiler\swarnings.
-D 2014-04-04T22:44:59.018
+C Experimental\smulti-threaded\ssorting\schanges\sto\sallow\sthe\ssorter\sto\sbegin\sreturning\sitems\sto\sthe\sVDBE\sbefore\sall\sdata\sis\ssorted.
+D 2014-04-09T20:04:17.324
 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
 F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125
 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4
 F src/vdbeaux.c d8dc38965507a34b0e150c0d7fc82b02f8cf25ea
 F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa
 F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447
-F src/vdbesort.c 8da916fc74e78edd5bc95653206942e01710ac09
+F src/vdbesort.c 26823b626c3231a52e45f5e78a18cb8681bb1b88
 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767
 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd
 F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8
@@ -818,7 +818,7 @@ F test/skipscan2.test 5a4db0799c338ddbacb154aaa5589c0254b36a8d
 F test/soak.test 0b5b6375c9f4110c828070b826b3b4b0bb65cd5f
 F test/softheap1.test 40562fe6cac6d9827b7b42b86d45aedf12c15e24
 F test/sort.test 79dc647c4e9b123a64e57b7080b7f9a2df43f87a
-F test/sort2.test 21cd865e31adecdc8fc81c8d95431e629676a8d8
+F test/sort2.test bbc2eb244fb862141a900a851056d48705b5997b
 F test/sort3.test c3f88d233452a129de519de311d109a0ad0da0af
 F test/speed1.test f2974a91d79f58507ada01864c0e323093065452
 F test/speed1p.explain d841e650a04728b39e6740296b852dccdca9b2cb
@@ -1163,7 +1163,10 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1
 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01
 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff
-P 5e3dfa27c71a666e122e3cf64897038ff8424800
-R bbcde0d30a2bb025a3c15f4a10b2b404
-U drh
-Z 9376e61a8443d421f4f6f69d3d5500ac
+P e54dded2012f0ab486ee138e9bd57c528af33980
+R 803b4ddf4cddf7e21aeddc04109caaf0
+T *branch * threads-experimental
+T *sym-threads-experimental *
+T -sym-threads *
+U dan
+Z 3b5c615396ccbaaa23add5a8103bd906
index 44109c73580015fdc519d35b67623f341b36e475..6cf45357c405ee71810279cc876800ba7e3907a1 100644 (file)
@@ -1 +1 @@
-e54dded2012f0ab486ee138e9bd57c528af33980
\ No newline at end of file
+f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f
\ No newline at end of file
index f59e8f51f503f00d6492128dc0c050c32075b09c..e558c42f11bc0ce8af556e6e114f7bb78ba770e3 100644 (file)
@@ -96,6 +96,8 @@ typedef struct PmaReader PmaReader;         /* Incrementally read one PMA */
 typedef struct PmaWriter PmaWriter;         /* Incrementally write on PMA */
 typedef struct SorterRecord SorterRecord;   /* A record being sorted */
 typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
+typedef struct SorterFile SorterFile;
+typedef struct IncrMerger IncrMerger;
 
 
 /*
@@ -105,6 +107,11 @@ typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
 #define SORT_SUBTASK_TO_PMA 2     /* Xfer pList to Packed-Memory-Array pTemp1 */
 #define SORT_SUBTASK_CONS   3     /* Consolidate multiple PMAs */
 
+struct SorterFile {
+  sqlite3_file *pFd;
+  i64 iEof;
+};
+
 /*
 ** Sorting is divided up into smaller subtasks.  Each subtask is controlled
 ** by an instance of this object. A Subtask might run in either the main thread
@@ -145,6 +152,7 @@ struct SortSubtask {
   int bDone;                      /* Set to true by pTask when finished */
 
   sqlite3 *db;                    /* Database connection */
+  VdbeSorter *pSorter;            /* Sorter */
   KeyInfo *pKeyInfo;              /* How to compare records */
   UnpackedRecord *pUnpacked;      /* Space to unpack a record */
   int pgsz;                       /* Main database page size */
@@ -155,9 +163,8 @@ struct SortSubtask {
   int nInMemory;                  /* Expected size of PMA based on pList */
   u8 *aListMemory;                /* Records memory (or NULL) */
 
-  int nPMA;                       /* Number of PMAs currently in pTemp1 */
-  i64 iTemp1Off;                  /* Offset to write to in pTemp1 */
-  sqlite3_file *pTemp1;           /* File to write PMAs to, or NULL */
+  int nPMA;                       /* Number of PMAs currently in file */
+  SorterFile file;
 };
 
 
@@ -239,8 +246,10 @@ struct VdbeSorter {
   int mnPmaSize;                  /* Minimum PMA size, in bytes */
   int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
   int bUsePMA;                    /* True if one or more PMAs created */
+  int bUseThreads;                /* True if one or more PMAs created */
   SorterRecord *pRecord;          /* Head of in-memory record list */
-  MergeEngine *pMerger;           /* For final merge of PMAs (by caller) */ 
+  PmaReader *pReader;             /* Read data from here after Rewind() */
+  UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
   u8 *aMemory;                    /* Block of memory to alloc records from */
   int iMemory;                    /* Offset of first free byte in aMemory */
   int nMemory;                    /* Size of aMemory allocation in bytes */
@@ -265,6 +274,16 @@ struct PmaReader {
   u8 *aBuffer;                    /* Current read buffer */
   int nBuffer;                    /* Size of read buffer in bytes */
   u8 *aMap;                       /* Pointer to mapping of entire file */
+  IncrMerger *pIncr;              /* Incremental merger */
+};
+
+struct IncrMerger {
+  int mxSz;                       /* Maximum size of files */
+  SortSubtask *pTask;             /* Task that owns this merger */
+  int bEof;                       /* Set to true when merge is finished */
+  SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
+  MergeEngine *pMerger;           /* Merge engine thread reads data from */
+  SQLiteThread *pThread;          /* Thread currently populating aFile[1] */
 };
 
 /*
@@ -326,6 +345,9 @@ struct SorterRecord {
 /* Maximum number of PMAs that a single MergeEngine can merge */
 #define SORTER_MAX_MERGE_COUNT 16
 
+static int vdbeIncrSwap(IncrMerger*);
+static void vdbeIncrFree(IncrMerger*);
+
 /*
 ** Free all memory belonging to the PmaReader object passed as the second
 ** argument. All structure fields are set to zero before returning.
@@ -334,6 +356,7 @@ static void vdbePmaReaderClear(PmaReader *pIter){
   sqlite3_free(pIter->aAlloc);
   sqlite3_free(pIter->aBuffer);
   if( pIter->aMap ) sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
+  if( pIter->pIncr ) vdbeIncrFree(pIter->pIncr);
   memset(pIter, 0, sizeof(PmaReader));
 }
 
@@ -400,7 +423,7 @@ static int vdbePmaReadBlob(
     /* Extend the p->aAlloc[] allocation if required. */
     if( p->nAlloc<nByte ){
       u8 *aNew;
-      int nNew = p->nAlloc*2;
+      int nNew = MAX(128, p->nAlloc*2);
       while( nByte>nNew ) nNew = nNew*2;
       aNew = sqlite3Realloc(p->aAlloc, nNew);
       if( !aNew ) return SQLITE_NOMEM;
@@ -464,22 +487,70 @@ static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
   return SQLITE_OK;
 }
 
+static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
+  int rc = SQLITE_OK;
+  if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
+    rc = sqlite3OsFetch(pFile->pFd, 0, pFile->iEof, (void**)pp);
+  }
+  return rc;
+}
+
+static int vdbePmaReaderReinit(PmaReader *pIter){
+  IncrMerger *pIncr = pIter->pIncr;
+  SortSubtask *pTask = pIncr->pTask;
+  int rc = SQLITE_OK;
+
+  assert( pIncr->bEof==0 );
+
+  if( pIter->aMap ){
+    sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
+    pIter->aMap = 0;
+  }
+  pIter->iReadOff = 0;
+  pIter->iEof = pIncr->aFile[0].iEof;
+  pIter->pFile = pIncr->aFile[0].pFd;
+
+  rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
+  if( rc==SQLITE_OK ){
+    if( pIter->aMap==0 && pIter->aBuffer==0 ){
+      pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
+      if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
+      pIter->nBuffer = pTask->pgsz;
+    }
+  }
+
+  return rc;
+}
+
 
 /*
 ** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
 ** no error occurs, or an SQLite error code if one does.
 */
 static int vdbePmaReaderNext(PmaReader *pIter){
-  int rc;                         /* Return Code */
+  int rc = SQLITE_OK;             /* Return Code */
   u64 nRec = 0;                   /* Size of record in bytes */
 
   if( pIter->iReadOff>=pIter->iEof ){
-    /* This is an EOF condition */
-    vdbePmaReaderClear(pIter);
-    return SQLITE_OK;
+    int bEof = 1;
+    if( pIter->pIncr ){
+      rc = vdbeIncrSwap(pIter->pIncr);
+      if( rc==SQLITE_OK && pIter->pIncr->bEof==0 ){
+        rc = vdbePmaReaderReinit(pIter);
+        bEof = 0;
+      }
+    }
+
+    if( bEof ){
+      /* This is an EOF condition */
+      vdbePmaReaderClear(pIter);
+      return rc;
+    }
   }
 
-  rc = vdbePmaReadVarint(pIter, &nRec);
+  if( rc==SQLITE_OK ){
+    rc = vdbePmaReadVarint(pIter, &nRec);
+  }
   if( rc==SQLITE_OK ){
     pIter->nKey = (int)nRec;
     rc = vdbePmaReadBlob(pIter, (int)nRec, &pIter->aKey);
@@ -493,10 +564,14 @@ static int vdbePmaReaderNext(PmaReader *pIter){
 ** starting at offset iStart and ending at offset iEof-1. This function 
 ** leaves the iterator pointing to the first key in the PMA (or EOF if the 
 ** PMA is empty).
+**
+** If the pnByte parameter is NULL, then it is assumed that the file 
+** contains a single PMA, and that that PMA omits the initial length varint.
 */
 static int vdbePmaReaderInit(
-  SortSubtask *pTask,             /* Thread context */
-  i64 iStart,                     /* Start offset in pTask->pTemp1 */
+  SortSubtask *pTask,             /* Task context */
+  SorterFile *pFile,              /* Sorter file to read from */
+  i64 iStart,                     /* Start offset in pFile */
   PmaReader *pIter,               /* Iterator to populate */
   i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
 ){
@@ -504,18 +579,18 @@ static int vdbePmaReaderInit(
   int nBuf = pTask->pgsz;
   void *pMap = 0;                 /* Mapping of temp file */
 
-  assert( pTask->iTemp1Off>iStart );
+  assert( pFile->iEof>iStart );
   assert( pIter->aAlloc==0 );
   assert( pIter->aBuffer==0 );
-  pIter->pFile = pTask->pTemp1;
+  pIter->pFile = pFile->pFd;
   pIter->iReadOff = iStart;
   pIter->nAlloc = 128;
   pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
   if( pIter->aAlloc ){
     /* Try to xFetch() a mapping of the entire temp file. If this is possible,
     ** the PMA will be read via the mapping. Otherwise, use xRead().  */
-    if( pTask->iTemp1Off<=(i64)(pTask->db->nMaxSorterMmap) ){
-      rc = sqlite3OsFetch(pIter->pFile, 0, pTask->iTemp1Off, &pMap);
+    if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
+      rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iEof, &pMap);
     }
   }else{
     rc = SQLITE_NOMEM;
@@ -533,12 +608,12 @@ static int vdbePmaReaderInit(
         int iBuf = iStart % nBuf;
         if( iBuf ){
           int nRead = nBuf - iBuf;
-          if( (iStart + nRead) > pTask->iTemp1Off ){
-            nRead = (int)(pTask->iTemp1Off - iStart);
+          if( (iStart + nRead) > pFile->iEof ){
+            nRead = (int)(pFile->iEof - iStart);
           }
           rc = sqlite3OsRead(
-              pTask->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
-              );
+              pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
+          );
           assert( rc!=SQLITE_IOERR_SHORT_READ );
         }
       }
@@ -547,7 +622,7 @@ static int vdbePmaReaderInit(
 
   if( rc==SQLITE_OK ){
     u64 nByte;                    /* Size of PMA in bytes */
-    pIter->iEof = pTask->iTemp1Off;
+    pIter->iEof = pFile->iEof;
     rc = vdbePmaReadVarint(pIter, &nByte);
     pIter->iEof = pIter->iReadOff + nByte;
     *pnByte += nByte;
@@ -669,11 +744,13 @@ int sqlite3VdbeSorterInit(
     pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
 
     pSorter->nTask = nWorker + 1;
+    pSorter->bUseThreads = (pSorter->nTask>1);
     for(i=0; i<pSorter->nTask; i++){
       SortSubtask *pTask = &pSorter->aTask[i];
       pTask->pKeyInfo = pKeyInfo;
       pTask->pgsz = pgsz;
       pTask->db = db;
+      pTask->pSorter = pSorter;
     }
 
     if( !sqlite3TempInMemory(db) ){
@@ -723,9 +800,10 @@ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
     pTask->aListMemory = 0;
   }
   pTask->pList = 0;
-  if( pTask->pTemp1 ){
-    sqlite3OsCloseFree(pTask->pTemp1);
-    pTask->pTemp1 = 0;
+  if( pTask->file.pFd ){
+    sqlite3OsCloseFree(pTask->file.pFd);
+    pTask->file.pFd = 0;
+    pTask->file.iEof = 0;
   }
 }
 
@@ -761,7 +839,8 @@ static MergeEngine *vdbeMergeEngineNew(int nIter){
   int nByte;                      /* Total bytes of space to allocate */
   MergeEngine *pNew;              /* Pointer to allocated object to return */
 
-  assert( nIter<=SORTER_MAX_MERGE_COUNT );
+  /* assert( nIter<=SORTER_MAX_MERGE_COUNT ); */
+
   while( N<nIter ) N += N;
   nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
 
@@ -793,8 +872,11 @@ static void vdbeMergeEngineFree(MergeEngine *pMerger){
 void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
   int i;
   (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
-  vdbeMergeEngineFree(pSorter->pMerger);
-  pSorter->pMerger = 0;
+  if( pSorter->pReader ){
+    vdbePmaReaderClear(pSorter->pReader);
+    sqlite3DbFree(db, pSorter->pReader);
+    pSorter->pReader = 0;
+  }
   for(i=0; i<pSorter->nTask; i++){
     SortSubtask *pTask = &pSorter->aTask[i];
     vdbeSortSubtaskCleanup(db, pTask);
@@ -806,6 +888,8 @@ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
   pSorter->nInMemory = 0;
   pSorter->bUsePMA = 0;
   pSorter->iMemory = 0;
+  sqlite3DbFree(db, pSorter->pUnpacked);
+  pSorter->pUnpacked = 0;
 }
 
 /*
@@ -815,7 +899,6 @@ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
   VdbeSorter *pSorter = pCsr->pSorter;
   if( pSorter ){
     sqlite3VdbeSorterReset(db, pSorter);
-    vdbeMergeEngineFree(pSorter->pMerger);
     sqlite3_free(pSorter->aMemory);
     sqlite3DbFree(db, pSorter);
     pCsr->pSorter = 0;
@@ -1053,17 +1136,17 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){
   assert( pTask->nInMemory>0 );
 
   /* If the first temporary PMA file has not been opened, open it now. */
-  if( pTask->pTemp1==0 ){
-    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->pTemp1);
-    assert( rc!=SQLITE_OK || pTask->pTemp1 );
-    assert( pTask->iTemp1Off==0 );
+  if( pTask->file.pFd==0 ){
+    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file.pFd);
+    assert( rc!=SQLITE_OK || pTask->file.pFd );
+    assert( pTask->file.iEof==0 );
     assert( pTask->nPMA==0 );
   }
 
   /* Try to get the file to memory map */
   if( rc==SQLITE_OK ){
     vdbeSorterExtendFile(pTask->db, 
-        pTask->pTemp1, pTask->iTemp1Off + pTask->nInMemory + 9
+        pTask->file.pFd, pTask->file.iEof + pTask->nInMemory + 9
     );
   }
 
@@ -1071,8 +1154,8 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){
     SorterRecord *p;
     SorterRecord *pNext = 0;
 
-    vdbePmaWriterInit(pTask->pTemp1, &writer, pTask->pgsz,
-                      pTask->iTemp1Off);
+    vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz,
+                      pTask->file.iEof);
     pTask->nPMA++;
     vdbePmaWriteVarint(&writer, pTask->nInMemory);
     for(p=pTask->pList; p; p=pNext){
@@ -1082,7 +1165,7 @@ static int vdbeSorterListToPMA(SortSubtask *pTask){
       if( pTask->aListMemory==0 ) sqlite3_free(p);
     }
     pTask->pList = p;
-    rc = vdbePmaWriterFinish(&writer, &pTask->iTemp1Off);
+    rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
   }
 
   assert( pTask->pList==0 || rc!=SQLITE_OK );
@@ -1164,6 +1247,23 @@ static int vdbeSorterNext(
   return rc;
 }
 
+#if 0
+static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
+  i64 t;
+  int iTask = (pTask - pTask->pSorter->aTask);
+  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
+  fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
+}
+static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
+  i64 t;
+  sqlite3OsCurrentTimeInt64(db->pVfs, &t);
+  fprintf(stderr, "%lld:X %s\n", t, zEvent);
+}
+#else
+# define vdbeSorterWorkDebug(x,y)
+# define vdbeSorterRewindDebug(x,y)
+#endif
+
 /*
 ** The main routine for sorter-thread operations.
 */
@@ -1177,6 +1277,8 @@ static void *vdbeSortSubtaskMain(void *pCtx){
   );
   assert( pTask->bDone==0 );
 
+  vdbeSorterWorkDebug(pTask, "enter");
+
   if( pTask->pUnpacked==0 ){
     char *pFree;
     pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
@@ -1211,7 +1313,7 @@ static void *vdbeSortSubtaskMain(void *pCtx){
       /* Open a second temp file to write merged data to */
       rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2);
       if( rc==SQLITE_OK ){
-        vdbeSorterExtendFile(pTask->db, pTemp2, pTask->iTemp1Off);
+        vdbeSorterExtendFile(pTask->db, pTemp2, pTask->file.iEof);
       }else{
         vdbeMergeEngineFree(pMerger);
         break;
@@ -1231,9 +1333,9 @@ static void *vdbeSortSubtaskMain(void *pCtx){
         int iIter;
         for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
           PmaReader *pIter = &pMerger->aIter[iIter];
-          rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nOut);
+          rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nOut);
           iReadOff = pIter->iEof;
-          if( iReadOff>=pTask->iTemp1Off || rc!=SQLITE_OK ) break;
+          if( iReadOff>=pTask->file.iEof || rc!=SQLITE_OK ) break;
         }
         for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
           rc = vdbeSorterDoCompare(pTask, pMerger, iIter);
@@ -1253,10 +1355,10 @@ static void *vdbeSortSubtaskMain(void *pCtx){
       }
 
       vdbeMergeEngineFree(pMerger);
-      sqlite3OsCloseFree(pTask->pTemp1);
-      pTask->pTemp1 = pTemp2;
+      sqlite3OsCloseFree(pTask->file.pFd);
+      pTask->file.pFd = pTemp2;
       pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT);
-      pTask->iTemp1Off = iWriteOff;
+      pTask->file.iEof = iWriteOff;
     }
   }else{
     /* Sort the pTask->pList list */
@@ -1267,10 +1369,10 @@ static void *vdbeSortSubtaskMain(void *pCtx){
 #ifdef SQLITE_DEBUG
       i64 nExpect = pTask->nInMemory
         + sqlite3VarintLen(pTask->nInMemory)
-        + pTask->iTemp1Off;
+        + pTask->file.iEof;
 #endif
       rc = vdbeSorterListToPMA(pTask);
-      assert( rc!=SQLITE_OK || (nExpect==pTask->iTemp1Off) );
+      assert( rc!=SQLITE_OK || (nExpect==pTask->file.iEof) );
     }
   }
 
@@ -1280,6 +1382,7 @@ static void *vdbeSortSubtaskMain(void *pCtx){
     assert( pTask->pUnpacked->errCode==SQLITE_NOMEM );
     rc = SQLITE_NOMEM;
   }
+  vdbeSorterWorkDebug(pTask, "exit");
   return SQLITE_INT_TO_PTR(rc);
 }
 
@@ -1480,6 +1583,164 @@ static int vdbeSorterCountPMA(VdbeSorter *pSorter){
   return nPMA;
 }
 
+/*
+** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
+** of the data stored in aFile[1] is the same as that used by regular PMAs,
+** except that the number-of-bytes varint is omitted from the start.
+*/
+static int vdbeIncrPopulate(IncrMerger *pIncr){
+  int rc = SQLITE_OK;
+  int rc2;
+  SorterFile *pOut = &pIncr->aFile[1];
+  MergeEngine *pMerger = pIncr->pMerger;
+  PmaWriter writer;
+  assert( pIncr->bEof==0 );
+
+  vdbePmaWriterInit(pIncr->aFile[1].pFd, &writer, pIncr->pTask->pgsz, 0);
+  while( rc==SQLITE_OK ){
+    int dummy;
+    PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
+    int nKey = pReader->nKey;
+    i64 iEof = writer.iWriteOff + writer.iBufEnd;
+
+    /* Check if the output file is full or if the input has been exhausted.
+    ** In either case exit the loop. */
+    if( pReader->pFile==0 ) break;
+    if( iEof && (iEof + nKey)>pIncr->mxSz ) break;
+
+    /* Write the next key to the output. */
+    vdbePmaWriteVarint(&writer, nKey);
+    vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
+    rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy);
+  }
+
+  rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
+  if( rc==SQLITE_OK ) rc = rc2;
+  return rc;
+}
+
+static void *vdbeIncrPopulateThreadMain(void *pCtx){
+  IncrMerger *pIncr = (IncrMerger*)pCtx;
+  return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
+}
+
+static int vdbeIncrBgPopulate(IncrMerger *pIncr){
+  int rc;
+  assert( pIncr->pThread==0 );
+  if( pIncr->pTask->pSorter->bUseThreads==0 ){
+    rc = vdbeIncrPopulate(pIncr);
+  }else{
+    void *pCtx = (void*)pIncr;
+    rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
+  }
+  return rc;
+}
+
+static int vdbeIncrSwap(IncrMerger *pIncr){
+  int rc = SQLITE_OK;
+  
+  if( pIncr->pThread ){
+    void *pRet;
+    rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
+    if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+    pIncr->pThread = 0;
+  }
+
+  if( rc==SQLITE_OK ){
+    SorterFile f0 = pIncr->aFile[0];
+    pIncr->aFile[0] = pIncr->aFile[1];
+    pIncr->aFile[1] = f0;
+
+    if( pIncr->aFile[0].iEof==0 ){
+      pIncr->bEof = 1;
+    }else{
+      rc = vdbeIncrBgPopulate(pIncr);
+    }
+  }
+
+  return rc;
+}
+
+static void vdbeIncrFree(IncrMerger *pIncr){
+  if( pIncr->pThread ){
+    void *pRet;
+    sqlite3ThreadJoin(pIncr->pThread, &pRet);
+  }
+  if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
+  if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
+  vdbeMergeEngineFree(pIncr->pMerger);
+  sqlite3_free(pIncr);
+}
+
+/*
+** Populate iterator *pIter so that it may be used to iterate through all 
+** keys stored in subtask pTask using the incremental merge method.
+*/
+static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){
+  SortSubtask *pTask0 = &pSorter->aTask[0];
+  int rc = SQLITE_OK;
+  MergeEngine *pMerger = 0;
+  IncrMerger *pIncr = 0;
+  int i;
+  int nPMA = 0;
+
+  for(i=0; i<pSorter->nTask; i++){
+    nPMA += pSorter->aTask[i].nPMA;
+  }
+  pMerger = vdbeMergeEngineNew(nPMA);
+  if( pMerger==0 ){
+    rc = SQLITE_NOMEM;
+  }else{
+    int iIter = 0;
+    int iPMA;
+    for(i=0; i<pSorter->nTask; i++){
+      i64 iReadOff = 0;
+      SortSubtask *pTask = &pSorter->aTask[i];
+      for(iPMA=0; iPMA<pTask->nPMA; iPMA++){
+        i64 nDummy = 0;
+        PmaReader *pIter = &pMerger->aIter[iIter++];
+        rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nDummy);
+        iReadOff = pIter->iEof;
+      }
+    }
+    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
+      rc = vdbeSorterDoCompare(pTask0, pMerger, i);
+    }
+  }
+
+  if( rc==SQLITE_OK ){
+    pIncr = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger));
+    if( pIncr==0 ){
+      rc = SQLITE_NOMEM;
+    }else{
+      memset(pIncr, 0, sizeof(IncrMerger));
+      pIncr->mxSz = (pSorter->mxPmaSize / 2);
+      pIncr->pMerger = pMerger;
+      pIncr->pTask = pTask0;
+    }
+  }
+
+  /* Open the two temp files. */
+  if( rc==SQLITE_OK ){
+    rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[0].pFd);
+  }
+  if( rc==SQLITE_OK ){
+    rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[1].pFd);
+  }
+
+  /* Launch a background thread to populate aFile[1]. */
+  if( rc==SQLITE_OK ){
+    rc = vdbeIncrBgPopulate(pIncr);
+  }
+
+  pIter->pIncr = pIncr;
+  if( rc==SQLITE_OK ){
+    rc = vdbePmaReaderNext(pIter);
+  }
+  return rc;
+}
+
+
 /*
 ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
 ** this function is called to prepare for iterating through the records
@@ -1520,70 +1781,21 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
   /* Join all threads */
   rc = vdbeSorterJoinAll(pSorter, rc);
 
-  /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
-  ** some of them together so that this is no longer the case. */
-  if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
-    int i;
-    for(i=0; rc==SQLITE_OK && i<pSorter->nTask; i++){
-      SortSubtask *pTask = &pSorter->aTask[i];
-      if( pTask->pTemp1 ){
-        pTask->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nTask;
-        pTask->eWork = SORT_SUBTASK_CONS;
+  vdbeSorterRewindDebug(db, "rewind");
 
-#if SQLITE_MAX_WORKER_THREADS>0
-        if( i<(pSorter->nTask-1) ){
-          void *pCtx = (void*)pTask;
-          rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx);
-        }else
-#endif
-        {
-          rc = vdbeSorterRunTask(pTask);
-        }
-      }
-    }
-  }
-
-  /* Join all threads */
-  rc = vdbeSorterJoinAll(pSorter, rc);
-
-  /* Assuming no errors have occurred, set up a merger structure to read
-  ** and merge all remaining PMAs.  */
-  assert( pSorter->pMerger==0 );
+  /* Assuming no errors have occurred, set up a merger structure to 
+  ** incrementally read and merge all remaining PMAs.  */
+  assert( pSorter->pReader==0 );
   if( rc==SQLITE_OK ){
-    int nIter = 0;                /* Number of iterators used */
-    int i;
-    MergeEngine *pMerger;
-    for(i=0; i<pSorter->nTask; i++){
-      nIter += pSorter->aTask[i].nPMA;
-    }
-
-    pSorter->pMerger = pMerger = vdbeMergeEngineNew(nIter);
-    if( pMerger==0 ){
-      rc = SQLITE_NOMEM;
-    }else{
-      int iIter = 0;
-      int iThread = 0;
-      for(iThread=0; iThread<pSorter->nTask; iThread++){
-        int iPMA;
-        i64 iReadOff = 0;
-        SortSubtask *pTask = &pSorter->aTask[iThread];
-        for(iPMA=0; iPMA<pTask->nPMA && rc==SQLITE_OK; iPMA++){
-          i64 nDummy = 0;
-          PmaReader *pIter = &pMerger->aIter[iIter++];
-          rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nDummy);
-          iReadOff = pIter->iEof;
-        }
-      }
-
-      for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
-        rc = vdbeSorterDoCompare(&pSorter->aTask[0], pMerger, i);
-      }
-    }
+    PmaReader *pReader;
+    pReader = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
+    pSorter->pReader = pReader;
+    rc = vdbePmaReaderIncrInit(pSorter, pReader);
+    assert( rc!=SQLITE_OK || pReader->pFile );
+    *pbEof = 0;
   }
 
-  if( rc==SQLITE_OK ){
-    *pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0);
-  }
+  vdbeSorterRewindDebug(db, "rewinddone");
   return rc;
 }
 
@@ -1594,8 +1806,9 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
   VdbeSorter *pSorter = pCsr->pSorter;
   int rc;                         /* Return code */
 
-  if( pSorter->pMerger ){
-    rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof);
+  if( pSorter->pReader ){
+    rc = vdbePmaReaderNext(pSorter->pReader);
+    *pbEof = (pSorter->pReader->pFile==0);
   }else{
     SorterRecord *pFree = pSorter->pRecord;
     pSorter->pRecord = pFree->u.pNext;
@@ -1616,11 +1829,9 @@ static void *vdbeSorterRowkey(
   int *pnKey                      /* OUT: Size of current key in bytes */
 ){
   void *pKey;
-  if( pSorter->pMerger ){
-    PmaReader *pIter;
-    pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ];
-    *pnKey = pIter->nKey;
-    pKey = pIter->aKey;
+  if( pSorter->pReader ){
+    *pnKey = pSorter->pReader->nKey;
+    pKey = pSorter->pReader->aKey;
   }else{
     *pnKey = pSorter->pRecord->nVal;
     pKey = SRVAL(pSorter->pRecord);
@@ -1669,13 +1880,19 @@ int sqlite3VdbeSorterCompare(
   int *pRes                       /* OUT: Result of comparison */
 ){
   VdbeSorter *pSorter = pCsr->pSorter;
-  UnpackedRecord *r2 = pSorter->aTask[0].pUnpacked;
+  UnpackedRecord *r2 = pSorter->pUnpacked;
   KeyInfo *pKeyInfo = pCsr->pKeyInfo;
   int i;
   void *pKey; int nKey;           /* Sorter key to compare pVal with */
 
+  if( r2==0 ){
+    char *p;
+    r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo,0,0,&p);
+    assert( pSorter->pUnpacked==(UnpackedRecord*)p );
+    if( r2==0 ) return SQLITE_NOMEM;
+    r2->nField = pKeyInfo->nField-nIgnore;
+  }
   assert( r2->nField>=pKeyInfo->nField-nIgnore );
-  r2->nField = pKeyInfo->nField-nIgnore;
 
   pKey = vdbeSorterRowkey(pSorter, &nKey);
   sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
index f8bfb0fe51256920849bd62aad2cdd0f7923228e..626630050c84de9a6c6de42035fedefcb19a8f81 100644 (file)
@@ -47,6 +47,12 @@ do_execsql_test 2.2 {
   CREATE UNIQUE INDEX i1 ON t1(b, a);
 }
 
+do_execsql_test 2.3 {
+  CREATE UNIQUE INDEX i2 ON t1(a);
+}
+
+do_execsql_test 2.4 { PRAGMA integrity_check } {ok}
+
 db close
 sqlite3_shutdown
 sqlite3_config_worker_threads 0