]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Experimental change to speed up ORDER BY clauses that sort based on a single expression. sorter-exp
authordan <dan@noemail.net>
Wed, 15 Aug 2012 15:57:30 +0000 (15:57 +0000)
committerdan <dan@noemail.net>
Wed, 15 Aug 2012 15:57:30 +0000 (15:57 +0000)
FossilOrigin-Name: 2bb8c4926136a1fcad107b09c8718d36d7101a51

manifest
manifest.uuid
src/vdbe.c
src/vdbeInt.h
src/vdbesort.c
test/sort.test

index fa8c06418bb56233967f666e209c659beeff9d6b..a2c361e87982ea0dfc9461db40cacaf4d8c33b07 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Silence\sthree\sharmless\scompiler\swarnings\sin\svdbesort.c.
-D 2012-08-14T19:04:27.492
+C Experimental\schange\sto\sspeed\sup\sORDER\sBY\sclauses\sthat\ssort\sbased\son\sa\ssingle\sexpression.
+D 2012-08-15T15:57:30.655
 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
 F Makefile.in abd5c10d21d1395f140d9e50ea999df8fa4d6376
 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@@ -237,14 +237,14 @@ F src/update.c d3076782c887c10e882996550345da9c4c9f9dea
 F src/utf.c 890c67dcfcc7a74623c95baac7535aadfe265e84
 F src/util.c 0af2e515dc0dabacec931bca39525f6c3f1c5455
 F src/vacuum.c 587a52bb8833d7ac15af8916f25437e2575028bd
-F src/vdbe.c 75da79cdcd58481825a06f045bc2f5ea3966eeae
+F src/vdbe.c 93b560a418a64c4a4dd6aadb30e79fad5c7eb0bc
 F src/vdbe.h 18f581cac1f4339ec3299f3e0cc6e11aec654cdb
-F src/vdbeInt.h 986b6b11a13c517337355009e5438703ba5b0a40
+F src/vdbeInt.h 4554c3a3c7f061c0b6b64366b97f8c3949f4dfe9
 F src/vdbeapi.c 88ea823bbcb4320f5a6607f39cd7c2d3cc4c26b1
 F src/vdbeaux.c dce80038c3c41f2680e5ab4dd0f7e0d8b7ff9071
 F src/vdbeblob.c 32f2a4899d67f69634ea4dd93e3f651936d732cb
 F src/vdbemem.c cb55e84b8e2c15704968ee05f0fae25883299b74
-F src/vdbesort.c 0dc1b274dcb4d4c8e71b0b2b15261f286caba39b
+F src/vdbesort.c e4aa1c16532854ae312cc91fe9e373630aaa6463
 F src/vdbetrace.c 8bd5da325fc90f28464335e4cc4ad1407fe30835
 F src/vtab.c bb8ea3a26608bb1357538a5d2fc72beba6638998
 F src/wal.c 9294df6f96aae5909ae1a9b733fd1e1b4736978b
@@ -709,7 +709,7 @@ F test/shrink.test 8c70f62b6e8eb4d54533de6d65bd06b1b9a17868
 F test/sidedelete.test f0ad71abe6233e3b153100f3b8d679b19a488329
 F test/soak.test 0b5b6375c9f4110c828070b826b3b4b0bb65cd5f
 F test/softheap1.test c16709a16ad79fa43b32929b2e623d1d117ccf53
-F test/sort.test 0e4456e729e5a92a625907c63dcdedfbe72c5dc5
+F test/sort.test 7adab2fcb2bd4ee8ee024cc8eab5a9192762fd75
 F test/speed1.test f2974a91d79f58507ada01864c0e323093065452
 F test/speed1p.explain d841e650a04728b39e6740296b852dccdca9b2cb
 F test/speed1p.test c4a469f29f135f4d76c55b1f2a52f36e209466cc
@@ -1010,7 +1010,10 @@ F tool/vdbe-compress.tcl d70ea6d8a19e3571d7ab8c9b75cba86d1173ff0f
 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
 F tool/warnings.sh fbc018d67fd7395f440c28f33ef0f94420226381
 F tool/win/sqlite.vsix 67d8a99aceb56384a81b3f30d6c71743146d2cc9
-P 6730579cf5c6c74cb293e7237d896d3a3a36b691
-R 7ea1e8d09a42f7daed61f69d5f68ea70
-U drh
-Z 6a9fd56c83b0e144f530abdc3bddd4ea
+P a5431c86df442c6e6dfaeae8e8aa62b56d204e97
+R 2b92d91d7f760dd74de397b9ffc908eb
+T *branch * sorter-exp
+T *sym-sorter-exp *
+T -sym-trunk *
+U dan
+Z 040bce45e18c929d5310b23a13be7d4e
index 4dc7f489ce8f81c0566d18c3f3c8ebf14e4eb0ce..41835e0162507af27140f76952c93dc8df5a53a5 100644 (file)
@@ -1 +1 @@
-a5431c86df442c6e6dfaeae8e8aa62b56d204e97
\ No newline at end of file
+2bb8c4926136a1fcad107b09c8718d36d7101a51
\ No newline at end of file
index 12e73251434f89c58b6416bec1c8ed3c66c2c406..96d1e2fb2fdc791b9618c5eaa2cc800cdb7bff35 100644 (file)
@@ -4150,13 +4150,11 @@ case OP_ResetCount: {
 */
 case OP_SorterCompare: {
   VdbeCursor *pC;
-  int res;
 
   pC = p->apCsr[pOp->p1];
   assert( isSorter(pC) );
   pIn3 = &aMem[pOp->p3];
-  rc = sqlite3VdbeSorterCompare(pC, pIn3, &res);
-  if( res ){
+  if( sqlite3VdbeSorterCompare(pC, pIn3) ){
     pc = pOp->p2-1;
   }
   break;
index 1f5694a595f55c155c58b2575f563145d2a9710b..e50f32702f73663f8653a3e6af4dd14c40fb3c5f 100644 (file)
@@ -427,7 +427,7 @@ int sqlite3VdbeTransferError(Vdbe *p);
 # define sqlite3VdbeSorterRowkey(Y,Z)    SQLITE_OK
 # define sqlite3VdbeSorterRewind(X,Y,Z)  SQLITE_OK
 # define sqlite3VdbeSorterNext(X,Y,Z)    SQLITE_OK
-# define sqlite3VdbeSorterCompare(X,Y,Z) SQLITE_OK
+# define sqlite3VdbeSorterCompare(X,Y)   0
 #else
 int sqlite3VdbeSorterInit(sqlite3 *, VdbeCursor *);
 void sqlite3VdbeSorterClose(sqlite3 *, VdbeCursor *);
@@ -435,7 +435,7 @@ int sqlite3VdbeSorterRowkey(const VdbeCursor *, Mem *);
 int sqlite3VdbeSorterNext(sqlite3 *, const VdbeCursor *, int *);
 int sqlite3VdbeSorterRewind(sqlite3 *, const VdbeCursor *, int *);
 int sqlite3VdbeSorterWrite(sqlite3 *, const VdbeCursor *, Mem *);
-int sqlite3VdbeSorterCompare(const VdbeCursor *, Mem *, int *);
+int sqlite3VdbeSorterCompare(const VdbeCursor *, Mem *);
 #endif
 
 #if !defined(SQLITE_OMIT_SHARED_CACHE) && SQLITE_THREADSAFE>0
index ba1e9f0f23311e57c6dbe185a94e45ec5db50c63..57999e4a52b58a8e4f33a68100fa2c2d837f2216 100644 (file)
@@ -39,7 +39,7 @@ typedef struct FileWriter FileWriter;
 ** The aIter[] array contains an iterator for each of the PMAs being merged.
 ** An aIter[] iterator either points to a valid key or else is at EOF. For 
 ** the purposes of the paragraphs below, we assume that the array is actually 
-** N elements in size, where N is the smallest power of 2 greater to or equal 
+** N elements in size, where N is the smallest power of 2 greater to or equal
 ** to the number of iterators being merged. The extra aIter[] elements are 
 ** treated as if they are empty (always at EOF).
 **
@@ -104,10 +104,21 @@ struct VdbeSorter {
   VdbeSorterIter *aIter;          /* Array of iterators to merge */
   int *aTree;                     /* Current state of incremental merge */
   sqlite3_file *pTemp1;           /* PMA file 1 */
-  SorterRecord *pRecord;          /* Head of in-memory record list */
+  SorterRecord *aRec[9];          /* Nine different types of records */
+  SorterRecord **aLastRec[9];     /* Locations to write the next pointers to */
   UnpackedRecord *pUnpacked;      /* Used to unpack keys */
 };
 
+#define SORTER_NULL     0
+#define SORTER_INT_NEG  1
+#define SORTER_INT_ZERO 2
+#define SORTER_INT_ONE  3
+#define SORTER_INT_POS  4
+#define SORTER_DOUBLE   5
+#define SORTER_TEXT     6
+#define SORTER_BLOB     7
+#define SORTER_LARGE    8
+
 /*
 ** The following type is an iterator for a PMA. It caches the current key in 
 ** variables nKey/aKey. If the iterator is at EOF, pFile==0.
@@ -366,7 +377,6 @@ static int vdbeSorterIterInit(
   return rc;
 }
 
-
 /*
 ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, 
 ** size nKey2 bytes).  Argument pKeyInfo supplies the collation functions
@@ -374,43 +384,74 @@ static int vdbeSorterIterInit(
 ** Otherwise, return SQLITE_OK and set *pRes to a negative, zero or positive
 ** value, depending on whether key1 is smaller, equal to or larger than key2.
 **
-** If the bOmitRowid argument is non-zero, assume both keys end in a rowid
-** field. For the purposes of the comparison, ignore it. Also, if bOmitRowid
-** is true and key1 contains even a single NULL value, it is considered to
-** be less than key2. Even if key2 also contains NULL values.
-**
 ** If pKey2 is passed a NULL pointer, then it is assumed that the pCsr->aSpace
 ** has been allocated and contains an unpacked record that is used as key2.
 */
-static void vdbeSorterCompare(
-  const VdbeCursor *pCsr,         /* Cursor object (for pKeyInfo) */
-  int bOmitRowid,                 /* Ignore rowid field at end of keys */
+static int vdbeSorterCompareRec(
+  void *p,                        /* VdbeCursor object */
   const void *pKey1, int nKey1,   /* Left side of comparison */
-  const void *pKey2, int nKey2,   /* Right side of comparison */
-  int *pRes                       /* OUT: Result of comparison */
+  const void *pKey2, int nKey2    /* Right side of comparison */
 ){
+  const VdbeCursor *pCsr = (VdbeCursor *)p;
   KeyInfo *pKeyInfo = pCsr->pKeyInfo;
-  VdbeSorter *pSorter = pCsr->pSorter;
-  UnpackedRecord *r2 = pSorter->pUnpacked;
-  int i;
+  UnpackedRecord *r2 = pCsr->pSorter->pUnpacked;
 
   if( pKey2 ){
     sqlite3VdbeRecordUnpack(pKeyInfo, nKey2, pKey2, r2);
   }
+  return sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
+}
 
-  if( bOmitRowid ){
-    r2->nField = pKeyInfo->nField;
-    assert( r2->nField>0 );
-    for(i=0; i<r2->nField; i++){
-      if( r2->aMem[i].flags & MEM_Null ){
-        *pRes = -1;
-        return;
-      }
-    }
-    r2->flags |= UNPACKED_PREFIX_MATCH;
+/*
+** Buffers pKey1 and pKey2 both contain encoded records. The first elements
+** of each are both either negative integers (if p!=0) or positive integers
+** greater than 1 (if p==0). Return a values less than, equal to or greater
+** than zero if the first field in pKey1 is less than, equal to or greater
+** than the first field in pKey2, respectively.
+*/
+static int vdbeSorterCompareInt(
+  void *p,
+  const void *pKey1, int nKey1,   /* Left side of comparison */
+  const void *pKey2, int nKey2    /* Right side of comparison */
+){
+  static const int aLen[] = {0, 1, 2, 3, 4, 6, 8};
+  const u8 *aKey1 = (u8 *)pKey1;
+  const u8 *aKey2 = (u8 *)pKey2;
+  int res = (int)aKey1[1] - (int)aKey2[1];
+
+  if( res==0 ){
+    res = memcmp(&aKey1[aKey1[0]], &aKey2[aKey2[0]], aLen[aKey1[1]]);
+  }else if( aKey1[aKey1[0]] & 0x80 ){
+    res = res * -1;
   }
+  return res;
+}
 
-  *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
+/*
+** Buffers pKey1 and pKey2 both contain encoded records. The first elements
+** of each are both either text or blob values. 
+**
+** Argument p points to a CollSeq structure. If the pKey1 and pKey2 buffers
+** contain blobs, then this is always the BINARY collation sequence. Either
+** way, compare the contents of the two buffers and return an integer less
+** than, equal to or greater than zero if the value in pKey1 is less than,
+** equal to or greater than that in pKey2, respectively.
+*/
+static int vdbeSorterCompareString(
+  void *p,                        /* Pointer to CollSeq object */
+  const void *pKey1, int nKey1,   /* Left side of comparison */
+  const void *pKey2, int nKey2    /* Right side of comparison */
+){
+  CollSeq *pColl = (CollSeq *)p;
+  const u8 *aKey1 = (u8 *)pKey1;
+  const u8 *aKey2 = (u8 *)pKey2;
+  int n1, n2;
+  int res;
+
+  n1 = (aKey1[1] - 12) / 2;
+  n2 = (aKey2[1] - 12) / 2;
+  res = pColl->xCmp(pColl->pUser, n1, &aKey1[aKey1[0]], n2, &aKey2[aKey2[0]]);
+  return res;
 }
 
 /*
@@ -446,8 +487,8 @@ static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){
   }else{
     int res;
     assert( pCsr->pSorter->pUnpacked!=0 );  /* allocated in vdbeSorterMerge() */
-    vdbeSorterCompare(
-        pCsr, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
+    res = vdbeSorterCompareRec(
+        (void *)pCsr, p1->aKey, p1->nKey, p2->aKey, p2->nKey
     );
     if( res<=0 ){
       iRes = i1;
@@ -460,6 +501,18 @@ static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){
   return SQLITE_OK;
 }
 
+
+/*
+** Set each entry of the aLastRec[] array to point to the corresponding entry
+** in the aRec[] array.
+*/
+static void vdbeSorterSetLastRec(VdbeSorter *pSorter){
+  int i;
+  for(i=0; i<ArraySize(pSorter->aLastRec); i++){
+    pSorter->aLastRec[i] = &pSorter->aRec[i];
+  }
+}
+
 /*
 ** Initialize the temporary index cursor just opened as a sorter cursor.
 */
@@ -487,6 +540,7 @@ int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){
     pSorter->mxPmaSize = mxCache * pgsz;
   }
 
+  vdbeSorterSetLastRec(pSorter);
   return SQLITE_OK;
 }
 
@@ -508,8 +562,8 @@ static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
 void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
   VdbeSorter *pSorter = pCsr->pSorter;
   if( pSorter ){
+    int i;
     if( pSorter->aIter ){
-      int i;
       for(i=0; i<pSorter->nTree; i++){
         vdbeSorterIterZero(db, &pSorter->aIter[i]);
       }
@@ -518,7 +572,11 @@ void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
     if( pSorter->pTemp1 ){
       sqlite3OsCloseFree(pSorter->pTemp1);
     }
-    vdbeSorterRecordFree(db, pSorter->pRecord);
+
+    for(i=0; i<ArraySize(pSorter->aRec); i++){
+      vdbeSorterRecordFree(db, pSorter->aRec[i]);
+    }
+
     sqlite3DbFree(db, pSorter->pUnpacked);
     sqlite3DbFree(db, pSorter);
     pCsr->pSorter = 0;
@@ -545,6 +603,8 @@ static int vdbeSorterOpenTempFile(sqlite3 *db, sqlite3_file **ppFile){
 */
 static void vdbeSorterMerge(
   const VdbeCursor *pCsr,         /* For pKeyInfo */
+  int (*xCmp)(void*,const void*,int,const void*,int),
+  void *pCtx,                     /* First argument to pass to xCmp() */
   SorterRecord *p1,               /* First list to merge */
   SorterRecord *p2,               /* Second list to merge */
   SorterRecord **ppOut            /* OUT: Head of merged list */
@@ -555,12 +615,15 @@ static void vdbeSorterMerge(
 
   while( p1 && p2 ){
     int res;
-    vdbeSorterCompare(pCsr, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res);
+    res = xCmp(pCtx, p1->pVal, p1->nVal, pVal2, p2->nVal);
+    if( xCmp!=vdbeSorterCompareRec && pCsr->pKeyInfo->aSortOrder[0] ){
+      res = res * -1;
+    }
     if( res<=0 ){
       *pp = p1;
       pp = &p1->pNext;
       p1 = p1->pNext;
-      pVal2 = 0;
+      if( xCmp==vdbeSorterCompareRec ) pVal2 = 0;
     }else{
       *pp = p2;
        pp = &p2->pNext;
@@ -573,6 +636,36 @@ static void vdbeSorterMerge(
   *ppOut = pFinal;
 }
 
+/*
+** Concatenate the linked lists headed at elements iStart through iEnd
+** (inclusive) of the pSorter->aRec[] array. Store the result in
+** pSorter->aRec[iEnd]. Set entries iStart through iEnd-1 to zero.
+**
+** If parameter bReverse is false, the lists are concatenated so that
+** all the elements of list iStart occur before those of iStart+1, and
+** so on. Or, if bReverse is true, the original content of iEnd is at
+** the start of the result, followed by the content of iEnd-1, etc.
+*/
+static void vdbeSorterConcatLists(
+  VdbeSorter *pSorter,
+  int bReverse,                   /* True to concenate in reverse order */
+  int iStart,
+  int iEnd
+){
+  int i;
+  for(i=iStart; i<iEnd; i++){
+    if( bReverse ){
+      *pSorter->aLastRec[i] = pSorter->aRec[iEnd];
+      pSorter->aRec[iEnd] = pSorter->aRec[i];
+    }else{
+      *pSorter->aLastRec[iEnd] = pSorter->aRec[i];
+      if( pSorter->aRec[i] ) pSorter->aLastRec[iEnd] = pSorter->aLastRec[i];
+    }
+    pSorter->aRec[i] = 0;
+    pSorter->aLastRec[i] = &pSorter->aRec[i];
+  }
+}
+
 /*
 ** Sort the linked list of records headed at pCsr->pRecord. Return SQLITE_OK
 ** if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if an error
@@ -580,33 +673,91 @@ static void vdbeSorterMerge(
 */
 static int vdbeSorterSort(const VdbeCursor *pCsr){
   int i;
+  int iRec;
   SorterRecord **aSlot;
   SorterRecord *p;
+  KeyInfo *pKeyInfo = pCsr->pKeyInfo;
   VdbeSorter *pSorter = pCsr->pSorter;
+  int bReverse = pCsr->pKeyInfo->aSortOrder[0];
 
   aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
   if( !aSlot ){
     return SQLITE_NOMEM;
   }
 
-  p = pSorter->pRecord;
-  while( p ){
-    SorterRecord *pNext = p->pNext;
-    p->pNext = 0;
-    for(i=0; aSlot[i]; i++){
-      vdbeSorterMerge(pCsr, p, aSlot[i], &p);
-      aSlot[i] = 0;
-    }
-    aSlot[i] = p;
-    p = pNext;
+  /* If there are one or more SORTER_LARGE records, or if there are 
+  ** SORTER_TEXT records that must be converted to a different encoding
+  ** before they can be compared, move everything to the SORTER_LARGE slot. */
+  if( pSorter->aRec[SORTER_LARGE] 
+   || (pSorter->aRec[SORTER_TEXT] && pKeyInfo->enc!=pKeyInfo->aColl[0]->enc)
+  ){
+    vdbeSorterConcatLists(pSorter, 0, 0, SORTER_LARGE);
+  }
+
+  /* If there are one or more SORTER_DOUBLE records, move all numeric
+  ** records to the SORTER_DOUBLE slot.  */
+  if( pSorter->aRec[SORTER_DOUBLE] ){
+    vdbeSorterConcatLists(pSorter, 0, SORTER_INT_NEG, SORTER_DOUBLE);
   }
 
-  p = 0;
-  for(i=0; i<64; i++){
-    vdbeSorterMerge(pCsr, p, aSlot[i], &p);
+  for(iRec=0; iRec<ArraySize(pSorter->aRec); iRec++){
+    void *pCtx = 0;
+    int (*xCmp)(void*,const void*,int,const void*,int);
+    switch( iRec ){
+      case SORTER_NULL: 
+      case SORTER_INT_ZERO: 
+      case SORTER_INT_ONE: 
+        xCmp = 0;
+        break;
+
+      case SORTER_INT_NEG: 
+      case SORTER_INT_POS:
+        xCmp = vdbeSorterCompareInt;
+        break;
+
+      case SORTER_BLOB: 
+        pCtx = (void *)(pCsr->pKeyInfo->db->pDfltColl);
+        xCmp = vdbeSorterCompareString;
+        break;
+
+      case SORTER_TEXT: 
+        pCtx = (void *)(pCsr->pKeyInfo->aColl[0]);
+        xCmp = vdbeSorterCompareString;
+        break;
+
+      default:
+        pCtx = (void *)pCsr;
+        xCmp = vdbeSorterCompareRec;
+        break;
+    }
+    if( !xCmp ) continue;
+
+    p = pSorter->aRec[iRec];
+    while( p ){
+      SorterRecord *pNext = p->pNext;
+      p->pNext = 0;
+      for(i=0; aSlot[i]; i++){
+        vdbeSorterMerge(pCsr, xCmp, pCtx, aSlot[i], p, &p);
+        aSlot[i] = 0;
+      }
+      aSlot[i] = p;
+      p = pNext;
+    }
+    p = 0;
+    for(i=0; i<64; i++){
+      vdbeSorterMerge(pCsr, xCmp, pCtx, aSlot[i], p, &p);
+      aSlot[i] = 0;
+    }
+    pSorter->aRec[iRec] = p;
+    if( p ){
+      SorterRecord *pRec;
+      for(pRec=pSorter->aRec[iRec]; pRec->pNext; pRec=pRec->pNext);
+      pSorter->aLastRec[iRec] = &pRec->pNext;
+    }
   }
-  pSorter->pRecord = p;
 
+  vdbeSorterConcatLists(pSorter, bReverse, 0, SORTER_LARGE);
+  vdbeSorterSetLastRec(pSorter);
   sqlite3_free(aSlot);
   return SQLITE_OK;
 }
@@ -711,15 +862,14 @@ static void fileWriterWriteVarint(FileWriter *p, u64 iVal){
 **       key). The varint is the number of bytes in the blob of data.
 */
 static int vdbeSorterListToPMA(sqlite3 *db, const VdbeCursor *pCsr){
-  int rc = SQLITE_OK;             /* Return code */
+  int rc;                         /* Return code */
   VdbeSorter *pSorter = pCsr->pSorter;
   FileWriter writer;
 
   memset(&writer, 0, sizeof(FileWriter));
 
   if( pSorter->nInMemory==0 ){
-    assert( pSorter->pRecord==0 );
-    return rc;
+    return SQLITE_OK;
   }
 
   rc = vdbeSorterSort(pCsr);
@@ -739,13 +889,13 @@ static int vdbeSorterListToPMA(sqlite3 *db, const VdbeCursor *pCsr){
     fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff);
     pSorter->nPMA++;
     fileWriterWriteVarint(&writer, pSorter->nInMemory);
-    for(p=pSorter->pRecord; p; p=pNext){
+    for(p=pSorter->aRec[SORTER_LARGE]; rc==SQLITE_OK && p; p=pNext){
       pNext = p->pNext;
       fileWriterWriteVarint(&writer, p->nVal);
       fileWriterWrite(&writer, p->pVal, p->nVal);
       sqlite3DbFree(db, p);
     }
-    pSorter->pRecord = p;
+    pSorter->aRec[SORTER_LARGE] = p;
     rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff);
   }
 
@@ -771,11 +921,48 @@ int sqlite3VdbeSorterWrite(
   if( pNew==0 ){
     rc = SQLITE_NOMEM;
   }else{
-    pNew->pVal = (void *)&pNew[1];
+    int iSlot;
+    u8 *aVal = pNew->pVal = (void *)&pNew[1];
     memcpy(pNew->pVal, pVal->z, pVal->n);
     pNew->nVal = pVal->n;
-    pNew->pNext = pSorter->pRecord;
-    pSorter->pRecord = pNew;
+
+    u8 n = aVal[0];
+    u8 t = aVal[1];
+
+    if( pCsr->pKeyInfo->nField!=1 || (t & 0x80) || (n & 0x80) ){
+      iSlot = SORTER_LARGE;
+    }else{
+      u8 t = aVal[1];
+      switch( t ){
+        case 0: 
+          iSlot = SORTER_NULL; 
+          break;
+
+        case 1: case 2: case 3: case 4: case 5: case 6:
+          iSlot = (aVal[n] & 0x80) ? SORTER_INT_NEG : SORTER_INT_POS; 
+          break;
+
+        case 7:
+          iSlot = SORTER_DOUBLE; 
+          break;
+
+        case 8:
+          iSlot = SORTER_INT_ZERO;
+          break;
+
+        case 9:
+          iSlot = SORTER_INT_ONE;
+          break;
+
+        default:
+          iSlot = SORTER_BLOB - (t & 0x01);
+          break;
+      }
+    }
+
+    pNew->pNext = 0;
+    *pSorter->aLastRec[iSlot] = pNew;
+    pSorter->aLastRec[iSlot] = &(pNew->pNext);
   }
 
   /* See if the contents of the sorter should now be written out. They
@@ -854,7 +1041,7 @@ int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
   ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
   ** from the in-memory list.  */
   if( pSorter->nPMA==0 ){
-    *pbEof = !pSorter->pRecord;
+    *pbEof = (pSorter->nInMemory==0);
     assert( pSorter->aTree==0 );
     return vdbeSorterSort(pCsr);
   }
@@ -963,11 +1150,11 @@ int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
 
     *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
   }else{
-    SorterRecord *pFree = pSorter->pRecord;
-    pSorter->pRecord = pFree->pNext;
+    SorterRecord *pFree = pSorter->aRec[SORTER_LARGE];
+    pSorter->aRec[SORTER_LARGE] = pFree->pNext;
     pFree->pNext = 0;
     vdbeSorterRecordFree(db, pFree);
-    *pbEof = !pSorter->pRecord;
+    *pbEof = !pSorter->aRec[SORTER_LARGE];
     rc = SQLITE_OK;
   }
   return rc;
@@ -988,8 +1175,8 @@ static void *vdbeSorterRowkey(
     *pnKey = pIter->nKey;
     pKey = pIter->aKey;
   }else{
-    *pnKey = pSorter->pRecord->nVal;
-    pKey = pSorter->pRecord->pVal;
+    *pnKey = pSorter->aRec[SORTER_LARGE]->nVal;
+    pKey = pSorter->aRec[SORTER_LARGE]->pVal;
   }
   return pKey;
 }
@@ -1008,7 +1195,6 @@ int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){
   pOut->n = nKey;
   MemSetTypeFlag(pOut, MEM_Blob);
   memcpy(pOut->z, pKey, nKey);
-
   return SQLITE_OK;
 }
 
@@ -1024,15 +1210,26 @@ int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){
 */
 int sqlite3VdbeSorterCompare(
   const VdbeCursor *pCsr,         /* Sorter cursor */
-  Mem *pVal,                      /* Value to compare to current sorter key */
-  int *pRes                       /* OUT: Result of comparison */
+  Mem *pVal                       /* Value to compare to current sorter key */
 ){
+  KeyInfo *pKeyInfo = pCsr->pKeyInfo;
   VdbeSorter *pSorter = pCsr->pSorter;
+  UnpackedRecord *r2 = pSorter->pUnpacked;
+  int i;
   void *pKey; int nKey;           /* Sorter key to compare pVal with */
 
   pKey = vdbeSorterRowkey(pSorter, &nKey);
-  vdbeSorterCompare(pCsr, 1, pVal->z, pVal->n, pKey, nKey, pRes);
-  return SQLITE_OK;
+  assert( pKey && pVal->z );
+  sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
+
+  r2->nField = pKeyInfo->nField;
+  assert( r2->nField>0 );
+  for(i=0; i<r2->nField; i++){
+    if( r2->aMem[i].flags & MEM_Null ) return -1;
+  }
+  r2->flags |= UNPACKED_PREFIX_MATCH;
+
+  return sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2);
 }
 
 #endif /* #ifndef SQLITE_OMIT_MERGE_SORT */
index 08d496b2596a0380455034930d974f4e20a816c2..84793c84d7429c962ad45fbdefe8e20f57156bd7 100644 (file)
@@ -464,4 +464,19 @@ do_test sort-12.1 {
   }
 } {1 2 xxx 1 3 yyy 1 1 zzz}
 
+foreach {tn shuffle} { 
+  1 {1 2 3 4}       2 {1 3 2 4} 
+  3 {4 3 2 1}       4 {1 3 2 4} 
+} {
+  do_test sort-13.$tn {
+    execsql { DROP TABLE IF EXISTS w1 }
+    execsql { CREATE TABLE w1(x) }
+    foreach i $shuffle {
+      execsql { INSERT INTO w1 VALUES($i) }
+    }
+    execsql { SELECT x FROM w1 ORDER BY x }
+  } {1 2 3 4}
+}
+
 finish_test
+