typedef struct SorterRecord SorterRecord; /* A record being sorted */
typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */
+typedef struct SortList SortList;
+typedef struct SortFile SortFile;
+typedef struct SortLevel SortLevel;
+
/*
-** Candidate values for SortSubtask.eWork
+** A file containing zero or more PMAs.
*/
-#define SORT_SUBTASK_SORT 1 /* Sort records on pList */
-#define SORT_SUBTASK_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pTemp1 */
-#define SORT_SUBTASK_CONS 3 /* Consolidate multiple PMAs */
+struct SortFile {
+ sqlite3_file *pFd; /* File handle */
+ i64 iOff; /* Current write offset */
+ i64 nByte; /* Actual size of file */
+ int nPMA; /* Number of PMA currently in file */
+};
+
+/*
+** A list of records.
+*/
+struct SortList {
+ SorterRecord *pRecord; /* List of records for pTask to sort */
+ int nInMemory; /* Expected size of PMA based on pList */
+ u8 *aMemory; /* Records memory (or NULL) */
+};
+
+struct SortLevel {
+ SortSubtask *pTask; /* Sorter task this level is a part of */
+ SQLiteThread *pThread; /* Thread handle, or NULL */
+ int bDone; /* Set to true by pThread when finished */
+ union {
+ SortFile f; /* Input for level 1 and greater */
+ SortList l; /* Input for level 0 */
+ } in;
+ SortFile out; /* Level storage */
+ SortLevel *pNext; /* Next level (containing larger PMAs) */
+ UnpackedRecord *pUnpacked; /* Space to unpack a record */
+};
/*
** Sorting is divided up into smaller subtasks. Each subtask is controlled
** remain in temp file SortSubtask.pTemp1.
*/
struct SortSubtask {
- SQLiteThread *pThread; /* Thread handle, or NULL */
- int bDone; /* Set to true by pTask when finished */
-
+ int iId; /* Sub-task id */
sqlite3 *db; /* Database connection */
+ VdbeSorter *pSorter; /* Sorter that owns this object */
KeyInfo *pKeyInfo; /* How to compare records */
- UnpackedRecord *pUnpacked; /* Space to unpack a record */
int pgsz; /* Main database page size */
-
- u8 eWork; /* One of the SORT_SUBTASK_* constants */
- int nConsolidate; /* For SORT_SUBTASK_CONS, max final PMAs */
- SorterRecord *pList; /* List of records for pTask to sort */
- int nInMemory; /* Expected size of PMA based on pList */
- u8 *aListMemory; /* Records memory (or NULL) */
-
- int nPMA; /* Number of PMAs currently in pTemp1 */
- i64 iTemp1Off; /* Offset to write to in pTemp1 */
- sqlite3_file *pTemp1; /* File to write PMAs to, or NULL */
+ int nConsolidate; /* For consolidation, max final PMAs */
+ SortLevel *pLevel; /* PMA level 0 */
};
** sorter cursor created by the VDBE.
*/
struct VdbeSorter {
- int nInMemory; /* Current size of pRecord list as PMA */
int mnPmaSize; /* Minimum PMA size, in bytes */
int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
int bUsePMA; /* True if one or more PMAs created */
- SorterRecord *pRecord; /* Head of in-memory record list */
MergeEngine *pMerger; /* For final merge of PMAs (by caller) */
- u8 *aMemory; /* Block of memory to alloc records from */
- int iMemory; /* Offset of first free byte in aMemory */
- int nMemory; /* Size of aMemory allocation in bytes */
- int iPrev; /* Previous thread used to flush PMA */
+ UnpackedRecord *pUnpacked; /* Used by sqlite3VdbeSorterCompare */
+ int iMemory; /* Offset of free byte in list.aMemory */
+ int nMemory; /* Size of list.aMemory allocation in bytes */
+ SortList list; /* In memory records */
+ int iPrev; /* Previous PMA flushed via task iPrev */
int nTask; /* Size of aTask[] array */
SortSubtask aTask[1]; /* One or more subtasks */
};
*/
static int vdbePmaReaderInit(
SortSubtask *pTask, /* Thread context */
+ SortFile *pFile, /* File to read from */
i64 iStart, /* Start offset in pTask->pTemp1 */
PmaReader *pIter, /* Iterator to populate */
i64 *pnByte /* IN/OUT: Increment this value by PMA size */
int nBuf = pTask->pgsz;
void *pMap = 0; /* Mapping of temp file */
- assert( pTask->iTemp1Off>iStart );
+ assert( pFile->iOff>iStart );
assert( pIter->aAlloc==0 );
assert( pIter->aBuffer==0 );
- pIter->pFile = pTask->pTemp1;
+ pIter->pFile = pFile->pFd;
pIter->iReadOff = iStart;
pIter->nAlloc = 128;
pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
if( pIter->aAlloc ){
/* Try to xFetch() a mapping of the entire temp file. If this is possible,
** the PMA will be read via the mapping. Otherwise, use xRead(). */
- if( pTask->iTemp1Off<=(i64)(pTask->db->nMaxSorterMmap) ){
- rc = sqlite3OsFetch(pIter->pFile, 0, pTask->iTemp1Off, &pMap);
+ if( pFile->iOff<=(i64)(pTask->db->nMaxSorterMmap) ){
+ rc = sqlite3OsFetch(pIter->pFile, 0, pFile->iOff, &pMap);
}
}else{
rc = SQLITE_NOMEM;
int iBuf = iStart % nBuf;
if( iBuf ){
int nRead = nBuf - iBuf;
- if( (iStart + nRead) > pTask->iTemp1Off ){
- nRead = (int)(pTask->iTemp1Off - iStart);
+ if( (iStart + nRead) > pFile->iOff ){
+ nRead = (int)(pFile->iOff - iStart);
}
rc = sqlite3OsRead(
- pTask->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
- );
+ pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
+ );
assert( rc!=SQLITE_IOERR_SHORT_READ );
}
}
if( rc==SQLITE_OK ){
u64 nByte; /* Size of PMA in bytes */
- pIter->iEof = pTask->iTemp1Off;
+ pIter->iEof = pFile->iOff;
rc = vdbePmaReadVarint(pIter, &nByte);
pIter->iEof = pIter->iReadOff + nByte;
*pnByte += nByte;
/*
** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2,
-** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
+** size nKey2 bytes). Use pKeyInfo for the collation sequences
** used by the comparison. Return the result of the comparison.
**
-** Before returning, object (pTask->pUnpacked) is populated with the
+** Before returning, object pUnpacked is populated with the
** unpacked version of key2. Or, if pKey2 is passed a NULL pointer, then it
-** is assumed that the (pTask->pUnpacked) structure already contains the
+** is assumed that the pUnpacked structure already contains the
** unpacked key to use as key2.
**
-** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
+** If an OOM error is encountered, (pUnpacked->error_rc) is set
** to SQLITE_NOMEM.
*/
static int vdbeSorterCompare(
- SortSubtask *pTask, /* Subtask context (for pKeyInfo) */
+ KeyInfo *pKeyInfo,
+ UnpackedRecord *r2,
const void *pKey1, int nKey1, /* Left side of comparison */
const void *pKey2, int nKey2 /* Right side of comparison */
){
- UnpackedRecord *r2 = pTask->pUnpacked;
if( pKey2 ){
- sqlite3VdbeRecordUnpack(pTask->pKeyInfo, nKey2, pKey2, r2);
+ sqlite3VdbeRecordUnpack(pKeyInfo, nKey2, pKey2, r2);
}
return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
}
** value to recalculate.
*/
static int vdbeSorterDoCompare(
- SortSubtask *pTask,
+ SortLevel *pLvl,
MergeEngine *pMerger,
int iOut
){
iRes = i1;
}else{
int res;
- assert( pTask->pUnpacked!=0 ); /* allocated in vdbeSortSubtaskMain() */
- res = vdbeSorterCompare(
- pTask, p1->aKey, p1->nKey, p2->aKey, p2->nKey
+ assert( pLvl->pUnpacked!=0 ); /* allocated in vdbeSorterThread() */
+ res = vdbeSorterCompare(pLvl->pTask->pKeyInfo, pLvl->pUnpacked,
+ p1->aKey, p1->nKey, p2->aKey, p2->nKey
);
if( res<=0 ){
iRes = i1;
pTask->pKeyInfo = pKeyInfo;
pTask->pgsz = pgsz;
pTask->db = db;
+ pTask->pSorter = pSorter;
}
if( !sqlite3TempInMemory(db) ){
if( sqlite3GlobalConfig.pHeap==0 ){
assert( pSorter->iMemory==0 );
pSorter->nMemory = pgsz;
- pSorter->aMemory = (u8*)sqlite3Malloc(pgsz);
- if( !pSorter->aMemory ) rc = SQLITE_NOMEM;
+ pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
+ if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM;
}
}
}
}
/*
-** Free all resources owned by the object indicated by argument pTask. All
-** fields of *pTask are zeroed before returning.
+** Free all resources owned by the object indicated by argument pTask.
+** This does not include joining any outstanding threads. All fields of
+** *pTask are zeroed before returning.
*/
static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
- sqlite3DbFree(db, pTask->pUnpacked);
- pTask->pUnpacked = 0;
- if( pTask->aListMemory==0 ){
- vdbeSorterRecordFree(0, pTask->pList);
- }else{
- sqlite3_free(pTask->aListMemory);
- pTask->aListMemory = 0;
- }
- pTask->pList = 0;
- if( pTask->pTemp1 ){
- sqlite3OsCloseFree(pTask->pTemp1);
- pTask->pTemp1 = 0;
+ SortLevel *pLvl;
+ SortLevel *pNext;
+ for(pLvl=pTask->pLevel; pLvl; pLvl=pNext){
+ pNext = pLvl->pNext;
+ assert( pLvl->pThread==0 );
+ if( pLvl==pTask->pLevel ){
+ if( pLvl->in.l.aMemory==0 ){
+ vdbeSorterRecordFree(0, pLvl->in.l.pRecord);
+ }else{
+ sqlite3_free(pLvl->in.l.aMemory);
+ }
+ }else{
+ if( pLvl->in.f.pFd ) sqlite3OsCloseFree(pLvl->in.f.pFd);
+ }
+ if( pLvl->out.pFd ) sqlite3OsCloseFree(pLvl->out.pFd);
+ sqlite3DbFree(db, pLvl->pUnpacked);
+ sqlite3_free(pLvl);
}
+ pTask->pLevel = 0;
+ pTask->nConsolidate = 0;
}
/*
int i;
for(i=0; i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i];
- if( pTask->pThread ){
- void *pRet;
- int rc2 = sqlite3ThreadJoin(pTask->pThread, &pRet);
- pTask->pThread = 0;
- pTask->bDone = 0;
- if( rc==SQLITE_OK ) rc = rc2;
- if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+ SortLevel *pLvl;
+ for(pLvl=pTask->pLevel; pLvl; pLvl=pLvl->pNext){
+ if( pLvl->pThread ){
+ void *pRet;
+ int rc2 = sqlite3ThreadJoin(pLvl->pThread, &pRet);
+ pLvl->pThread = 0;
+ pLvl->bDone = 0;
+ if( rc==SQLITE_OK ) rc = rc2;
+ if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+ }
}
}
return rc;
SortSubtask *pTask = &pSorter->aTask[i];
vdbeSortSubtaskCleanup(db, pTask);
}
- if( pSorter->aMemory==0 ){
- vdbeSorterRecordFree(0, pSorter->pRecord);
+ if( pSorter->list.aMemory==0 ){
+ vdbeSorterRecordFree(0, pSorter->list.pRecord);
}
- pSorter->pRecord = 0;
- pSorter->nInMemory = 0;
+ pSorter->list.pRecord = 0;
+ pSorter->list.nInMemory = 0;
pSorter->bUsePMA = 0;
pSorter->iMemory = 0;
}
if( pSorter ){
sqlite3VdbeSorterReset(db, pSorter);
vdbeMergeEngineFree(pSorter->pMerger);
- sqlite3_free(pSorter->aMemory);
+ sqlite3_free(pSorter->list.aMemory);
+ sqlite3DbFree(db, pSorter->pUnpacked);
sqlite3DbFree(db, pSorter);
pCsr->pSorter = 0;
}
** Set *ppOut to the head of the new list.
*/
static void vdbeSorterMerge(
- SortSubtask *pTask, /* Calling thread context */
+ KeyInfo *pKeyInfo,
+ UnpackedRecord *r2,
SorterRecord *p1, /* First list to merge */
SorterRecord *p2, /* Second list to merge */
SorterRecord **ppOut /* OUT: Head of merged list */
while( p1 && p2 ){
int res;
- res = vdbeSorterCompare(pTask, SRVAL(p1), p1->nVal, pVal2, p2->nVal);
+ res = vdbeSorterCompare(pKeyInfo, r2, SRVAL(p1), p1->nVal, pVal2, p2->nVal);
if( res<=0 ){
*pp = p1;
pp = &p1->u.pNext;
}
/*
-** Sort the linked list of records headed at pTask->pList. Return
+** Sort the linked list of records headed at pLvl->in.l.pRecord. Return
** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if
** an error occurs.
*/
-static int vdbeSorterSort(SortSubtask *pTask){
+static int vdbeSorterSort(
+ SortList *pList,
+ KeyInfo *pKeyInfo,
+ UnpackedRecord *pUnpacked
+){
int i;
SorterRecord **aSlot;
SorterRecord *p;
return SQLITE_NOMEM;
}
- p = pTask->pList;
+ p = pList->pRecord;
while( p ){
SorterRecord *pNext;
- if( pTask->aListMemory ){
- if( (u8*)p==pTask->aListMemory ){
+ if( pList->aMemory ){
+ if( (u8*)p==pList->aMemory ){
pNext = 0;
}else{
- assert( p->u.iNext<sqlite3MallocSize(pTask->aListMemory) );
- pNext = (SorterRecord*)&pTask->aListMemory[p->u.iNext];
+ assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
+ pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
}
}else{
pNext = p->u.pNext;
p->u.pNext = 0;
for(i=0; aSlot[i]; i++){
- vdbeSorterMerge(pTask, p, aSlot[i], &p);
+ vdbeSorterMerge(pKeyInfo, pUnpacked, p, aSlot[i], &p);
aSlot[i] = 0;
}
aSlot[i] = p;
p = 0;
for(i=0; i<64; i++){
- vdbeSorterMerge(pTask, p, aSlot[i], &p);
+ vdbeSorterMerge(pKeyInfo, pUnpacked, p, aSlot[i], &p);
}
- pTask->pList = p;
+ pList->pRecord = p;
sqlite3_free(aSlot);
return SQLITE_OK;
** Whether or not the file does end up memory mapped of course depends on
** the specific VFS implementation.
*/
-static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){
- if( nByte<=(i64)(db->nMaxSorterMmap) ){
- int rc = sqlite3OsTruncate(pFile, nByte);
+static void vdbeSorterExtendFile(
+ sqlite3 *db,
+ SortFile *pFile,
+ i64 nByte
+){
+ if( nByte<=(i64)(db->nMaxSorterMmap) && nByte>pFile->nByte ){
+ sqlite3_file *pFd = pFile->pFd;
+ int rc = sqlite3OsTruncate(pFd, nByte);
if( rc==SQLITE_OK ){
void *p = 0;
- sqlite3OsFetch(pFile, 0, nByte, &p);
- sqlite3OsUnfetch(pFile, 0, p);
+ sqlite3OsFetch(pFd, 0, nByte, &p);
+ sqlite3OsUnfetch(pFd, 0, p);
+ pFile->nByte = nByte;
}
}
}
# define vdbeSorterExtendFile(x,y,z) SQLITE_OK
#endif
-
-/*
-** Write the current contents of the in-memory linked-list to a PMA. Return
-** SQLITE_OK if successful, or an SQLite error code otherwise.
-**
-** The format of a PMA is:
-**
-** * A varint. This varint contains the total number of bytes of content
-** in the PMA (not including the varint itself).
-**
-** * One or more records packed end-to-end in order of ascending keys.
-** Each record consists of a varint followed by a blob of data (the
-** key). The varint is the number of bytes in the blob of data.
-*/
-static int vdbeSorterListToPMA(SortSubtask *pTask){
- int rc = SQLITE_OK; /* Return code */
- PmaWriter writer; /* Object used to write to the file */
-
- memset(&writer, 0, sizeof(PmaWriter));
- assert( pTask->nInMemory>0 );
-
- /* If the first temporary PMA file has not been opened, open it now. */
- if( pTask->pTemp1==0 ){
- rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->pTemp1);
- assert( rc!=SQLITE_OK || pTask->pTemp1 );
- assert( pTask->iTemp1Off==0 );
- assert( pTask->nPMA==0 );
- }
-
- /* Try to get the file to memory map */
- if( rc==SQLITE_OK ){
- vdbeSorterExtendFile(pTask->db,
- pTask->pTemp1, pTask->iTemp1Off + pTask->nInMemory + 9
- );
- }
-
- if( rc==SQLITE_OK ){
- SorterRecord *p;
- SorterRecord *pNext = 0;
-
- vdbePmaWriterInit(pTask->pTemp1, &writer, pTask->pgsz,
- pTask->iTemp1Off);
- pTask->nPMA++;
- vdbePmaWriteVarint(&writer, pTask->nInMemory);
- for(p=pTask->pList; p; p=pNext){
- pNext = p->u.pNext;
- vdbePmaWriteVarint(&writer, p->nVal);
- vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
- if( pTask->aListMemory==0 ) sqlite3_free(p);
- }
- pTask->pList = p;
- rc = vdbePmaWriterFinish(&writer, &pTask->iTemp1Off);
- }
-
- assert( pTask->pList==0 || rc!=SQLITE_OK );
- return rc;
-}
-
/*
** Advance the MergeEngine iterator passed as the second argument to
** the next entry. Set *pbEof to true if this means the iterator has
** Return SQLITE_OK if successful or an error code if an error occurs.
*/
static int vdbeSorterNext(
- SortSubtask *pTask,
- MergeEngine *pMerger,
+ SortLevel *pLvl,
+ MergeEngine *pMerger,
int *pbEof
){
int rc;
int iPrev = pMerger->aTree[1];/* Index of iterator to advance */
+ KeyInfo *pKeyInfo = pLvl->pTask->pKeyInfo;
+ UnpackedRecord *r2 = pLvl->pUnpacked;
/* Advance the current iterator */
rc = vdbePmaReaderNext(&pMerger->aIter[iPrev]);
}else if( pIter2->pFile==0 ){
iRes = -1;
}else{
- iRes = vdbeSorterCompare(pTask,
+ iRes = vdbeSorterCompare(pKeyInfo, r2,
pIter1->aKey, pIter1->nKey, pKey2, pIter2->nKey
);
}
/* If pIter1 contained the smaller value, set aTree[i] to its index.
** Then set pIter2 to the next iterator to compare to pIter1. In this
- ** case there is no cache of pIter2 in pTask->pUnpacked, so set
+ ** case there is no cache of pIter2 in pLvl->pUnpacked, so set
** pKey2 to point to the record belonging to pIter2.
**
** Alternatively, if pIter2 contains the smaller of the two values,
** set aTree[i] to its index and update pIter1. If vdbeSorterCompare()
- ** was actually called above, then pTask->pUnpacked now contains
+ ** was actually called above, then pLvl->pUnpacked now contains
** a value equivalent to pIter2. So set pKey2 to NULL to prevent
** vdbeSorterCompare() from decoding pIter2 again.
**
return rc;
}
-/*
-** The main routine for sorter-thread operations.
-*/
-static void *vdbeSortSubtaskMain(void *pCtx){
- int rc = SQLITE_OK;
- SortSubtask *pTask = (SortSubtask*)pCtx;
+static UnpackedRecord *vdbeSorterAllocUnpackedRecord(KeyInfo *pKeyInfo){
+ char *pFree;
+ UnpackedRecord *pRet;
+ pRet = sqlite3VdbeAllocUnpackedRecord(pKeyInfo, 0, 0, &pFree);
+ assert( pRet==(UnpackedRecord*)pFree );
+ if( pRet ){
+ pRet->nField = pKeyInfo->nField;
+ pRet->errCode = 0;
+ }
+ return pRet;
+}
- assert( pTask->eWork==SORT_SUBTASK_SORT
- || pTask->eWork==SORT_SUBTASK_TO_PMA
- || pTask->eWork==SORT_SUBTASK_CONS
- );
- assert( pTask->bDone==0 );
+#if 0
+static void vdbeSorterWorkDebug(SortLevel *pLvl, const char *zEvent){
+ i64 t;
+ SortLevel *p;
+ SortSubtask *pTask = pLvl->pTask;
+ int iTask = (pTask - pTask->pSorter->aTask);
+ int iLvl = 0;
+ for(p=pTask->pLevel; p!=pLvl; p=p->pNext) iLvl++;
+ sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
+ fprintf(stderr, "%lld:%d.%d %s\n", t, iTask, iLvl, zEvent);
+}
+static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
+ i64 t;
+ sqlite3OsCurrentTimeInt64(db->pVfs, &t);
+ fprintf(stderr, "%lld:X %s\n", t, zEvent);
+}
+#else
+# define vdbeSorterWorkDebug(x,y)
+# define vdbeSorterRewindDebug(x,y)
+#endif
- if( pTask->pUnpacked==0 ){
- char *pFree;
- pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
- pTask->pKeyInfo, 0, 0, &pFree
- );
- assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
- if( pFree==0 ){
+/*
+** Merge the data currently stored in (pLevel->in), if any, into a new PMA
+** stored within (pLevel->out).
+*/
+static int vdbeSorterWorkLevel(SortLevel *pLvl){
+ int rc = SQLITE_OK; /* Return code */
+ SortSubtask *pTask = pLvl->pTask;
+ MergeEngine *pMerger = 0;
+ SortFile *pOut = &pLvl->out; /* Write new PMA here */
+ i64 nOut = 0; /* Expected size of new PMA */
+ PmaWriter writer; /* Used to write new PMA to pOut */
+ int bEof = 0;
+
+ vdbeSorterWorkDebug(pLvl, "enter");
+
+ if( pLvl->pUnpacked==0 ){
+ pLvl->pUnpacked = vdbeSorterAllocUnpackedRecord(pTask->pKeyInfo);
+ if( pLvl->pUnpacked==0 ){
rc = SQLITE_NOMEM;
- goto thread_out;
+ goto work_level_out;
}
- pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
- pTask->pUnpacked->errCode = 0;
}
- if( pTask->eWork==SORT_SUBTASK_CONS ){
- assert( pTask->pList==0 );
- while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){
- int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT);
- sqlite3_file *pTemp2 = 0; /* Second temp file to use */
- MergeEngine *pMerger; /* Object for reading/merging PMA data */
- i64 iReadOff = 0; /* Offset in pTemp1 to read from */
- i64 iWriteOff = 0; /* Offset in pTemp2 to write to */
- int i;
-
- /* Allocate a merger object to merge PMAs together. */
- pMerger = vdbeMergeEngineNew(nIter);
+ if( pLvl->out.pFd==0 ){
+ assert( pLvl->out.iOff==0 );
+ assert( pLvl->out.nByte==0 );
+ assert( pLvl->out.nPMA==0 );
+ rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pLvl->out.pFd);
+ if( rc!=SQLITE_OK ) goto work_level_out;
+ }
+
+ if( pLvl==pTask->pLevel ){
+ if( pLvl->in.l.pRecord==0 ){
+ bEof = 1;
+ }else{
+ rc = vdbeSorterSort(&pLvl->in.l, pTask->pKeyInfo, pLvl->pUnpacked);
+ nOut = pLvl->in.l.nInMemory;
+ }
+ }else{
+ int nPMA = pLvl->in.f.nPMA;
+ if( nPMA==0 ){
+ bEof = 1;
+ }else{
+ pMerger = vdbeMergeEngineNew(nPMA);
if( pMerger==0 ){
rc = SQLITE_NOMEM;
- break;
- }
-
- /* Open a second temp file to write merged data to */
- rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2);
- if( rc==SQLITE_OK ){
- vdbeSorterExtendFile(pTask->db, pTemp2, pTask->iTemp1Off);
}else{
- vdbeMergeEngineFree(pMerger);
- break;
- }
-
- /* This loop runs once for each output PMA. Each output PMA is made
- ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */
- for(i=0; rc==SQLITE_OK && i<pTask->nPMA; i+=SORTER_MAX_MERGE_COUNT){
- PmaWriter writer; /* Object for writing data to pTemp2 */
- i64 nOut = 0; /* Bytes of data in output PMA */
- int bEof = 0;
- int rc2;
-
- /* Configure the merger object to read and merge data from the next
- ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs,
- ** if that is fewer). */
+ /* Configure the merger object to read and merge data from all
+ ** PMAs at pLvl. */
int iIter;
- for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
+ i64 iReadOff = 0;
+ for(iIter=0; iIter<nPMA && rc==SQLITE_OK; iIter++){
PmaReader *pIter = &pMerger->aIter[iIter];
- rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nOut);
+ rc = vdbePmaReaderInit(pTask, &pLvl->in.f, iReadOff, pIter, &nOut);
iReadOff = pIter->iEof;
- if( iReadOff>=pTask->iTemp1Off || rc!=SQLITE_OK ) break;
}
+
for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
- rc = vdbeSorterDoCompare(pTask, pMerger, iIter);
+ rc = vdbeSorterDoCompare(pLvl, pMerger, iIter);
}
+ }
+ }
+ }
+ if( rc!=SQLITE_OK ) goto work_level_out;
- vdbePmaWriterInit(pTemp2, &writer, pTask->pgsz, iWriteOff);
- vdbePmaWriteVarint(&writer, nOut);
- while( rc==SQLITE_OK && bEof==0 ){
- PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
- assert( pIter->pFile!=0 ); /* pIter is not at EOF */
- vdbePmaWriteVarint(&writer, pIter->nKey);
- vdbePmaWriteBlob(&writer, pIter->aKey, pIter->nKey);
- rc = vdbeSorterNext(pTask, pMerger, &bEof);
- }
- rc2 = vdbePmaWriterFinish(&writer, &iWriteOff);
- if( rc==SQLITE_OK ) rc = rc2;
+ /* If mmap is to be used, pre-extend and map the temp file. */
+ vdbeSorterExtendFile(pTask->db, &pLvl->out, pLvl->out.iOff + nOut + 9);
+
+ if( bEof==0 ){
+ vdbePmaWriterInit(pOut->pFd, &writer, pTask->pgsz, pOut->iOff);
+ vdbePmaWriteVarint(&writer, nOut);
+
+ while( rc==SQLITE_OK && bEof==0 ){
+ u8 *aKey; /* Next key to write to output */
+ int nKey; /* Size of aKey[] in bytes */
+ if( pMerger==0 ){
+ aKey = SRVAL(pLvl->in.l.pRecord);
+ nKey = pLvl->in.l.pRecord->nVal;
+ }else{
+ PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
+ assert( pIter->pFile ); /* pIter is not at EOF */
+ aKey = pIter->aKey;
+ nKey = pIter->nKey;
}
- vdbeMergeEngineFree(pMerger);
- sqlite3OsCloseFree(pTask->pTemp1);
- pTask->pTemp1 = pTemp2;
- pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT);
- pTask->iTemp1Off = iWriteOff;
+ vdbePmaWriteVarint(&writer, nKey);
+ vdbePmaWriteBlob(&writer, aKey, nKey);
+
+ if( pMerger==0 ){
+ SorterRecord *pNext = pLvl->in.l.pRecord->u.pNext;
+ if( pLvl->in.l.aMemory==0 ) sqlite3_free(pLvl->in.l.pRecord);
+ pLvl->in.l.pRecord = pNext;
+ bEof = (pNext==0);
+ }else{
+ rc = vdbeSorterNext(pLvl, pMerger, &bEof);
+ }
}
- }else{
- /* Sort the pTask->pList list */
- rc = vdbeSorterSort(pTask);
-
- /* If required, write the list out to a PMA. */
- if( rc==SQLITE_OK && pTask->eWork==SORT_SUBTASK_TO_PMA ){
-#ifdef SQLITE_DEBUG
- i64 nExpect = pTask->nInMemory
- + sqlite3VarintLen(pTask->nInMemory)
- + pTask->iTemp1Off;
-#endif
- rc = vdbeSorterListToPMA(pTask);
- assert( rc!=SQLITE_OK || (nExpect==pTask->iTemp1Off) );
+ rc = vdbePmaWriterFinish(&writer, &pOut->iOff);
+ pOut->nPMA++;
+
+ if( rc==SQLITE_OK && pMerger ){
+ sqlite3OsCloseFree(pLvl->in.f.pFd);
+ pLvl->in.f.pFd = 0;
}
+ vdbeMergeEngineFree(pMerger);
}
- thread_out:
- pTask->bDone = 1;
- if( rc==SQLITE_OK && pTask->pUnpacked->errCode ){
- assert( pTask->pUnpacked->errCode==SQLITE_NOMEM );
+ if( rc==SQLITE_OK && (
+ (pOut->nPMA>=SORTER_MAX_MERGE_COUNT)
+ || (pTask->nConsolidate && pLvl->pNext)
+ || (pTask->nConsolidate && pTask->nConsolidate<pOut->nPMA)
+ )){
+ SortLevel *pNext = pLvl->pNext;
+ if( pNext==0 ){
+ pNext = (SortLevel*)sqlite3_malloc(sizeof(SortLevel));
+ if( pNext==0 ){
+ rc = SQLITE_NOMEM;
+ goto work_level_out;
+ }
+ memset(pNext, 0, sizeof(SortLevel));
+ pLvl->pNext = pNext;
+ pNext->pTask = pTask;
+ }
+
+ /* If there is a thread running on the next level, block on it. */
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pNext->pThread ){
+ void *pRet;
+ rc = sqlite3ThreadJoin(pNext->pThread, &pRet);
+ pNext->pThread = 0;
+ pNext->bDone = 0;
+ if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+ if( rc!=SQLITE_OK ) goto work_level_out;
+ }
+#endif
+
+ pNext->in.f = pLvl->out;
+ memset(&pLvl->out, 0, sizeof(pLvl->out));
+ }
+
+ work_level_out:
+ vdbeSorterWorkDebug(pLvl, "exit");
+ if( rc==SQLITE_OK && pLvl->pUnpacked->errCode ){
+ assert( pLvl->pUnpacked->errCode==SQLITE_NOMEM );
rc = SQLITE_NOMEM;
}
- return SQLITE_INT_TO_PTR(rc);
+ return rc;
}
/*
** Run the activity scheduled by the object passed as the only argument
** in the current thread.
*/
-static int vdbeSorterRunTask(SortSubtask *pTask){
- int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pTask) );
- assert( pTask->bDone );
- pTask->bDone = 0;
+static int vdbeSorterRun(SortLevel *pLvl){
+ int rc;
+
+ assert( pLvl->bDone==0 );
+ assert( pLvl->pThread==0 );
+ while( 1 ){
+ rc = vdbeSorterWorkLevel(pLvl);
+ if( rc==SQLITE_OK && pLvl->pTask->pLevel==pLvl && pLvl->in.l.aMemory ){
+ assert( pLvl->pTask->pSorter->list.aMemory==0 );
+ assert( pLvl->in.l.pRecord==0 );
+ pLvl->pTask->pSorter->list.aMemory = pLvl->in.l.aMemory;
+ pLvl->in.l.aMemory = 0;
+ }
+
+ if( rc!=SQLITE_OK || pLvl->out.nPMA>0 ) break;
+ pLvl = pLvl->pNext;
+ assert( pLvl->bDone==0 );
+ assert( pLvl->pThread==0 );
+ }
+
+ pLvl->bDone = 0;
return rc;
}
+#if SQLITE_MAX_WORKER_THREADS>0
+static void *vdbeSorterThread(void *pCtx){
+ int rc;
+ SortLevel *pLvl = (SortLevel*)pCtx;
+
+ rc = vdbeSorterWorkLevel(pLvl);
+ if( rc==SQLITE_OK && pLvl->out.nPMA==0 ){
+ SortLevel *pNext = pLvl->pNext;
+ void *pCtx = (void*)pNext;
+ assert( pNext->pThread==0 );
+ rc = sqlite3ThreadCreate(&pNext->pThread, vdbeSorterThread, pCtx);
+ }
+
+ pLvl->bDone = 1;
+ return SQLITE_INT_TO_PTR(rc);
+}
+#endif
+
/*
** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly
** using a background thread.
VdbeSorter *pSorter = pCsr->pSorter;
int rc = SQLITE_OK;
int i;
- SortSubtask *pTask = 0; /* Thread context used to create new PMA */
- int nWorker = (pSorter->nTask-1);
+ SortSubtask *pTask = 0; /* Sub-task new PMA is written to */
+ SortLevel *pLevel; /* Level to write to */
+ /* Set the use-temp-files flag. */
pSorter->bUsePMA = 1;
- for(i=0; i<nWorker; i++){
- int iTest = (pSorter->iPrev + i + 1) % nWorker;
- pTask = &pSorter->aTask[iTest];
-#if SQLITE_MAX_WORKER_THREADS>0
- if( pTask->bDone ){
- void *pRet;
- assert( pTask->pThread );
- rc = sqlite3ThreadJoin(pTask->pThread, &pRet);
- pTask->pThread = 0;
- pTask->bDone = 0;
- if( rc==SQLITE_OK ){
- rc = SQLITE_PTR_TO_INT(pRet);
- }
+
+ /* Select one of the sub-tasks to flush this PMA. In single threaded
+ ** mode (pSorter->nTask==1), this is always aTask[0]. In multi-threaded mode,
+ ** it may be any of the pSorter->nTask sub-tasks. */
+ for(i=0; i<pSorter->nTask; i++){
+ pTask = &pSorter->aTask[i];
+ if( pTask->pLevel==0
+ || pTask->pLevel->pThread==0
+ || pTask->pLevel->bDone
+ ){
+ break;
}
-#endif
- if( pTask->pThread==0 ) break;
- pTask = 0;
}
- if( pTask==0 ){
- pTask = &pSorter->aTask[nWorker];
- }
- pSorter->iPrev = (pTask - pSorter->aTask);
- if( rc==SQLITE_OK ){
- assert( pTask->pThread==0 && pTask->bDone==0 );
- pTask->eWork = SORT_SUBTASK_TO_PMA;
- pTask->pList = pSorter->pRecord;
- pTask->nInMemory = pSorter->nInMemory;
- pSorter->nInMemory = 0;
- pSorter->pRecord = 0;
-
- if( pSorter->aMemory ){
- u8 *aMem = pTask->aListMemory;
- pTask->aListMemory = pSorter->aMemory;
- pSorter->aMemory = aMem;
+ /* If the first level for this task has not been allocated, allocate it. */
+ if( pTask->pLevel==0 ){
+ SortLevel *pNew = (SortLevel*)sqlite3_malloc(sizeof(SortLevel));
+ if( pNew==0 ){
+ rc = SQLITE_NOMEM;
+ }else{
+ memset(pNew, 0, sizeof(SortLevel));
+ pNew->pTask = pTask;
+ pTask->pLevel = pNew;
}
+ }
+ pLevel = pTask->pLevel;
+ /* If there is a background thread using the selected task, wait for
+ ** it to finish. */
#if SQLITE_MAX_WORKER_THREADS>0
- if( !bFg && pTask!=&pSorter->aTask[nWorker] ){
- /* Launch a background thread for this operation */
- void *pCtx = (void*)pTask;
- assert( pSorter->aMemory==0 || pTask->aListMemory!=0 );
- if( pTask->aListMemory ){
- if( pSorter->aMemory==0 ){
- pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
- if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
- }else{
- pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory);
- }
+ if( rc==SQLITE_OK && pLevel->pThread ){
+ void *pRet = 0;
+ rc = sqlite3ThreadJoin(pLevel->pThread, &pRet);
+ pLevel->pThread = 0;
+ pLevel->bDone = 0;
+ if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+ }
+#endif
+
+ if( rc==SQLITE_OK ){
+ u8 *aNewMem = 0;
+ if( pSorter->list.aMemory && pSorter->nTask>1 ){
+ aNewMem = pLevel->in.l.aMemory;
+ if( aNewMem==0 ){
+ aNewMem = sqlite3_malloc(pSorter->mxPmaSize);
+ if( aNewMem==0 ) rc = SQLITE_NOMEM;
}
- rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx);
- }else
+ }
+ assert( pLevel->in.l.pRecord==0 );
+ pLevel->in.l = pSorter->list;
+ pSorter->list.pRecord = 0;
+ pSorter->list.nInMemory = 0;
+ pSorter->list.aMemory = aNewMem;
+ if( rc==SQLITE_OK ){
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pSorter->nTask>1 ){
+ void *pCtx = (void*)pLevel;
+ rc = sqlite3ThreadCreate(&pLevel->pThread, vdbeSorterThread, pCtx);
+ pSorter->nMemory = aNewMem ? sqlite3MallocSize(aNewMem) : 0;
+ }else
#endif
- {
- /* Use the foreground thread for this operation */
- rc = vdbeSorterRunTask(pTask);
- if( rc==SQLITE_OK ){
- u8 *aMem = pTask->aListMemory;
- pTask->aListMemory = pSorter->aMemory;
- pSorter->aMemory = aMem;
- assert( pTask->pList==0 );
+ {
+ rc = vdbeSorterRun(pLevel);
}
}
}
nReq = pVal->n + sizeof(SorterRecord);
nPMA = pVal->n + sqlite3VarintLen(pVal->n);
if( pSorter->mxPmaSize ){
- if( pSorter->aMemory ){
+ if( pSorter->list.aMemory ){
bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
}else{
+ int nInMemory = pSorter->list.nInMemory;
bFlush = (
- (pSorter->nInMemory > pSorter->mxPmaSize)
- || (pSorter->nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
+ (nInMemory > pSorter->mxPmaSize)
+ || (nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
);
}
if( bFlush ){
rc = vdbeSorterFlushPMA(db, pCsr, 0);
- pSorter->nInMemory = 0;
+ pSorter->list.nInMemory = 0;
pSorter->iMemory = 0;
- assert( rc!=SQLITE_OK || pSorter->pRecord==0 );
+ assert( rc!=SQLITE_OK || pSorter->list.pRecord==0 );
}
}
- pSorter->nInMemory += nPMA;
+ pSorter->list.nInMemory += nPMA;
- if( pSorter->aMemory ){
+ if( pSorter->list.aMemory ){
int nMin = pSorter->iMemory + nReq;
if( nMin>pSorter->nMemory ){
if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
if( nNew < nMin ) nNew = nMin;
- aNew = sqlite3Realloc(pSorter->aMemory, nNew);
+ aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
if( !aNew ) return SQLITE_NOMEM;
- pSorter->pRecord = (SorterRecord*)(
- aNew + ((u8*)pSorter->pRecord - pSorter->aMemory)
+ pSorter->list.pRecord = (SorterRecord*)(
+ aNew + ((u8*)pSorter->list.pRecord - pSorter->list.aMemory)
);
- pSorter->aMemory = aNew;
+ pSorter->list.aMemory = aNew;
pSorter->nMemory = nNew;
}
- pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory];
+ pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
pSorter->iMemory += ROUND8(nReq);
- pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory;
+ pNew->u.iNext = (u8*)(pSorter->list.pRecord) - pSorter->list.aMemory;
}else{
pNew = (SorterRecord *)sqlite3Malloc(nReq);
if( pNew==0 ){
return SQLITE_NOMEM;
}
- pNew->u.pNext = pSorter->pRecord;
+ pNew->u.pNext = pSorter->list.pRecord;
}
memcpy(SRVAL(pNew), pVal->z, pVal->n);
pNew->nVal = pVal->n;
- pSorter->pRecord = pNew;
+ pSorter->list.pRecord = pNew;
return rc;
}
-/*
-** Return the total number of PMAs in all temporary files.
-*/
-static int vdbeSorterCountPMA(VdbeSorter *pSorter){
- int nPMA = 0;
- int i;
- for(i=0; i<pSorter->nTask; i++){
- nPMA += pSorter->aTask[i].nPMA;
- }
- return nPMA;
-}
/*
** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
VdbeSorter *pSorter = pCsr->pSorter;
int rc = SQLITE_OK; /* Return code */
+ int nTask = 0;
+ int i;
assert( pSorter );
/* If no data has been written to disk, then do not do so now. Instead,
- ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
- ** from the in-memory list. */
+ ** sort the VdbeSorter.list.pRecord list. The vdbe layer will read data
+ ** directly from the in-memory list. */
+ *pbEof = 0;
if( pSorter->bUsePMA==0 ){
- if( pSorter->pRecord ){
+ if( pSorter->list.pRecord ){
SortSubtask *pTask = &pSorter->aTask[0];
- *pbEof = 0;
- pTask->pList = pSorter->pRecord;
- pTask->eWork = SORT_SUBTASK_SORT;
- assert( pTask->aListMemory==0 );
- pTask->aListMemory = pSorter->aMemory;
- rc = vdbeSorterRunTask(pTask);
- pTask->aListMemory = 0;
- pSorter->pRecord = pTask->pList;
- pTask->pList = 0;
+ UnpackedRecord *pUnpack = vdbeSorterAllocUnpackedRecord(pTask->pKeyInfo);
+ if( pUnpack==0 ) return SQLITE_NOMEM;
+ rc = vdbeSorterSort(&pSorter->list, pTask->pKeyInfo, pUnpack);
+ sqlite3DbFree(db, pUnpack);
}else{
*pbEof = 1;
}
}
/* Write the current in-memory list to a PMA. */
- if( pSorter->pRecord ){
- rc = vdbeSorterFlushPMA(db, pCsr, 1);
+ if( pSorter->list.pRecord ){
+ rc = vdbeSorterFlushPMA(db, pCsr, 0);
}
/* Join all threads */
rc = vdbeSorterJoinAll(pSorter, rc);
- /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
- ** some of them together so that this is no longer the case. */
- if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
- int i;
- for(i=0; rc==SQLITE_OK && i<pSorter->nTask; i++){
- SortSubtask *pTask = &pSorter->aTask[i];
- if( pTask->pTemp1 ){
- pTask->nConsolidate = SORTER_MAX_MERGE_COUNT / pSorter->nTask;
- pTask->eWork = SORT_SUBTASK_CONS;
+ vdbeSorterRewindDebug(db, "rewind");
+
+ for(i=0; i<pSorter->nTask; i++){
+ if( pSorter->aTask[i].pLevel ) nTask++;
+ }
+ for(i=0; rc==SQLITE_OK && i<pSorter->nTask; i++){
+ SortSubtask *pTask = &pSorter->aTask[i];
+ if( pTask->pLevel ){
+ SortLevel *pLvl = pTask->pLevel;
+ pTask->nConsolidate = (SORTER_MAX_MERGE_COUNT / nTask);
#if SQLITE_MAX_WORKER_THREADS>0
- if( i<(pSorter->nTask-1) ){
- void *pCtx = (void*)pTask;
- rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx);
- }else
+ if( i<(pSorter->nTask-1) ){
+ void *pCtx = (void*)pLvl;
+ rc = sqlite3ThreadCreate(&pLvl->pThread, vdbeSorterThread, pCtx);
+ }else
#endif
- {
- rc = vdbeSorterRunTask(pTask);
- }
+ {
+ assert( pLvl->pThread==0 );
+ rc = vdbeSorterRun(pLvl);
}
}
}
int i;
MergeEngine *pMerger;
for(i=0; i<pSorter->nTask; i++){
- nIter += pSorter->aTask[i].nPMA;
+ SortSubtask *pTask = &pSorter->aTask[i];
+ if( pTask->pLevel ){
+ SortLevel *pLvl;
+ for(pLvl=pTask->pLevel; pLvl->pNext; pLvl=pLvl->pNext){
+ assert( pLvl->out.nPMA==0 );
+ }
+ nIter += pLvl->out.nPMA;
+ }
}
pSorter->pMerger = pMerger = vdbeMergeEngineNew(nIter);
rc = SQLITE_NOMEM;
}else{
int iIter = 0;
- int iThread = 0;
- for(iThread=0; iThread<pSorter->nTask; iThread++){
- int iPMA;
- i64 iReadOff = 0;
- SortSubtask *pTask = &pSorter->aTask[iThread];
- for(iPMA=0; iPMA<pTask->nPMA && rc==SQLITE_OK; iPMA++){
- i64 nDummy = 0;
- PmaReader *pIter = &pMerger->aIter[iIter++];
- rc = vdbePmaReaderInit(pTask, iReadOff, pIter, &nDummy);
- iReadOff = pIter->iEof;
+ for(i=0; i<pSorter->nTask; i++){
+ SortSubtask *pTask = &pSorter->aTask[i];
+ if( pTask->pLevel ){
+ int iPMA;
+ i64 iReadOff = 0;
+ SortLevel *pLvl;
+ for(pLvl=pTask->pLevel; pLvl->pNext; pLvl=pLvl->pNext);
+
+ for(iPMA=0; iPMA<pLvl->out.nPMA && rc==SQLITE_OK; iPMA++){
+ i64 nDummy = 0;
+ PmaReader *pIter = &pMerger->aIter[iIter++];
+ rc = vdbePmaReaderInit(pTask, &pLvl->out, iReadOff, pIter, &nDummy);
+ iReadOff = pIter->iEof;
+ }
}
}
for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
- rc = vdbeSorterDoCompare(&pSorter->aTask[0], pMerger, i);
+ rc = vdbeSorterDoCompare(pSorter->aTask[0].pLevel, pMerger, i);
}
}
}
if( rc==SQLITE_OK ){
*pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0);
}
+ vdbeSorterRewindDebug(db, "rewinddone");
return rc;
}
int rc; /* Return code */
if( pSorter->pMerger ){
- rc = vdbeSorterNext(&pSorter->aTask[0], pSorter->pMerger, pbEof);
+ rc = vdbeSorterNext(pSorter->aTask[0].pLevel, pSorter->pMerger, pbEof);
}else{
- SorterRecord *pFree = pSorter->pRecord;
- pSorter->pRecord = pFree->u.pNext;
+ SorterRecord *pFree = pSorter->list.pRecord;
+ pSorter->list.pRecord = pFree->u.pNext;
pFree->u.pNext = 0;
- if( pSorter->aMemory==0 ) vdbeSorterRecordFree(db, pFree);
- *pbEof = !pSorter->pRecord;
+ if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
+ *pbEof = !pSorter->list.pRecord;
rc = SQLITE_OK;
}
return rc;
*pnKey = pIter->nKey;
pKey = pIter->aKey;
}else{
- *pnKey = pSorter->pRecord->nVal;
- pKey = SRVAL(pSorter->pRecord);
+ *pnKey = pSorter->list.pRecord->nVal;
+ pKey = SRVAL(pSorter->list.pRecord);
}
return pKey;
}
int *pRes /* OUT: Result of comparison */
){
VdbeSorter *pSorter = pCsr->pSorter;
- UnpackedRecord *r2 = pSorter->aTask[0].pUnpacked;
+ UnpackedRecord *r2 = pSorter->pUnpacked;
KeyInfo *pKeyInfo = pCsr->pKeyInfo;
int i;
void *pKey; int nKey; /* Sorter key to compare pVal with */
- assert( r2->nField>=pKeyInfo->nField-nIgnore );
- r2->nField = pKeyInfo->nField-nIgnore;
+ if( r2==0 ){
+ r2 = vdbeSorterAllocUnpackedRecord(pSorter->aTask[0].pKeyInfo);
+ if( r2==0 ) return SQLITE_NOMEM;
+ pSorter->pUnpacked = r2;
+ assert( r2->nField>=pKeyInfo->nField-nIgnore );
+ r2->nField = pKeyInfo->nField-nIgnore;
+ }
+ assert( r2->nField==pKeyInfo->nField-nIgnore );
pKey = vdbeSorterRowkey(pSorter, &nKey);
sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);