** The aIter[] array contains an iterator for each of the PMAs being merged.
** An aIter[] iterator either points to a valid key or else is at EOF. For
** the purposes of the paragraphs below, we assume that the array is actually
-** N elements in size, where N is the smallest power of 2 greater to or equal
+** N elements in size, where N is the smallest power of 2 greater to or equal
** to the number of iterators being merged. The extra aIter[] elements are
** treated as if they are empty (always at EOF).
**
VdbeSorterIter *aIter; /* Array of iterators to merge */
int *aTree; /* Current state of incremental merge */
sqlite3_file *pTemp1; /* PMA file 1 */
- SorterRecord *pRecord; /* Head of in-memory record list */
+ SorterRecord *aRec[9]; /* Nine different types of records */
+ SorterRecord **aLastRec[9]; /* Locations to write the next pointers to */
UnpackedRecord *pUnpacked; /* Used to unpack keys */
};
+#define SORTER_NULL 0
+#define SORTER_INT_NEG 1
+#define SORTER_INT_ZERO 2
+#define SORTER_INT_ONE 3
+#define SORTER_INT_POS 4
+#define SORTER_DOUBLE 5
+#define SORTER_TEXT 6
+#define SORTER_BLOB 7
+#define SORTER_LARGE 8
+
/*
** The following type is an iterator for a PMA. It caches the current key in
** variables nKey/aKey. If the iterator is at EOF, pFile==0.
return rc;
}
-
/*
** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
** size nKey2 bytes). Argument pKeyInfo supplies the collation functions
** Otherwise, return SQLITE_OK and set *pRes to a negative, zero or positive
** value, depending on whether key1 is smaller, equal to or larger than key2.
**
-** If the bOmitRowid argument is non-zero, assume both keys end in a rowid
-** field. For the purposes of the comparison, ignore it. Also, if bOmitRowid
-** is true and key1 contains even a single NULL value, it is considered to
-** be less than key2. Even if key2 also contains NULL values.
-**
** If pKey2 is passed a NULL pointer, then it is assumed that the pCsr->aSpace
** has been allocated and contains an unpacked record that is used as key2.
*/
-static void vdbeSorterCompare(
- const VdbeCursor *pCsr, /* Cursor object (for pKeyInfo) */
- int bOmitRowid, /* Ignore rowid field at end of keys */
+static int vdbeSorterCompareRec(
+ void *p, /* VdbeCursor object */
const void *pKey1, int nKey1, /* Left side of comparison */
- const void *pKey2, int nKey2, /* Right side of comparison */
- int *pRes /* OUT: Result of comparison */
+ const void *pKey2, int nKey2 /* Right side of comparison */
){
+ const VdbeCursor *pCsr = (VdbeCursor *)p;
KeyInfo *pKeyInfo = pCsr->pKeyInfo;
- VdbeSorter *pSorter = pCsr->pSorter;
- UnpackedRecord *r2 = pSorter->pUnpacked;
- int i;
+ UnpackedRecord *r2 = pCsr->pSorter->pUnpacked;
if( pKey2 ){
sqlite3VdbeRecordUnpack(pKeyInfo, nKey2, pKey2, r2);
}
+ return sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
+}
- if( bOmitRowid ){
- r2->nField = pKeyInfo->nField;
- assert( r2->nField>0 );
- for(i=0; i<r2->nField; i++){
- if( r2->aMem[i].flags & MEM_Null ){
- *pRes = -1;
- return;
- }
- }
- r2->flags |= UNPACKED_PREFIX_MATCH;
+/*
+** Buffers pKey1 and pKey2 both contain encoded records. The first elements
+** of each are both either negative integers (if p!=0) or positive integers
+** greater than 1 (if p==0). Return a values less than, equal to or greater
+** than zero if the first field in pKey1 is less than, equal to or greater
+** than the first field in pKey2, respectively.
+*/
+static int vdbeSorterCompareInt(
+ void *p,
+ const void *pKey1, int nKey1, /* Left side of comparison */
+ const void *pKey2, int nKey2 /* Right side of comparison */
+){
+ static const int aLen[] = {0, 1, 2, 3, 4, 6, 8};
+ const u8 *aKey1 = (u8 *)pKey1;
+ const u8 *aKey2 = (u8 *)pKey2;
+ int res = (int)aKey1[1] - (int)aKey2[1];
+
+ if( res==0 ){
+ res = memcmp(&aKey1[aKey1[0]], &aKey2[aKey2[0]], aLen[aKey1[1]]);
+ }else if( aKey1[aKey1[0]] & 0x80 ){
+ res = res * -1;
}
+ return res;
+}
- *pRes = sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
+/*
+** Buffers pKey1 and pKey2 both contain encoded records. The first elements
+** of each are both either text or blob values.
+**
+** Argument p points to a CollSeq structure. If the pKey1 and pKey2 buffers
+** contain blobs, then this is always the BINARY collation sequence. Either
+** way, compare the contents of the two buffers and return an integer less
+** than, equal to or greater than zero if the value in pKey1 is less than,
+** equal to or greater than that in pKey2, respectively.
+*/
+static int vdbeSorterCompareString(
+ void *p, /* Pointer to CollSeq object */
+ const void *pKey1, int nKey1, /* Left side of comparison */
+ const void *pKey2, int nKey2 /* Right side of comparison */
+){
+ CollSeq *pColl = (CollSeq *)p;
+ const u8 *aKey1 = (u8 *)pKey1;
+ const u8 *aKey2 = (u8 *)pKey2;
+ int n1, n2;
+ int res;
+
+ n1 = (aKey1[1] - 12) / 2;
+ n2 = (aKey2[1] - 12) / 2;
+ res = pColl->xCmp(pColl->pUser, n1, &aKey1[aKey1[0]], n2, &aKey2[aKey2[0]]);
+ return res;
}
/*
}else{
int res;
assert( pCsr->pSorter->pUnpacked!=0 ); /* allocated in vdbeSorterMerge() */
- vdbeSorterCompare(
- pCsr, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
+ res = vdbeSorterCompareRec(
+ (void *)pCsr, p1->aKey, p1->nKey, p2->aKey, p2->nKey
);
if( res<=0 ){
iRes = i1;
return SQLITE_OK;
}
+
+/*
+** Set each entry of the aLastRec[] array to point to the corresponding entry
+** in the aRec[] array.
+*/
+static void vdbeSorterSetLastRec(VdbeSorter *pSorter){
+ int i;
+ for(i=0; i<ArraySize(pSorter->aLastRec); i++){
+ pSorter->aLastRec[i] = &pSorter->aRec[i];
+ }
+}
+
/*
** Initialize the temporary index cursor just opened as a sorter cursor.
*/
pSorter->mxPmaSize = mxCache * pgsz;
}
+ vdbeSorterSetLastRec(pSorter);
return SQLITE_OK;
}
void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
VdbeSorter *pSorter = pCsr->pSorter;
if( pSorter ){
+ int i;
if( pSorter->aIter ){
- int i;
for(i=0; i<pSorter->nTree; i++){
vdbeSorterIterZero(db, &pSorter->aIter[i]);
}
if( pSorter->pTemp1 ){
sqlite3OsCloseFree(pSorter->pTemp1);
}
- vdbeSorterRecordFree(db, pSorter->pRecord);
+
+ for(i=0; i<ArraySize(pSorter->aRec); i++){
+ vdbeSorterRecordFree(db, pSorter->aRec[i]);
+ }
+
sqlite3DbFree(db, pSorter->pUnpacked);
sqlite3DbFree(db, pSorter);
pCsr->pSorter = 0;
*/
static void vdbeSorterMerge(
const VdbeCursor *pCsr, /* For pKeyInfo */
+ int (*xCmp)(void*,const void*,int,const void*,int),
+ void *pCtx, /* First argument to pass to xCmp() */
SorterRecord *p1, /* First list to merge */
SorterRecord *p2, /* Second list to merge */
SorterRecord **ppOut /* OUT: Head of merged list */
while( p1 && p2 ){
int res;
- vdbeSorterCompare(pCsr, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res);
+ res = xCmp(pCtx, p1->pVal, p1->nVal, pVal2, p2->nVal);
+ if( xCmp!=vdbeSorterCompareRec && pCsr->pKeyInfo->aSortOrder[0] ){
+ res = res * -1;
+ }
if( res<=0 ){
*pp = p1;
pp = &p1->pNext;
p1 = p1->pNext;
- pVal2 = 0;
+ if( xCmp==vdbeSorterCompareRec ) pVal2 = 0;
}else{
*pp = p2;
pp = &p2->pNext;
*ppOut = pFinal;
}
+/*
+** Concatenate the linked lists headed at elements iStart through iEnd
+** (inclusive) of the pSorter->aRec[] array. Store the result in
+** pSorter->aRec[iEnd]. Set entries iStart through iEnd-1 to zero.
+**
+** If parameter bReverse is false, the lists are concatenated so that
+** all the elements of list iStart occur before those of iStart+1, and
+** so on. Or, if bReverse is true, the original content of iEnd is at
+** the start of the result, followed by the content of iEnd-1, etc.
+*/
+static void vdbeSorterConcatLists(
+ VdbeSorter *pSorter,
+ int bReverse, /* True to concenate in reverse order */
+ int iStart,
+ int iEnd
+){
+ int i;
+ for(i=iStart; i<iEnd; i++){
+ if( bReverse ){
+ *pSorter->aLastRec[i] = pSorter->aRec[iEnd];
+ pSorter->aRec[iEnd] = pSorter->aRec[i];
+ }else{
+ *pSorter->aLastRec[iEnd] = pSorter->aRec[i];
+ if( pSorter->aRec[i] ) pSorter->aLastRec[iEnd] = pSorter->aLastRec[i];
+ }
+ pSorter->aRec[i] = 0;
+ pSorter->aLastRec[i] = &pSorter->aRec[i];
+ }
+}
+
/*
** Sort the linked list of records headed at pCsr->pRecord. Return SQLITE_OK
** if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if an error
*/
static int vdbeSorterSort(const VdbeCursor *pCsr){
int i;
+ int iRec;
SorterRecord **aSlot;
SorterRecord *p;
+ KeyInfo *pKeyInfo = pCsr->pKeyInfo;
VdbeSorter *pSorter = pCsr->pSorter;
+ int bReverse = pCsr->pKeyInfo->aSortOrder[0];
aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
if( !aSlot ){
return SQLITE_NOMEM;
}
- p = pSorter->pRecord;
- while( p ){
- SorterRecord *pNext = p->pNext;
- p->pNext = 0;
- for(i=0; aSlot[i]; i++){
- vdbeSorterMerge(pCsr, p, aSlot[i], &p);
- aSlot[i] = 0;
- }
- aSlot[i] = p;
- p = pNext;
+ /* If there are one or more SORTER_LARGE records, or if there are
+ ** SORTER_TEXT records that must be converted to a different encoding
+ ** before they can be compared, move everything to the SORTER_LARGE slot. */
+ if( pSorter->aRec[SORTER_LARGE]
+ || (pSorter->aRec[SORTER_TEXT] && pKeyInfo->enc!=pKeyInfo->aColl[0]->enc)
+ ){
+ vdbeSorterConcatLists(pSorter, 0, 0, SORTER_LARGE);
+ }
+
+ /* If there are one or more SORTER_DOUBLE records, move all numeric
+ ** records to the SORTER_DOUBLE slot. */
+ if( pSorter->aRec[SORTER_DOUBLE] ){
+ vdbeSorterConcatLists(pSorter, 0, SORTER_INT_NEG, SORTER_DOUBLE);
}
- p = 0;
- for(i=0; i<64; i++){
- vdbeSorterMerge(pCsr, p, aSlot[i], &p);
+ for(iRec=0; iRec<ArraySize(pSorter->aRec); iRec++){
+ void *pCtx = 0;
+ int (*xCmp)(void*,const void*,int,const void*,int);
+ switch( iRec ){
+ case SORTER_NULL:
+ case SORTER_INT_ZERO:
+ case SORTER_INT_ONE:
+ xCmp = 0;
+ break;
+
+ case SORTER_INT_NEG:
+ case SORTER_INT_POS:
+ xCmp = vdbeSorterCompareInt;
+ break;
+
+ case SORTER_BLOB:
+ pCtx = (void *)(pCsr->pKeyInfo->db->pDfltColl);
+ xCmp = vdbeSorterCompareString;
+ break;
+
+ case SORTER_TEXT:
+ pCtx = (void *)(pCsr->pKeyInfo->aColl[0]);
+ xCmp = vdbeSorterCompareString;
+ break;
+
+ default:
+ pCtx = (void *)pCsr;
+ xCmp = vdbeSorterCompareRec;
+ break;
+ }
+ if( !xCmp ) continue;
+
+ p = pSorter->aRec[iRec];
+ while( p ){
+ SorterRecord *pNext = p->pNext;
+ p->pNext = 0;
+ for(i=0; aSlot[i]; i++){
+ vdbeSorterMerge(pCsr, xCmp, pCtx, aSlot[i], p, &p);
+ aSlot[i] = 0;
+ }
+ aSlot[i] = p;
+ p = pNext;
+ }
+ p = 0;
+ for(i=0; i<64; i++){
+ vdbeSorterMerge(pCsr, xCmp, pCtx, aSlot[i], p, &p);
+ aSlot[i] = 0;
+ }
+ pSorter->aRec[iRec] = p;
+ if( p ){
+ SorterRecord *pRec;
+ for(pRec=pSorter->aRec[iRec]; pRec->pNext; pRec=pRec->pNext);
+ pSorter->aLastRec[iRec] = &pRec->pNext;
+ }
}
- pSorter->pRecord = p;
+ vdbeSorterConcatLists(pSorter, bReverse, 0, SORTER_LARGE);
+ vdbeSorterSetLastRec(pSorter);
sqlite3_free(aSlot);
return SQLITE_OK;
}
** key). The varint is the number of bytes in the blob of data.
*/
static int vdbeSorterListToPMA(sqlite3 *db, const VdbeCursor *pCsr){
- int rc = SQLITE_OK; /* Return code */
+ int rc; /* Return code */
VdbeSorter *pSorter = pCsr->pSorter;
FileWriter writer;
memset(&writer, 0, sizeof(FileWriter));
if( pSorter->nInMemory==0 ){
- assert( pSorter->pRecord==0 );
- return rc;
+ return SQLITE_OK;
}
rc = vdbeSorterSort(pCsr);
fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff);
pSorter->nPMA++;
fileWriterWriteVarint(&writer, pSorter->nInMemory);
- for(p=pSorter->pRecord; p; p=pNext){
+ for(p=pSorter->aRec[SORTER_LARGE]; rc==SQLITE_OK && p; p=pNext){
pNext = p->pNext;
fileWriterWriteVarint(&writer, p->nVal);
fileWriterWrite(&writer, p->pVal, p->nVal);
sqlite3DbFree(db, p);
}
- pSorter->pRecord = p;
+ pSorter->aRec[SORTER_LARGE] = p;
rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff);
}
if( pNew==0 ){
rc = SQLITE_NOMEM;
}else{
- pNew->pVal = (void *)&pNew[1];
+ int iSlot;
+ u8 *aVal = pNew->pVal = (void *)&pNew[1];
memcpy(pNew->pVal, pVal->z, pVal->n);
pNew->nVal = pVal->n;
- pNew->pNext = pSorter->pRecord;
- pSorter->pRecord = pNew;
+
+ u8 n = aVal[0];
+ u8 t = aVal[1];
+
+ if( pCsr->pKeyInfo->nField!=1 || (t & 0x80) || (n & 0x80) ){
+ iSlot = SORTER_LARGE;
+ }else{
+ u8 t = aVal[1];
+ switch( t ){
+ case 0:
+ iSlot = SORTER_NULL;
+ break;
+
+ case 1: case 2: case 3: case 4: case 5: case 6:
+ iSlot = (aVal[n] & 0x80) ? SORTER_INT_NEG : SORTER_INT_POS;
+ break;
+
+ case 7:
+ iSlot = SORTER_DOUBLE;
+ break;
+
+ case 8:
+ iSlot = SORTER_INT_ZERO;
+ break;
+
+ case 9:
+ iSlot = SORTER_INT_ONE;
+ break;
+
+ default:
+ iSlot = SORTER_BLOB - (t & 0x01);
+ break;
+ }
+ }
+
+ pNew->pNext = 0;
+ *pSorter->aLastRec[iSlot] = pNew;
+ pSorter->aLastRec[iSlot] = &(pNew->pNext);
}
/* See if the contents of the sorter should now be written out. They
** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
** from the in-memory list. */
if( pSorter->nPMA==0 ){
- *pbEof = !pSorter->pRecord;
+ *pbEof = (pSorter->nInMemory==0);
assert( pSorter->aTree==0 );
return vdbeSorterSort(pCsr);
}
*pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
}else{
- SorterRecord *pFree = pSorter->pRecord;
- pSorter->pRecord = pFree->pNext;
+ SorterRecord *pFree = pSorter->aRec[SORTER_LARGE];
+ pSorter->aRec[SORTER_LARGE] = pFree->pNext;
pFree->pNext = 0;
vdbeSorterRecordFree(db, pFree);
- *pbEof = !pSorter->pRecord;
+ *pbEof = !pSorter->aRec[SORTER_LARGE];
rc = SQLITE_OK;
}
return rc;
*pnKey = pIter->nKey;
pKey = pIter->aKey;
}else{
- *pnKey = pSorter->pRecord->nVal;
- pKey = pSorter->pRecord->pVal;
+ *pnKey = pSorter->aRec[SORTER_LARGE]->nVal;
+ pKey = pSorter->aRec[SORTER_LARGE]->pVal;
}
return pKey;
}
pOut->n = nKey;
MemSetTypeFlag(pOut, MEM_Blob);
memcpy(pOut->z, pKey, nKey);
-
return SQLITE_OK;
}
*/
int sqlite3VdbeSorterCompare(
const VdbeCursor *pCsr, /* Sorter cursor */
- Mem *pVal, /* Value to compare to current sorter key */
- int *pRes /* OUT: Result of comparison */
+ Mem *pVal /* Value to compare to current sorter key */
){
+ KeyInfo *pKeyInfo = pCsr->pKeyInfo;
VdbeSorter *pSorter = pCsr->pSorter;
+ UnpackedRecord *r2 = pSorter->pUnpacked;
+ int i;
void *pKey; int nKey; /* Sorter key to compare pVal with */
pKey = vdbeSorterRowkey(pSorter, &nKey);
- vdbeSorterCompare(pCsr, 1, pVal->z, pVal->n, pKey, nKey, pRes);
- return SQLITE_OK;
+ assert( pKey && pVal->z );
+ sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
+
+ r2->nField = pKeyInfo->nField;
+ assert( r2->nField>0 );
+ for(i=0; i<r2->nField; i++){
+ if( r2->aMem[i].flags & MEM_Null ) return -1;
+ }
+ r2->flags |= UNPACKED_PREFIX_MATCH;
+
+ return sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2);
}
#endif /* #ifndef SQLITE_OMIT_MERGE_SORT */