typedef struct VdbeSorterIter VdbeSorterIter;
+typedef struct SorterThread SorterThread;
typedef struct SorterRecord SorterRecord;
+typedef struct SorterMerger SorterMerger;
typedef struct FileWriter FileWriter;
+
+/*
+** Maximum number of threads to use. Setting this value to 1 forces all
+** operations to be single-threaded.
+*/
+#ifndef SQLITE_MAX_SORTER_THREAD
+# define SQLITE_MAX_SORTER_THREAD 1
+#endif
+
+/*
+** Candidate values for SorterThread.eWork
+*/
+#define SORTER_THREAD_SORT 1
+#define SORTER_THREAD_TO_PMA 2
+#define SORTER_THREAD_CONS 3
+
+/*
+** Much of the work performed in this module to sort the list of records is
+** broken down into smaller units that may be peformed in parallel. In order
+** to perform such a unit of work, an instance of the following structure
+** is configured and passed to vdbeSorterThreadMain() - either directly by
+** the main thread or via a background thread.
+**
+** Exactly SQLITE_MAX_SORTER_THREAD instances of this structure are allocated
+** as part of each VdbeSorter object. Instances are never allocated any other
+** way.
+**
+** When a background thread is launched to perform work, SorterThread.bDone
+** is set to 0 and the SorterThread.pThread variable set to point to the
+** thread handle. SorterThread.bDone is set to 1 (to indicate to the main
+** thread that joining SorterThread.pThread will not block) before the thread
+** exits. SorterThread.pThread and bDone are always cleared after the
+** background thread has been joined.
+**
+** One object (specifically, VdbeSorter.aThread[SQLITE_MAX_SORTER_THREAD-1])
+** is reserved for the foreground thread.
+**
+** The nature of the work performed is determined by SorterThread.eWork,
+** as follows:
+**
+** SORTER_THREAD_SORT:
+** Sort the linked list of records at SorterThread.pList.
+**
+** SORTER_THREAD_TO_PMA:
+** Sort the linked list of records at SorterThread.pList, and write
+** the results to a new PMA in temp file SorterThread.pTemp1. Open
+** the temp file if it is not already open.
+**
+** SORTER_THREAD_CONS:
+** Merge existing PMAs until SorterThread.nConsolidate or fewer
+** remain in temp file SorterThread.pTemp1.
+*/
+struct SorterThread {
+ SQLiteThread *pThread; /* Thread handle, or NULL */
+ int bDone; /* Set to true by pThread when finished */
+
+ sqlite3_vfs *pVfs; /* VFS used to open temporary files */
+ KeyInfo *pKeyInfo; /* How to compare records */
+ UnpackedRecord *pUnpacked; /* Space to unpack a record */
+ int pgsz; /* Main database page size */
+
+ u8 eWork; /* One of the SORTER_THREAD_* constants */
+ int nConsolidate; /* For THREAD_CONS, max final PMAs */
+ SorterRecord *pList; /* List of records for pThread to sort */
+ int nInMemory; /* Expected size of PMA based on pList */
+
+ int nPMA; /* Number of PMAs currently in pTemp1 */
+ i64 iTemp1Off; /* Offset to write to in pTemp1 */
+ sqlite3_file *pTemp1; /* File to write PMAs to, or NULL */
+};
+
+
/*
** NOTES ON DATA STRUCTURE USED FOR N-WAY MERGES:
**
** key comparison operations are required, where N is the number of segments
** being merged (rounded up to the next power of 2).
*/
+struct SorterMerger {
+ int nTree; /* Used size of aTree/aIter (power of 2) */
+ int *aTree; /* Current state of incremental merge */
+ VdbeSorterIter *aIter; /* Array of iterators to merge data from */
+};
+
+/*
+** Main sorter structure. A single instance of this is allocated for each
+** sorter cursor created by the VDBE.
+*/
struct VdbeSorter {
- i64 iWriteOff; /* Current write offset within file pTemp1 */
- i64 iReadOff; /* Current read offset within file pTemp1 */
int nInMemory; /* Current size of pRecord list as PMA */
- int nTree; /* Used size of aTree/aIter (power of 2) */
- int nPMA; /* Number of PMAs stored in pTemp1 */
int mnPmaSize; /* Minimum PMA size, in bytes */
int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
- VdbeSorterIter *aIter; /* Array of iterators to merge */
- int *aTree; /* Current state of incremental merge */
- sqlite3_file *pTemp1; /* PMA file 1 */
+ int bUsePMA; /* True if one or more PMAs created */
SorterRecord *pRecord; /* Head of in-memory record list */
- UnpackedRecord *pUnpacked; /* Used to unpack keys */
+ SorterMerger *pMerger; /* For final merge of PMAs (by caller) */
+ SorterThread aThread[SQLITE_MAX_SORTER_THREAD];
};
/*
SorterRecord *pNext;
};
-/* Minimum allowable value for the VdbeSorter.nWorking variable */
+/* The minimum PMA size is set to this value multiplied by the database
+** page size in bytes. */
#define SORTER_MIN_WORKING 10
/* Maximum number of segments to merge in a single pass. */
** Free all memory belonging to the VdbeSorterIter object passed as the second
** argument. All structure fields are set to zero before returning.
*/
-static void vdbeSorterIterZero(sqlite3 *db, VdbeSorterIter *pIter){
- sqlite3DbFree(db, pIter->aAlloc);
- sqlite3DbFree(db, pIter->aBuffer);
+static void vdbeSorterIterZero(VdbeSorterIter *pIter){
+ sqlite3_free(pIter->aAlloc);
+ sqlite3_free(pIter->aBuffer);
memset(pIter, 0, sizeof(VdbeSorterIter));
}
** next call to this function.
*/
static int vdbeSorterIterRead(
- sqlite3 *db, /* Database handle (for malloc) */
VdbeSorterIter *p, /* Iterator */
int nByte, /* Bytes of data to read */
u8 **ppOut /* OUT: Pointer to buffer containing data */
/* Extend the p->aAlloc[] allocation if required. */
if( p->nAlloc<nByte ){
+ u8 *aNew;
int nNew = p->nAlloc*2;
while( nByte>nNew ) nNew = nNew*2;
- p->aAlloc = sqlite3DbReallocOrFree(db, p->aAlloc, nNew);
- if( !p->aAlloc ) return SQLITE_NOMEM;
+ aNew = sqlite3Realloc(p->aAlloc, nNew);
+ if( !aNew ) return SQLITE_NOMEM;
p->nAlloc = nNew;
+ p->aAlloc = aNew;
}
/* Copy as much data as is available in the buffer into the start of
nCopy = nRem;
if( nRem>p->nBuffer ) nCopy = p->nBuffer;
- rc = vdbeSorterIterRead(db, p, nCopy, &aNext);
+ rc = vdbeSorterIterRead(p, nCopy, &aNext);
if( rc!=SQLITE_OK ) return rc;
assert( aNext!=p->aAlloc );
memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
** Read a varint from the stream of data accessed by p. Set *pnOut to
** the value read.
*/
-static int vdbeSorterIterVarint(sqlite3 *db, VdbeSorterIter *p, u64 *pnOut){
+static int vdbeSorterIterVarint(VdbeSorterIter *p, u64 *pnOut){
int iBuf;
iBuf = p->iReadOff % p->nBuffer;
u8 aVarint[16], *a;
int i = 0, rc;
do{
- rc = vdbeSorterIterRead(db, p, 1, &a);
+ rc = vdbeSorterIterRead(p, 1, &a);
if( rc ) return rc;
aVarint[(i++)&0xf] = a[0];
}while( (a[0]&0x80)!=0 );
** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
** no error occurs, or an SQLite error code if one does.
*/
-static int vdbeSorterIterNext(
- sqlite3 *db, /* Database handle (for sqlite3DbMalloc() ) */
- VdbeSorterIter *pIter /* Iterator to advance */
-){
+static int vdbeSorterIterNext(VdbeSorterIter *pIter){
int rc; /* Return Code */
u64 nRec = 0; /* Size of record in bytes */
if( pIter->iReadOff>=pIter->iEof ){
/* This is an EOF condition */
- vdbeSorterIterZero(db, pIter);
+ vdbeSorterIterZero(pIter);
return SQLITE_OK;
}
- rc = vdbeSorterIterVarint(db, pIter, &nRec);
+ rc = vdbeSorterIterVarint(pIter, &nRec);
if( rc==SQLITE_OK ){
pIter->nKey = (int)nRec;
- rc = vdbeSorterIterRead(db, pIter, (int)nRec, &pIter->aKey);
+ rc = vdbeSorterIterRead(pIter, (int)nRec, &pIter->aKey);
}
return rc;
** PMA is empty).
*/
static int vdbeSorterIterInit(
- sqlite3 *db, /* Database handle */
- const VdbeSorter *pSorter, /* Sorter object */
- i64 iStart, /* Start offset in pFile */
+ SorterThread *pThread, /* Thread context */
+ i64 iStart, /* Start offset in pThread->pTemp1 */
VdbeSorterIter *pIter, /* Iterator to populate */
i64 *pnByte /* IN/OUT: Increment this value by PMA size */
){
int rc = SQLITE_OK;
- int nBuf;
+ int nBuf = pThread->pgsz;
- nBuf = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
-
- assert( pSorter->iWriteOff>iStart );
+ assert( pThread->iTemp1Off>iStart );
assert( pIter->aAlloc==0 );
assert( pIter->aBuffer==0 );
- pIter->pFile = pSorter->pTemp1;
+ pIter->pFile = pThread->pTemp1;
pIter->iReadOff = iStart;
pIter->nAlloc = 128;
- pIter->aAlloc = (u8 *)sqlite3DbMallocRaw(db, pIter->nAlloc);
+ pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
pIter->nBuffer = nBuf;
- pIter->aBuffer = (u8 *)sqlite3DbMallocRaw(db, nBuf);
+ pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
if( !pIter->aBuffer ){
rc = SQLITE_NOMEM;
iBuf = iStart % nBuf;
if( iBuf ){
int nRead = nBuf - iBuf;
- if( (iStart + nRead) > pSorter->iWriteOff ){
- nRead = (int)(pSorter->iWriteOff - iStart);
+ if( (iStart + nRead) > pThread->iTemp1Off ){
+ nRead = (int)(pThread->iTemp1Off - iStart);
}
rc = sqlite3OsRead(
- pSorter->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
+ pThread->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
);
assert( rc!=SQLITE_IOERR_SHORT_READ );
}
if( rc==SQLITE_OK ){
- u64 nByte; /* Size of PMA in bytes */
- pIter->iEof = pSorter->iWriteOff;
- rc = vdbeSorterIterVarint(db, pIter, &nByte);
+ u64 nByte;
+ pIter->iEof = pThread->iTemp1Off;
+ rc = vdbeSorterIterVarint(pIter, &nByte);
pIter->iEof = pIter->iReadOff + nByte;
*pnByte += nByte;
}
}
if( rc==SQLITE_OK ){
- rc = vdbeSorterIterNext(db, pIter);
+ rc = vdbeSorterIterNext(pIter);
}
return rc;
}
** has been allocated and contains an unpacked record that is used as key2.
*/
static void vdbeSorterCompare(
- const VdbeCursor *pCsr, /* Cursor object (for pKeyInfo) */
+ SorterThread *pThread, /* Thread context (for pKeyInfo) */
int nIgnore, /* Ignore the last nIgnore fields */
const void *pKey1, int nKey1, /* Left side of comparison */
const void *pKey2, int nKey2, /* Right side of comparison */
int *pRes /* OUT: Result of comparison */
){
- KeyInfo *pKeyInfo = pCsr->pKeyInfo;
- VdbeSorter *pSorter = pCsr->pSorter;
- UnpackedRecord *r2 = pSorter->pUnpacked;
+ KeyInfo *pKeyInfo = pThread->pKeyInfo;
+ UnpackedRecord *r2 = pThread->pUnpacked;
int i;
if( pKey2 ){
** multiple b-tree segments. Parameter iOut is the index of the aTree[]
** value to recalculate.
*/
-static int vdbeSorterDoCompare(const VdbeCursor *pCsr, int iOut){
- VdbeSorter *pSorter = pCsr->pSorter;
+static int vdbeSorterDoCompare(
+ SorterThread *pThread,
+ SorterMerger *pMerger,
+ int iOut
+){
int i1;
int i2;
int iRes;
VdbeSorterIter *p1;
VdbeSorterIter *p2;
- assert( iOut<pSorter->nTree && iOut>0 );
+ assert( iOut<pMerger->nTree && iOut>0 );
- if( iOut>=(pSorter->nTree/2) ){
- i1 = (iOut - pSorter->nTree/2) * 2;
+ if( iOut>=(pMerger->nTree/2) ){
+ i1 = (iOut - pMerger->nTree/2) * 2;
i2 = i1 + 1;
}else{
- i1 = pSorter->aTree[iOut*2];
- i2 = pSorter->aTree[iOut*2+1];
+ i1 = pMerger->aTree[iOut*2];
+ i2 = pMerger->aTree[iOut*2+1];
}
- p1 = &pSorter->aIter[i1];
- p2 = &pSorter->aIter[i2];
+ p1 = &pMerger->aIter[i1];
+ p2 = &pMerger->aIter[i2];
if( p1->pFile==0 ){
iRes = i2;
iRes = i1;
}else{
int res;
- assert( pCsr->pSorter->pUnpacked!=0 ); /* allocated in vdbeSorterMerge() */
+ assert( pThread->pUnpacked!=0 ); /* allocated in vdbeSorterMerge() */
vdbeSorterCompare(
- pCsr, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
+ pThread, 0, p1->aKey, p1->nKey, p2->aKey, p2->nKey, &res
);
if( res<=0 ){
iRes = i1;
}
}
- pSorter->aTree[iOut] = iRes;
+ pMerger->aTree[iOut] = iRes;
return SQLITE_OK;
}
*/
int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){
int pgsz; /* Page size of main database */
+ int i; /* Used to iterate through aThread[] */
int mxCache; /* Cache size */
VdbeSorter *pSorter; /* The new sorter */
- char *d; /* Dummy */
+ KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */
+ int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */
assert( pCsr->pKeyInfo && pCsr->pBt==0 );
- pCsr->pSorter = pSorter = sqlite3DbMallocZero(db, sizeof(VdbeSorter));
+ szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
+ pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sizeof(VdbeSorter)+szKeyInfo);
+ pCsr->pSorter = pSorter;
if( pSorter==0 ){
return SQLITE_NOMEM;
}
-
- pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pCsr->pKeyInfo, 0, 0, &d);
- if( pSorter->pUnpacked==0 ) return SQLITE_NOMEM;
- assert( pSorter->pUnpacked==(UnpackedRecord *)d );
+ pKeyInfo = (KeyInfo*)&pSorter[1];
+ memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
+ pKeyInfo->db = 0;
+ pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
+
+ for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+ SorterThread *pThread = &pSorter->aThread[i];
+ pThread->pKeyInfo = pKeyInfo;
+ pThread->pVfs = db->pVfs;
+ pThread->pgsz = pgsz;
+ }
if( !sqlite3TempInMemory(db) ){
- pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
mxCache = db->aDb[0].pSchema->cache_size;
if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
}
}
+/*
+** Free all resources owned by the object indicated by argument pThread. All
+** fields of *pThread are zeroed before returning.
+*/
+static void vdbeSorterThreadCleanup(sqlite3 *db, SorterThread *pThread){
+ sqlite3DbFree(db, pThread->pUnpacked);
+ vdbeSorterRecordFree(0, pThread->pList);
+ if( pThread->pTemp1 ){
+ sqlite3OsCloseFree(pThread->pTemp1);
+ }
+ memset(pThread, 0, sizeof(SorterThread));
+}
+
+/*
+** Join all threads.
+*/
+static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
+ int rc = rcin;
+ int i;
+ for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+ SorterThread *pThread = &pSorter->aThread[i];
+ if( pThread->pThread ){
+ void *pRet;
+ int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet);
+ pThread->pThread = 0;
+ pThread->bDone = 0;
+ if( rc==SQLITE_OK ) rc = rc2;
+ if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+ }
+ }
+ return rc;
+}
+
+/*
+** Allocate a new SorterMerger object with space for nIter iterators.
+*/
+static SorterMerger *vdbeSorterMergerNew(int nIter){
+ int N = 2; /* Smallest power of two >= nIter */
+ int nByte; /* Total bytes of space to allocate */
+ SorterMerger *pNew; /* Pointer to allocated object to return */
+
+ assert( nIter<=SORTER_MAX_MERGE_COUNT );
+ while( N<nIter ) N += N;
+ nByte = sizeof(SorterMerger) + N * (sizeof(int) + sizeof(VdbeSorterIter));
+
+ pNew = (SorterMerger*)sqlite3MallocZero(nByte);
+ if( pNew ){
+ pNew->nTree = N;
+ pNew->aIter = (VdbeSorterIter*)&pNew[1];
+ pNew->aTree = (int*)&pNew->aIter[N];
+ }
+
+ return pNew;
+}
+
+/*
+** Free the SorterMerger object passed as the only argument.
+*/
+static void vdbeSorterMergerFree(SorterMerger *pMerger){
+ if( pMerger ){
+ int i;
+ for(i=0; i<pMerger->nTree; i++){
+ vdbeSorterIterZero(&pMerger->aIter[i]);
+ }
+ sqlite3_free(pMerger);
+ }
+}
+
/*
** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
*/
void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
VdbeSorter *pSorter = pCsr->pSorter;
if( pSorter ){
- if( pSorter->aIter ){
- int i;
- for(i=0; i<pSorter->nTree; i++){
- vdbeSorterIterZero(db, &pSorter->aIter[i]);
- }
- sqlite3DbFree(db, pSorter->aIter);
- }
- if( pSorter->pTemp1 ){
- sqlite3OsCloseFree(pSorter->pTemp1);
+ int i;
+ vdbeSorterJoinAll(pSorter, SQLITE_OK);
+ for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+ SorterThread *pThread = &pSorter->aThread[i];
+ vdbeSorterThreadCleanup(db, pThread);
}
- vdbeSorterRecordFree(db, pSorter->pRecord);
- sqlite3DbFree(db, pSorter->pUnpacked);
+
+ vdbeSorterRecordFree(0, pSorter->pRecord);
+ vdbeSorterMergerFree(pSorter->pMerger);
sqlite3DbFree(db, pSorter);
pCsr->pSorter = 0;
}
** set *ppFile to point to the malloc'd file-handle and return SQLITE_OK.
** Otherwise, set *ppFile to 0 and return an SQLite error code.
*/
-static int vdbeSorterOpenTempFile(sqlite3 *db, sqlite3_file **ppFile){
+static int vdbeSorterOpenTempFile(sqlite3_vfs *pVfs, sqlite3_file **ppFile){
int dummy;
- return sqlite3OsOpenMalloc(db->pVfs, 0, ppFile,
+ return sqlite3OsOpenMalloc(pVfs, 0, ppFile,
SQLITE_OPEN_TEMP_JOURNAL |
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &dummy
** Set *ppOut to the head of the new list.
*/
static void vdbeSorterMerge(
- const VdbeCursor *pCsr, /* For pKeyInfo */
+ SorterThread *pThread, /* Calling thread context */
SorterRecord *p1, /* First list to merge */
SorterRecord *p2, /* Second list to merge */
SorterRecord **ppOut /* OUT: Head of merged list */
while( p1 && p2 ){
int res;
- vdbeSorterCompare(pCsr, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res);
+ vdbeSorterCompare(pThread, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res);
if( res<=0 ){
*pp = p1;
pp = &p1->pNext;
}
/*
-** Sort the linked list of records headed at pCsr->pRecord. Return SQLITE_OK
-** if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if an error
-** occurs.
+** Sort the linked list of records headed at pThread->pList. Return
+** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if
+** an error occurs.
*/
-static int vdbeSorterSort(const VdbeCursor *pCsr){
+static int vdbeSorterSort(SorterThread *pThread){
int i;
SorterRecord **aSlot;
SorterRecord *p;
- VdbeSorter *pSorter = pCsr->pSorter;
aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
if( !aSlot ){
return SQLITE_NOMEM;
}
- p = pSorter->pRecord;
+ p = pThread->pList;
while( p ){
SorterRecord *pNext = p->pNext;
p->pNext = 0;
for(i=0; aSlot[i]; i++){
- vdbeSorterMerge(pCsr, p, aSlot[i], &p);
+ vdbeSorterMerge(pThread, p, aSlot[i], &p);
aSlot[i] = 0;
}
aSlot[i] = p;
p = 0;
for(i=0; i<64; i++){
- vdbeSorterMerge(pCsr, p, aSlot[i], &p);
+ vdbeSorterMerge(pThread, p, aSlot[i], &p);
}
- pSorter->pRecord = p;
+ pThread->pList = p;
sqlite3_free(aSlot);
return SQLITE_OK;
** Initialize a file-writer object.
*/
static void fileWriterInit(
- sqlite3 *db, /* Database (for malloc) */
sqlite3_file *pFile, /* File to write to */
FileWriter *p, /* Object to populate */
+ int nBuf, /* Buffer size */
i64 iStart /* Offset of pFile to begin writing at */
){
- int nBuf = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
-
memset(p, 0, sizeof(FileWriter));
- p->aBuffer = (u8 *)sqlite3DbMallocRaw(db, nBuf);
+ p->aBuffer = (u8*)sqlite3Malloc(nBuf);
if( !p->aBuffer ){
p->eFWErr = SQLITE_NOMEM;
}else{
** Before returning, set *piEof to the offset immediately following the
** last byte written to the file.
*/
-static int fileWriterFinish(sqlite3 *db, FileWriter *p, i64 *piEof){
+static int fileWriterFinish(FileWriter *p, i64 *piEof){
int rc;
if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
p->eFWErr = sqlite3OsWrite(p->pFile,
);
}
*piEof = (p->iWriteOff + p->iBufEnd);
- sqlite3DbFree(db, p->aBuffer);
+ sqlite3_free(p->aBuffer);
rc = p->eFWErr;
memset(p, 0, sizeof(FileWriter));
return rc;
** Each record consists of a varint followed by a blob of data (the
** key). The varint is the number of bytes in the blob of data.
*/
-static int vdbeSorterListToPMA(sqlite3 *db, const VdbeCursor *pCsr){
+static int vdbeSorterListToPMA(SorterThread *pThread){
int rc = SQLITE_OK; /* Return code */
- VdbeSorter *pSorter = pCsr->pSorter;
- FileWriter writer;
+ FileWriter writer; /* Object used to write to the file */
memset(&writer, 0, sizeof(FileWriter));
-
- if( pSorter->nInMemory==0 ){
- assert( pSorter->pRecord==0 );
- return rc;
- }
-
- rc = vdbeSorterSort(pCsr);
+ assert( pThread->nInMemory>0 );
/* If the first temporary PMA file has not been opened, open it now. */
- if( rc==SQLITE_OK && pSorter->pTemp1==0 ){
- rc = vdbeSorterOpenTempFile(db, &pSorter->pTemp1);
- assert( rc!=SQLITE_OK || pSorter->pTemp1 );
- assert( pSorter->iWriteOff==0 );
- assert( pSorter->nPMA==0 );
+ if( pThread->pTemp1==0 ){
+ rc = vdbeSorterOpenTempFile(pThread->pVfs, &pThread->pTemp1);
+ assert( rc!=SQLITE_OK || pThread->pTemp1 );
+ assert( pThread->iTemp1Off==0 );
+ assert( pThread->nPMA==0 );
}
if( rc==SQLITE_OK ){
SorterRecord *p;
SorterRecord *pNext = 0;
- fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff);
- pSorter->nPMA++;
- fileWriterWriteVarint(&writer, pSorter->nInMemory);
- for(p=pSorter->pRecord; p; p=pNext){
+ fileWriterInit(pThread->pTemp1, &writer, pThread->pgsz, pThread->iTemp1Off);
+ pThread->nPMA++;
+ fileWriterWriteVarint(&writer, pThread->nInMemory);
+ for(p=pThread->pList; p; p=pNext){
pNext = p->pNext;
fileWriterWriteVarint(&writer, p->nVal);
fileWriterWrite(&writer, p->pVal, p->nVal);
- sqlite3DbFree(db, p);
+ sqlite3_free(p);
+ }
+ pThread->pList = p;
+ rc = fileWriterFinish(&writer, &pThread->iTemp1Off);
+ }
+
+ return rc;
+}
+
+/*
+** Advance the SorterMerger iterator passed as the second argument to
+** the next entry. Set *pbEof to true if this means the iterator has
+** reached EOF.
+**
+** Return SQLITE_OK if successful or an error code if an error occurs.
+*/
+static int vdbeSorterNext(
+ SorterThread *pThread,
+ SorterMerger *pMerger,
+ int *pbEof
+){
+ int rc;
+ int iPrev = pMerger->aTree[1];/* Index of iterator to advance */
+ int i; /* Index of aTree[] to recalculate */
+
+ /* Advance the current iterator */
+ rc = vdbeSorterIterNext(&pMerger->aIter[iPrev]);
+
+ /* Update contents of aTree[] */
+ for(i=(pMerger->nTree+iPrev)/2; rc==SQLITE_OK && i>0; i=i/2){
+ rc = vdbeSorterDoCompare(pThread, pMerger, i);
+ }
+
+ *pbEof = (pMerger->aIter[pMerger->aTree[1]].pFile==0);
+ return rc;
+}
+
+/*
+** The main routine for sorter-thread operations.
+*/
+static void *vdbeSorterThreadMain(void *pCtx){
+ int rc = SQLITE_OK;
+ SorterThread *pThread = (SorterThread*)pCtx;
+
+ assert( pThread->eWork==SORTER_THREAD_SORT
+ || pThread->eWork==SORTER_THREAD_TO_PMA
+ || pThread->eWork==SORTER_THREAD_CONS
+ );
+ assert( pThread->bDone==0 );
+
+ if( pThread->pUnpacked==0 ){
+ char *pFree;
+ pThread->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
+ pThread->pKeyInfo, 0, 0, &pFree
+ );
+ assert( pThread->pUnpacked==(UnpackedRecord*)pFree );
+ if( pFree==0 ){
+ rc = SQLITE_NOMEM;
+ goto thread_out;
+ }
+ }
+
+ if( pThread->eWork==SORTER_THREAD_CONS ){
+ assert( pThread->pList==0 );
+ while( pThread->nPMA>pThread->nConsolidate && rc==SQLITE_OK ){
+ int nIter = MIN(pThread->nPMA, SORTER_MAX_MERGE_COUNT);
+ sqlite3_file *pTemp2 = 0; /* Second temp file to use */
+ SorterMerger *pMerger; /* Object for reading/merging PMA data */
+ i64 iReadOff = 0; /* Offset in pTemp1 to read from */
+ i64 iWriteOff = 0; /* Offset in pTemp2 to write to */
+ int i;
+
+ /* Allocate a merger object to merge PMAs together. */
+ pMerger = vdbeSorterMergerNew(nIter);
+ if( pMerger==0 ){
+ rc = SQLITE_NOMEM;
+ break;
+ }
+
+ /* Open a second temp file to write merged data to */
+ rc = vdbeSorterOpenTempFile(pThread->pVfs, &pTemp2);
+ if( rc!=SQLITE_OK ){
+ vdbeSorterMergerFree(pMerger);
+ break;
+ }
+
+ /* This loop runs once for each output PMA. Each output PMA is made
+ ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */
+ for(i=0; i<pThread->nPMA; i+=SORTER_MAX_MERGE_COUNT){
+ FileWriter writer; /* Object for writing data to pTemp2 */
+ i64 nOut = 0; /* Bytes of data in output PMA */
+ int bEof = 0;
+ int rc2;
+
+ /* Configure the merger object to read and merge data from the next
+ ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs,
+ ** if that is fewer). */
+ int iIter;
+ for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
+ VdbeSorterIter *pIter = &pMerger->aIter[iIter];
+ rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nOut);
+ iReadOff = pIter->iEof;
+ if( iReadOff>=pThread->iTemp1Off || rc!=SQLITE_OK ) break;
+ }
+ for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
+ rc = vdbeSorterDoCompare(pThread, pMerger, iIter);
+ }
+
+ fileWriterInit(pTemp2, &writer, pThread->pgsz, iWriteOff);
+ fileWriterWriteVarint(&writer, nOut);
+ while( rc==SQLITE_OK && bEof==0 ){
+ VdbeSorterIter *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
+ assert( pIter->pFile!=0 ); /* pIter is not at EOF */
+ fileWriterWriteVarint(&writer, pIter->nKey);
+ fileWriterWrite(&writer, pIter->aKey, pIter->nKey);
+ rc = vdbeSorterNext(pThread, pMerger, &bEof);
+ }
+ rc2 = fileWriterFinish(&writer, &iWriteOff);
+ if( rc==SQLITE_OK ) rc = rc2;
+ }
+
+ vdbeSorterMergerFree(pMerger);
+ sqlite3OsCloseFree(pThread->pTemp1);
+ pThread->pTemp1 = pTemp2;
+ pThread->nPMA = (i / SORTER_MAX_MERGE_COUNT);
+ pThread->iTemp1Off = iWriteOff;
+ }
+ }else{
+ /* Sort the pThread->pList list */
+ rc = vdbeSorterSort(pThread);
+
+ /* If required, write the list out to a PMA. */
+ if( rc==SQLITE_OK && pThread->eWork==SORTER_THREAD_TO_PMA ){
+#ifdef SQLITE_DEBUG
+ i64 nExpect = pThread->nInMemory
+ + sqlite3VarintLen(pThread->nInMemory)
+ + pThread->iTemp1Off;
+#endif
+ rc = vdbeSorterListToPMA(pThread);
+ assert( rc!=SQLITE_OK || (nExpect==pThread->iTemp1Off) );
+ }
+ }
+
+ thread_out:
+ pThread->bDone = 1;
+ return SQLITE_INT_TO_PTR(rc);
+}
+
+/*
+** Run the activity scheduled by the object passed as the only argument
+** in the current thread.
+*/
+static int vdbeSorterRunThread(SorterThread *pThread){
+ int rc = SQLITE_PTR_TO_INT( vdbeSorterThreadMain((void*)pThread) );
+ assert( pThread->bDone );
+ pThread->bDone = 0;
+ return rc;
+}
+
+/*
+** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly
+** using a background thread.
+**
+** If argument bFg is non-zero, the operation always uses the calling thread.
+*/
+static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
+ VdbeSorter *pSorter = pCsr->pSorter;
+ int rc = SQLITE_OK;
+ int i;
+ SorterThread *pThread; /* Thread context used to create new PMA */
+
+ pSorter->bUsePMA = 1;
+ for(i=0; ALWAYS( i<SQLITE_MAX_SORTER_THREAD ); i++){
+ pThread = &pSorter->aThread[i];
+ if( pThread->bDone ){
+ void *pRet;
+ assert( pThread->pThread );
+ rc = sqlite3ThreadJoin(pThread->pThread, &pRet);
+ pThread->pThread = 0;
+ pThread->bDone = 0;
+ if( rc==SQLITE_OK ){
+ rc = SQLITE_PTR_TO_INT(pRet);
+ }
+ }
+ if( pThread->pThread==0 ) break;
+ }
+
+ if( rc==SQLITE_OK ){
+ assert( pThread->pThread==0 && pThread->bDone==0 );
+ pThread->eWork = SORTER_THREAD_TO_PMA;
+ pThread->pList = pSorter->pRecord;
+ pThread->nInMemory = pSorter->nInMemory;
+ pSorter->nInMemory = 0;
+ pSorter->pRecord = 0;
+
+ if( bFg || i<(SQLITE_MAX_SORTER_THREAD-1) ){
+ void *pCtx = (void*)pThread;
+ rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSorterThreadMain, pCtx);
+ }else{
+ /* Use the foreground thread for this operation */
+ rc = vdbeSorterRunThread(pThread);
}
- pSorter->pRecord = p;
- rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff);
}
return rc;
assert( pSorter );
pSorter->nInMemory += sqlite3VarintLen(pVal->n) + pVal->n;
- pNew = (SorterRecord *)sqlite3DbMallocRaw(db, pVal->n + sizeof(SorterRecord));
+ pNew = (SorterRecord *)sqlite3Malloc(pVal->n + sizeof(SorterRecord));
if( pNew==0 ){
rc = SQLITE_NOMEM;
}else{
(pSorter->nInMemory>pSorter->mxPmaSize)
|| (pSorter->nInMemory>pSorter->mnPmaSize && sqlite3HeapNearlyFull())
)){
-#ifdef SQLITE_DEBUG
- i64 nExpect = pSorter->iWriteOff
- + sqlite3VarintLen(pSorter->nInMemory)
- + pSorter->nInMemory;
-#endif
- rc = vdbeSorterListToPMA(db, pCsr);
- pSorter->nInMemory = 0;
- assert( rc!=SQLITE_OK || (nExpect==pSorter->iWriteOff) );
+ rc = vdbeSorterFlushPMA(db, pCsr, 0);
}
return rc;
}
/*
-** Helper function for sqlite3VdbeSorterRewind().
+** Return the total number of PMAs in all temporary files.
*/
-static int vdbeSorterInitMerge(
- sqlite3 *db, /* Database handle */
- const VdbeCursor *pCsr, /* Cursor handle for this sorter */
- i64 *pnByte /* Sum of bytes in all opened PMAs */
-){
- VdbeSorter *pSorter = pCsr->pSorter;
- int rc = SQLITE_OK; /* Return code */
- int i; /* Used to iterator through aIter[] */
- i64 nByte = 0; /* Total bytes in all opened PMAs */
-
- /* Initialize the iterators. */
- for(i=0; i<SORTER_MAX_MERGE_COUNT; i++){
- VdbeSorterIter *pIter = &pSorter->aIter[i];
- rc = vdbeSorterIterInit(db, pSorter, pSorter->iReadOff, pIter, &nByte);
- pSorter->iReadOff = pIter->iEof;
- assert( rc!=SQLITE_OK || pSorter->iReadOff<=pSorter->iWriteOff );
- if( rc!=SQLITE_OK || pSorter->iReadOff>=pSorter->iWriteOff ) break;
- }
-
- /* Initialize the aTree[] array. */
- for(i=pSorter->nTree-1; rc==SQLITE_OK && i>0; i--){
- rc = vdbeSorterDoCompare(pCsr, i);
+static int vdbeSorterCountPMA(VdbeSorter *pSorter){
+ int nPMA = 0;
+ int i;
+ for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+ nPMA += pSorter->aThread[i].nPMA;
}
-
- *pnByte = nByte;
- return rc;
+ return nPMA;
}
/*
*/
int sqlite3VdbeSorterRewind(sqlite3 *db, const VdbeCursor *pCsr, int *pbEof){
VdbeSorter *pSorter = pCsr->pSorter;
- int rc; /* Return code */
- sqlite3_file *pTemp2 = 0; /* Second temp file to use */
- i64 iWrite2 = 0; /* Write offset for pTemp2 */
- int nIter; /* Number of iterators used */
- int nByte; /* Bytes of space required for aIter/aTree */
- int N = 2; /* Power of 2 >= nIter */
+ int rc = SQLITE_OK; /* Return code */
assert( pSorter );
/* If no data has been written to disk, then do not do so now. Instead,
** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
** from the in-memory list. */
- if( pSorter->nPMA==0 ){
- *pbEof = !pSorter->pRecord;
- assert( pSorter->aTree==0 );
- return vdbeSorterSort(pCsr);
+ if( pSorter->bUsePMA==0 ){
+ if( pSorter->pRecord ){
+ SorterThread *pThread = &pSorter->aThread[0];
+ *pbEof = 0;
+ pThread->pList = pSorter->pRecord;
+ pThread->eWork = SORTER_THREAD_SORT;
+ rc = vdbeSorterRunThread(pThread);
+ pSorter->pRecord = pThread->pList;
+ pThread->pList = 0;
+ }else{
+ *pbEof = 1;
+ }
+ return rc;
}
/* Write the current in-memory list to a PMA. */
- rc = vdbeSorterListToPMA(db, pCsr);
- if( rc!=SQLITE_OK ) return rc;
+ if( pSorter->pRecord ){
+ rc = vdbeSorterFlushPMA(db, pCsr, 1);
+ }
- /* Allocate space for aIter[] and aTree[]. */
- nIter = pSorter->nPMA;
- if( nIter>SORTER_MAX_MERGE_COUNT ) nIter = SORTER_MAX_MERGE_COUNT;
- assert( nIter>0 );
- while( N<nIter ) N += N;
- nByte = N * (sizeof(int) + sizeof(VdbeSorterIter));
- pSorter->aIter = (VdbeSorterIter *)sqlite3DbMallocZero(db, nByte);
- if( !pSorter->aIter ) return SQLITE_NOMEM;
- pSorter->aTree = (int *)&pSorter->aIter[N];
- pSorter->nTree = N;
-
- do {
- int iNew; /* Index of new, merged, PMA */
-
- for(iNew=0;
- rc==SQLITE_OK && iNew*SORTER_MAX_MERGE_COUNT<pSorter->nPMA;
- iNew++
- ){
- int rc2; /* Return code from fileWriterFinish() */
- FileWriter writer; /* Object used to write to disk */
- i64 nWrite; /* Number of bytes in new PMA */
-
- memset(&writer, 0, sizeof(FileWriter));
-
- /* If there are SORTER_MAX_MERGE_COUNT or less PMAs in file pTemp1,
- ** initialize an iterator for each of them and break out of the loop.
- ** These iterators will be incrementally merged as the VDBE layer calls
- ** sqlite3VdbeSorterNext().
- **
- ** Otherwise, if pTemp1 contains more than SORTER_MAX_MERGE_COUNT PMAs,
- ** initialize interators for SORTER_MAX_MERGE_COUNT of them. These PMAs
- ** are merged into a single PMA that is written to file pTemp2.
- */
- rc = vdbeSorterInitMerge(db, pCsr, &nWrite);
- assert( rc!=SQLITE_OK || pSorter->aIter[ pSorter->aTree[1] ].pFile );
- if( rc!=SQLITE_OK || pSorter->nPMA<=SORTER_MAX_MERGE_COUNT ){
- break;
+ /* Join all threads */
+ rc = vdbeSorterJoinAll(pSorter, rc);
+
+ /* If there are more than SORTER_MAX_MERGE_COUNT PMAs on disk, merge
+ ** some of them together so that this is no longer the case. */
+ assert( SORTER_MAX_MERGE_COUNT>=SQLITE_MAX_SORTER_THREAD );
+ if( vdbeSorterCountPMA(pSorter)>SORTER_MAX_MERGE_COUNT ){
+ int i;
+ for(i=0; rc==SQLITE_OK && i<SQLITE_MAX_SORTER_THREAD; i++){
+ SorterThread *pThread = &pSorter->aThread[i];
+ if( pThread->pTemp1 ){
+ pThread->nConsolidate = SORTER_MAX_MERGE_COUNT/SQLITE_MAX_SORTER_THREAD;
+ pThread->eWork = SORTER_THREAD_CONS;
+
+ if( i<(SQLITE_MAX_SORTER_THREAD-1) ){
+ void *pCtx = (void*)pThread;
+ rc = sqlite3ThreadCreate(&pThread->pThread,vdbeSorterThreadMain,pCtx);
+ }else{
+ rc = vdbeSorterRunThread(pThread);
+ }
}
+ }
+ }
- /* Open the second temp file, if it is not already open. */
- if( pTemp2==0 ){
- assert( iWrite2==0 );
- rc = vdbeSorterOpenTempFile(db, &pTemp2);
- }
+ /* Join all threads */
+ rc = vdbeSorterJoinAll(pSorter, rc);
- if( rc==SQLITE_OK ){
- int bEof = 0;
- fileWriterInit(db, pTemp2, &writer, iWrite2);
- fileWriterWriteVarint(&writer, nWrite);
- while( rc==SQLITE_OK && bEof==0 ){
- VdbeSorterIter *pIter = &pSorter->aIter[ pSorter->aTree[1] ];
- assert( pIter->pFile );
+ /* Assuming no errors have occurred, set up a merger structure to read
+ ** and merge all remaining PMAs. */
+ assert( pSorter->pMerger==0 );
+ if( rc==SQLITE_OK ){
+ int nIter = 0; /* Number of iterators used */
+ int i;
+ SorterMerger *pMerger;
+ for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+ nIter += pSorter->aThread[i].nPMA;
+ }
- fileWriterWriteVarint(&writer, pIter->nKey);
- fileWriterWrite(&writer, pIter->aKey, pIter->nKey);
- rc = sqlite3VdbeSorterNext(db, pCsr, &bEof);
+ pSorter->pMerger = pMerger = vdbeSorterMergerNew(nIter);
+ if( pMerger==0 ){
+ rc = SQLITE_NOMEM;
+ }else{
+ int iIter = 0;
+ int iThread = 0;
+ for(iThread=0; iThread<SQLITE_MAX_SORTER_THREAD; iThread++){
+ int iPMA;
+ i64 iReadOff = 0;
+ SorterThread *pThread = &pSorter->aThread[iThread];
+ for(iPMA=0; iPMA<pThread->nPMA && rc==SQLITE_OK; iPMA++){
+ i64 nDummy = 0;
+ VdbeSorterIter *pIter = &pMerger->aIter[iIter++];
+ rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nDummy);
+ iReadOff = pIter->iEof;
}
- rc2 = fileWriterFinish(db, &writer, &iWrite2);
- if( rc==SQLITE_OK ) rc = rc2;
}
- }
- if( pSorter->nPMA<=SORTER_MAX_MERGE_COUNT ){
- break;
- }else{
- sqlite3_file *pTmp = pSorter->pTemp1;
- pSorter->nPMA = iNew;
- pSorter->pTemp1 = pTemp2;
- pTemp2 = pTmp;
- pSorter->iWriteOff = iWrite2;
- pSorter->iReadOff = 0;
- iWrite2 = 0;
+ for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
+ rc = vdbeSorterDoCompare(&pSorter->aThread[0], pMerger, i);
+ }
}
- }while( rc==SQLITE_OK );
+ }
- if( pTemp2 ){
- sqlite3OsCloseFree(pTemp2);
+ if( rc==SQLITE_OK ){
+ *pbEof = (pSorter->pMerger->aIter[pSorter->pMerger->aTree[1]].pFile==0);
}
- *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
return rc;
}
VdbeSorter *pSorter = pCsr->pSorter;
int rc; /* Return code */
- if( pSorter->aTree ){
- int iPrev = pSorter->aTree[1];/* Index of iterator to advance */
- int i; /* Index of aTree[] to recalculate */
-
- rc = vdbeSorterIterNext(db, &pSorter->aIter[iPrev]);
- for(i=(pSorter->nTree+iPrev)/2; rc==SQLITE_OK && i>0; i=i/2){
- rc = vdbeSorterDoCompare(pCsr, i);
- }
-
- *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
+ if( pSorter->pMerger ){
+ rc = vdbeSorterNext(&pSorter->aThread[0], pSorter->pMerger, pbEof);
}else{
SorterRecord *pFree = pSorter->pRecord;
pSorter->pRecord = pFree->pNext;
int *pnKey /* OUT: Size of current key in bytes */
){
void *pKey;
- if( pSorter->aTree ){
+ if( pSorter->pMerger ){
VdbeSorterIter *pIter;
- pIter = &pSorter->aIter[ pSorter->aTree[1] ];
+ pIter = &pSorter->pMerger->aIter[ pSorter->pMerger->aTree[1] ];
*pnKey = pIter->nKey;
pKey = pIter->aKey;
}else{
int *pRes /* OUT: Result of comparison */
){
VdbeSorter *pSorter = pCsr->pSorter;
+ SorterThread *pMain = &pSorter->aThread[0];
void *pKey; int nKey; /* Sorter key to compare pVal with */
pKey = vdbeSorterRowkey(pSorter, &nKey);
- vdbeSorterCompare(pCsr, nIgnore, pVal->z, pVal->n, pKey, nKey, pRes);
+ vdbeSorterCompare(pMain, nIgnore, pVal->z, pVal->n, pKey, nKey, pRes);
return SQLITE_OK;
}