**
*************************************************************************
** This file contains code for the VdbeSorter object, used in concert with
-** a VdbeCursor to sort large numbers of keys for CREATE TABLE statements
+** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
** or by SELECT statements with ORDER BY clauses that cannot be satisfied
** using indexes and without LIMIT clauses.
**
**
** The interfaces above must be called in a particular order. Write() can
** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and
-** Compare() can only occur in between Rewind() and Close()/Reset().
+** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
+**
+** Init()
+** for each record: Write()
+** Rewind()
+** Rowkey()/Compare()
+** Next()
+** Close()
**
** Algorithm:
**
-** Records to be sorted are initially held in memory, in the order in
-** which they arrive from Write(). When the amount of memory needed exceeds
-** a threshold, all in-memory records are sorted and then appended to
-** a temporary file as a "Packed-Memory-Array" or "PMA" and the memory is
-** reset. There is a single temporary file used for all PMAs. The PMAs
-** are packed one after another in the file. The VdbeSorter object keeps
-** track of the number of PMAs written.
-**
-** When the Rewind() is seen, any records still held in memory are sorted.
-** If no PMAs have been written (if all records are still held in memory)
-** then subsequent Rowkey(), Next(), and Compare() operations work directly
-** from memory. But if PMAs have been written things get a little more
-** complicated.
-**
-** When Rewind() is seen after PMAs have been written, any records still
-** in memory are sorted and written as a final PMA. Then all the PMAs
-** are merged together into a single massive PMA that Next(), Rowkey(),
-** and Compare() walk to extract the records in sorted order.
-**
-** If SQLITE_MAX_WORKER_THREADS is non-zero, various steps of the above
-** algorithm might be performed in parallel by separate threads. Threads
-** are only used when one or more PMA spill to disk. If the sort is small
-** enough to fit entirely in memory, everything happens on the main thread.
+** Records passed to the sorter via calls to Write() are initially held
+** unsorted in main memory. Assuming the amount of memory used never exceeds
+** a threshold, when Rewind() is called the set of records is sorted using
+** an in-memory merge sort. In this case, no temporary files are required
+** and subsequent calls to Rowkey(), Next() and Compare() read records
+** directly from main memory.
+**
+** If the amount of space used to store records in main memory exceeds the
+** threshold, then the set of records currently in memory are sorted and
+** written to a temporary file in "Packed Memory Array" (PMA) format.
+** A PMA created at this point is known as a "level-0 PMA". Higher levels
+** of PMAs may be created by merging existing PMAs together - for example
+** merging two or more level-0 PMAs together creates a level-1 PMA.
+**
+** The threshold for the amount of main memory to use before flushing
+** records to a PMA is roughly the same as the limit configured for the
+** page-cache of the main database. Specifically, the threshold is set to
+** the value returned multiplied by "PRAGMA main.page_size" multipled by
+** that returned by "PRAGMA main.cache_size", in bytes.
+**
+** If the sorter is running in single-threaded mode, then all PMAs generated
+** are appended to a single temporary file. Or, if the sorter is running in
+** multi-threaded mode then up to (N+1) temporary files may be opened, where
+** N is the configured number of worker threads. In this case, instead of
+** sorting the records and writing the PMA to a temporary file itself, the
+** calling thread usually launches a worker thread to do so. Except, if
+** there are already N worker threads running, the main thread does the work
+** itself.
+**
+** The sorter is running in multi-threaded mode if (a) the library was built
+** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
+** than zero, and (b) worker threads have been enabled at runtime by calling
+** sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, ...).
+**
+** When Rewind() is called, any data remaining in memory is flushed to a
+** final PMA. So at this point the data is stored in some number of sorted
+** PMAs within temporary files on disk. Within a single file sorter is
+** running in single threaded mode, or distributed between one or more files
+** for multi-threaded sorters.
+**
+** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
+** sorter is running in single-threaded mode, then these PMAs are merged
+** incrementally as keys are retreived from the sorter by the VDBE. See
+** comments above object MergeEngine below for details.
+**
+** Or, if running in multi-threaded mode, then a background thread is
+** launched to merge the existing PMAs. Once the background thread has
+** merged T bytes of data into a single sorted PMA, the main thread
+** begins reading keys from that PMA while the background thread proceeds
+** with merging the next T bytes of data. And so on.
+**
+** Parameter T is set to half the value of the memory threshold used
+** by Write() above to determine when to create a new PMA.
+**
+** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when
+** Rewind() is called, then a hierarchy of incremental-merges is used.
+** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on
+** disk are merged together. Then T bytes of data from the second set, and
+** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
+** PMAs at a time. This done is to improve locality.
+**
+** If running in multi-threaded mode and there are more than
+** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
+** than one background thread may be created. Specifically, there may be
+** one background thread for each temporary file on disk, and one background
+** thread to merge the output of each of the others to a single PMA for
+** the main thread to read from.
*/
#include "sqliteInt.h"
#include "vdbeInt.h"
*/
typedef struct MergeEngine MergeEngine; /* Merge PMAs together */
typedef struct PmaReader PmaReader; /* Incrementally read one PMA */
-typedef struct PmaWriter PmaWriter; /* Incrementally write on PMA */
+typedef struct PmaWriter PmaWriter; /* Incrementally write one PMA */
typedef struct SorterRecord SorterRecord; /* A record being sorted */
typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */
-typedef struct SorterFile SorterFile;
-typedef struct SorterThread SorterThread;
-typedef struct SorterList SorterList;
+typedef struct SorterFile SorterFile; /* Temporary file object wrapper */
+typedef struct SorterList SorterList; /* In-memory list of records */
typedef struct IncrMerger IncrMerger;
/*
};
/*
-** An object of this type is used to store the thread handle for each
-** background thread launched by the sorter. Before the thread is launched,
-** variable bDone is set to 0. Then, right before it exits, the thread
-** itself sets bDone to 1.
-**
-** This is then used for two purposes:
-**
-** 1. When flushing the contents of memory to a level-0 PMA on disk, to
-** attempt to select a SortSubtask for which there is not already an
-** active background thread (since doing so causes the main thread
-** to block until it finishes).
-**
-** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
-** to sqlite3ThreadJoin() is likely to block.
-**
-** In both cases, the effects of the main thread seeing (bDone==0) even
-** after the thread has finished are not dire. So we don't worry about
-** memory barriers and such here.
+** In memory linked list of records.
*/
-struct SorterThread {
- SQLiteThread *pThread;
- int bDone;
-};
-
struct SorterList {
SorterRecord *pList; /* Linked list of records */
u8 *aMemory; /* If non-NULL, blob of memory for pList */
int szPMA; /* Size of pList as PMA in bytes */
};
-/*
-** Sorting is divided up into smaller subtasks. Each subtask is controlled
-** by an instance of this object. A Subtask might run in either the main thread
-** or in a background thread.
-**
-** Exactly VdbeSorter.nTask instances of this object are allocated
-** as part of each VdbeSorter object. Instances are never allocated any other
-** way. VdbeSorter.nTask is set to the number of worker threads allowed
-** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
-**
-** When a background thread is launched to perform work, SortSubtask.bDone
-** is set to 0 and the SortSubtask.pTask variable set to point to the
-** thread handle. SortSubtask.bDone is set to 1 (to indicate to the main
-** thread that joining SortSubtask.pTask will not block) before the thread
-** exits. SortSubtask.pTask and bDone are always cleared after the
-** background thread has been joined.
-**
-** One object (specifically, VdbeSorter.aTask[VdbeSorter.nTask-1])
-** is reserved for the foreground thread.
-**
-** The nature of the work performed is determined by SortSubtask.eWork,
-** as follows:
-**
-** SORT_SUBTASK_SORT:
-** Sort the linked list of records at SortSubtask.pList.
-**
-** SORT_SUBTASK_TO_PMA:
-** Sort the linked list of records at SortSubtask.pList, and write
-** the results to a new PMA in temp file SortSubtask.pTemp1. Open
-** the temp file if it is not already open.
-**
-** SORT_SUBTASK_CONS:
-** Merge existing PMAs until SortSubtask.nConsolidate or fewer
-** remain in temp file SortSubtask.pTemp1.
-*/
-struct SortSubtask {
- SorterThread thread;
- sqlite3 *db; /* Database connection */
- VdbeSorter *pSorter; /* Sorter */
- KeyInfo *pKeyInfo; /* How to compare records */
- UnpackedRecord *pUnpacked; /* Space to unpack a record */
- int pgsz; /* Main database page size */
- SorterList list; /* List for thread to write to a PMA */
- int nPMA; /* Number of PMAs currently in file */
- SorterFile file; /* Temp file for level-0 PMAs */
- SorterFile file2; /* Space for other PMAs */
-};
-
-
/*
** The MergeEngine object is used to combine two or more smaller PMAs into
** one big PMA using a merge operation. Separate PMAs all need to be
PmaReader *aIter; /* Array of iterators to merge data from */
};
+/*
+** Exactly VdbeSorter.nTask instances of this object are allocated
+** as part of each VdbeSorter object. Instances are never allocated any
+** other way. VdbeSorter.nTask is set to the number of worker threads allowed
+** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).
+**
+** Essentially, this structure contains all those fields of the VdbeSorter
+** structure for which each thread requires a separate instance. For example,
+** each thread requries its own UnpackedRecord object to unpack records in
+** as part of comparison operations.
+**
+** Before a background thread is launched, variable bDone is set to 0. Then,
+** right before it exits, the thread itself sets bDone to 1. This is used for
+** two purposes:
+**
+** 1. When flushing the contents of memory to a level-0 PMA on disk, to
+** attempt to select a SortSubtask for which there is not already an
+** active background thread (since doing so causes the main thread
+** to block until it finishes).
+**
+** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
+** to sqlite3ThreadJoin() is likely to block. Cases that are likely to
+** block provoke debugging output.
+**
+** In both cases, the effects of the main thread seeing (bDone==0) even
+** after the thread has finished are not dire. So we don't worry about
+** memory barriers and such here.
+*/
+struct SortSubtask {
+ SQLiteThread *pThread; /* Background thread, if any */
+ int bDone; /* Set if thread is finished but not joined */
+ VdbeSorter *pSorter; /* Sorter that owns this sub-task */
+ UnpackedRecord *pUnpacked; /* Space to unpack a record */
+ SorterList list; /* List for thread to write to a PMA */
+ int nPMA; /* Number of PMAs currently in file */
+ SorterFile file; /* Temp file for level-0 PMAs */
+ SorterFile file2; /* Space for other PMAs */
+};
+
/*
** Main sorter structure. A single instance of this is allocated for each
** sorter cursor created by the VDBE.
struct VdbeSorter {
int mnPmaSize; /* Minimum PMA size, in bytes */
int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */
+ int mxKeysize; /* Largest serialized key seen so far */
+ int pgsz; /* Main database page size */
PmaReader *pReader; /* Read data from here after Rewind() */
MergeEngine *pMerger; /* Or here, if bUseThreads==0 */
- int mxKeysize; /* Largest serialized key seen so far */
+ sqlite3 *db; /* Database connection */
+ KeyInfo *pKeyInfo; /* How to compare records */
UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */
SorterList list; /* List of in-memory records */
int iMemory; /* Offset of free space in list.aMemory */
** within a temp file. However, if the PmaReader.pIncr variable points to
** an object of the following type, it may be used to iterate/merge through
** multiple PMAs simultaneously.
+**
+** There are two types of IncrMerger object - single (bUseThread==0) and
+** multi-threaded (bUseThread==1).
+**
+** A multi-threaded IncrMerger object uses two temporary files - aFile[0]
+** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in
+** size. When the IncrMerger is initialized, it reads enough data from
+** pMerger to populate aFile[0]. It then sets variables within the
+** corresponding PmaReader object to read from that file and kicks off
+** a background thread to populate aFile[1] with the next mxSz bytes of
+** sorted record data from pMerger.
+**
+** When the PmaReader reaches the end of aFile[0], it blocks until the
+** background thread has finished populating aFile[1]. It then exchanges
+** the contents of the aFile[0] and aFile[1] variables within this structure,
+** sets the PmaReader fields to read from the new aFile[0] and kicks off
+** another background thread to populate the new aFile[1]. And so on, until
+** the contents of pMerger are exhausted.
+**
+** A single-threaded IncrMerger does not open any temporary files of its
+** own. Instead, it has exclusive access to mxSz bytes of space beginning
+** at offset iStartOff of file pTask->file2. And instead of using a
+** background thread to prepare data for the PmaReader, with a single
+** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
+** keys from pMerger by the calling thread whenever the PmaReader runs out
+** of data.
*/
struct IncrMerger {
SortSubtask *pTask; /* Task that owns this merger */
- SorterThread thread; /* Thread for populating aFile[1] */
MergeEngine *pMerger; /* Merge engine thread reads data from */
i64 iStartOff; /* Offset to start writing file at */
int mxSz; /* Maximum bytes of data to store */
**
** How the linked list is connected depends on how memory is being managed
** by this module. If using a separate allocation for each in-memory record
-** (VdbeSorter.aMemory==0), then the list is always connected using the
+** (VdbeSorter.list.aMemory==0), then the list is always connected using the
** SorterRecord.u.pNext pointers.
**
-** Or, if using the single large allocation method (VdbeSorter.aMemory!=0),
+** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
** then while records are being accumulated the list is linked using the
** SorterRecord.u.iNext offset. This is because the aMemory[] array may
** be sqlite3Realloc()ed while records are being accumulated. Once the VM
#define SORTER_MAX_MERGE_COUNT 16
static int vdbeIncrSwap(IncrMerger*);
-static void vdbeIncrFree(IncrMerger*);
+static void vdbeIncrFree(IncrMerger *);
/*
** Free all memory belonging to the PmaReader object passed as the second
return SQLITE_OK;
}
+/*
+** Attempt to memory map file pFile. If successful, set *pp to point to the
+** new mapping and return SQLITE_OK. If the mapping is not attempted
+** (because the file is too large or the VFS layer is configured not to use
+** mmap), return SQLITE_OK and set *pp to NULL.
+**
+** Or, if an error occurs, return an SQLite error code. The final value of
+** *pp is undefined in this case.
+*/
static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
int rc = SQLITE_OK;
- if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){
+ if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
rc = sqlite3OsFetch(pFile->pFd, 0, pFile->iEof, (void**)pp);
}
return rc;
}
-static int vdbePmaReaderReinit(PmaReader *pIter){
- IncrMerger *pIncr = pIter->pIncr;
- SortSubtask *pTask = pIncr->pTask;
+/*
+** Seek iterator pIter to offset iOff within file pFile. Return SQLITE_OK
+** if successful, or an SQLite error code if an error occurs.
+*/
+static int vdbePmaReaderSeek(
+ SortSubtask *pTask, /* Task context */
+ PmaReader *pIter, /* Iterate to populate */
+ SorterFile *pFile, /* Sorter file to read from */
+ i64 iOff /* Offset in pFile */
+){
int rc = SQLITE_OK;
- assert( pIncr->bEof==0 );
+ assert( pIter->pIncr==0 || pIter->pIncr->bEof==0 );
if( pIter->aMap ){
sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap);
pIter->aMap = 0;
}
- pIter->iReadOff = pIncr->iStartOff;
- pIter->iEof = pIncr->aFile[0].iEof;
- pIter->pFile = pIncr->aFile[0].pFd;
+ pIter->iReadOff = iOff;
+ pIter->iEof = pFile->iEof;
+ pIter->pFile = pFile->pFd;
- rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap);
- if( rc==SQLITE_OK ){
- if( pIter->aMap==0 ){
- /* TODO: Combine this code with similar code in vdbePmaReaderInit() */
- int iBuf = pIter->iReadOff % pTask->pgsz;
- if( pIter->aBuffer==0 ){
- pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz);
- if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
- pIter->nBuffer = pTask->pgsz;
- }
- if( iBuf ){
- int nRead = pTask->pgsz - iBuf;
- if( (pIter->iReadOff + nRead) > pIter->iEof ){
- nRead = (int)(pIter->iEof - pIter->iReadOff);
- }
- rc = sqlite3OsRead(
- pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
- );
- assert( rc!=SQLITE_IOERR_SHORT_READ );
+ rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
+ if( rc==SQLITE_OK && pIter->aMap==0 ){
+ int pgsz = pTask->pSorter->pgsz;
+ int iBuf = pIter->iReadOff % pgsz;
+ if( pIter->aBuffer==0 ){
+ pIter->aBuffer = (u8*)sqlite3Malloc(pgsz);
+ if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM;
+ pIter->nBuffer = pgsz;
+ }
+ if( iBuf ){
+ int nRead = pgsz - iBuf;
+ if( (pIter->iReadOff + nRead) > pIter->iEof ){
+ nRead = (int)(pIter->iEof - pIter->iReadOff);
}
+ rc = sqlite3OsRead(
+ pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff
+ );
+ assert( rc!=SQLITE_IOERR_SHORT_READ );
}
}
return rc;
}
-
/*
** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if
** no error occurs, or an SQLite error code if one does.
u64 nRec = 0; /* Size of record in bytes */
if( pIter->iReadOff>=pIter->iEof ){
+ IncrMerger *pIncr = pIter->pIncr;
int bEof = 1;
- if( pIter->pIncr ){
- rc = vdbeIncrSwap(pIter->pIncr);
- if( rc==SQLITE_OK && pIter->pIncr->bEof==0 ){
- rc = vdbePmaReaderReinit(pIter);
+ if( pIncr ){
+ rc = vdbeIncrSwap(pIncr);
+ if( rc==SQLITE_OK && pIncr->bEof==0 ){
+ rc = vdbePmaReaderSeek(
+ pIncr->pTask, pIter, &pIncr->aFile[0], pIncr->iStartOff
+ );
bEof = 0;
}
}
PmaReader *pIter, /* Iterator to populate */
i64 *pnByte /* IN/OUT: Increment this value by PMA size */
){
- int rc = SQLITE_OK;
- int nBuf = pTask->pgsz;
+ int rc;
assert( pFile->iEof>iStart );
- assert( pIter->aAlloc==0 );
+ assert( pIter->aAlloc==0 && pIter->nAlloc==0 );
assert( pIter->aBuffer==0 );
- pIter->pFile = pFile->pFd;
- pIter->iReadOff = iStart;
- pIter->nAlloc = 128;
- pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc);
- if( pIter->aAlloc ){
- /* Try to xFetch() a mapping of the entire temp file. If this is possible,
- ** the PMA will be read via the mapping. Otherwise, use xRead(). */
- rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap);
- }else{
- rc = SQLITE_NOMEM;
- }
-
- if( rc==SQLITE_OK && pIter->aMap==0 ){
- pIter->nBuffer = nBuf;
- pIter->aBuffer = (u8*)sqlite3Malloc(nBuf);
- if( !pIter->aBuffer ){
- rc = SQLITE_NOMEM;
- }else{
- int iBuf = iStart % nBuf;
- if( iBuf ){
- int nRead = nBuf - iBuf;
- if( (iStart + nRead) > pFile->iEof ){
- nRead = (int)(pFile->iEof - iStart);
- }
- rc = sqlite3OsRead(
- pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart
- );
- assert( rc!=SQLITE_IOERR_SHORT_READ );
- }
- }
- }
+ assert( pIter->aMap==0 );
+ rc = vdbePmaReaderSeek(pTask, pIter, pFile, iStart);
if( rc==SQLITE_OK ){
u64 nByte; /* Size of PMA in bytes */
- pIter->iEof = pFile->iEof;
rc = vdbePmaReadVarint(pIter, &nByte);
pIter->iEof = pIter->iReadOff + nByte;
*pnByte += nByte;
){
UnpackedRecord *r2 = pTask->pUnpacked;
if( pKey2 ){
- sqlite3VdbeRecordUnpack(pTask->pKeyInfo, nKey2, pKey2, r2);
+ sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
}
return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0);
}
if( pSorter==0 ){
rc = SQLITE_NOMEM;
}else{
- pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
+ pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
pKeyInfo->db = 0;
if( nField && nWorker==0 ) pKeyInfo->nField = nField;
- pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
-
+ pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt);
pSorter->nTask = nWorker + 1;
pSorter->bUseThreads = (pSorter->nTask>1);
+ pSorter->db = db;
for(i=0; i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i];
- pTask->pKeyInfo = pKeyInfo;
- pTask->pgsz = pgsz;
- pTask->db = db;
pTask->pSorter = pSorter;
}
#if SQLITE_MAX_WORKER_THREADS>0
/*
-** Join thread p.
+** Join thread pTask->thread.
*/
-static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){
+static int vdbeSorterJoinThread(SortSubtask *pTask){
int rc = SQLITE_OK;
- if( p->pThread ){
+ if( pTask->pThread ){
#ifdef SQLITE_DEBUG_SORTER_THREADS
- int bDone = p->bDone;
+ int bDone = pTask->bDone;
#endif
void *pRet;
vdbeSorterBlockDebug(pTask, !bDone, "enter");
- rc = sqlite3ThreadJoin(p->pThread, &pRet);
+ rc = sqlite3ThreadJoin(pTask->pThread, &pRet);
vdbeSorterBlockDebug(pTask, !bDone, "exit");
if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet);
- assert( p->bDone==1 );
- p->bDone = 0;
- p->pThread = 0;
+ assert( pTask->bDone==1 );
+ pTask->bDone = 0;
+ pTask->pThread = 0;
}
return rc;
}
** Launch a background thread to run xTask(pIn).
*/
static int vdbeSorterCreateThread(
- SorterThread *p, /* Thread object to populate */
+ SortSubtask *pTask, /* Thread will use this task object */
void *(*xTask)(void*), /* Routine to run in a separate thread */
void *pIn /* Argument passed into xTask() */
){
- assert( p->pThread==0 && p->bDone==0 );
- return sqlite3ThreadCreate(&p->pThread, xTask, pIn);
+ assert( pTask->pThread==0 && pTask->bDone==0 );
+ return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
}
/*
int i;
for(i=0; i<pSorter->nTask; i++){
SortSubtask *pTask = &pSorter->aTask[i];
- int rc2 = vdbeSorterJoinThread(pTask, &pTask->thread);
+ int rc2 = vdbeSorterJoinThread(pTask);
if( rc==SQLITE_OK ) rc = rc2;
}
return rc;
}
#else
# define vdbeSorterJoinAll(x,rcin) (rcin)
-# define vdbeSorterJoinThread(pTask,p) SQLITE_OK
+# define vdbeSorterJoinThread(pTask) SQLITE_OK
#endif
/*
sqlite3_free(pMerger);
}
+/*
+** Free all resources associated with the IncrMerger object indicated by
+** the first argument.
+*/
+static void vdbeIncrFree(IncrMerger *pIncr){
+ if( pIncr ){
+#if SQLITE_MAX_WORKER_THREADS>0
+ if( pIncr->bUseThread ){
+ vdbeSorterJoinThread(pIncr->pTask);
+ if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
+ if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
+ }
+#endif
+ vdbeMergeEngineFree(pIncr->pMerger);
+ sqlite3_free(pIncr);
+ }
+}
+
/*
** Reset a sorting cursor back to its original empty state.
*/
return rc;
}
+/*
+** If it has not already been allocated, allocate the UnpackedRecord
+** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or
+** if no allocation was required), or SQLITE_NOMEM otherwise.
+*/
static int vdbeSortAllocUnpacked(SortSubtask *pTask){
if( pTask->pUnpacked==0 ){
char *pFree;
pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(
- pTask->pKeyInfo, 0, 0, &pFree
+ pTask->pSorter->pKeyInfo, 0, 0, &pFree
);
assert( pTask->pUnpacked==(UnpackedRecord*)pFree );
if( pFree==0 ) return SQLITE_NOMEM;
- pTask->pUnpacked->nField = pTask->pKeyInfo->nField;
+ pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField;
pTask->pUnpacked->errCode = 0;
}
return SQLITE_OK;
** key). The varint is the number of bytes in the blob of data.
*/
static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
+ sqlite3 *db = pTask->pSorter->db;
int rc = SQLITE_OK; /* Return code */
PmaWriter writer; /* Object used to write to the file */
/* If the first temporary PMA file has not been opened, open it now. */
if( pTask->file.pFd==0 ){
- rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file.pFd);
+ rc = vdbeSorterOpenTempFile(db->pVfs, &pTask->file.pFd);
assert( rc!=SQLITE_OK || pTask->file.pFd );
assert( pTask->file.iEof==0 );
assert( pTask->nPMA==0 );
/* Try to get the file to memory map */
if( rc==SQLITE_OK ){
- vdbeSorterExtendFile(pTask->db,
- pTask->file.pFd, pTask->file.iEof + pList->szPMA + 9
- );
+ vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
}
/* Sort the list */
SorterRecord *p;
SorterRecord *pNext = 0;
- vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz,
+ vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
pTask->file.iEof);
pTask->nPMA++;
vdbePmaWriteVarint(&writer, pList->szPMA);
}
/*
-** The main routine for sorter-thread operations.
+** The main routine for background threads that write level-0 PMAs.
*/
static void *vdbeSorterFlushThread(void *pCtx){
SortSubtask *pTask = (SortSubtask*)pCtx;
int rc; /* Return code */
- assert( pTask->thread.bDone==0 );
+ assert( pTask->bDone==0 );
rc = vdbeSorterListToPMA(pTask, &pTask->list);
- pTask->thread.bDone = 1;
+ pTask->bDone = 1;
return SQLITE_INT_TO_PTR(rc);
}
for(i=0; i<nWorker; i++){
int iTest = (pSorter->iPrev + i + 1) % nWorker;
pTask = &pSorter->aTask[iTest];
- if( pTask->thread.bDone ){
- rc = vdbeSorterJoinThread(pTask, &pTask->thread);
+ if( pTask->bDone ){
+ rc = vdbeSorterJoinThread(pTask);
}
- if( pTask->thread.pThread==0 || rc!=SQLITE_OK ) break;
+ if( pTask->pThread==0 || rc!=SQLITE_OK ) break;
}
if( rc==SQLITE_OK ){
u8 *aMem = pTask->list.aMemory;
void *pCtx = (void*)pTask;
- assert( pTask->thread.pThread==0 && pTask->thread.bDone==0 );
+ assert( pTask->pThread==0 && pTask->bDone==0 );
assert( pTask->list.pList==0 );
assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
if( !pSorter->list.aMemory ) return SQLITE_NOMEM;
}
- rc = vdbeSorterCreateThread(&pTask->thread, vdbeSorterFlushThread, pCtx);
+ rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
}
}
int rc2;
i64 iStart = pIncr->iStartOff;
SorterFile *pOut = &pIncr->aFile[1];
+ SortSubtask *pTask = pIncr->pTask;
MergeEngine *pMerger = pIncr->pMerger;
PmaWriter writer;
assert( pIncr->bEof==0 );
- vdbeSorterPopulateDebug(pIncr->pTask, "enter");
+ vdbeSorterPopulateDebug(pTask, "enter");
- vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart);
+ vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
while( rc==SQLITE_OK ){
int dummy;
PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ];
/* Write the next key to the output. */
vdbePmaWriteVarint(&writer, nKey);
vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
- rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy);
+ rc = vdbeSorterNext(pTask, pIncr->pMerger, &dummy);
}
rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
if( rc==SQLITE_OK ) rc = rc2;
- vdbeSorterPopulateDebug(pIncr->pTask, "exit");
+ vdbeSorterPopulateDebug(pTask, "exit");
return rc;
}
+#if SQLITE_MAX_WORKER_THREADS>0
+/*
+** The main routine for background threads that populate aFile[1] of
+** multi-threaded IncrMerger objects.
+*/
static void *vdbeIncrPopulateThread(void *pCtx){
IncrMerger *pIncr = (IncrMerger*)pCtx;
void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
- pIncr->thread.bDone = 1;
+ pIncr->pTask->bDone = 1;
return pRet;
}
-#if SQLITE_MAX_WORKER_THREADS>0
+/*
+** Launch a background thread to populate aFile[1] of pIncr.
+*/
static int vdbeIncrBgPopulate(IncrMerger *pIncr){
- void *pCtx = (void*)pIncr;
+ void *p = (void*)pIncr;
assert( pIncr->bUseThread );
- return vdbeSorterCreateThread(&pIncr->thread, vdbeIncrPopulateThread, pCtx);
+ return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
}
#endif
+/*
+** This function is called when the PmaReader corresponding to pIncr has
+** finished reading the contents of aFile[0]. Its purpose is to "refill"
+** aFile[0] such that the iterator should start rereading it from the
+** beginning.
+**
+** For single-threaded objects, this is accomplished by literally reading
+** keys from pIncr->pMerger and repopulating aFile[0].
+**
+** For multi-threaded objects, all that is required is to wait until the
+** background thread is finished (if it is not already) and then swap
+** aFile[0] and aFile[1] in place. If the contents of pMerger have not
+** been exhausted, this function also launches a new background thread
+** to populate the new aFile[1].
+**
+** SQLITE_OK is returned on success, or an SQLite error code otherwise.
+*/
static int vdbeIncrSwap(IncrMerger *pIncr){
int rc = SQLITE_OK;
#if SQLITE_MAX_WORKER_THREADS>0
if( pIncr->bUseThread ){
- rc = vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
+ rc = vdbeSorterJoinThread(pIncr->pTask);
if( rc==SQLITE_OK ){
SorterFile f0 = pIncr->aFile[0];
return rc;
}
-static void vdbeIncrFree(IncrMerger *pIncr){
- if( pIncr ){
-#if SQLITE_MAX_WORKER_THREADS>0
- vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread);
- if( pIncr->bUseThread ){
- if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
- if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
- }
-#endif
- vdbeMergeEngineFree(pIncr->pMerger);
- sqlite3_free(pIncr);
- }
-}
-
+/*
+** Allocate and return a new IncrMerger object to read data from pMerger.
+*/
static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){
IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger));
if( pIncr ){
return pIncr;
}
+/*
+** Set the "use-threads" flag on object pIncr.
+*/
static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){
if( bUseThread ){
pIncr->bUseThread = 1;
IncrMerger *pIncr = pIter->pIncr;
if( pIncr ){
SortSubtask *pTask = pIncr->pTask;
+ sqlite3 *db = pTask->pSorter->db;
rc = vdbeIncrInitMerger(pTask, pIncr->pMerger, eMode);
if( rc==SQLITE_OK ){
if( pIncr->bUseThread==0 ){
if( pTask->file2.pFd==0 ){
- rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd);
+ rc = vdbeSorterOpenTempFile(db->pVfs, &pTask->file2.pFd);
assert( pTask->file2.iEof>0 );
if( rc==SQLITE_OK ){
- vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof);
+ vdbeSorterExtendFile(db, pTask->file2.pFd, pTask->file2.iEof);
pTask->file2.iEof = 0;
}
}
pTask->file2.iEof += pIncr->mxSz;
}
}else{
- rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd);
+ rc = vdbeSorterOpenTempFile(db->pVfs, &pIncr->aFile[0].pFd);
if( rc==SQLITE_OK ){
- rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd);
+ rc = vdbeSorterOpenTempFile(db->pVfs, &pIncr->aFile[1].pFd);
}
}
}
static void *vdbeIncrInit2Thread(void *pCtx){
PmaReader *pReader = (PmaReader*)pCtx;
void *pRet = SQLITE_INT_TO_PTR( vdbeIncrInit2(pReader, INCRINIT2_TASK) );
- pReader->pIncr->thread.bDone = 1;
+ pReader->pIncr->pTask->bDone = 1;
return pRet;
}
static int vdbeIncrBgInit2(PmaReader *pIter){
void *pCtx = (void*)pIter;
- return vdbeSorterCreateThread(
- &pIter->pIncr->thread, vdbeIncrInit2Thread, pCtx
- );
+ return vdbeSorterCreateThread(pIter->pIncr->pTask, vdbeIncrInit2Thread, pCtx);
}
#endif
static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){
SortSubtask *pTask0 = &pSorter->aTask[0];
MergeEngine *pMain = 0;
- sqlite3 *db = pTask0->db;
+ sqlite3 *db = pTask0->pSorter->db;
int rc = SQLITE_OK;
int iTask;