]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Changes to make the multi-threaded sorter sort stably. threads-closed
authordan <dan@noemail.net>
Sat, 29 Mar 2014 19:48:15 +0000 (19:48 +0000)
committerdan <dan@noemail.net>
Sat, 29 Mar 2014 19:48:15 +0000 (19:48 +0000)
FossilOrigin-Name: 83a105c864247a02b723c84069055765bf373703

manifest
manifest.uuid
src/vdbesort.c

index 0b2bd63f5067dbe01c38782a70cd820e92969001..5d63a5079ad531891532b9aa468dbc9e4485571b 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Fix\sa\sbroken\sassert()\sin\svdbesort.c.
-D 2014-03-29T10:01:58.802
+C Changes\sto\smake\sthe\smulti-threaded\ssorter\ssort\sstably.
+D 2014-03-29T19:48:15.410
 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
 F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125
 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4
 F src/vdbeaux.c 1153175fb57a8454e1c8cf79b59b7bf92b26779d
 F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa
 F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447
-F src/vdbesort.c 2881297f4acdba5908078c5d7f00635288a1ca08
+F src/vdbesort.c d42a32a4fff1a1bac6033abc4252f93e6ef3282b
 F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767
 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd
 F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8
@@ -1160,7 +1160,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1
 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01
 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff
-P a683c05f6250389e84b980b16559e162ba1a27c2
-R d8e63408790a442d33eec2a57272c54d
+P 18d1b402f2dbe78f1a1113bb356b710e348365ef
+R 9fe7387082ff9660756870dd3cb64af4
 U dan
-Z 6c5e71d99f167a1ded5f09353b3e0513
+Z 65bd6f7e332292b10d887f31007d4273
index 1bf7c269addf294bfeca49fada0d704847988e19..005734c1d53a79b5d50e46dd2058afdd0aea589b 100644 (file)
@@ -1 +1 @@
-18d1b402f2dbe78f1a1113bb356b710e348365ef
\ No newline at end of file
+83a105c864247a02b723c84069055765bf373703
\ No newline at end of file
index c6927f87a7ebad685fc7952c5c16c93293fa845e..08887e0208ede2bf422ef99542253b7ae6026645 100644 (file)
@@ -91,8 +91,10 @@ struct SorterThread {
   SorterRecord *pList;            /* List of records for pThread to sort */
   int nInMemory;                  /* Expected size of PMA based on pList */
   u8 *aListMemory;                /* Records memory (or NULL) */
+  u32 iSeq;                       /* Sequence number for PMA */
 
   int nPMA;                       /* Number of PMAs currently in pTemp1 */
+  int bEmbeddedSeq;               /* True if pTemp1 contains embedded seq. */
   i64 iTemp1Off;                  /* Offset to write to in pTemp1 */
   sqlite3_file *pTemp1;           /* File to write PMAs to, or NULL */
 };
@@ -187,6 +189,7 @@ struct VdbeSorter {
   u8 *aMemory;                    /* Block of memory to alloc records from */
   int iMemory;                    /* Offset of first free byte in aMemory */
   int nMemory;                    /* Size of aMemory allocation in bytes */
+  u32 iNextSeq;                   /* Sequence number for next PMA */
   SorterThread aThread[SQLITE_MAX_SORTER_THREAD];
 };
 
@@ -197,6 +200,8 @@ struct VdbeSorter {
 struct VdbeSorterIter {
   i64 iReadOff;                   /* Current read offset */
   i64 iEof;                       /* 1 byte past EOF for this iterator */
+  int bEmbeddedSeq;               /* True if records have sequence values */
+  int iSeq;                       /* Current sequence value */
   int nAlloc;                     /* Bytes of space at aAlloc */
   int nKey;                       /* Number of bytes in key */
   sqlite3_file *pFile;            /* File iterator is reading from */
@@ -417,6 +422,11 @@ static int vdbeSorterIterNext(VdbeSorterIter *pIter){
   }
 
   rc = vdbeSorterIterVarint(pIter, &nRec);
+  if( rc==SQLITE_OK && pIter->bEmbeddedSeq ){
+    u64 iSeq;
+    rc = vdbeSorterIterVarint(pIter, &iSeq);
+    pIter->iSeq = (int)iSeq;
+  }
   if( rc==SQLITE_OK ){
     pIter->nKey = (int)nRec;
     rc = vdbeSorterIterRead(pIter, (int)nRec, &pIter->aKey);
@@ -448,6 +458,7 @@ static int vdbeSorterIterInit(
   pIter->iReadOff = iStart;
   pIter->nAlloc = 128;
   pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
+  pIter->bEmbeddedSeq = pThread->bEmbeddedSeq;
 
   /* Try to xFetch() a mapping of the entire temp file. If this is possible,
   ** the PMA will be read via the mapping. Otherwise, use xRead().  */
@@ -470,7 +481,7 @@ static int vdbeSorterIterInit(
           }
           rc = sqlite3OsRead(
               pThread->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
-              );
+          );
           assert( rc!=SQLITE_IOERR_SHORT_READ );
         }
       }
@@ -480,9 +491,20 @@ static int vdbeSorterIterInit(
   if( rc==SQLITE_OK ){
     u64 nByte;                    /* Size of PMA in bytes */
     pIter->iEof = pThread->iTemp1Off;
-    rc = vdbeSorterIterVarint(pIter, &nByte);
-    pIter->iEof = pIter->iReadOff + nByte;
-    *pnByte += nByte;
+    if( pIter->bEmbeddedSeq==0 ){
+      u64 iSeq, nElem;
+      rc = vdbeSorterIterVarint(pIter, &iSeq);
+      pIter->iSeq = (int)iSeq;
+      if( rc==SQLITE_OK ){
+        rc = vdbeSorterIterVarint(pIter, &nElem);
+        *pnByte += (nElem * sqlite3VarintLen(iSeq));
+      }
+    }
+    if( rc==SQLITE_OK ){
+      rc = vdbeSorterIterVarint(pIter, &nByte);
+      pIter->iEof = pIter->iReadOff + nByte;
+      *pnByte += nByte;
+    }
   }
 
   if( rc==SQLITE_OK ){
@@ -751,6 +773,7 @@ void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
   pSorter->nInMemory = 0;
   pSorter->bUsePMA = 0;
   pSorter->iMemory = 0;
+  pSorter->iNextSeq = 0;
 }
 
 /*
@@ -824,11 +847,15 @@ static void vdbeSorterMerge(
 ** Sort the linked list of records headed at pThread->pList. Return 
 ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
 ** an error occurs.
+**
+** If the pnElem argument is not NULL and no error occurs, set *pnElem to
+** the total number of elements in the list.
 */
-static int vdbeSorterSort(SorterThread *pThread){
+static int vdbeSorterSort(SorterThread *pThread, i64 *pnElem){
   int i;
   SorterRecord **aSlot;
   SorterRecord *p;
+  i64 nElem = 0;
 
   aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
   if( !aSlot ){
@@ -856,6 +883,7 @@ static int vdbeSorterSort(SorterThread *pThread){
     }
     aSlot[i] = p;
     p = pNext;
+    nElem++;
   }
 
   p = 0;
@@ -864,6 +892,7 @@ static int vdbeSorterSort(SorterThread *pThread){
   }
   pThread->pList = p;
 
+  *pnElem = nElem;
   sqlite3_free(aSlot);
   return SQLITE_OK;
 }
@@ -989,7 +1018,7 @@ static int vdbeSorterExtendFile(sqlite3_file *pFile, i64 nByte){
 **       Each record consists of a varint followed by a blob of data (the 
 **       key). The varint is the number of bytes in the blob of data.
 */
-static int vdbeSorterListToPMA(SorterThread *pThread){
+static int vdbeSorterListToPMA(SorterThread *pThread, i64 nElem){
   int rc = SQLITE_OK;             /* Return code */
   FileWriter writer;              /* Object used to write to the file */
 
@@ -1002,12 +1031,13 @@ static int vdbeSorterListToPMA(SorterThread *pThread){
     assert( rc!=SQLITE_OK || pThread->pTemp1 );
     assert( pThread->iTemp1Off==0 );
     assert( pThread->nPMA==0 );
+    assert( pThread->bEmbeddedSeq==0 );
   }
 
   /* Try to get the file to memory map */
   if( rc==SQLITE_OK ){
     rc = vdbeSorterExtendFile(
-        pThread->pTemp1, pThread->iTemp1Off + pThread->nInMemory + 9
+        pThread->pTemp1, pThread->iTemp1Off + 9 + 9 + 9 + pThread->nInMemory
     );
   }
 
@@ -1017,6 +1047,8 @@ static int vdbeSorterListToPMA(SorterThread *pThread){
 
     fileWriterInit(pThread->pTemp1, &writer, pThread->pgsz, pThread->iTemp1Off);
     pThread->nPMA++;
+    fileWriterWriteVarint(&writer, (u64)pThread->iSeq);
+    fileWriterWriteVarint(&writer, (u64)nElem);
     fileWriterWriteVarint(&writer, pThread->nInMemory);
     for(p=pThread->pList; p; p=pNext){
       pNext = p->u.pNext;
@@ -1091,7 +1123,7 @@ static int vdbeSorterNext(
       ** PMA should be considered smaller. The VdbeSorter.aIter[] array
       ** is sorted from oldest to newest, so pIter1 contains older values
       ** than pIter2 iff (pIter1<pIter2).  */
-      if( iRes<0 || (iRes==0 && pIter1<pIter2) ){
+      if( iRes<0 || (iRes==0 && pIter1->iSeq < pIter2->iSeq) ){
         pMerger->aTree[i] = (int)(pIter1 - pMerger->aIter);
         pIter2 = &pMerger->aIter[ pMerger->aTree[i ^ 0x0001] ];
         pKey2 = pIter2->aKey;
@@ -1188,6 +1220,7 @@ static void *vdbeSorterThreadMain(void *pCtx){
           VdbeSorterIter *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
           assert( pIter->pFile!=0 );        /* pIter is not at EOF */
           fileWriterWriteVarint(&writer, pIter->nKey);
+          fileWriterWriteVarint(&writer, (u64)pIter->iSeq);
           fileWriterWrite(&writer, pIter->aKey, pIter->nKey);
           rc = vdbeSorterNext(pThread, pMerger, &bEof);
         }
@@ -1200,19 +1233,25 @@ static void *vdbeSorterThreadMain(void *pCtx){
       pThread->pTemp1 = pTemp2;
       pThread->nPMA = (i / SORTER_MAX_MERGE_COUNT);
       pThread->iTemp1Off = iWriteOff;
+      pThread->bEmbeddedSeq = 1;
+      sqlite3OsUnfetch(pTemp2, 0, 0);
     }
   }else{
+    i64 nElem;
+
     /* Sort the pThread->pList list */
-    rc = vdbeSorterSort(pThread);
+    rc = vdbeSorterSort(pThread, &nElem);
 
     /* If required, write the list out to a PMA. */
     if( rc==SQLITE_OK && pThread->eWork==SORTER_THREAD_TO_PMA ){
 #ifdef SQLITE_DEBUG
       i64 nExpect = pThread->nInMemory
         + sqlite3VarintLen(pThread->nInMemory)
+        + sqlite3VarintLen(pThread->iSeq)
+        + sqlite3VarintLen(nElem)
         + pThread->iTemp1Off;
 #endif
-      rc = vdbeSorterListToPMA(pThread);
+      rc = vdbeSorterListToPMA(pThread, nElem);
       assert( rc!=SQLITE_OK || (nExpect==pThread->iTemp1Off) );
     }
   }
@@ -1268,6 +1307,7 @@ static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
     pThread->eWork = SORTER_THREAD_TO_PMA;
     pThread->pList = pSorter->pRecord;
     pThread->nInMemory = pSorter->nInMemory;
+    pThread->iSeq = pSorter->iNextSeq++;
     pSorter->nInMemory = 0;
     pSorter->pRecord = 0;