]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Avoid having the sorter merge too many PMAs at a time when incrementally merging...
authordan <dan@noemail.net>
Fri, 11 Apr 2014 19:43:07 +0000 (19:43 +0000)
committerdan <dan@noemail.net>
Fri, 11 Apr 2014 19:43:07 +0000 (19:43 +0000)
FossilOrigin-Name: 98bf0307b121b0776a7170108cc8d3f948a7ebfe

manifest
manifest.uuid
src/shell.c
src/vdbesort.c
test/sort2.test

index 883105388052cd5bdcb501f95da6c6b419edfcbd..f48df9c72d7e7f4c9162be875ab004bc816853d6 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Experimental\smulti-threaded\ssorting\schanges\sto\sallow\sthe\ssorter\sto\sbegin\sreturning\sitems\sto\sthe\sVDBE\sbefore\sall\sdata\sis\ssorted.
-D 2014-04-09T20:04:17.324
+C Avoid\shaving\sthe\ssorter\smerge\stoo\smany\sPMAs\sat\sa\stime\swhen\sincrementally\smerging\sdata\sfollowing\sa\sSorterRewind().
+D 2014-04-11T19:43:07.755
 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
 F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125
 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@@ -218,7 +218,7 @@ F src/random.c d10c1f85b6709ca97278428fd5db5bbb9c74eece
 F src/resolve.c 273d5f47c4e2c05b2d3d2bffeda939551ab59e66
 F src/rowset.c 64655f1a627c9c212d9ab497899e7424a34222e0
 F src/select.c 20055cf917222e660c4222fea306bd13a0623caa
-F src/shell.c afc0b1a5a646d287142ef0c9a2a6e3139d57cba2
+F src/shell.c b44c3f17f0bf41b3431e9cc171706251156ae85f
 F src/sqlite.h.in 81221c50addbf698c3247154d92efd1095bfd885
 F src/sqlite3.rc 11094cc6a157a028b301a9f06b3d03089ea37c3e
 F src/sqlite3ext.h 886f5a34de171002ad46fae8c36a7d8051c190fc
@@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4
 F src/vdbeaux.c d8dc38965507a34b0e150c0d7fc82b02f8cf25ea
 F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa
 F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447
-F src/vdbesort.c 26823b626c3231a52e45f5e78a18cb8681bb1b88
+F src/vdbesort.c 2984e3624383adf9c762558b8f85a17a626c11a7
 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767
 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd
 F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8
@@ -818,7 +818,7 @@ F test/skipscan2.test 5a4db0799c338ddbacb154aaa5589c0254b36a8d
 F test/soak.test 0b5b6375c9f4110c828070b826b3b4b0bb65cd5f
 F test/softheap1.test 40562fe6cac6d9827b7b42b86d45aedf12c15e24
 F test/sort.test 79dc647c4e9b123a64e57b7080b7f9a2df43f87a
-F test/sort2.test bbc2eb244fb862141a900a851056d48705b5997b
+F test/sort2.test 04e99d0d028b469c6cfab2c647c6c28755504063
 F test/sort3.test c3f88d233452a129de519de311d109a0ad0da0af
 F test/speed1.test f2974a91d79f58507ada01864c0e323093065452
 F test/speed1p.explain d841e650a04728b39e6740296b852dccdca9b2cb
@@ -1163,10 +1163,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1
 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01
 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff
-P e54dded2012f0ab486ee138e9bd57c528af33980
-R 803b4ddf4cddf7e21aeddc04109caaf0
-T *branch * threads-experimental
-T *sym-threads-experimental *
-T -sym-threads *
+P f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f
+R f6c598c1c558c5930404cda096730209
 U dan
-Z 3b5c615396ccbaaa23add5a8103bd906
+Z 3e9d4ee1a6e7b343cf831d1b18651067
index 6cf45357c405ee71810279cc876800ba7e3907a1..b1943056c17dae2d7c7cf79902bfaa39dbf981f2 100644 (file)
@@ -1 +1 @@
-f9d5e09afaf64d68a0e461c1c2f38179bcea4b1f
\ No newline at end of file
+98bf0307b121b0776a7170108cc8d3f948a7ebfe
\ No newline at end of file
index e032bd36d2c4346ac03381c0813e8ea4b61dfa70..40ac24093a1bea9c08879c0ff05e438b6714c8c5 100644 (file)
@@ -3535,7 +3535,7 @@ static void main_init(struct callback_data *data) {
   sqlite3_snprintf(sizeof(mainPrompt), mainPrompt,"sqlite> ");
   sqlite3_snprintf(sizeof(continuePrompt), continuePrompt,"   ...> ");
   sqlite3_config(SQLITE_CONFIG_MULTITHREAD);
-  sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 3);
+  sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, 4);
 }
 
 /*
index e558c42f11bc0ce8af556e6e114f7bb78ba770e3..16f6c618c6354229e7c307757d87b721b7179dd8 100644 (file)
@@ -164,7 +164,8 @@ struct SortSubtask {
   u8 *aListMemory;                /* Records memory (or NULL) */
 
   int nPMA;                       /* Number of PMAs currently in file */
-  SorterFile file;
+  SorterFile file;                /* Temp file for level-0 PMAs */
+  SorterFile file2;               /* Space for other PMAs */
 };
 
 
@@ -240,6 +241,11 @@ struct MergeEngine {
 /*
 ** Main sorter structure. A single instance of this is allocated for each 
 ** sorter cursor created by the VDBE.
+**
+** mxKeysize:
+**   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
+**   this variable is updated so as to be set to the size on disk of the
+**   largest record in the sorter.
 */
 struct VdbeSorter {
   int nInMemory;                  /* Current size of pRecord list as PMA */
@@ -249,6 +255,7 @@ struct VdbeSorter {
   int bUseThreads;                /* True if one or more PMAs created */
   SorterRecord *pRecord;          /* Head of in-memory record list */
   PmaReader *pReader;             /* Read data from here after Rewind() */
+  int mxKeysize;                  /* Largest serialized key seen so far */
   UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
   u8 *aMemory;                    /* Block of memory to alloc records from */
   int iMemory;                    /* Offset of first free byte in aMemory */
@@ -277,13 +284,21 @@ struct PmaReader {
   IncrMerger *pIncr;              /* Incremental merger */
 };
 
+/*
+** Normally, a PmaReader object iterates through an existing PMA stored 
+** within a temp file. However, if the PmaReader.pIncr variable points to
+** an object of the following type, it may be used to iterate/merge through
+** multiple PMAs simultaneously.
+*/
 struct IncrMerger {
-  int mxSz;                       /* Maximum size of files */
   SortSubtask *pTask;             /* Task that owns this merger */
+  SQLiteThread *pThread;          /* Thread currently populating aFile[1] */
+  MergeEngine *pMerger;           /* Merge engine thread reads data from */
+  i64 iStartOff;                  /* Offset to start writing file at */
+  int mxSz;                       /* Maximum bytes of data to store */
   int bEof;                       /* Set to true when merge is finished */
+  int bUseThread;                 /* True to use a bg thread for this object */
   SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
-  MergeEngine *pMerger;           /* Merge engine thread reads data from */
-  SQLiteThread *pThread;          /* Thread currently populating aFile[1] */
 };
 
 /*
@@ -506,16 +521,30 @@ static int vdbePmaReaderReinit(PmaReader *pIter){
     sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
     pIter->aMap = 0;
   }
-  pIter->iReadOff = 0;
+  pIter->iReadOff = pIncr->iStartOff;
   pIter->iEof = pIncr->aFile[0].iEof;
   pIter->pFile = pIncr->aFile[0].pFd;
 
   rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
   if( rc==SQLITE_OK ){
-    if( pIter->aMap==0 && pIter->aBuffer==0 ){
-      pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
-      if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
-      pIter->nBuffer = pTask->pgsz;
+    if( pIter->aMap==0 ){
+      /* TODO: Combine this code with similar code in vdbePmaReaderInit() */
+      int iBuf = pIter->iReadOff % pTask->pgsz;
+      if( pIter->aBuffer==0 ){
+        pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
+        if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
+        pIter->nBuffer = pTask->pgsz;
+      }
+      if( iBuf ){
+        int nRead = pTask->pgsz - iBuf;
+        if( (pIter->iReadOff + nRead) > pIter->iEof ){
+          nRead = (int)(pIter->iEof - pIter->iReadOff);
+        }
+        rc = sqlite3OsRead(
+            pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
+        );
+        assert( rc!=SQLITE_IOERR_SHORT_READ );
+      }
     }
   }
 
@@ -577,7 +606,6 @@ static int vdbePmaReaderInit(
 ){
   int rc = SQLITE_OK;
   int nBuf = pTask->pgsz;
-  void *pMap = 0;                 /* Mapping of temp file */
 
   assert( pFile->iEof>iStart );
   assert( pIter->aAlloc==0 );
@@ -589,33 +617,27 @@ static int vdbePmaReaderInit(
   if( pIter->aAlloc ){
     /* Try to xFetch() a mapping of the entire temp file. If this is possible,
     ** the PMA will be read via the mapping. Otherwise, use xRead().  */
-    if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
-      rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iEof, &pMap);
-    }
+    rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
   }else{
     rc = SQLITE_NOMEM;
   }
 
-  if( rc==SQLITE_OK ){
-    if( pMap ){
-      pIter->aMap = (u8*)pMap;
+  if( rc==SQLITE_OK && pIter->aMap==0 ){
+    pIter->nBuffer = nBuf;
+    pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
+    if( !pIter->aBuffer ){
+      rc = SQLITE_NOMEM;
     }else{
-      pIter->nBuffer = nBuf;
-      pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
-      if( !pIter->aBuffer ){
-        rc = SQLITE_NOMEM;
-      }else{
-        int iBuf = iStart % nBuf;
-        if( iBuf ){
-          int nRead = nBuf - iBuf;
-          if( (iStart + nRead) > pFile->iEof ){
-            nRead = (int)(pFile->iEof - iStart);
-          }
-          rc = sqlite3OsRead(
-              pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
-          );
-          assert( rc!=SQLITE_IOERR_SHORT_READ );
+      int iBuf = iStart % nBuf;
+      if( iBuf ){
+        int nRead = nBuf - iBuf;
+        if( (iStart + nRead) > pFile->iEof ){
+          nRead = (int)(pFile->iEof - iStart);
         }
+        rc = sqlite3OsRead(
+            pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
+        );
+        assert( rc!=SQLITE_IOERR_SHORT_READ );
       }
     }
   }
@@ -805,6 +827,11 @@ static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
     pTask->file.pFd = 0;
     pTask->file.iEof = 0;
   }
+  if( pTask->file2.pFd ){
+    sqlite3OsCloseFree(pTask->file2.pFd);
+    pTask->file2.pFd = 0;
+    pTask->file2.iEof = 0;
+  }
 }
 
 /*
@@ -839,7 +866,7 @@ static MergeEngine *vdbeMergeEngineNew(int nIter){
   int nByte;                      /* Total bytes of space to allocate */
   MergeEngine *pNew;              /* Pointer to allocated object to return */
 
-  /* assert( nIter<=SORTER_MAX_MERGE_COUNT ); */
+  assert( nIter<=SORTER_MAX_MERGE_COUNT );
 
   while( N<nIter ) N += N;
   nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
@@ -888,6 +915,7 @@ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
   pSorter->nInMemory = 0;
   pSorter->bUsePMA = 0;
   pSorter->iMemory = 0;
+  pSorter->mxKeysize = 0;
   sqlite3DbFree(db, pSorter->pUnpacked);
   pSorter->pUnpacked = 0;
 }
@@ -1259,11 +1287,35 @@ static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
   sqlite3OsCurrentTimeInt64(db->pVfs, &t);
   fprintf(stderr, "%lld:X %s\n", t, zEvent);
 }
+static void vdbeSorterPopulateDebug(
+  SortSubtask *pTask,
+  const char *zEvent
+){
+  i64 t;
+  int iTask = (pTask - pTask->pSorter->aTask);
+  sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
+  fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
+}
 #else
 # define vdbeSorterWorkDebug(x,y)
 # define vdbeSorterRewindDebug(x,y)
+# define vdbeSorterPopulateDebug(x,y)
 #endif
 
+static int vdbeSortAllocUnpacked(SortSubtask *pTask){
+  if( pTask->pUnpacked==0 ){
+    char *pFree;
+    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
+        pTask->pKeyInfo, 0, 0, &pFree
+    );
+    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
+    if( pFree==0 ) return SQLITE_NOMEM;
+    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
+    pTask->pUnpacked->errCode = 0;
+  }
+  return SQLITE_OK;
+}
+
 /*
 ** The main routine for sorter-thread operations.
 */
@@ -1279,19 +1331,8 @@ static void *vdbeSortSubtaskMain(void *pCtx){
 
   vdbeSorterWorkDebug(pTask, "enter");
 
-  if( pTask->pUnpacked==0 ){
-    char *pFree;
-    pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
-        pTask->pKeyInfo, 0, 0, &pFree
-    );
-    assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
-    if( pFree==0 ){
-      rc = SQLITE_NOMEM;
-      goto thread_out;
-    }
-    pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
-    pTask->pUnpacked->errCode = 0;
-  }
+  rc = vdbeSortAllocUnpacked(pTask);
+  if( rc!=SQLITE_OK ) goto thread_out;
 
   if( pTask->eWork==SORT_SUBTASK_CONS ){
     assert( pTask->pList==0 );
@@ -1533,6 +1574,9 @@ int sqlite3VdbeSorterWrite(
   }
 
   pSorter->nInMemory += nPMA;
+  if( nPMA>pSorter->mxKeysize ){
+    pSorter->mxKeysize = nPMA;
+  }
 
   if( pSorter->aMemory ){
     int nMin = pSorter->iMemory + nReq;
@@ -1591,12 +1635,15 @@ static int vdbeSorterCountPMA(VdbeSorter *pSorter){
 static int vdbeIncrPopulate(IncrMerger *pIncr){
   int rc = SQLITE_OK;
   int rc2;
+  i64 iStart = pIncr->iStartOff;
   SorterFile *pOut = &pIncr->aFile[1];
   MergeEngine *pMerger = pIncr->pMerger;
   PmaWriter writer;
   assert( pIncr->bEof==0 );
 
-  vdbePmaWriterInit(pIncr->aFile[1].pFd, &writer, pIncr->pTask->pgsz, 0);
+  vdbeSorterPopulateDebug(pIncr->pTask, "enter");
+
+  vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart);
   while( rc==SQLITE_OK ){
     int dummy;
     PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
@@ -1606,7 +1653,7 @@ static int vdbeIncrPopulate(IncrMerger *pIncr){
     /* Check if the output file is full or if the input has been exhausted.
     ** In either case exit the loop. */
     if( pReader->pFile==0 ) break;
-    if( iEof && (iEof + nKey)>pIncr->mxSz ) break;
+    if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
 
     /* Write the next key to the output. */
     vdbePmaWriteVarint(&writer, nKey);
@@ -1616,6 +1663,7 @@ static int vdbeIncrPopulate(IncrMerger *pIncr){
 
   rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
   if( rc==SQLITE_OK ) rc = rc2;
+  vdbeSorterPopulateDebug(pIncr->pTask, "exit");
   return rc;
 }
 
@@ -1627,34 +1675,50 @@ static void *vdbeIncrPopulateThreadMain(void *pCtx){
 static int vdbeIncrBgPopulate(IncrMerger *pIncr){
   int rc;
   assert( pIncr->pThread==0 );
-  if( pIncr->pTask->pSorter->bUseThreads==0 ){
+  if( pIncr->bUseThread==0 ){
     rc = vdbeIncrPopulate(pIncr);
-  }else{
+  }
+#if SQLITE_MAX_WORKER_THREADS>0
+  else{
     void *pCtx = (void*)pIncr;
     rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
   }
+#endif
   return rc;
 }
 
 static int vdbeIncrSwap(IncrMerger *pIncr){
   int rc = SQLITE_OK;
-  
-  if( pIncr->pThread ){
-    void *pRet;
-    rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
-    if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
-    pIncr->pThread = 0;
-  }
 
-  if( rc==SQLITE_OK ){
-    SorterFile f0 = pIncr->aFile[0];
-    pIncr->aFile[0] = pIncr->aFile[1];
-    pIncr->aFile[1] = f0;
+  if( pIncr->bUseThread ){
+#if SQLITE_MAX_WORKER_THREADS>0
+    if( pIncr->pThread ){
+      void *pRet;
+      assert( pIncr->bUseThread );
+      rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
+      if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+      pIncr->pThread = 0;
+    }
+#endif
+
+    if( rc==SQLITE_OK ){
+      SorterFile f0 = pIncr->aFile[0];
+      pIncr->aFile[0] = pIncr->aFile[1];
+      pIncr->aFile[1] = f0;
+    }
 
-    if( pIncr->aFile[0].iEof==0 ){
+    if( rc==SQLITE_OK ){
+      if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
+        pIncr->bEof = 1;
+      }else{
+        rc = vdbeIncrBgPopulate(pIncr);
+      }
+    }
+  }else{
+    rc = vdbeIncrPopulate(pIncr);
+    pIncr->aFile[0] = pIncr->aFile[1];
+    if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
       pIncr->bEof = 1;
-    }else{
-      rc = vdbeIncrBgPopulate(pIncr);
     }
   }
 
@@ -1662,81 +1726,283 @@ static int vdbeIncrSwap(IncrMerger *pIncr){
 }
 
 static void vdbeIncrFree(IncrMerger *pIncr){
-  if( pIncr->pThread ){
-    void *pRet;
-    sqlite3ThreadJoin(pIncr->pThread, &pRet);
-  }
-  if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
-  if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
-  vdbeMergeEngineFree(pIncr->pMerger);
-  sqlite3_free(pIncr);
+  if( pIncr ){
+#if SQLITE_MAX_WORKER_THREADS>0
+    if( pIncr->pThread ){
+      void *pRet;
+      sqlite3ThreadJoin(pIncr->pThread, &pRet);
+    }
+    if( pIncr->bUseThread ){
+      if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
+      if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
+    }
+#endif
+    vdbeMergeEngineFree(pIncr->pMerger);
+    sqlite3_free(pIncr);
+  }
+}
+
+static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
+  IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
+  if( pIncr ){
+    memset(pIncr, 0, sizeof(IncrMerger));
+    pIncr->pMerger = pMerger;
+    pIncr->pTask = pTask;
+    pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
+    pTask->file2.iEof += pIncr->mxSz;
+
+#if 0
+    /* Open the two temp files. */
+    rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
+    if( rc==SQLITE_OK ){
+      rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
+    }
+    if( rc!=SQLITE_OK ){
+      vdbeIncrFree(pIncr);
+      pIncr = 0;
+    }
+#endif
+  }
+  return pIncr;
+}
+
+static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
+  if( bUseThread ){
+    pIncr->bUseThread = 1;
+    pIncr->pTask->file2.iEof -= pIncr->mxSz;
+  }
+}
+
+static int vdbeIncrInit2(PmaReader *pIter){
+  int rc = SQLITE_OK;
+  IncrMerger *pIncr = pIter->pIncr;
+  if( pIncr ){
+    SortSubtask *pTask = pIncr->pTask;
+    int i;
+    MergeEngine *pMerger = pIncr->pMerger;
+
+    for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
+      rc = vdbeIncrInit2(&pMerger->aIter[i]);
+    }
+    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
+      rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
+    }
+
+    /* Set up the required files for pIncr */
+    if( rc==SQLITE_OK ){
+      if( pIncr->bUseThread==0 ){
+        if( pTask->file2.pFd==0 ){
+          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
+          assert( pTask->file2.iEof>0 );
+          if( rc==SQLITE_OK ){
+            vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof);
+            pTask->file2.iEof = 0;
+          }
+        }
+        if( rc==SQLITE_OK ){
+          pIncr->aFile[1].pFd = pTask->file2.pFd;
+          pIncr->iStartOff = pTask->file2.iEof;
+          pTask->file2.iEof += pIncr->mxSz;
+        }
+      }else{
+        rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
+        if( rc==SQLITE_OK ){
+          rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
+        }
+      }
+    }
+
+    if( rc==SQLITE_OK && pIncr->bUseThread ){
+      rc = vdbeIncrBgPopulate(pIncr);
+    }
+
+    if( rc==SQLITE_OK ){
+      rc = vdbePmaReaderNext(pIter);
+    }
+  }
+  return rc;
+}
+
+/*
+** Allocate a new MergeEngine object to merge the contents of nPMA level-0
+** PMAs from pTask->file. If no error occurs, set *ppOut to point to
+** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
+** to NULL and return an SQLite error code.
+**
+** When this function is called, *piOffset is set to the offset of the
+** first PMA to read from pTask->file. Assuming no error occurs, it is 
+** set to the offset immediately following the last byte of the last
+** PMA before returning. If an error does occur, then the final value of
+** *piOffset is undefined.
+*/
+static int vdbeMergeEngineLevel0(
+  SortSubtask *pTask,             /* Sorter task to read from */
+  int nPMA,                       /* Number of PMAs to read */
+  i64 *piOffset,                  /* IN/OUT: Read offset in pTask->file */
+  MergeEngine **ppOut             /* OUT: New merge-engine */
+){
+  MergeEngine *pNew;              /* Merge engine to return */
+  i64 iOff = *piOffset;
+  int i;
+  int rc = SQLITE_OK;
+
+  *ppOut = pNew = vdbeMergeEngineNew(nPMA);
+  if( pNew==0 ) rc = SQLITE_NOMEM;
+
+  for(i=0; i<nPMA && rc==SQLITE_OK; i++){
+    i64 nDummy;
+    PmaReader *pIter = &pNew->aIter[i];
+    rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pIter, &nDummy);
+    iOff = pIter->iEof;
+  }
+
+  if( rc!=SQLITE_OK ){
+    vdbeMergeEngineFree(pNew);
+    *ppOut = 0;
+  }
+  *piOffset = iOff;
+  return rc;
+}
+
+typedef struct IncrBuilder IncrBuilder;
+struct IncrBuilder {
+  int nPMA;                     /* Number of iterators used so far */
+  MergeEngine *pMerger;         /* Merge engine to populate. */
+};
+
+static int vdbeAddToBuilder(
+  SortSubtask *pTask,
+  IncrBuilder *pBuilder, 
+  MergeEngine *pMerger
+){
+  int rc = SQLITE_OK;
+  IncrMerger *pIncr;
+
+  assert( pMerger );
+  if( pBuilder->nPMA==SORTER_MAX_MERGE_COUNT ){
+    rc = vdbeAddToBuilder(pTask, &pBuilder[1], pBuilder->pMerger);
+    pBuilder->pMerger = 0;
+    pBuilder->nPMA = 0;
+  }
+
+  if( rc==SQLITE_OK && pBuilder->pMerger==0 ){
+    pBuilder->pMerger = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
+    if( pBuilder->pMerger==0 ) rc = SQLITE_NOMEM;
+  }
+
+  if( rc==SQLITE_OK ){
+    pIncr = vdbeIncrNew(pTask, pMerger);
+    if( pIncr==0 ) rc = SQLITE_NOMEM;
+    pBuilder->pMerger->aIter[pBuilder->nPMA++].pIncr = pIncr;
+  }
+
+  if( rc!=SQLITE_OK ){
+    vdbeMergeEngineFree(pMerger);
+  }
+
+  return rc;
 }
 
 /*
 ** Populate iterator *pIter so that it may be used to iterate through all 
-** keys stored in subtask pTask using the incremental merge method.
+** keys stored in all PMAs created by this sorter.
 */
 static int vdbePmaReaderIncrInit(VdbeSorter *pSorter, PmaReader *pIter){
   SortSubtask *pTask0 = &pSorter->aTask[0];
+  MergeEngine *pMain = 0;
+  sqlite3 *db = pTask0->db;
   int rc = SQLITE_OK;
-  MergeEngine *pMerger = 0;
-  IncrMerger *pIncr = 0;
-  int i;
-  int nPMA = 0;
+  int iTask;
 
-  for(i=0; i<pSorter->nTask; i++){
-    nPMA += pSorter->aTask[i].nPMA;
+  IncrBuilder *aMerge;
+  const int nMerge = 32;
+  aMerge = sqlite3DbMallocZero(db, sizeof(aMerge[0])*nMerge);
+  if( aMerge==0 ) return SQLITE_NOMEM;
+
+  if( pSorter->nTask>1 ){
+    pMain = vdbeMergeEngineNew(pSorter->nTask);
+    if( pMain==0 ) rc = SQLITE_NOMEM;
   }
-  pMerger = vdbeMergeEngineNew(nPMA);
-  if( pMerger==0 ){
-    rc = SQLITE_NOMEM;
-  }else{
-    int iIter = 0;
+
+  for(iTask=0; iTask<pSorter->nTask && rc==SQLITE_OK; iTask++){
+    MergeEngine *pRoot = 0;
     int iPMA;
-    for(i=0; i<pSorter->nTask; i++){
-      i64 iReadOff = 0;
-      SortSubtask *pTask = &pSorter->aTask[i];
-      for(iPMA=0; iPMA<pTask->nPMA; iPMA++){
-        i64 nDummy = 0;
-        PmaReader *pIter = &pMerger->aIter[iIter++];
-        rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nDummy);
-        iReadOff = pIter->iEof;
+    i64 iReadOff = 0;
+    SortSubtask *pTask = &pSorter->aTask[iTask];
+    if( pTask->nPMA==0 ) continue;
+    for(iPMA=0; iPMA<pTask->nPMA; iPMA += SORTER_MAX_MERGE_COUNT){
+      MergeEngine *pMerger = 0;
+      int nReader = MIN(pTask->nPMA - iPMA, SORTER_MAX_MERGE_COUNT);
+
+      rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
+      if( rc!=SQLITE_OK ) break;
+
+      if( iPMA==0 ){
+        pRoot = pMerger;
+      }else{
+        if( pRoot ){
+          rc = vdbeAddToBuilder(pTask, &aMerge[0], pRoot);
+          pRoot = 0;
+          if( rc!=SQLITE_OK ){
+            vdbeMergeEngineFree(pMerger);
+            break;
+          }
+        }
+        rc = vdbeAddToBuilder(pTask, &aMerge[0], pMerger);
       }
     }
-    for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
-      rc = vdbeSorterDoCompare(pTask0, pMerger, i);
+
+    if( pRoot==0 ){
+      int i;
+      for(i=0; rc==SQLITE_OK && i<nMerge; i++){
+        if( aMerge[i].pMerger ){
+          if( pRoot ){
+            rc = vdbeAddToBuilder(pTask, &aMerge[i], pRoot);
+            if( rc!=SQLITE_OK ) break;
+          }
+          pRoot = aMerge[i].pMerger;
+          aMerge[i].pMerger = 0;
+        }
+      }
     }
-  }
 
-  if( rc==SQLITE_OK ){
-    pIncr = (IncrMerger*)sqlite3_malloc(sizeof(IncrMerger));
-    if( pIncr==0 ){
-      rc = SQLITE_NOMEM;
-    }else{
-      memset(pIncr, 0, sizeof(IncrMerger));
-      pIncr->mxSz = (pSorter->mxPmaSize / 2);
-      pIncr->pMerger = pMerger;
-      pIncr->pTask = pTask0;
+    if( rc==SQLITE_OK ){
+      if( pMain==0 ){
+        pMain = pRoot;
+      }else{
+        IncrMerger *pNew = vdbeIncrNew(pTask, pRoot);
+        pMain->aIter[iTask].pIncr = pNew;
+        if( pNew==0 ) rc = SQLITE_NOMEM;
+      }
+      memset(aMerge, 0, nMerge*sizeof(aMerge[0]));
     }
   }
 
-  /* Open the two temp files. */
   if( rc==SQLITE_OK ){
-    rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[0].pFd);
-  }
-  if( rc==SQLITE_OK ){
-    rc = vdbeSorterOpenTempFile(pTask0->db->pVfs, &pIncr->aFile[1].pFd);
-  }
+    SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
 
-  /* Launch a background thread to populate aFile[1]. */
-  if( rc==SQLITE_OK ){
-    rc = vdbeIncrBgPopulate(pIncr);
+    rc = vdbeSortAllocUnpacked(pLast);
+    if( rc==SQLITE_OK ){
+      pIter->pIncr = vdbeIncrNew(pLast, pMain);
+      if( pIter->pIncr==0 ){
+        rc = SQLITE_NOMEM;
+      }else{
+        vdbeIncrSetThreads(pIter->pIncr, pSorter->bUseThreads);
+        for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
+          IncrMerger *pIncr;
+          if( (pIncr = pMain->aIter[iTask].pIncr) ){
+            vdbeIncrSetThreads(pIncr, pSorter->bUseThreads);
+            assert( pIncr->pTask!=pLast );
+          }
+        }
+      }
+    }
   }
-
-  pIter->pIncr = pIncr;
   if( rc==SQLITE_OK ){
-    rc = vdbePmaReaderNext(pIter);
+    rc = vdbeIncrInit2(pIter);
   }
+
+  sqlite3_free(aMerge);
   return rc;
 }
 
index 626630050c84de9a6c6de42035fedefcb19a8f81..4fb6a9462ba56d2100c228873c5da4d17342f211 100644 (file)
@@ -15,47 +15,71 @@ set testdir [file dirname $argv0]
 source $testdir/tester.tcl
 set testprefix sort2
 
-db close
-sqlite3_shutdown
-sqlite3_config_worker_threads 7
-reset_db
-
-do_execsql_test 1 {
-  PRAGMA cache_size = 5;
-  WITH r(x,y) AS (
-    SELECT 1, randomblob(100)
-    UNION ALL
-    SELECT x+1, randomblob(100) FROM r
-    LIMIT 100000
-  )
-  SELECT count(x), length(y) FROM r GROUP BY (x%5)
+foreach {tn script} {
+  1 { }
+  2 {
+    catch { db close }
+    sqlite3_shutdown
+    sqlite3_config_worker_threads 7
+    reset_db
+  }
 } {
-  20000 100 20000 100 20000 100 20000 100 20000 100
-}
 
-do_execsql_test 2.1 {
-  CREATE TABLE t1(a, b);
-  WITH r(x,y) AS (
-    SELECT 1, randomblob(100)
-    UNION ALL
-    SELECT x+1, randomblob(100) FROM r
-    LIMIT 10000
-  ) INSERT INTO t1 SELECT * FROM r;
-}
+  eval $script
 
-do_execsql_test 2.2 {
-  CREATE UNIQUE INDEX i1 ON t1(b, a);
-}
+  do_execsql_test $tn.1 {
+    PRAGMA cache_size = 5;
+    WITH r(x,y) AS (
+      SELECT 1, randomblob(100)
+      UNION ALL
+      SELECT x+1, randomblob(100) FROM r
+      LIMIT 100000
+    )
+    SELECT count(x), length(y) FROM r GROUP BY (x%5)
+  } {
+    20000 100 20000 100 20000 100 20000 100 20000 100
+  }
 
-do_execsql_test 2.3 {
-  CREATE UNIQUE INDEX i2 ON t1(a);
-}
+  do_execsql_test $tn.2.1 {
+    CREATE TABLE t1(a, b);
+    WITH r(x,y) AS (
+      SELECT 1, randomblob(100)
+      UNION ALL
+      SELECT x+1, randomblob(100) FROM r
+      LIMIT 10000
+    ) INSERT INTO t1 SELECT * FROM r;
+  }
+  
+  do_execsql_test $tn.2.2 {
+    CREATE UNIQUE INDEX i1 ON t1(b, a);
+  }
+  
+  do_execsql_test $tn.2.3 {
+    CREATE UNIQUE INDEX i2 ON t1(a);
+  }
+  
+  do_execsql_test $tn.2.4 { PRAGMA integrity_check } {ok}
+  
+  breakpoint
+  do_execsql_test $tn.3 {
+    PRAGMA cache_size = 5;
+    WITH r(x,y) AS (
+      SELECT 1, randomblob(100)
+      UNION ALL
+      SELECT x+1, randomblob(100) FROM r
+      LIMIT 1000000
+    )
+    SELECT count(x), length(y) FROM r GROUP BY (x%5)
+  } {
+    200000 100 200000 100 200000 100 200000 100 200000 100
+  }
+  
+  db close
+  sqlite3_shutdown
+  sqlite3_config_worker_threads 0
+  sqlite3_initialize
 
-do_execsql_test 2.4 { PRAGMA integrity_check } {ok}
+}
 
-db close
-sqlite3_shutdown
-sqlite3_config_worker_threads 0
-sqlite3_initialize
 finish_test