typedef struct VdbeSorterIter VdbeSorterIter;
+typedef struct SorterThread SorterThread;
typedef struct SorterRecord SorterRecord;
+typedef struct SorterMerger SorterMerger;
typedef struct FileWriter FileWriter;
+
+/*
+** Maximum number of threads to use. Setting this value to 1 forces all
+** operations to be single-threaded.
+*/
+#ifndef SQLITE_MAX_SORTER_THREAD
+# define SQLITE_MAX_SORTER_THREAD 4
+#endif
+
+/*
+** Candidate values for SorterThread.eWork
+*/
+#define SORTER_THREAD_SORT 1
+#define SORTER_THREAD_TO_PMA 2
+#define SORTER_THREAD_CONS 3
+
+/*
+** Much of the work performed in this module to sort the list of records is
+** broken down into smaller units that may be peformed in parallel. In order
+** to perform such a unit of work, an instance of the following structure
+** is configured and passed to vdbeSorterThreadMain() - either directly by
+** the main thread or via a background thread.
+**
+** Exactly SQLITE_MAX_SORTER_THREAD instances of this structure are allocated
+** as part of each VdbeSorter object. Instances are never allocated any other
+** way.
+**
+** When a background thread is launched to perform work, SorterThread.bDone
+** is set to 0 and the SorterThread.pThread variable set to point to the
+** thread handle. SorterThread.bDone is set to 1 (to indicate to the main
+** thread that joining SorterThread.pThread will not block) before the thread
+** exits. SorterThread.pThread and bDone are always cleared after the
+** background thread has been joined.
+**
+** One object (specifically, VdbeSorter.aThread[SQLITE_MAX_SORTER_THREAD-1])
+** is reserved for the foreground thread.
+**
+** The nature of the work performed is determined by SorterThread.eWork,
+** as follows:
+**
+** SORTER_THREAD_SORT:
+** Sort the linked list of records at SorterThread.pList.
+**
+** SORTER_THREAD_TO_PMA:
+** Sort the linked list of records at SorterThread.pList, and write
+** the results to a new PMA in temp file SorterThread.pTemp1. Open
+** the temp file if it is not already open.
+**
+** SORTER_THREAD_CONS:
+** Merge existing PMAs until SorterThread.nConsolidate or fewer
+** remain in temp file SorterThread.pTemp1.
+*/
+struct SorterThread {
+ SQLiteThread *pThread; /* Thread handle, or NULL */
+ int bDone; /* Set to true by pThread when finished */
+
+ sqlite3_vfs *pVfs; /* VFS used to open temporary files */
+ KeyInfo *pKeyInfo; /* How to compare records */
+ UnpackedRecord *pUnpacked; /* Space to unpack a record */
+ int pgsz; /* Main database page size */
+
+ u8 eWork; /* One of the SORTER_THREAD_* constants */
+ int nConsolidate; /* For THREAD_CONS, max final PMAs */
+ SorterRecord *pList; /* List of records for pThread to sort */
+ int nInMemory; /* Expected size of PMA based on pList */
++ u8 *aListMemory; /* Records memory (or NULL) */
+
+ int nPMA; /* Number of PMAs currently in pTemp1 */
+ i64 iTemp1Off; /* Offset to write to in pTemp1 */
+ sqlite3_file *pTemp1; /* File to write PMAs to, or NULL */
+};
+
+
/*
** NOTES ON DATA STRUCTURE USED FOR N-WAY MERGES:
**
** key comparison operations are required, where N is the number of segments
** being merged (rounded up to the next power of 2).
*/
+struct SorterMerger {
+ int nTree; /* Used size of aTree/aIter (power of 2) */
+ int *aTree; /* Current state of incremental merge */
+ VdbeSorterIter *aIter; /* Array of iterators to merge data from */
+};
+
+/*
+** Main sorter structure. A single instance of this is allocated for each
+** sorter cursor created by the VDBE.
+*/
struct VdbeSorter {
- i64 iWriteOff; /* Current write offset within file pTemp1 */
- i64 iReadOff; /* Current read offset within file pTemp1 */
int nInMemory; /* Current size of pRecord list as PMA */
- int nTree; /* Used size of aTree/aIter (power of 2) */
- int nPMA; /* Number of PMAs stored in pTemp1 */
int mnPmaSize; /* Minimum PMA size, in bytes */
int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
- VdbeSorterIter *aIter; /* Array of iterators to merge */
- int *aTree; /* Current state of incremental merge */
- sqlite3_file *pTemp1; /* PMA file 1 */
+ int bUsePMA; /* True if one or more PMAs created */
SorterRecord *pRecord; /* Head of in-memory record list */
- UnpackedRecord *pUnpacked; /* Used to unpack keys */
- u8* aMemory; /* Block to allocate records from */
- int iMemory; /* Offset of free space in aMemory */
- int nMemory; /* Current size of allocation at aMemory */
+ SorterMerger *pMerger; /* For final merge of PMAs (by caller) */
++ u8 *aMemory; /* Block of memory to alloc records from */
++ int iMemory; /* Offset of first free byte in aMemory */
++ int nMemory; /* Size of aMemory allocation in bytes */
+ SorterThread aThread[SQLITE_MAX_SORTER_THREAD];
};
/*
u8 *aKey; /* Pointer to current key */
u8 *aBuffer; /* Current read buffer */
int nBuffer; /* Size of read buffer in bytes */
- u8 *aMap; /* Pointer to mapping of pFile */
++ u8 *aMap; /* Pointer to mapping of entire file */
};
/*
/*
** A structure to store a single record. All in-memory records are connected
- ** together into a linked list headed at VdbeSorter.pRecord using the
- ** SorterRecord.pNext pointer.
+ ** together into a linked list headed at VdbeSorter.pRecord.
+ **
+ ** How the linked list is connected depends on how memory is being managed
+ ** by this module. If using a separate allocation for each in-memory record
-** (VdbeSorter.aMemory==0), then the list is always connected using the
++** (VdbeSorter.aMemory==0), then the list is always connected using the
+ ** SorterRecord.u.pNext pointers.
+ **
+ ** Or, if using the single large allocation method (VdbeSorter.aMemory!=0),
+ ** then while records are being accumulated the list is linked using the
+ ** SorterRecord.u.iNext offset. This is because the aMemory[] array may
+ ** be sqlite3Realloc()ed while records are being accumulated. Once the VM
+ ** has finished passing records to the sorter, or when the in-memory buffer
+ ** is full, the list is sorted. As part of the sorting process, it is
+ ** converted to use the SorterRecord.u.pNext pointers. See function
+ ** vdbeSorterSort() for details.
*/
struct SorterRecord {
- void *pVal;
int nVal;
- SorterRecord *pNext;
+ union {
+ SorterRecord *pNext; /* Pointer to next record in list */
+ int iNext; /* Offset within aMemory of next record */
+ } u;
};
-/* Minimum allowable value for the VdbeSorter.nWorking variable */
+ /* Return a pointer to the buffer containing the record data for SorterRecord
+ ** object p. Should be used as if:
+ **
+ ** void *SRVAL(SorterRecord *p) { return (void*)&p[1]; }
+ */
+ #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
+
+/* The minimum PMA size is set to this value multiplied by the database
+** page size in bytes. */
#define SORTER_MIN_WORKING 10
/* Maximum number of segments to merge in a single pass. */
** Free all memory belonging to the VdbeSorterIter object passed as the second
** argument. All structure fields are set to zero before returning.
*/
-static void vdbeSorterIterZero(sqlite3 *db, VdbeSorterIter *pIter){
- sqlite3DbFree(db, pIter->aAlloc);
- sqlite3DbFree(db, pIter->aBuffer);
+static void vdbeSorterIterZero(VdbeSorterIter *pIter){
+ sqlite3_free(pIter->aAlloc);
+ sqlite3_free(pIter->aBuffer);
+ if( pIter->aMap ) sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
memset(pIter, 0, sizeof(VdbeSorterIter));
}
** Read a varint from the stream of data accessed by p. Set *pnOut to
** the value read.
*/
-static int vdbeSorterIterVarint(sqlite3 *db, VdbeSorterIter *p, u64 *pnOut){
+static int vdbeSorterIterVarint(VdbeSorterIter *p, u64 *pnOut){
int iBuf;
- iBuf = p->iReadOff % p->nBuffer;
- if( iBuf && (p->nBuffer-iBuf)>=9 ){
- p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
+ if( p->aMap ){
+ p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut);
}else{
- u8 aVarint[16], *a;
- int i = 0, rc;
- do{
- rc = vdbeSorterIterRead(p, 1, &a);
- if( rc ) return rc;
- aVarint[(i++)&0xf] = a[0];
- }while( (a[0]&0x80)!=0 );
- sqlite3GetVarint(aVarint, pnOut);
+ iBuf = p->iReadOff % p->nBuffer;
+ if( iBuf && (p->nBuffer-iBuf)>=9 ){
+ p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
+ }else{
+ u8 aVarint[16], *a;
+ int i = 0, rc;
+ do{
- rc = vdbeSorterIterRead(db, p, 1, &a);
++ rc = vdbeSorterIterRead(p, 1, &a);
+ if( rc ) return rc;
+ aVarint[(i++)&0xf] = a[0];
+ }while( (a[0]&0x80)!=0 );
+ sqlite3GetVarint(aVarint, pnOut);
+ }
}
return SQLITE_OK;
i64 *pnByte /* IN/OUT: Increment this value by PMA size */
){
int rc = SQLITE_OK;
- int nBuf;
- void *pMap;
-
- nBuf = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
+ int nBuf = pThread->pgsz;
++ void *pMap = 0; /* Mapping of temp file */
- assert( pSorter->iWriteOff>iStart );
+ assert( pThread->iTemp1Off>iStart );
assert( pIter->aAlloc==0 );
assert( pIter->aBuffer==0 );
- pIter->pFile = pSorter->pTemp1;
+ pIter->pFile = pThread->pTemp1;
pIter->iReadOff = iStart;
pIter->nAlloc = 128;
- pIter->aAlloc = (u8 *)sqlite3DbMallocRaw(db, pIter->nAlloc);
+ pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
- pIter->nBuffer = nBuf;
- pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
- if( !pIter->aBuffer ){
- rc = SQLITE_NOMEM;
- /* See if this PMA can be read using xFetch. */
- rc = sqlite3OsFetch(pIter->pFile, 0, pSorter->iWriteOff, &pMap);
- if( rc!=SQLITE_OK ) return rc;
- if( pMap ){
- pIter->aMap = (u8*)pMap;
-- }else{
- int iBuf;
- pIter->nBuffer = nBuf;
- pIter->aBuffer = (u8 *)sqlite3DbMallocRaw(db, nBuf);
++ /* Try to xFetch() a mapping of the entire temp file. If this is possible,
++ ** the PMA will be read via the mapping. Otherwise, use xRead(). */
++ rc = sqlite3OsFetch(pIter->pFile, 0, pThread->iTemp1Off, &pMap);
- iBuf = iStart % nBuf;
- if( iBuf ){
- int nRead = nBuf - iBuf;
- if( (iStart + nRead) > pThread->iTemp1Off ){
- nRead = (int)(pThread->iTemp1Off - iStart);
- if( !pIter->aBuffer ){
- rc = SQLITE_NOMEM;
++ if( rc==SQLITE_OK ){
++ if( pMap ){
++ pIter->aMap = (u8*)pMap;
+ }else{
- int iBuf;
-
- iBuf = iStart % nBuf;
- if( iBuf ){
- int nRead = nBuf - iBuf;
- if( (iStart + nRead) > pSorter->iWriteOff ){
- nRead = (int)(pSorter->iWriteOff - iStart);
++ pIter->nBuffer = nBuf;
++ pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
++ if( !pIter->aBuffer ){
++ rc = SQLITE_NOMEM;
++ }else{
++ int iBuf = iStart % nBuf;
++ if( iBuf ){
++ int nRead = nBuf - iBuf;
++ if( (iStart + nRead) > pThread->iTemp1Off ){
++ nRead = (int)(pThread->iTemp1Off - iStart);
++ }
++ rc = sqlite3OsRead(
++ pThread->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
++ );
++ assert( rc!=SQLITE_IOERR_SHORT_READ );
+ }
- rc = sqlite3OsRead(
- pSorter->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
- );
- assert( rc!=SQLITE_IOERR_SHORT_READ );
}
- rc = sqlite3OsRead(
- pThread->pTemp1, &pIter->aBuffer[iBuf], nRead, iStart
- );
- assert( rc!=SQLITE_IOERR_SHORT_READ );
}
+ }
- if( rc==SQLITE_OK ){
- u64 nByte;
- pIter->iEof = pThread->iTemp1Off;
- rc = vdbeSorterIterVarint(pIter, &nByte);
- pIter->iEof = pIter->iReadOff + nByte;
- *pnByte += nByte;
- }
+ if( rc==SQLITE_OK ){
- u64 nByte; /* Size of PMA in bytes */
- pIter->iEof = pSorter->iWriteOff;
- rc = vdbeSorterIterVarint(db, pIter, &nByte);
++ u64 nByte; /* Size of PMA in bytes */
++ pIter->iEof = pThread->iTemp1Off;
++ rc = vdbeSorterIterVarint(pIter, &nByte);
+ pIter->iEof = pIter->iReadOff + nByte;
+ *pnByte += nByte;
}
if( rc==SQLITE_OK ){
*/
int sqlite3VdbeSorterInit(sqlite3 *db, VdbeCursor *pCsr){
int pgsz; /* Page size of main database */
+ int i; /* Used to iterate through aThread[] */
int mxCache; /* Cache size */
VdbeSorter *pSorter; /* The new sorter */
- char *d; /* Dummy */
+ KeyInfo *pKeyInfo; /* Copy of pCsr->pKeyInfo with db==0 */
+ int szKeyInfo; /* Size of pCsr->pKeyInfo in bytes */
++ int rc = SQLITE_OK;
assert( pCsr->pKeyInfo && pCsr->pBt==0 );
- pCsr->pSorter = pSorter = sqlite3DbMallocZero(db, sizeof(VdbeSorter));
+ szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nField-1)*sizeof(CollSeq*);
+ pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sizeof(VdbeSorter)+szKeyInfo);
+ pCsr->pSorter = pSorter;
if( pSorter==0 ){
-- return SQLITE_NOMEM;
-- }
- pKeyInfo = (KeyInfo*)&pSorter[1];
- memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
- pKeyInfo->db = 0;
- pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
-
- pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pCsr->pKeyInfo, 0, 0, &d);
- if( pSorter->pUnpacked==0 ) return SQLITE_NOMEM;
- assert( pSorter->pUnpacked==(UnpackedRecord *)d );
- pSorter->pUnpacked->nField = pCsr->pKeyInfo->nField;
-
- if( !sqlite3TempInMemory(db) ){
++ rc = SQLITE_NOMEM;
++ }else{
++ pKeyInfo = (KeyInfo*)&pSorter[1];
++ memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
++ pKeyInfo->db = 0;
+ pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
- pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
- mxCache = db->aDb[0].pSchema->cache_size;
- if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
- pSorter->mxPmaSize = mxCache * pgsz;
-
- /* If the application is using memsys3 or memsys5, use a separate
- ** allocation for each sort-key in memory. Otherwise, use a single big
- ** allocation at pSorter->aMemory for all sort-keys. */
- if( sqlite3GlobalConfig.pHeap==0 ){
- assert( pSorter->iMemory==0 );
- pSorter->nMemory = pgsz;
- pSorter->aMemory = (u8*)sqlite3Malloc(pSorter->nMemory);
- if( !pSorter->aMemory ) return SQLITE_NOMEM;
+
- for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
- SorterThread *pThread = &pSorter->aThread[i];
- pThread->pKeyInfo = pKeyInfo;
- pThread->pVfs = db->pVfs;
- pThread->pgsz = pgsz;
- }
++ for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
++ SorterThread *pThread = &pSorter->aThread[i];
++ pThread->pKeyInfo = pKeyInfo;
++ pThread->pVfs = db->pVfs;
++ pThread->pgsz = pgsz;
++ }
+
- if( !sqlite3TempInMemory(db) ){
- pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
- mxCache = db->aDb[0].pSchema->cache_size;
- if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
- pSorter->mxPmaSize = mxCache * pgsz;
++ if( !sqlite3TempInMemory(db) ){
++ pSorter->mnPmaSize = SORTER_MIN_WORKING * pgsz;
++ mxCache = db->aDb[0].pSchema->cache_size;
++ if( mxCache<SORTER_MIN_WORKING ) mxCache = SORTER_MIN_WORKING;
++ pSorter->mxPmaSize = mxCache * pgsz;
++
++ /* If the application is using memsys3 or memsys5, use a separate
++ ** allocation for each sort-key in memory. Otherwise, use a single big
++ ** allocation at pSorter->aMemory for all sort-keys. */
++ if( sqlite3GlobalConfig.pHeap==0 ){
++ assert( pSorter->iMemory==0 );
++ pSorter->nMemory = pgsz;
++ pSorter->aMemory = (u8*)sqlite3Malloc(pgsz);
++ if( !pSorter->aMemory ) rc = SQLITE_NOMEM;
++ }
+ }
}
-- return SQLITE_OK;
++ return rc;
}
/*
}
/*
-** Reset a sorting cursor back to its original empty state.
+** Free all resources owned by the object indicated by argument pThread. All
+** fields of *pThread are zeroed before returning.
*/
-void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
- if( pSorter->aIter ){
- int i;
- for(i=0; i<pSorter->nTree; i++){
- vdbeSorterIterZero(db, &pSorter->aIter[i]);
+static void vdbeSorterThreadCleanup(sqlite3 *db, SorterThread *pThread){
+ sqlite3DbFree(db, pThread->pUnpacked);
+ pThread->pUnpacked = 0;
- vdbeSorterRecordFree(0, pThread->pList);
++ if( pThread->aListMemory==0 ){
++ vdbeSorterRecordFree(0, pThread->pList);
++ }else{
++ sqlite3_free(pThread->aListMemory);
++ pThread->aListMemory = 0;
++ }
+ pThread->pList = 0;
+ if( pThread->pTemp1 ){
+ sqlite3OsCloseFree(pThread->pTemp1);
+ pThread->pTemp1 = 0;
+ }
+}
+
+/*
+** Join all threads.
+*/
+static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
+ int rc = rcin;
+ int i;
+ for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+ SorterThread *pThread = &pSorter->aThread[i];
+ if( pThread->pThread ){
+ void *pRet;
+ int rc2 = sqlite3ThreadJoin(pThread->pThread, &pRet);
+ pThread->pThread = 0;
+ pThread->bDone = 0;
+ if( rc==SQLITE_OK ) rc = rc2;
+ if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+ }
+ }
+ return rc;
+}
+
+/*
+** Allocate a new SorterMerger object with space for nIter iterators.
+*/
+static SorterMerger *vdbeSorterMergerNew(int nIter){
+ int N = 2; /* Smallest power of two >= nIter */
+ int nByte; /* Total bytes of space to allocate */
+ SorterMerger *pNew; /* Pointer to allocated object to return */
+
+ assert( nIter<=SORTER_MAX_MERGE_COUNT );
+ while( N<nIter ) N += N;
+ nByte = sizeof(SorterMerger) + N * (sizeof(int) + sizeof(VdbeSorterIter));
+
+ pNew = (SorterMerger*)sqlite3MallocZero(nByte);
+ if( pNew ){
+ pNew->nTree = N;
+ pNew->aIter = (VdbeSorterIter*)&pNew[1];
+ pNew->aTree = (int*)&pNew->aIter[N];
+ }
+ return pNew;
+}
+
+/*
+** Reset a merger
+*/
+static void vdbeSorterMergerReset(SorterMerger *pMerger){
+ int i;
+ if( pMerger ){
+ for(i=0; i<pMerger->nTree; i++){
+ vdbeSorterIterZero(&pMerger->aIter[i]);
}
- sqlite3DbFree(db, pSorter->aIter);
- pSorter->aIter = 0;
}
- if( pSorter->pTemp1 ){
- sqlite3OsCloseFree(pSorter->pTemp1);
- pSorter->pTemp1 = 0;
+}
+
+
+/*
+** Free the SorterMerger object passed as the only argument.
+*/
+static void vdbeSorterMergerFree(SorterMerger *pMerger){
+ vdbeSorterMergerReset(pMerger);
+ sqlite3_free(pMerger);
+}
+
+/*
+** Reset a sorting cursor back to its original empty state.
+*/
+void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
+ int i;
+ vdbeSorterJoinAll(pSorter, SQLITE_OK);
+ for(i=0; i<SQLITE_MAX_SORTER_THREAD; i++){
+ SorterThread *pThread = &pSorter->aThread[i];
+ vdbeSorterThreadCleanup(db, pThread);
}
- vdbeSorterRecordFree(0, pSorter->pRecord);
+ if( pSorter->aMemory==0 ){
- vdbeSorterRecordFree(db, pSorter->pRecord);
++ vdbeSorterRecordFree(0, pSorter->pRecord);
+ }
+ vdbeSorterMergerReset(pSorter->pMerger);
pSorter->pRecord = 0;
- pSorter->iWriteOff = 0;
- pSorter->iReadOff = 0;
pSorter->nInMemory = 0;
- pSorter->nTree = 0;
- pSorter->nPMA = 0;
- pSorter->aTree = 0;
+ pSorter->bUsePMA = 0;
+ pSorter->iMemory = 0;
}
-
/*
** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
*/
VdbeSorter *pSorter = pCsr->pSorter;
if( pSorter ){
sqlite3VdbeSorterReset(db, pSorter);
- sqlite3DbFree(db, pSorter->pUnpacked);
- sqlite3DbFree(db, pSorter->aMemory);
+ vdbeSorterMergerFree(pSorter->pMerger);
++ sqlite3_free(pSorter->aMemory);
sqlite3DbFree(db, pSorter);
pCsr->pSorter = 0;
}
** set *ppFile to point to the malloc'd file-handle and return SQLITE_OK.
** Otherwise, set *ppFile to 0 and return an SQLite error code.
*/
-static int vdbeSorterOpenTempFile(sqlite3 *db, sqlite3_file **ppFile){
+static int vdbeSorterOpenTempFile(sqlite3_vfs *pVfs, sqlite3_file **ppFile){
- int dummy;
- return sqlite3OsOpenMalloc(pVfs, 0, ppFile,
+ int rc;
- rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFile,
++ rc = sqlite3OsOpenMalloc(pVfs, 0, ppFile,
SQLITE_OPEN_TEMP_JOURNAL |
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
- SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &dummy
+ SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc
);
+ if( rc==SQLITE_OK ){
+ i64 max = SQLITE_MAX_MMAP_SIZE;
+ sqlite3OsFileControlHint( *ppFile, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
+ }
+ return rc;
}
/*
while( p1 && p2 ){
int res;
- vdbeSorterCompare(pThread, 0, p1->pVal, p1->nVal, pVal2, p2->nVal, &res);
- vdbeSorterCompare(pCsr, 0, SRVAL(p1), p1->nVal, pVal2, p2->nVal, &res);
++ vdbeSorterCompare(pThread, 0, SRVAL(p1), p1->nVal, pVal2, p2->nVal, &res);
if( res<=0 ){
*pp = p1;
- pp = &p1->pNext;
- p1 = p1->pNext;
+ pp = &p1->u.pNext;
+ p1 = p1->u.pNext;
pVal2 = 0;
}else{
*pp = p2;
return SQLITE_NOMEM;
}
- p = pSorter->pRecord;
+ p = pThread->pList;
while( p ){
- SorterRecord *pNext = p->pNext;
- p->pNext = 0;
+ SorterRecord *pNext;
- if( pSorter->aMemory ){
- assert( p->u.iNext<pSorter->nMemory );
- if( (u8*)p==pSorter->aMemory ){
++ if( pThread->aListMemory ){
++ if( (u8*)p==pThread->aListMemory ){
+ pNext = 0;
+ }else{
- pNext = (SorterRecord*)&pSorter->aMemory[p->u.iNext];
++ assert( p->u.iNext<sqlite3MallocSize(pThread->aListMemory) );
++ pNext = (SorterRecord*)&pThread->aListMemory[p->u.iNext];
+ }
+ }else{
+ pNext = p->u.pNext;
+ }
++
+ p->u.pNext = 0;
for(i=0; aSlot[i]; i++){
- vdbeSorterMerge(pCsr, p, aSlot[i], &p);
+ vdbeSorterMerge(pThread, p, aSlot[i], &p);
aSlot[i] = 0;
}
aSlot[i] = p;
fileWriterWrite(p, aByte, nByte);
}
-** is guaranteed to be nByte bytes or smaller in size. This function
+ #if SQLITE_MAX_MMAP_SIZE>0
+ /*
+ ** The first argument is a file-handle open on a temporary file. The file
-** Whether or not the file does end up memory mapped of course depends on
++** is guaranteed to be nByte bytes or smaller in size. This function
+ ** attempts to extend the file to nByte bytes in size and to ensure that
+ ** the VFS has memory mapped it.
+ **
++** Whether or not the file does end up memory mapped of course depends on
+ ** the specific VFS implementation.
+ */
+ static int vdbeSorterExtendFile(sqlite3_file *pFile, i64 nByte){
+ int rc = sqlite3OsTruncate(pFile, nByte);
+ if( rc==SQLITE_OK ){
+ void *p = 0;
+ sqlite3OsFetch(pFile, 0, nByte, &p);
+ sqlite3OsUnfetch(pFile, 0, p);
+ }
+ return rc;
+ }
+ #else
+ # define vdbeSorterExtendFile(x,y) SQLITE_OK
+ #endif
+
++
/*
** Write the current contents of the in-memory linked-list to a PMA. Return
** SQLITE_OK if successful, or an SQLite error code otherwise.
** Each record consists of a varint followed by a blob of data (the
** key). The varint is the number of bytes in the blob of data.
*/
-static int vdbeSorterListToPMA(sqlite3 *db, const VdbeCursor *pCsr){
+static int vdbeSorterListToPMA(SorterThread *pThread){
int rc = SQLITE_OK; /* Return code */
- VdbeSorter *pSorter = pCsr->pSorter;
- FileWriter writer;
+ FileWriter writer; /* Object used to write to the file */
memset(&writer, 0, sizeof(FileWriter));
-
- if( pSorter->nInMemory==0 ){
- assert( pSorter->pRecord==0 );
- return rc;
- }
-
- rc = vdbeSorterSort(pCsr);
+ assert( pThread->nInMemory>0 );
/* If the first temporary PMA file has not been opened, open it now. */
- if( rc==SQLITE_OK && pSorter->pTemp1==0 ){
- rc = vdbeSorterOpenTempFile(db, &pSorter->pTemp1);
- assert( rc!=SQLITE_OK || pSorter->pTemp1 );
- assert( pSorter->iWriteOff==0 );
- assert( pSorter->nPMA==0 );
+ if( pThread->pTemp1==0 ){
+ rc = vdbeSorterOpenTempFile(pThread->pVfs, &pThread->pTemp1);
+ assert( rc!=SQLITE_OK || pThread->pTemp1 );
+ assert( pThread->iTemp1Off==0 );
+ assert( pThread->nPMA==0 );
}
- pSorter->pTemp1, pSorter->iWriteOff + pSorter->nInMemory + 9
+ /* Try to get the file to memory map */
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterExtendFile(
++ pThread->pTemp1, pThread->iTemp1Off + pThread->nInMemory + 9
+ );
+ }
+
if( rc==SQLITE_OK ){
SorterRecord *p;
SorterRecord *pNext = 0;
- fileWriterInit(db, pSorter->pTemp1, &writer, pSorter->iWriteOff);
- pSorter->nPMA++;
- fileWriterWriteVarint(&writer, pSorter->nInMemory);
- for(p=pSorter->pRecord; p; p=pNext){
+ fileWriterInit(pThread->pTemp1, &writer, pThread->pgsz, pThread->iTemp1Off);
+ pThread->nPMA++;
+ fileWriterWriteVarint(&writer, pThread->nInMemory);
+ for(p=pThread->pList; p; p=pNext){
- pNext = p->pNext;
+ pNext = p->u.pNext;
fileWriterWriteVarint(&writer, p->nVal);
- fileWriterWrite(&writer, p->pVal, p->nVal);
- sqlite3_free(p);
+ fileWriterWrite(&writer, SRVAL(p), p->nVal);
- if( pSorter->aMemory==0 ) sqlite3DbFree(db, p);
++ if( pThread->aListMemory==0 ) sqlite3_free(p);
+ }
+ pThread->pList = p;
+ rc = fileWriterFinish(&writer, &pThread->iTemp1Off);
+ }
+
++ assert( pThread->pList==0 || rc!=SQLITE_OK );
+ return rc;
+}
+
+/*
+** Advance the SorterMerger iterator passed as the second argument to
+** the next entry. Set *pbEof to true if this means the iterator has
+** reached EOF.
+**
+** Return SQLITE_OK if successful or an error code if an error occurs.
+*/
+static int vdbeSorterNext(
+ SorterThread *pThread,
+ SorterMerger *pMerger,
+ int *pbEof
+){
+ int rc;
+ int iPrev = pMerger->aTree[1];/* Index of iterator to advance */
+ int i; /* Index of aTree[] to recalculate */
+
+ /* Advance the current iterator */
+ rc = vdbeSorterIterNext(&pMerger->aIter[iPrev]);
+
+ /* Update contents of aTree[] */
+ for(i=(pMerger->nTree+iPrev)/2; rc==SQLITE_OK && i>0; i=i/2){
+ rc = vdbeSorterDoCompare(pThread, pMerger, i);
+ }
+
+ *pbEof = (pMerger->aIter[pMerger->aTree[1]].pFile==0);
+ return rc;
+}
+
+/*
+** The main routine for sorter-thread operations.
+*/
+static void *vdbeSorterThreadMain(void *pCtx){
+ int rc = SQLITE_OK;
+ SorterThread *pThread = (SorterThread*)pCtx;
+
+ assert( pThread->eWork==SORTER_THREAD_SORT
+ || pThread->eWork==SORTER_THREAD_TO_PMA
+ || pThread->eWork==SORTER_THREAD_CONS
+ );
+ assert( pThread->bDone==0 );
+
+ if( pThread->pUnpacked==0 ){
+ char *pFree;
+ pThread->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
+ pThread->pKeyInfo, 0, 0, &pFree
+ );
+ assert( pThread->pUnpacked==(UnpackedRecord*)pFree );
+ if( pFree==0 ){
+ rc = SQLITE_NOMEM;
+ goto thread_out;
+ }
++ pThread->pUnpacked->nField = pThread->pKeyInfo->nField;
+ }
+
+ if( pThread->eWork==SORTER_THREAD_CONS ){
+ assert( pThread->pList==0 );
+ while( pThread->nPMA>pThread->nConsolidate && rc==SQLITE_OK ){
+ int nIter = MIN(pThread->nPMA, SORTER_MAX_MERGE_COUNT);
+ sqlite3_file *pTemp2 = 0; /* Second temp file to use */
+ SorterMerger *pMerger; /* Object for reading/merging PMA data */
+ i64 iReadOff = 0; /* Offset in pTemp1 to read from */
+ i64 iWriteOff = 0; /* Offset in pTemp2 to write to */
+ int i;
+
+ /* Allocate a merger object to merge PMAs together. */
+ pMerger = vdbeSorterMergerNew(nIter);
+ if( pMerger==0 ){
+ rc = SQLITE_NOMEM;
+ break;
+ }
+
+ /* Open a second temp file to write merged data to */
+ rc = vdbeSorterOpenTempFile(pThread->pVfs, &pTemp2);
++ if( rc==SQLITE_OK ){
++ rc = vdbeSorterExtendFile(pTemp2, pThread->iTemp1Off);
++ }
+ if( rc!=SQLITE_OK ){
+ vdbeSorterMergerFree(pMerger);
+ break;
+ }
+
+ /* This loop runs once for each output PMA. Each output PMA is made
+ ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */
+ for(i=0; i<pThread->nPMA; i+=SORTER_MAX_MERGE_COUNT){
+ FileWriter writer; /* Object for writing data to pTemp2 */
+ i64 nOut = 0; /* Bytes of data in output PMA */
+ int bEof = 0;
+ int rc2;
+
+ /* Configure the merger object to read and merge data from the next
+ ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs,
+ ** if that is fewer). */
+ int iIter;
+ for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
+ VdbeSorterIter *pIter = &pMerger->aIter[iIter];
+ rc = vdbeSorterIterInit(pThread, iReadOff, pIter, &nOut);
+ iReadOff = pIter->iEof;
+ if( iReadOff>=pThread->iTemp1Off || rc!=SQLITE_OK ) break;
+ }
+ for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
+ rc = vdbeSorterDoCompare(pThread, pMerger, iIter);
+ }
+
+ fileWriterInit(pTemp2, &writer, pThread->pgsz, iWriteOff);
+ fileWriterWriteVarint(&writer, nOut);
+ while( rc==SQLITE_OK && bEof==0 ){
+ VdbeSorterIter *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
+ assert( pIter->pFile!=0 ); /* pIter is not at EOF */
+ fileWriterWriteVarint(&writer, pIter->nKey);
+ fileWriterWrite(&writer, pIter->aKey, pIter->nKey);
+ rc = vdbeSorterNext(pThread, pMerger, &bEof);
+ }
+ rc2 = fileWriterFinish(&writer, &iWriteOff);
+ if( rc==SQLITE_OK ) rc = rc2;
+ }
+
+ vdbeSorterMergerFree(pMerger);
+ sqlite3OsCloseFree(pThread->pTemp1);
+ pThread->pTemp1 = pTemp2;
+ pThread->nPMA = (i / SORTER_MAX_MERGE_COUNT);
+ pThread->iTemp1Off = iWriteOff;
+ }
+ }else{
+ /* Sort the pThread->pList list */
+ rc = vdbeSorterSort(pThread);
+
+ /* If required, write the list out to a PMA. */
+ if( rc==SQLITE_OK && pThread->eWork==SORTER_THREAD_TO_PMA ){
+#ifdef SQLITE_DEBUG
+ i64 nExpect = pThread->nInMemory
+ + sqlite3VarintLen(pThread->nInMemory)
+ + pThread->iTemp1Off;
+#endif
+ rc = vdbeSorterListToPMA(pThread);
+ assert( rc!=SQLITE_OK || (nExpect==pThread->iTemp1Off) );
+ }
+ }
+
+ thread_out:
+ pThread->bDone = 1;
+ return SQLITE_INT_TO_PTR(rc);
+}
+
+/*
+** Run the activity scheduled by the object passed as the only argument
+** in the current thread.
+*/
+static int vdbeSorterRunThread(SorterThread *pThread){
+ int rc = SQLITE_PTR_TO_INT( vdbeSorterThreadMain((void*)pThread) );
+ assert( pThread->bDone );
+ pThread->bDone = 0;
+ return rc;
+}
+
+/*
+** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly
+** using a background thread.
+**
+** If argument bFg is non-zero, the operation always uses the calling thread.
+*/
+static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
+ VdbeSorter *pSorter = pCsr->pSorter;
+ int rc = SQLITE_OK;
+ int i;
+ SorterThread *pThread; /* Thread context used to create new PMA */
+
+ pSorter->bUsePMA = 1;
+ for(i=0; ALWAYS( i<SQLITE_MAX_SORTER_THREAD ); i++){
+ pThread = &pSorter->aThread[i];
+ if( pThread->bDone ){
+ void *pRet;
+ assert( pThread->pThread );
+ rc = sqlite3ThreadJoin(pThread->pThread, &pRet);
+ pThread->pThread = 0;
+ pThread->bDone = 0;
+ if( rc==SQLITE_OK ){
+ rc = SQLITE_PTR_TO_INT(pRet);
+ }
+ }
+ if( pThread->pThread==0 ) break;
+ }
+
+ if( rc==SQLITE_OK ){
++ int bUseFg = (bFg || i==(SQLITE_MAX_SORTER_THREAD-1));
++
+ assert( pThread->pThread==0 && pThread->bDone==0 );
+ pThread->eWork = SORTER_THREAD_TO_PMA;
+ pThread->pList = pSorter->pRecord;
+ pThread->nInMemory = pSorter->nInMemory;
+ pSorter->nInMemory = 0;
+ pSorter->pRecord = 0;
+
- if( bFg || i<(SQLITE_MAX_SORTER_THREAD-1) ){
++ if( pSorter->aMemory ){
++ u8 *aMem = pThread->aListMemory;
++ pThread->aListMemory = pSorter->aMemory;
++ pSorter->aMemory = aMem;
++ }
++
++ if( bUseFg==0 ){
++ /* Launch a background thread for this operation */
+ void *pCtx = (void*)pThread;
++ if( pSorter->aMemory==0 ){
++ pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
++ if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
++ }else{
++ pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory);
++ }
+ rc = sqlite3ThreadCreate(&pThread->pThread, vdbeSorterThreadMain, pCtx);
+ }else{
+ /* Use the foreground thread for this operation */
++ u8 *aMem;
+ rc = vdbeSorterRunThread(pThread);
++ aMem = pThread->aListMemory;
++ pThread->aListMemory = pSorter->aMemory;
++ pSorter->aMemory = aMem;
}
- pSorter->pRecord = p;
- rc = fileWriterFinish(db, &writer, &pSorter->iWriteOff);
}
- if( pSorter->aMemory ) pSorter->pRecord = 0;
- assert( pSorter->pRecord==0 || rc!=SQLITE_OK );
return rc;
}
** * The total memory allocated for the in-memory list is greater
** than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
*/
- if( rc==SQLITE_OK && pSorter->mxPmaSize>0 && (
- (pSorter->nInMemory>pSorter->mxPmaSize)
- || (pSorter->nInMemory>pSorter->mnPmaSize && sqlite3HeapNearlyFull())
- )){
+ nReq = pVal->n + sizeof(SorterRecord);
+ nPMA = pVal->n + sqlite3VarintLen(pVal->n);
+ if( pSorter->aMemory ){
+ bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
+ }else{
+ bFlush = (
+ (pSorter->nInMemory > pSorter->mxPmaSize)
+ || (pSorter->nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
+ );
+ }
+ if( bFlush ){
-#ifdef SQLITE_DEBUG
- i64 nExpect = pSorter->iWriteOff
- + sqlite3VarintLen(pSorter->nInMemory)
- + pSorter->nInMemory;
-#endif
- rc = vdbeSorterListToPMA(db, pCsr);
+ rc = vdbeSorterFlushPMA(db, pCsr, 0);
+ pSorter->nInMemory = 0;
+ pSorter->iMemory = 0;
- assert( rc!=SQLITE_OK || (nExpect==pSorter->iWriteOff) );
+ assert( rc!=SQLITE_OK || pSorter->pRecord==0 );
}
- pNew = (SorterRecord *)sqlite3DbMallocRaw(db, pVal->n+sizeof(SorterRecord));
+ pSorter->nInMemory += nPMA;
+
+ if( pSorter->aMemory ){
+ int nMin = pSorter->iMemory + nReq;
+
+ if( nMin>pSorter->nMemory ){
+ u8 *aNew;
+ int nNew = pSorter->nMemory * 2;
+ while( nNew < nMin ) nNew = nNew*2;
+ if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
+ if( nNew < nMin ) nNew = nMin;
+
+ aNew = sqlite3Realloc(pSorter->aMemory, nNew);
+ if( !aNew ) return SQLITE_NOMEM;
+ pSorter->pRecord = aNew + ((u8*)pSorter->pRecord - pSorter->aMemory);
+ pSorter->aMemory = aNew;
+ pSorter->nMemory = nNew;
+ }
+
+ pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory];
+ pSorter->iMemory += ROUND8(nReq);
+ pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory;
+ }else{
++ pNew = (SorterRecord *)sqlite3Malloc(pVal->n+sizeof(SorterRecord));
+ if( pNew==0 ){
+ return SQLITE_NOMEM;
+ }
+ pNew->u.pNext = pSorter->pRecord;
+ }
+
+ memcpy(SRVAL(pNew), pVal->z, pVal->n);
+ pNew->nVal = pVal->n;
+ pSorter->pRecord = pNew;
+
return rc;
}
/* If no data has been written to disk, then do not do so now. Instead,
** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
** from the in-memory list. */
- if( pSorter->nPMA==0 ){
- *pbEof = !pSorter->pRecord;
- assert( pSorter->aTree==0 );
- return vdbeSorterSort(pCsr);
+ if( pSorter->bUsePMA==0 ){
+ if( pSorter->pRecord ){
+ SorterThread *pThread = &pSorter->aThread[0];
+ *pbEof = 0;
+ pThread->pList = pSorter->pRecord;
+ pThread->eWork = SORTER_THREAD_SORT;
++ assert( pThread->aListMemory==0 );
++ pThread->aListMemory = pSorter->aMemory;
+ rc = vdbeSorterRunThread(pThread);
++ pThread->aListMemory = 0;
+ pSorter->pRecord = pThread->pList;
+ pThread->pList = 0;
+ }else{
+ *pbEof = 1;
+ }
+ return rc;
}
/* Write the current in-memory list to a PMA. */
VdbeSorter *pSorter = pCsr->pSorter;
int rc; /* Return code */
- if( pSorter->aTree ){
- int iPrev = pSorter->aTree[1];/* Index of iterator to advance */
- rc = vdbeSorterIterNext(db, &pSorter->aIter[iPrev]);
- if( rc==SQLITE_OK ){
- int i; /* Index of aTree[] to recalculate */
- VdbeSorterIter *pIter1; /* First iterator to compare */
- VdbeSorterIter *pIter2; /* Second iterator to compare */
- u8 *pKey2; /* To pIter2->aKey, or 0 if record cached */
-
- /* Find the first two iterators to compare. The one that was just
- ** advanced (iPrev) and the one next to it in the array. */
- pIter1 = &pSorter->aIter[(iPrev & 0xFFFE)];
- pIter2 = &pSorter->aIter[(iPrev | 0x0001)];
- pKey2 = pIter2->aKey;
-
- for(i=(pSorter->nTree+iPrev)/2; i>0; i=i/2){
- /* Compare pIter1 and pIter2. Store the result in variable iRes. */
- int iRes;
- if( pIter1->pFile==0 ){
- iRes = +1;
- }else if( pIter2->pFile==0 ){
- iRes = -1;
- }else{
- vdbeSorterCompare(pCsr, 0,
- pIter1->aKey, pIter1->nKey, pKey2, pIter2->nKey, &iRes
- );
- }
-
- /* If pIter1 contained the smaller value, set aTree[i] to its index.
- ** Then set pIter2 to the next iterator to compare to pIter1. In this
- ** case there is no cache of pIter2 in pSorter->pUnpacked, so set
- ** pKey2 to point to the record belonging to pIter2.
- **
- ** Alternatively, if pIter2 contains the smaller of the two values,
- ** set aTree[i] to its index and update pIter1. If vdbeSorterCompare()
- ** was actually called above, then pSorter->pUnpacked now contains
- ** a value equivalent to pIter2. So set pKey2 to NULL to prevent
- ** vdbeSorterCompare() from decoding pIter2 again.
- **
- ** If the two values were equal, then the value from the oldest
- ** PMA should be considered smaller. The VdbeSorter.aIter[] array
- ** is sorted from oldest to newest, so pIter1 contains older values
- ** than pIter2 iff (pIter1<pIter2). */
- if( iRes<0 || (iRes==0 && pIter1<pIter2) ){
- pSorter->aTree[i] = (int)(pIter1 - pSorter->aIter);
- pIter2 = &pSorter->aIter[ pSorter->aTree[i ^ 0x0001] ];
- pKey2 = pIter2->aKey;
- }else{
- if( pIter1->pFile ) pKey2 = 0;
- pSorter->aTree[i] = (int)(pIter2 - pSorter->aIter);
- pIter1 = &pSorter->aIter[ pSorter->aTree[i ^ 0x0001] ];
- }
- }
- *pbEof = (pSorter->aIter[pSorter->aTree[1]].pFile==0);
- }
+ if( pSorter->pMerger ){
+ rc = vdbeSorterNext(&pSorter->aThread[0], pSorter->pMerger, pbEof);
}else{
SorterRecord *pFree = pSorter->pRecord;
- pSorter->pRecord = pFree->pNext;
- pFree->pNext = 0;
- vdbeSorterRecordFree(db, pFree);
+ pSorter->pRecord = pFree->u.pNext;
+ pFree->u.pNext = 0;
- if( pSorter->aMemory==0 ){
- vdbeSorterRecordFree(db, pFree);
- }
++ if( pSorter->aMemory==0 ) vdbeSorterRecordFree(db, pFree);
*pbEof = !pSorter->pRecord;
rc = SQLITE_OK;
}