#include "sqliteInt.h"
#include "vdbeInt.h"
+/*
+** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
+** messages to stderr that may be helpful in understanding the performance
+** characteristics of the sorter in multi-threaded mode.
+*/
+#if 0
+# define SQLITE_DEBUG_SORTER_THREADS 1
+#endif
+
/*
** Private objects used by the sorter
*/
typedef struct SorterRecord SorterRecord; /* A record being sorted */
typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */
typedef struct SorterFile SorterFile;
+typedef struct SorterThread SorterThread;
+typedef struct SorterList SorterList;
typedef struct IncrMerger IncrMerger;
+/*
+** A container for a temp file handle and the current amount of data
+** stored in the file.
+*/
+struct SorterFile {
+ sqlite3_file *pFd; /* File handle */
+ i64 iEof; /* Bytes of data stored in pFd */
+};
/*
-** Candidate values for SortSubtask.eWork
+** An object of this type is used to store the thread handle for each
+** background thread launched by the sorter. Before the thread is launched,
+** variable bDone is set to 0. Then, right before it exits, the thread
+** itself sets bDone to 1.
+**
+** This is then used for two purposes:
+**
+** 1. When flushing the contents of memory to a level-0 PMA on disk, to
+** attempt to select a SortSubtask for which there is not already an
+** active background thread (since doing so causes the main thread
+** to block until it finishes).
+**
+** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
+** to sqlite3ThreadJoin() is likely to block.
+**
+** In both cases, the effects of the main thread seeing (bDone==0) even
+** after the thread has finished are not dire. So we don't worry about
+** memory barriers and such here.
*/
-#define SORT_SUBTASK_SORT 1 /* Sort records on pList */
-#define SORT_SUBTASK_TO_PMA 2 /* Xfer pList to Packed-Memory-Array pTemp1 */
-#define SORT_SUBTASK_CONS 3 /* Consolidate multiple PMAs */
+struct SorterThread {
+ SQLiteThread *pThread;
+ int bDone;
+};
-struct SorterFile {
- sqlite3_file *pFd;
- i64 iEof;
+struct SorterList {
+ SorterRecord *pList; /* Linked list of records */
+ u8 *aMemory; /* If non-NULL, blob of memory for pList */
+ int szPMA; /* Size of pList as PMA in bytes */
};
/*
** remain in temp file SortSubtask.pTemp1.
*/
struct SortSubtask {
- SQLiteThread *pThread; /* Thread handle, or NULL */
- int bDone; /* Set to true by pTask when finished */
-
+ SorterThread thread;
sqlite3 *db; /* Database connection */
VdbeSorter *pSorter; /* Sorter */
KeyInfo *pKeyInfo; /* How to compare records */
UnpackedRecord *pUnpacked; /* Space to unpack a record */
int pgsz; /* Main database page size */
-
- u8 eWork; /* One of the SORT_SUBTASK_* constants */
- int nConsolidate; /* For SORT_SUBTASK_CONS, max final PMAs */
- SorterRecord *pList; /* List of records for pTask to sort */
- int nInMemory; /* Expected size of PMA based on pList */
- u8 *aListMemory; /* Records memory (or NULL) */
-
+ SorterList list; /* List for thread to write to a PMA */
int nPMA; /* Number of PMAs currently in file */
SorterFile file; /* Temp file for level-0 PMAs */
SorterFile file2; /* Space for other PMAs */
** largest record in the sorter.
*/
struct VdbeSorter {
- int nInMemory; /* Current size of pRecord list as PMA */
int mnPmaSize; /* Minimum PMA size, in bytes */
int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
int bUsePMA; /* True if one or more PMAs created */
int bUseThreads; /* True if one or more PMAs created */
- SorterRecord *pRecord; /* Head of in-memory record list */
PmaReader *pReader; /* Read data from here after Rewind() */
int mxKeysize; /* Largest serialized key seen so far */
UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */
+#if 0
+ int nInMemory; /* Current size of pRecord list as PMA */
+ SorterRecord *pRecord; /* Head of in-memory record list */
u8 *aMemory; /* Block of memory to alloc records from */
+#endif
+ SorterList list;
int iMemory; /* Offset of first free byte in aMemory */
int nMemory; /* Size of aMemory allocation in bytes */
int iPrev; /* Previous thread used to flush PMA */
*/
struct IncrMerger {
SortSubtask *pTask; /* Task that owns this merger */
- SQLiteThread *pThread; /* Thread currently populating aFile[1] */
+ SorterThread thread; /* Thread for populating aFile[1] */
MergeEngine *pMerger; /* Merge engine thread reads data from */
i64 iStartOff; /* Offset to start writing file at */
int mxSz; /* Maximum bytes of data to store */
if( sqlite3GlobalConfig.pHeap==0 ){
assert( pSorter->iMemory==0 );
pSorter->nMemory = pgsz;
- pSorter->aMemory = (u8*)sqlite3Malloc(pgsz);
- if( !pSorter->aMemory ) rc = SQLITE_NOMEM;
+ pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
+ if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM;
}
}
}
static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
sqlite3DbFree(db, pTask->pUnpacked);
pTask->pUnpacked = 0;
- if( pTask->aListMemory==0 ){
- vdbeSorterRecordFree(0, pTask->pList);
+ if( pTask->list.aMemory==0 ){
+ vdbeSorterRecordFree(0, pTask->list.pList);
}else{
- sqlite3_free(pTask->aListMemory);
- pTask->aListMemory = 0;
+ sqlite3_free(pTask->list.aMemory);
+ pTask->list.aMemory = 0;
}
- pTask->pList = 0;
+ pTask->list.pList = 0;
if( pTask->file.pFd ){
sqlite3OsCloseFree(pTask->file.pFd);
pTask->file.pFd = 0;
}
}
+#ifdef SQLITE_DEBUG_SORTER_THREADS
+static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
+ i64 t;
+ int iTask = (pTask - pTask->pSorter->aTask);
+ sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
+ fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
+}
+static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
+ i64 t;
+ sqlite3OsCurrentTimeInt64(db->pVfs, &t);
+ fprintf(stderr, "%lld:X %s\n", t, zEvent);
+}
+static void vdbeSorterPopulateDebug(
+ SortSubtask *pTask,
+ const char *zEvent
+){
+ i64 t;
+ int iTask = (pTask - pTask->pSorter->aTask);
+ sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
+ fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
+}
+static void vdbeSorterBlockDebug(
+ SortSubtask *pTask,
+ int bBlocked,
+ const char *zEvent
+){
+ if( bBlocked ){
+ i64 t;
+ sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
+ fprintf(stderr, "%lld:main %s\n", t, zEvent);
+ }
+}
+#else
+# define vdbeSorterWorkDebug(x,y)
+# define vdbeSorterRewindDebug(x,y)
+# define vdbeSorterPopulateDebug(x,y)
+# define vdbeSorterBlockDebug(x,y,z)
+#endif
+
+#if SQLITE_MAX_WORKER_THREADS>0
/*
-** Join all threads.
+** Join thread p.
+*/
+static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){
+ int rc = SQLITE_OK;
+ if( p->pThread ){
+#ifdef SQLITE_DEBUG_SORTER_THREADS
+ int bDone = p->bDone;
+#endif
+ void *pRet;
+ vdbeSorterBlockDebug(pTask, !bDone, "enter");
+ rc = sqlite3ThreadJoin(p->pThread, &pRet);
+ vdbeSorterBlockDebug(pTask, !bDone, "exit");
+ if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
+ assert( p->bDone==1 );
+ p->bDone = 0;
+ p->pThread = 0;
+ }
+ return rc;
+}
+
+/*
+** Launch a background thread to run xTask(pIn).
+*/
+static int vdbeSorterCreateThread(
+ SorterThread *p, /* Thread object to populate */
+ void *(*xTask)(void*), /* Routine to run in a separate thread */
+ void *pIn /* Argument passed into xTask() */
+){
+ assert( p->pThread==0 && p->bDone==0 );
+ return sqlite3ThreadCreate(&p->pThread, xTask, pIn);
+}
+
+/*
+** Join all outstanding threads launched by SorterWrite() to create
+** level-0 PMAs.
*/
-#if SQLITE_MAX_WORKER_THREADS>0
static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
int rc = rcin;
int i;
for(i=0; i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i];
- if( pTask->pThread ){
- void *pRet;
- int rc2 = sqlite3ThreadJoin(pTask->pThread, &pRet);
- pTask->pThread = 0;
- pTask->bDone = 0;
- if( rc==SQLITE_OK ) rc = rc2;
- if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
- }
+ int rc2 = vdbeSorterJoinThread(pTask, &pTask->thread);
+ if( rc==SQLITE_OK ) rc = rc2;
}
return rc;
}
#else
# define vdbeSorterJoinAll(x,rcin) (rcin)
+# define vdbeSorterJoinThread(pTask,p) SQLITE_OK
#endif
/*
SortSubtask *pTask = &pSorter->aTask[i];
vdbeSortSubtaskCleanup(db, pTask);
}
- if( pSorter->aMemory==0 ){
- vdbeSorterRecordFree(0, pSorter->pRecord);
+ if( pSorter->list.aMemory==0 ){
+ vdbeSorterRecordFree(0, pSorter->list.pList);
}
- pSorter->pRecord = 0;
- pSorter->nInMemory = 0;
+ pSorter->list.pList = 0;
+ pSorter->list.szPMA = 0;
pSorter->bUsePMA = 0;
pSorter->iMemory = 0;
pSorter->mxKeysize = 0;
VdbeSorter *pSorter = pCsr->pSorter;
if( pSorter ){
sqlite3VdbeSorterReset(db, pSorter);
- sqlite3_free(pSorter->aMemory);
+ sqlite3_free(pSorter->list.aMemory);
sqlite3DbFree(db, pSorter);
pCsr->pSorter = 0;
}
return rc;
}
+static int vdbeSortAllocUnpacked(SortSubtask *pTask){
+ if( pTask->pUnpacked==0 ){
+ char *pFree;
+ pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
+ pTask->pKeyInfo, 0, 0, &pFree
+ );
+ assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
+ if( pFree==0 ) return SQLITE_NOMEM;
+ pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
+ pTask->pUnpacked->errCode = 0;
+ }
+ return SQLITE_OK;
+}
+
+
/*
** Merge the two sorted lists p1 and p2 into a single list.
** Set *ppOut to the head of the new list.
** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if
** an error occurs.
*/
-static int vdbeSorterSort(SortSubtask *pTask){
+static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
int i;
SorterRecord **aSlot;
SorterRecord *p;
+ int rc;
+
+ rc = vdbeSortAllocUnpacked(pTask);
+ if( rc!=SQLITE_OK ) return rc;
aSlot = (SorterRecord **)sqlite3MallocZero(64 * sizeof(SorterRecord *));
if( !aSlot ){
return SQLITE_NOMEM;
}
- p = pTask->pList;
+ p = pList->pList;
while( p ){
SorterRecord *pNext;
- if( pTask->aListMemory ){
- if( (u8*)p==pTask->aListMemory ){
+ if( pList->aMemory ){
+ if( (u8*)p==pList->aMemory ){
pNext = 0;
}else{
- assert( p->u.iNext<sqlite3MallocSize(pTask->aListMemory) );
- pNext = (SorterRecord*)&pTask->aListMemory[p->u.iNext];
+ assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
+ pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
}
}else{
pNext = p->u.pNext;
for(i=0; i<64; i++){
vdbeSorterMerge(pTask, p, aSlot[i], &p);
}
- pTask->pList = p;
+ pList->pList = p;
sqlite3_free(aSlot);
+ if( pTask->pUnpacked->errCode ){
+ assert( pTask->pUnpacked->errCode==SQLITE_NOMEM );
+ return SQLITE_NOMEM;
+ }
return SQLITE_OK;
}
/*
-** Write the current contents of the in-memory linked-list to a PMA. Return
-** SQLITE_OK if successful, or an SQLite error code otherwise.
+** Write the current contents of in-memory linked-list pList to a level-0
+** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if
+** successful, or an SQLite error code otherwise.
**
** The format of a PMA is:
**
** Each record consists of a varint followed by a blob of data (the
** key). The varint is the number of bytes in the blob of data.
*/
-static int vdbeSorterListToPMA(SortSubtask *pTask){
+static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
int rc = SQLITE_OK; /* Return code */
PmaWriter writer; /* Object used to write to the file */
+#ifdef SQLITE_DEBUG
+ /* Set iSz to the expected size of file pTask->file after writing the PMA.
+ ** This is used by an assert() statement at the end of this function. */
+ i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
+#endif
+
+ vdbeSorterWorkDebug(pTask, "enter");
memset(&writer, 0, sizeof(PmaWriter));
- assert( pTask->nInMemory>0 );
+ assert( pList->szPMA>0 );
/* If the first temporary PMA file has not been opened, open it now. */
if( pTask->file.pFd==0 ){
/* Try to get the file to memory map */
if( rc==SQLITE_OK ){
vdbeSorterExtendFile(pTask->db,
- pTask->file.pFd, pTask->file.iEof + pTask->nInMemory + 9
+ pTask->file.pFd, pTask->file.iEof + pList->szPMA + 9
);
}
+ /* Sort the list */
+ if( rc==SQLITE_OK ){
+ rc = vdbeSorterSort(pTask, pList);
+ }
+
if( rc==SQLITE_OK ){
SorterRecord *p;
SorterRecord *pNext = 0;
vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz,
pTask->file.iEof);
pTask->nPMA++;
- vdbePmaWriteVarint(&writer, pTask->nInMemory);
- for(p=pTask->pList; p; p=pNext){
+ vdbePmaWriteVarint(&writer, pList->szPMA);
+ for(p=pList->pList; p; p=pNext){
pNext = p->u.pNext;
vdbePmaWriteVarint(&writer, p->nVal);
vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
- if( pTask->aListMemory==0 ) sqlite3_free(p);
+ if( pList->aMemory==0 ) sqlite3_free(p);
}
- pTask->pList = p;
+ pList->pList = p;
rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
}
- assert( pTask->pList==0 || rc!=SQLITE_OK );
+ vdbeSorterWorkDebug(pTask, "exit");
+ assert( rc!=SQLITE_OK || pList->pList==0 );
+ assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
return rc;
}
return rc;
}
-#if 0
-static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
- i64 t;
- int iTask = (pTask - pTask->pSorter->aTask);
- sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
- fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
-}
-static void vdbeSorterRewindDebug(sqlite3 *db, const char *zEvent){
- i64 t;
- sqlite3OsCurrentTimeInt64(db->pVfs, &t);
- fprintf(stderr, "%lld:X %s\n", t, zEvent);
-}
-static void vdbeSorterPopulateDebug(
- SortSubtask *pTask,
- const char *zEvent
-){
- i64 t;
- int iTask = (pTask - pTask->pSorter->aTask);
- sqlite3OsCurrentTimeInt64(pTask->db->pVfs, &t);
- fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
-}
-#else
-# define vdbeSorterWorkDebug(x,y)
-# define vdbeSorterRewindDebug(x,y)
-# define vdbeSorterPopulateDebug(x,y)
-#endif
-
-static int vdbeSortAllocUnpacked(SortSubtask *pTask){
- if( pTask->pUnpacked==0 ){
- char *pFree;
- pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
- pTask->pKeyInfo, 0, 0, &pFree
- );
- assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
- if( pFree==0 ) return SQLITE_NOMEM;
- pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
- pTask->pUnpacked->errCode = 0;
- }
- return SQLITE_OK;
-}
-
/*
** The main routine for sorter-thread operations.
*/
-static void *vdbeSortSubtaskMain(void *pCtx){
- int rc = SQLITE_OK;
+static void *vdbeSorterFlushThread(void *pCtx){
SortSubtask *pTask = (SortSubtask*)pCtx;
-
- assert( pTask->eWork==SORT_SUBTASK_SORT
- || pTask->eWork==SORT_SUBTASK_TO_PMA
- || pTask->eWork==SORT_SUBTASK_CONS
- );
- assert( pTask->bDone==0 );
-
- vdbeSorterWorkDebug(pTask, "enter");
-
- rc = vdbeSortAllocUnpacked(pTask);
- if( rc!=SQLITE_OK ) goto thread_out;
-
- if( pTask->eWork==SORT_SUBTASK_CONS ){
- assert( pTask->pList==0 );
- while( pTask->nPMA>pTask->nConsolidate && rc==SQLITE_OK ){
- int nIter = MIN(pTask->nPMA, SORTER_MAX_MERGE_COUNT);
- sqlite3_file *pTemp2 = 0; /* Second temp file to use */
- MergeEngine *pMerger; /* Object for reading/merging PMA data */
- i64 iReadOff = 0; /* Offset in pTemp1 to read from */
- i64 iWriteOff = 0; /* Offset in pTemp2 to write to */
- int i;
-
- /* Allocate a merger object to merge PMAs together. */
- pMerger = vdbeMergeEngineNew(nIter);
- if( pMerger==0 ){
- rc = SQLITE_NOMEM;
- break;
- }
-
- /* Open a second temp file to write merged data to */
- rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTemp2);
- if( rc==SQLITE_OK ){
- vdbeSorterExtendFile(pTask->db, pTemp2, pTask->file.iEof);
- }else{
- vdbeMergeEngineFree(pMerger);
- break;
- }
-
- /* This loop runs once for each output PMA. Each output PMA is made
- ** of data merged from up to SORTER_MAX_MERGE_COUNT input PMAs. */
- for(i=0; rc==SQLITE_OK && i<pTask->nPMA; i+=SORTER_MAX_MERGE_COUNT){
- PmaWriter writer; /* Object for writing data to pTemp2 */
- i64 nOut = 0; /* Bytes of data in output PMA */
- int bEof = 0;
- int rc2;
-
- /* Configure the merger object to read and merge data from the next
- ** SORTER_MAX_MERGE_COUNT PMAs in pTemp1 (or from all remaining PMAs,
- ** if that is fewer). */
- int iIter;
- for(iIter=0; iIter<SORTER_MAX_MERGE_COUNT; iIter++){
- PmaReader *pIter = &pMerger->aIter[iIter];
- rc = vdbePmaReaderInit(pTask, &pTask->file, iReadOff, pIter, &nOut);
- iReadOff = pIter->iEof;
- if( iReadOff>=pTask->file.iEof || rc!=SQLITE_OK ) break;
- }
- for(iIter=pMerger->nTree-1; rc==SQLITE_OK && iIter>0; iIter--){
- rc = vdbeSorterDoCompare(pTask, pMerger, iIter);
- }
-
- vdbePmaWriterInit(pTemp2, &writer, pTask->pgsz, iWriteOff);
- vdbePmaWriteVarint(&writer, nOut);
- while( rc==SQLITE_OK && bEof==0 ){
- PmaReader *pIter = &pMerger->aIter[ pMerger->aTree[1] ];
- assert( pIter->pFile!=0 ); /* pIter is not at EOF */
- vdbePmaWriteVarint(&writer, pIter->nKey);
- vdbePmaWriteBlob(&writer, pIter->aKey, pIter->nKey);
- rc = vdbeSorterNext(pTask, pMerger, &bEof);
- }
- rc2 = vdbePmaWriterFinish(&writer, &iWriteOff);
- if( rc==SQLITE_OK ) rc = rc2;
- }
-
- vdbeMergeEngineFree(pMerger);
- sqlite3OsCloseFree(pTask->file.pFd);
- pTask->file.pFd = pTemp2;
- pTask->nPMA = (i / SORTER_MAX_MERGE_COUNT);
- pTask->file.iEof = iWriteOff;
- }
- }else{
- /* Sort the pTask->pList list */
- rc = vdbeSorterSort(pTask);
-
- /* If required, write the list out to a PMA. */
- if( rc==SQLITE_OK && pTask->eWork==SORT_SUBTASK_TO_PMA ){
-#ifdef SQLITE_DEBUG
- i64 nExpect = pTask->nInMemory
- + sqlite3VarintLen(pTask->nInMemory)
- + pTask->file.iEof;
-#endif
- rc = vdbeSorterListToPMA(pTask);
- assert( rc!=SQLITE_OK || (nExpect==pTask->file.iEof) );
- }
- }
-
- thread_out:
- pTask->bDone = 1;
- if( rc==SQLITE_OK && pTask->pUnpacked->errCode ){
- assert( pTask->pUnpacked->errCode==SQLITE_NOMEM );
- rc = SQLITE_NOMEM;
- }
- vdbeSorterWorkDebug(pTask, "exit");
+ int rc; /* Return code */
+ assert( pTask->thread.bDone==0 );
+ rc = vdbeSorterListToPMA(pTask, &pTask->list);
+ pTask->thread.bDone = 1;
return SQLITE_INT_TO_PTR(rc);
}
/*
-** Run the activity scheduled by the object passed as the only argument
-** in the current thread.
-*/
-static int vdbeSorterRunTask(SortSubtask *pTask){
- int rc = SQLITE_PTR_TO_INT( vdbeSortSubtaskMain((void*)pTask) );
- assert( pTask->bDone );
- pTask->bDone = 0;
- return rc;
-}
-
-/*
-** Flush the current contents of VdbeSorter.pRecord to a new PMA, possibly
+** Flush the current contents of VdbeSorter.list to a new PMA, possibly
** using a background thread.
-**
-** If argument bFg is non-zero, the operation always uses the calling thread.
*/
-static int vdbeSorterFlushPMA(sqlite3 *db, const VdbeCursor *pCsr, int bFg){
- VdbeSorter *pSorter = pCsr->pSorter;
+static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
+#if SQLITE_MAX_WORKER_THREADS==0
+ pSorter->bUsePMA = 1;
+ return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
+#else
int rc = SQLITE_OK;
int i;
SortSubtask *pTask = 0; /* Thread context used to create new PMA */
int nWorker = (pSorter->nTask-1);
+ /* Set the flag to indicate that at least one PMA has been written.
+ ** Or will be, anyhow. */
pSorter->bUsePMA = 1;
+
+ /* Select a sub-task to sort and flush the current list of in-memory
+ ** records to disk. If the sorter is running in multi-threaded mode,
+ ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
+ ** the background thread from a sub-tasks previous turn is still running,
+ ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
+ ** fall back to using the final sub-task. The first (pSorter->nTask-1)
+ ** sub-tasks are prefered as they use background threads - the final
+ ** sub-task uses the main thread. */
for(i=0; i<nWorker; i++){
int iTest = (pSorter->iPrev + i + 1) % nWorker;
pTask = &pSorter->aTask[iTest];
-#if SQLITE_MAX_WORKER_THREADS>0
- if( pTask->bDone ){
- void *pRet;
- assert( pTask->pThread );
- rc = sqlite3ThreadJoin(pTask->pThread, &pRet);
- pTask->pThread = 0;
- pTask->bDone = 0;
- if( rc==SQLITE_OK ){
- rc = SQLITE_PTR_TO_INT(pRet);
- }
+ if( pTask->thread.bDone ){
+ rc = vdbeSorterJoinThread(pTask, &pTask->thread);
}
-#endif
- if( pTask->pThread==0 ) break;
- pTask = 0;
- }
- if( pTask==0 ){
- pTask = &pSorter->aTask[nWorker];
+ if( pTask->thread.pThread==0 || rc!=SQLITE_OK ) break;
}
- pSorter->iPrev = (pTask - pSorter->aTask);
if( rc==SQLITE_OK ){
- assert( pTask->pThread==0 && pTask->bDone==0 );
- pTask->eWork = SORT_SUBTASK_TO_PMA;
- pTask->pList = pSorter->pRecord;
- pTask->nInMemory = pSorter->nInMemory;
- pSorter->nInMemory = 0;
- pSorter->pRecord = 0;
-
- if( pSorter->aMemory ){
- u8 *aMem = pTask->aListMemory;
- pTask->aListMemory = pSorter->aMemory;
- pSorter->aMemory = aMem;
- }
-
-#if SQLITE_MAX_WORKER_THREADS>0
- if( !bFg && pTask!=&pSorter->aTask[nWorker] ){
+ if( i==nWorker ){
+ /* Use the foreground thread for this operation */
+ rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
+ }else{
/* Launch a background thread for this operation */
+ u8 *aMem = pTask->list.aMemory;
void *pCtx = (void*)pTask;
- assert( pSorter->aMemory==0 || pTask->aListMemory!=0 );
- if( pTask->aListMemory ){
- if( pSorter->aMemory==0 ){
- pSorter->aMemory = sqlite3Malloc(pSorter->nMemory);
- if( pSorter->aMemory==0 ) return SQLITE_NOMEM;
- }else{
- pSorter->nMemory = sqlite3MallocSize(pSorter->aMemory);
- }
- }
- rc = sqlite3ThreadCreate(&pTask->pThread, vdbeSortSubtaskMain, pCtx);
- }else
-#endif
- {
- /* Use the foreground thread for this operation */
- rc = vdbeSorterRunTask(pTask);
- if( rc==SQLITE_OK ){
- u8 *aMem = pTask->aListMemory;
- pTask->aListMemory = pSorter->aMemory;
- pSorter->aMemory = aMem;
- assert( pTask->pList==0 );
+
+ assert( pTask->thread.pThread==0 && pTask->thread.bDone==0 );
+ assert( pTask->list.pList==0 );
+ assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
+
+ pSorter->iPrev = (pTask - pSorter->aTask);
+ pTask->list = pSorter->list;
+ pSorter->list.pList = 0;
+ pSorter->list.szPMA = 0;
+ if( aMem ){
+ pSorter->list.aMemory = aMem;
+ pSorter->nMemory = sqlite3MallocSize(aMem);
+ }else{
+ pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
+ if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
}
+
+ rc = vdbeSorterCreateThread(&pTask->thread, vdbeSorterFlushThread, pCtx);
}
}
return rc;
+#endif
}
/*
nReq = pVal->n + sizeof(SorterRecord);
nPMA = pVal->n + sqlite3VarintLen(pVal->n);
if( pSorter->mxPmaSize ){
- if( pSorter->aMemory ){
+ if( pSorter->list.aMemory ){
bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
}else{
bFlush = (
- (pSorter->nInMemory > pSorter->mxPmaSize)
- || (pSorter->nInMemory > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
+ (pSorter->list.szPMA > pSorter->mxPmaSize)
+ || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
);
}
if( bFlush ){
- rc = vdbeSorterFlushPMA(db, pCsr, 0);
- pSorter->nInMemory = 0;
+ rc = vdbeSorterFlushPMA(pSorter);
+ pSorter->list.szPMA = 0;
pSorter->iMemory = 0;
- assert( rc!=SQLITE_OK || pSorter->pRecord==0 );
+ assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
}
}
- pSorter->nInMemory += nPMA;
+ pSorter->list.szPMA += nPMA;
if( nPMA>pSorter->mxKeysize ){
pSorter->mxKeysize = nPMA;
}
- if( pSorter->aMemory ){
+ if( pSorter->list.aMemory ){
int nMin = pSorter->iMemory + nReq;
if( nMin>pSorter->nMemory ){
if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
if( nNew < nMin ) nNew = nMin;
- aNew = sqlite3Realloc(pSorter->aMemory, nNew);
+ aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
if( !aNew ) return SQLITE_NOMEM;
- pSorter->pRecord = (SorterRecord*)(
- aNew + ((u8*)pSorter->pRecord - pSorter->aMemory)
+ pSorter->list.pList = (SorterRecord*)(
+ aNew + ((u8*)pSorter->list.pList - pSorter->list.aMemory)
);
- pSorter->aMemory = aNew;
+ pSorter->list.aMemory = aNew;
pSorter->nMemory = nNew;
}
- pNew = (SorterRecord*)&pSorter->aMemory[pSorter->iMemory];
+ pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
pSorter->iMemory += ROUND8(nReq);
- pNew->u.iNext = (u8*)(pSorter->pRecord) - pSorter->aMemory;
+ pNew->u.iNext = (u8*)(pSorter->list.pList) - pSorter->list.aMemory;
}else{
pNew = (SorterRecord *)sqlite3Malloc(nReq);
if( pNew==0 ){
return SQLITE_NOMEM;
}
- pNew->u.pNext = pSorter->pRecord;
+ pNew->u.pNext = pSorter->list.pList;
}
memcpy(SRVAL(pNew), pVal->z, pVal->n);
pNew->nVal = pVal->n;
- pSorter->pRecord = pNew;
+ pSorter->list.pList = pNew;
return rc;
}
-/*
-** Return the total number of PMAs in all temporary files.
-*/
-static int vdbeSorterCountPMA(VdbeSorter *pSorter){
- int nPMA = 0;
- int i;
- for(i=0; i<pSorter->nTask; i++){
- nPMA += pSorter->aTask[i].nPMA;
- }
- return nPMA;
-}
-
/*
** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
** of the data stored in aFile[1] is the same as that used by regular PMAs,
return rc;
}
-static void *vdbeIncrPopulateThreadMain(void *pCtx){
+static void *vdbeIncrPopulateThread(void *pCtx){
IncrMerger *pIncr = (IncrMerger*)pCtx;
- return SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
+ void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
+ pIncr->thread.bDone = 1;
+ return pRet;
}
-static int vdbeIncrBgPopulate(IncrMerger *pIncr){
- int rc;
- assert( pIncr->pThread==0 );
- if( pIncr->bUseThread==0 ){
- rc = vdbeIncrPopulate(pIncr);
- }
#if SQLITE_MAX_WORKER_THREADS>0
- else{
- void *pCtx = (void*)pIncr;
- rc = sqlite3ThreadCreate(&pIncr->pThread, vdbeIncrPopulateThreadMain, pCtx);
- }
-#endif
- return rc;
+static int vdbeIncrBgPopulate(IncrMerger *pIncr){
+ void *pCtx = (void*)pIncr;
+ assert( pIncr->bUseThread );
+ return vdbeSorterCreateThread(&pIncr->thread, vdbeIncrPopulateThread, pCtx);
}
+#endif
static int vdbeIncrSwap(IncrMerger *pIncr){
int rc = SQLITE_OK;
- if( pIncr->bUseThread ){
#if SQLITE_MAX_WORKER_THREADS>0
- if( pIncr->pThread ){
- void *pRet;
- assert( pIncr->bUseThread );
- rc = sqlite3ThreadJoin(pIncr->pThread, &pRet);
- if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
- pIncr->pThread = 0;
- }
-#endif
+ if( pIncr->bUseThread ){
+ rc = vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
if( rc==SQLITE_OK ){
SorterFile f0 = pIncr->aFile[0];
rc = vdbeIncrBgPopulate(pIncr);
}
}
- }else{
+ }else
+#endif
+ {
rc = vdbeIncrPopulate(pIncr);
pIncr->aFile[0] = pIncr->aFile[1];
if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
static void vdbeIncrFree(IncrMerger *pIncr){
if( pIncr ){
#if SQLITE_MAX_WORKER_THREADS>0
- if( pIncr->pThread ){
- void *pRet;
- sqlite3ThreadJoin(pIncr->pThread, &pRet);
- }
+ vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
if( pIncr->bUseThread ){
if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
pIncr->pTask = pTask;
pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
pTask->file2.iEof += pIncr->mxSz;
-
-#if 0
- /* Open the two temp files. */
- rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
- if( rc==SQLITE_OK ){
- rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
- }
- if( rc!=SQLITE_OK ){
- vdbeIncrFree(pIncr);
- pIncr = 0;
- }
-#endif
}
return pIncr;
}
for(i=0; rc==SQLITE_OK && i<pMerger->nTree; i++){
rc = vdbeIncrInit2(&pMerger->aIter[i]);
}
- for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
- rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
- }
/* Set up the required files for pIncr */
if( rc==SQLITE_OK ){
}
}
+ for(i=pMerger->nTree-1; rc==SQLITE_OK && i>0; i--){
+ rc = vdbeSorterDoCompare(pIncr->pTask, pMerger, i);
+ }
+
if( rc==SQLITE_OK && pIncr->bUseThread ){
rc = vdbeIncrBgPopulate(pIncr);
}
}
}
}
- if( rc==SQLITE_OK ){
- rc = vdbeIncrInit2(pIter);
- }
+ if( rc==SQLITE_OK ) rc = vdbeIncrInit2(pIter);
sqlite3_free(aMerge);
return rc;
** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
** from the in-memory list. */
if( pSorter->bUsePMA==0 ){
- if( pSorter->pRecord ){
- SortSubtask *pTask = &pSorter->aTask[0];
+ if( pSorter->list.pList ){
*pbEof = 0;
- pTask->pList = pSorter->pRecord;
- pTask->eWork = SORT_SUBTASK_SORT;
- assert( pTask->aListMemory==0 );
- pTask->aListMemory = pSorter->aMemory;
- rc = vdbeSorterRunTask(pTask);
- pTask->aListMemory = 0;
- pSorter->pRecord = pTask->pList;
- pTask->pList = 0;
+ rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
}else{
*pbEof = 1;
}
}
/* Write the current in-memory list to a PMA. */
- if( pSorter->pRecord ){
- rc = vdbeSorterFlushPMA(db, pCsr, 1);
+ if( pSorter->list.pList ){
+ rc = vdbeSorterFlushPMA(pSorter);
}
/* Join all threads */
rc = vdbePmaReaderNext(pSorter->pReader);
*pbEof = (pSorter->pReader->pFile==0);
}else{
- SorterRecord *pFree = pSorter->pRecord;
- pSorter->pRecord = pFree->u.pNext;
+ SorterRecord *pFree = pSorter->list.pList;
+ pSorter->list.pList = pFree->u.pNext;
pFree->u.pNext = 0;
- if( pSorter->aMemory==0 ) vdbeSorterRecordFree(db, pFree);
- *pbEof = !pSorter->pRecord;
+ if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
+ *pbEof = !pSorter->list.pList;
rc = SQLITE_OK;
}
return rc;
*pnKey = pSorter->pReader->nKey;
pKey = pSorter->pReader->aKey;
}else{
- *pnKey = pSorter->pRecord->nVal;
- pKey = SRVAL(pSorter->pRecord);
+ *pnKey = pSorter->list.pList->nVal;
+ pKey = SRVAL(pSorter->list.pList);
}
return pKey;
}