From: dan Date: Tue, 15 Apr 2014 19:52:34 +0000 (+0000) Subject: Fix further code and documentation issues in vdbesort.c. X-Git-Tag: version-3.8.7~132^2~74 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=1a088a8ef58bb012e5ecb2e2e807530797db18c3;p=thirdparty%2Fsqlite.git Fix further code and documentation issues in vdbesort.c. FossilOrigin-Name: d03f5b8622d304f029f73c7cd0bee3182a81d081 --- diff --git a/manifest b/manifest index f04882a0e8..0ca4cbc345 100644 --- a/manifest +++ b/manifest @@ -1,5 +1,5 @@ -C Allow\sthe\ssorter\sto\sbegin\sreturning\sdata\sto\sthe\sVDBE\sas\ssoon\sas\sit\sis\savailable,\sinstead\sof\swaiting\suntil\sall\skeys\shave\sbeen\ssorted. -D 2014-04-14T19:23:18.139 +C Fix\sfurther\scode\sand\sdocumentation\sissues\sin\svdbesort.c. +D 2014-04-15T19:52:34.797 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f F Makefile.in ad0921c4b2780d01868cf69b419a4f102308d125 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23 @@ -286,7 +286,7 @@ F src/vdbeapi.c 0ed6053f947edd0b30f64ce5aeb811872a3450a4 F src/vdbeaux.c d8dc38965507a34b0e150c0d7fc82b02f8cf25ea F src/vdbeblob.c 15377abfb59251bccedd5a9c7d014a895f0c04aa F src/vdbemem.c 6fc77594c60f6155404f3f8d71bf36d1fdeb4447 -F src/vdbesort.c 15f59dc56c7c4509b4783ad20fb25479ac63d267 +F src/vdbesort.c ceb8e16055327d0c52bdd2087fcdd7d132fe314f F src/vdbetrace.c 6f52bc0c51e144b7efdcfb2a8f771167a8816767 F src/vtab.c 21b932841e51ebd7d075e2d0ad1415dce8d2d5fd F src/wal.c 76e7fc6de229bea8b30bb2539110f03a494dc3a8 @@ -1163,7 +1163,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4 F tool/warnings.sh d1a6de74685f360ab718efda6265994b99bbea01 F tool/win/sqlite.vsix 030f3eeaf2cb811a3692ab9c14d021a75ce41fff -P e54dded2012f0ab486ee138e9bd57c528af33980 02610cd9b77caa2c181210056088beb3ad6ce30f -R b74f7a073d8c77e32a2dce332520aa10 +P cb0ab20c48962cdee03115efa93d7d501780ac73 +R 44c59b06ba4f012b63c2d0e26d30a7e6 U dan -Z ebd968da8bc859eb7bfdd863767ac91c +Z 2cc8bdff8b0228a6efe530348d9564c8 diff --git a/manifest.uuid b/manifest.uuid index 1714edb86b..b7e5cbc645 100644 --- a/manifest.uuid +++ b/manifest.uuid @@ -1 +1 @@ -cb0ab20c48962cdee03115efa93d7d501780ac73 \ No newline at end of file +d03f5b8622d304f029f73c7cd0bee3182a81d081 \ No newline at end of file diff --git a/src/vdbesort.c b/src/vdbesort.c index 857b7ba690..8b6cce6259 100644 --- a/src/vdbesort.c +++ b/src/vdbesort.c @@ -10,7 +10,7 @@ ** ************************************************************************* ** This file contains code for the VdbeSorter object, used in concert with -** a VdbeCursor to sort large numbers of keys for CREATE TABLE statements +** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements ** or by SELECT statements with ORDER BY clauses that cannot be satisfied ** using indexes and without LIMIT clauses. ** @@ -57,33 +57,84 @@ ** ** The interfaces above must be called in a particular order. Write() can ** only occur in between Init()/Reset() and Rewind(). Next(), Rowkey(), and -** Compare() can only occur in between Rewind() and Close()/Reset(). +** Compare() can only occur in between Rewind() and Close()/Reset(). i.e. +** +** Init() +** for each record: Write() +** Rewind() +** Rowkey()/Compare() +** Next() +** Close() ** ** Algorithm: ** -** Records to be sorted are initially held in memory, in the order in -** which they arrive from Write(). When the amount of memory needed exceeds -** a threshold, all in-memory records are sorted and then appended to -** a temporary file as a "Packed-Memory-Array" or "PMA" and the memory is -** reset. There is a single temporary file used for all PMAs. The PMAs -** are packed one after another in the file. The VdbeSorter object keeps -** track of the number of PMAs written. -** -** When the Rewind() is seen, any records still held in memory are sorted. -** If no PMAs have been written (if all records are still held in memory) -** then subsequent Rowkey(), Next(), and Compare() operations work directly -** from memory. But if PMAs have been written things get a little more -** complicated. -** -** When Rewind() is seen after PMAs have been written, any records still -** in memory are sorted and written as a final PMA. Then all the PMAs -** are merged together into a single massive PMA that Next(), Rowkey(), -** and Compare() walk to extract the records in sorted order. -** -** If SQLITE_MAX_WORKER_THREADS is non-zero, various steps of the above -** algorithm might be performed in parallel by separate threads. Threads -** are only used when one or more PMA spill to disk. If the sort is small -** enough to fit entirely in memory, everything happens on the main thread. +** Records passed to the sorter via calls to Write() are initially held +** unsorted in main memory. Assuming the amount of memory used never exceeds +** a threshold, when Rewind() is called the set of records is sorted using +** an in-memory merge sort. In this case, no temporary files are required +** and subsequent calls to Rowkey(), Next() and Compare() read records +** directly from main memory. +** +** If the amount of space used to store records in main memory exceeds the +** threshold, then the set of records currently in memory are sorted and +** written to a temporary file in "Packed Memory Array" (PMA) format. +** A PMA created at this point is known as a "level-0 PMA". Higher levels +** of PMAs may be created by merging existing PMAs together - for example +** merging two or more level-0 PMAs together creates a level-1 PMA. +** +** The threshold for the amount of main memory to use before flushing +** records to a PMA is roughly the same as the limit configured for the +** page-cache of the main database. Specifically, the threshold is set to +** the value returned multiplied by "PRAGMA main.page_size" multipled by +** that returned by "PRAGMA main.cache_size", in bytes. +** +** If the sorter is running in single-threaded mode, then all PMAs generated +** are appended to a single temporary file. Or, if the sorter is running in +** multi-threaded mode then up to (N+1) temporary files may be opened, where +** N is the configured number of worker threads. In this case, instead of +** sorting the records and writing the PMA to a temporary file itself, the +** calling thread usually launches a worker thread to do so. Except, if +** there are already N worker threads running, the main thread does the work +** itself. +** +** The sorter is running in multi-threaded mode if (a) the library was built +** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater +** than zero, and (b) worker threads have been enabled at runtime by calling +** sqlite3_config(SQLITE_CONFIG_WORKER_THREADS, ...). +** +** When Rewind() is called, any data remaining in memory is flushed to a +** final PMA. So at this point the data is stored in some number of sorted +** PMAs within temporary files on disk. Within a single file sorter is +** running in single threaded mode, or distributed between one or more files +** for multi-threaded sorters. +** +** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the +** sorter is running in single-threaded mode, then these PMAs are merged +** incrementally as keys are retreived from the sorter by the VDBE. See +** comments above object MergeEngine below for details. +** +** Or, if running in multi-threaded mode, then a background thread is +** launched to merge the existing PMAs. Once the background thread has +** merged T bytes of data into a single sorted PMA, the main thread +** begins reading keys from that PMA while the background thread proceeds +** with merging the next T bytes of data. And so on. +** +** Parameter T is set to half the value of the memory threshold used +** by Write() above to determine when to create a new PMA. +** +** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when +** Rewind() is called, then a hierarchy of incremental-merges is used. +** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on +** disk are merged together. Then T bytes of data from the second set, and +** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT +** PMAs at a time. This done is to improve locality. +** +** If running in multi-threaded mode and there are more than +** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more +** than one background thread may be created. Specifically, there may be +** one background thread for each temporary file on disk, and one background +** thread to merge the output of each of the others to a single PMA for +** the main thread to read from. */ #include "sqliteInt.h" #include "vdbeInt.h" @@ -102,12 +153,11 @@ */ typedef struct MergeEngine MergeEngine; /* Merge PMAs together */ typedef struct PmaReader PmaReader; /* Incrementally read one PMA */ -typedef struct PmaWriter PmaWriter; /* Incrementally write on PMA */ +typedef struct PmaWriter PmaWriter; /* Incrementally write one PMA */ typedef struct SorterRecord SorterRecord; /* A record being sorted */ typedef struct SortSubtask SortSubtask; /* A sub-task in the sort process */ -typedef struct SorterFile SorterFile; -typedef struct SorterThread SorterThread; -typedef struct SorterList SorterList; +typedef struct SorterFile SorterFile; /* Temporary file object wrapper */ +typedef struct SorterList SorterList; /* In-memory list of records */ typedef struct IncrMerger IncrMerger; /* @@ -120,85 +170,14 @@ struct SorterFile { }; /* -** An object of this type is used to store the thread handle for each -** background thread launched by the sorter. Before the thread is launched, -** variable bDone is set to 0. Then, right before it exits, the thread -** itself sets bDone to 1. -** -** This is then used for two purposes: -** -** 1. When flushing the contents of memory to a level-0 PMA on disk, to -** attempt to select a SortSubtask for which there is not already an -** active background thread (since doing so causes the main thread -** to block until it finishes). -** -** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call -** to sqlite3ThreadJoin() is likely to block. -** -** In both cases, the effects of the main thread seeing (bDone==0) even -** after the thread has finished are not dire. So we don't worry about -** memory barriers and such here. +** In memory linked list of records. */ -struct SorterThread { - SQLiteThread *pThread; - int bDone; -}; - struct SorterList { SorterRecord *pList; /* Linked list of records */ u8 *aMemory; /* If non-NULL, blob of memory for pList */ int szPMA; /* Size of pList as PMA in bytes */ }; -/* -** Sorting is divided up into smaller subtasks. Each subtask is controlled -** by an instance of this object. A Subtask might run in either the main thread -** or in a background thread. -** -** Exactly VdbeSorter.nTask instances of this object are allocated -** as part of each VdbeSorter object. Instances are never allocated any other -** way. VdbeSorter.nTask is set to the number of worker threads allowed -** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). -** -** When a background thread is launched to perform work, SortSubtask.bDone -** is set to 0 and the SortSubtask.pTask variable set to point to the -** thread handle. SortSubtask.bDone is set to 1 (to indicate to the main -** thread that joining SortSubtask.pTask will not block) before the thread -** exits. SortSubtask.pTask and bDone are always cleared after the -** background thread has been joined. -** -** One object (specifically, VdbeSorter.aTask[VdbeSorter.nTask-1]) -** is reserved for the foreground thread. -** -** The nature of the work performed is determined by SortSubtask.eWork, -** as follows: -** -** SORT_SUBTASK_SORT: -** Sort the linked list of records at SortSubtask.pList. -** -** SORT_SUBTASK_TO_PMA: -** Sort the linked list of records at SortSubtask.pList, and write -** the results to a new PMA in temp file SortSubtask.pTemp1. Open -** the temp file if it is not already open. -** -** SORT_SUBTASK_CONS: -** Merge existing PMAs until SortSubtask.nConsolidate or fewer -** remain in temp file SortSubtask.pTemp1. -*/ -struct SortSubtask { - SorterThread thread; - sqlite3 *db; /* Database connection */ - VdbeSorter *pSorter; /* Sorter */ - KeyInfo *pKeyInfo; /* How to compare records */ - UnpackedRecord *pUnpacked; /* Space to unpack a record */ - int pgsz; /* Main database page size */ - SorterList list; /* List for thread to write to a PMA */ - int nPMA; /* Number of PMAs currently in file */ - SorterFile file; /* Temp file for level-0 PMAs */ - SorterFile file2; /* Space for other PMAs */ -}; - - /* ** The MergeEngine object is used to combine two or more smaller PMAs into ** one big PMA using a merge operation. Separate PMAs all need to be @@ -268,6 +247,45 @@ struct MergeEngine { PmaReader *aIter; /* Array of iterators to merge data from */ }; +/* +** Exactly VdbeSorter.nTask instances of this object are allocated +** as part of each VdbeSorter object. Instances are never allocated any +** other way. VdbeSorter.nTask is set to the number of worker threads allowed +** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread). +** +** Essentially, this structure contains all those fields of the VdbeSorter +** structure for which each thread requires a separate instance. For example, +** each thread requries its own UnpackedRecord object to unpack records in +** as part of comparison operations. +** +** Before a background thread is launched, variable bDone is set to 0. Then, +** right before it exits, the thread itself sets bDone to 1. This is used for +** two purposes: +** +** 1. When flushing the contents of memory to a level-0 PMA on disk, to +** attempt to select a SortSubtask for which there is not already an +** active background thread (since doing so causes the main thread +** to block until it finishes). +** +** 2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call +** to sqlite3ThreadJoin() is likely to block. Cases that are likely to +** block provoke debugging output. +** +** In both cases, the effects of the main thread seeing (bDone==0) even +** after the thread has finished are not dire. So we don't worry about +** memory barriers and such here. +*/ +struct SortSubtask { + SQLiteThread *pThread; /* Background thread, if any */ + int bDone; /* Set if thread is finished but not joined */ + VdbeSorter *pSorter; /* Sorter that owns this sub-task */ + UnpackedRecord *pUnpacked; /* Space to unpack a record */ + SorterList list; /* List for thread to write to a PMA */ + int nPMA; /* Number of PMAs currently in file */ + SorterFile file; /* Temp file for level-0 PMAs */ + SorterFile file2; /* Space for other PMAs */ +}; + /* ** Main sorter structure. A single instance of this is allocated for each ** sorter cursor created by the VDBE. @@ -280,9 +298,12 @@ struct MergeEngine { struct VdbeSorter { int mnPmaSize; /* Minimum PMA size, in bytes */ int mxPmaSize; /* Maximum PMA size, in bytes. 0==no limit */ + int mxKeysize; /* Largest serialized key seen so far */ + int pgsz; /* Main database page size */ PmaReader *pReader; /* Read data from here after Rewind() */ MergeEngine *pMerger; /* Or here, if bUseThreads==0 */ - int mxKeysize; /* Largest serialized key seen so far */ + sqlite3 *db; /* Database connection */ + KeyInfo *pKeyInfo; /* How to compare records */ UnpackedRecord *pUnpacked; /* Used by VdbeSorterCompare() */ SorterList list; /* List of in-memory records */ int iMemory; /* Offset of free space in list.aMemory */ @@ -318,10 +339,35 @@ struct PmaReader { ** within a temp file. However, if the PmaReader.pIncr variable points to ** an object of the following type, it may be used to iterate/merge through ** multiple PMAs simultaneously. +** +** There are two types of IncrMerger object - single (bUseThread==0) and +** multi-threaded (bUseThread==1). +** +** A multi-threaded IncrMerger object uses two temporary files - aFile[0] +** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in +** size. When the IncrMerger is initialized, it reads enough data from +** pMerger to populate aFile[0]. It then sets variables within the +** corresponding PmaReader object to read from that file and kicks off +** a background thread to populate aFile[1] with the next mxSz bytes of +** sorted record data from pMerger. +** +** When the PmaReader reaches the end of aFile[0], it blocks until the +** background thread has finished populating aFile[1]. It then exchanges +** the contents of the aFile[0] and aFile[1] variables within this structure, +** sets the PmaReader fields to read from the new aFile[0] and kicks off +** another background thread to populate the new aFile[1]. And so on, until +** the contents of pMerger are exhausted. +** +** A single-threaded IncrMerger does not open any temporary files of its +** own. Instead, it has exclusive access to mxSz bytes of space beginning +** at offset iStartOff of file pTask->file2. And instead of using a +** background thread to prepare data for the PmaReader, with a single +** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with +** keys from pMerger by the calling thread whenever the PmaReader runs out +** of data. */ struct IncrMerger { SortSubtask *pTask; /* Task that owns this merger */ - SorterThread thread; /* Thread for populating aFile[1] */ MergeEngine *pMerger; /* Merge engine thread reads data from */ i64 iStartOff; /* Offset to start writing file at */ int mxSz; /* Maximum bytes of data to store */ @@ -354,10 +400,10 @@ struct PmaWriter { ** ** How the linked list is connected depends on how memory is being managed ** by this module. If using a separate allocation for each in-memory record -** (VdbeSorter.aMemory==0), then the list is always connected using the +** (VdbeSorter.list.aMemory==0), then the list is always connected using the ** SorterRecord.u.pNext pointers. ** -** Or, if using the single large allocation method (VdbeSorter.aMemory!=0), +** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0), ** then while records are being accumulated the list is linked using the ** SorterRecord.u.iNext offset. This is because the aMemory[] array may ** be sqlite3Realloc()ed while records are being accumulated. Once the VM @@ -390,7 +436,7 @@ struct SorterRecord { #define SORTER_MAX_MERGE_COUNT 16 static int vdbeIncrSwap(IncrMerger*); -static void vdbeIncrFree(IncrMerger*); +static void vdbeIncrFree(IncrMerger *); /* ** Free all memory belonging to the PmaReader object passed as the second @@ -531,56 +577,69 @@ static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){ return SQLITE_OK; } +/* +** Attempt to memory map file pFile. If successful, set *pp to point to the +** new mapping and return SQLITE_OK. If the mapping is not attempted +** (because the file is too large or the VFS layer is configured not to use +** mmap), return SQLITE_OK and set *pp to NULL. +** +** Or, if an error occurs, return an SQLite error code. The final value of +** *pp is undefined in this case. +*/ static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){ int rc = SQLITE_OK; - if( pFile->iEof<=(i64)(pTask->db->nMaxSorterMmap) ){ + if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){ rc = sqlite3OsFetch(pFile->pFd, 0, pFile->iEof, (void**)pp); } return rc; } -static int vdbePmaReaderReinit(PmaReader *pIter){ - IncrMerger *pIncr = pIter->pIncr; - SortSubtask *pTask = pIncr->pTask; +/* +** Seek iterator pIter to offset iOff within file pFile. Return SQLITE_OK +** if successful, or an SQLite error code if an error occurs. +*/ +static int vdbePmaReaderSeek( + SortSubtask *pTask, /* Task context */ + PmaReader *pIter, /* Iterate to populate */ + SorterFile *pFile, /* Sorter file to read from */ + i64 iOff /* Offset in pFile */ +){ int rc = SQLITE_OK; - assert( pIncr->bEof==0 ); + assert( pIter->pIncr==0 || pIter->pIncr->bEof==0 ); if( pIter->aMap ){ sqlite3OsUnfetch(pIter->pFile, 0, pIter->aMap); pIter->aMap = 0; } - pIter->iReadOff = pIncr->iStartOff; - pIter->iEof = pIncr->aFile[0].iEof; - pIter->pFile = pIncr->aFile[0].pFd; + pIter->iReadOff = iOff; + pIter->iEof = pFile->iEof; + pIter->pFile = pFile->pFd; - rc = vdbeSorterMapFile(pTask, &pIncr->aFile[0], &pIter->aMap); - if( rc==SQLITE_OK ){ - if( pIter->aMap==0 ){ - /* TODO: Combine this code with similar code in vdbePmaReaderInit() */ - int iBuf = pIter->iReadOff % pTask->pgsz; - if( pIter->aBuffer==0 ){ - pIter->aBuffer = (u8*)sqlite3Malloc(pTask->pgsz); - if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM; - pIter->nBuffer = pTask->pgsz; - } - if( iBuf ){ - int nRead = pTask->pgsz - iBuf; - if( (pIter->iReadOff + nRead) > pIter->iEof ){ - nRead = (int)(pIter->iEof - pIter->iReadOff); - } - rc = sqlite3OsRead( - pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff - ); - assert( rc!=SQLITE_IOERR_SHORT_READ ); + rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap); + if( rc==SQLITE_OK && pIter->aMap==0 ){ + int pgsz = pTask->pSorter->pgsz; + int iBuf = pIter->iReadOff % pgsz; + if( pIter->aBuffer==0 ){ + pIter->aBuffer = (u8*)sqlite3Malloc(pgsz); + if( pIter->aBuffer==0 ) rc = SQLITE_NOMEM; + pIter->nBuffer = pgsz; + } + if( iBuf ){ + int nRead = pgsz - iBuf; + if( (pIter->iReadOff + nRead) > pIter->iEof ){ + nRead = (int)(pIter->iEof - pIter->iReadOff); } + rc = sqlite3OsRead( + pIter->pFile, &pIter->aBuffer[iBuf], nRead, pIter->iReadOff + ); + assert( rc!=SQLITE_IOERR_SHORT_READ ); } } return rc; } - /* ** Advance iterator pIter to the next key in its PMA. Return SQLITE_OK if ** no error occurs, or an SQLite error code if one does. @@ -590,11 +649,14 @@ static int vdbePmaReaderNext(PmaReader *pIter){ u64 nRec = 0; /* Size of record in bytes */ if( pIter->iReadOff>=pIter->iEof ){ + IncrMerger *pIncr = pIter->pIncr; int bEof = 1; - if( pIter->pIncr ){ - rc = vdbeIncrSwap(pIter->pIncr); - if( rc==SQLITE_OK && pIter->pIncr->bEof==0 ){ - rc = vdbePmaReaderReinit(pIter); + if( pIncr ){ + rc = vdbeIncrSwap(pIncr); + if( rc==SQLITE_OK && pIncr->bEof==0 ){ + rc = vdbePmaReaderSeek( + pIncr->pTask, pIter, &pIncr->aFile[0], pIncr->iStartOff + ); bEof = 0; } } @@ -633,47 +695,16 @@ static int vdbePmaReaderInit( PmaReader *pIter, /* Iterator to populate */ i64 *pnByte /* IN/OUT: Increment this value by PMA size */ ){ - int rc = SQLITE_OK; - int nBuf = pTask->pgsz; + int rc; assert( pFile->iEof>iStart ); - assert( pIter->aAlloc==0 ); + assert( pIter->aAlloc==0 && pIter->nAlloc==0 ); assert( pIter->aBuffer==0 ); - pIter->pFile = pFile->pFd; - pIter->iReadOff = iStart; - pIter->nAlloc = 128; - pIter->aAlloc = (u8*)sqlite3Malloc(pIter->nAlloc); - if( pIter->aAlloc ){ - /* Try to xFetch() a mapping of the entire temp file. If this is possible, - ** the PMA will be read via the mapping. Otherwise, use xRead(). */ - rc = vdbeSorterMapFile(pTask, pFile, &pIter->aMap); - }else{ - rc = SQLITE_NOMEM; - } - - if( rc==SQLITE_OK && pIter->aMap==0 ){ - pIter->nBuffer = nBuf; - pIter->aBuffer = (u8*)sqlite3Malloc(nBuf); - if( !pIter->aBuffer ){ - rc = SQLITE_NOMEM; - }else{ - int iBuf = iStart % nBuf; - if( iBuf ){ - int nRead = nBuf - iBuf; - if( (iStart + nRead) > pFile->iEof ){ - nRead = (int)(pFile->iEof - iStart); - } - rc = sqlite3OsRead( - pIter->pFile, &pIter->aBuffer[iBuf], nRead, iStart - ); - assert( rc!=SQLITE_IOERR_SHORT_READ ); - } - } - } + assert( pIter->aMap==0 ); + rc = vdbePmaReaderSeek(pTask, pIter, pFile, iStart); if( rc==SQLITE_OK ){ u64 nByte; /* Size of PMA in bytes */ - pIter->iEof = pFile->iEof; rc = vdbePmaReadVarint(pIter, &nByte); pIter->iEof = pIter->iReadOff + nByte; *pnByte += nByte; @@ -706,7 +737,7 @@ static int vdbeSorterCompare( ){ UnpackedRecord *r2 = pTask->pUnpacked; if( pKey2 ){ - sqlite3VdbeRecordUnpack(pTask->pKeyInfo, nKey2, pKey2, r2); + sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2); } return sqlite3VdbeRecordCompare(nKey1, pKey1, r2, 0); } @@ -788,19 +819,16 @@ int sqlite3VdbeSorterInit( if( pSorter==0 ){ rc = SQLITE_NOMEM; }else{ - pKeyInfo = (KeyInfo*)((u8*)pSorter + sz); + pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz); memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo); pKeyInfo->db = 0; if( nField && nWorker==0 ) pKeyInfo->nField = nField; - pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); - + pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(db->aDb[0].pBt); pSorter->nTask = nWorker + 1; pSorter->bUseThreads = (pSorter->nTask>1); + pSorter->db = db; for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; - pTask->pKeyInfo = pKeyInfo; - pTask->pgsz = pgsz; - pTask->db = db; pTask->pSorter = pSorter; } @@ -904,22 +932,22 @@ static void vdbeSorterBlockDebug( #if SQLITE_MAX_WORKER_THREADS>0 /* -** Join thread p. +** Join thread pTask->thread. */ -static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){ +static int vdbeSorterJoinThread(SortSubtask *pTask){ int rc = SQLITE_OK; - if( p->pThread ){ + if( pTask->pThread ){ #ifdef SQLITE_DEBUG_SORTER_THREADS - int bDone = p->bDone; + int bDone = pTask->bDone; #endif void *pRet; vdbeSorterBlockDebug(pTask, !bDone, "enter"); - rc = sqlite3ThreadJoin(p->pThread, &pRet); + rc = sqlite3ThreadJoin(pTask->pThread, &pRet); vdbeSorterBlockDebug(pTask, !bDone, "exit"); if( rc==SQLITE_OK ) rc = SQLITE_PTR_TO_INT(pRet); - assert( p->bDone==1 ); - p->bDone = 0; - p->pThread = 0; + assert( pTask->bDone==1 ); + pTask->bDone = 0; + pTask->pThread = 0; } return rc; } @@ -928,12 +956,12 @@ static int vdbeSorterJoinThread(SortSubtask *pTask, SorterThread *p){ ** Launch a background thread to run xTask(pIn). */ static int vdbeSorterCreateThread( - SorterThread *p, /* Thread object to populate */ + SortSubtask *pTask, /* Thread will use this task object */ void *(*xTask)(void*), /* Routine to run in a separate thread */ void *pIn /* Argument passed into xTask() */ ){ - assert( p->pThread==0 && p->bDone==0 ); - return sqlite3ThreadCreate(&p->pThread, xTask, pIn); + assert( pTask->pThread==0 && pTask->bDone==0 ); + return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn); } /* @@ -945,14 +973,14 @@ static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){ int i; for(i=0; inTask; i++){ SortSubtask *pTask = &pSorter->aTask[i]; - int rc2 = vdbeSorterJoinThread(pTask, &pTask->thread); + int rc2 = vdbeSorterJoinThread(pTask); if( rc==SQLITE_OK ) rc = rc2; } return rc; } #else # define vdbeSorterJoinAll(x,rcin) (rcin) -# define vdbeSorterJoinThread(pTask,p) SQLITE_OK +# define vdbeSorterJoinThread(pTask) SQLITE_OK #endif /* @@ -990,6 +1018,24 @@ static void vdbeMergeEngineFree(MergeEngine *pMerger){ sqlite3_free(pMerger); } +/* +** Free all resources associated with the IncrMerger object indicated by +** the first argument. +*/ +static void vdbeIncrFree(IncrMerger *pIncr){ + if( pIncr ){ +#if SQLITE_MAX_WORKER_THREADS>0 + if( pIncr->bUseThread ){ + vdbeSorterJoinThread(pIncr->pTask); + if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); + if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); + } +#endif + vdbeMergeEngineFree(pIncr->pMerger); + sqlite3_free(pIncr); + } +} + /* ** Reset a sorting cursor back to its original empty state. */ @@ -1051,15 +1097,20 @@ static int vdbeSorterOpenTempFile(sqlite3_vfs *pVfs, sqlite3_file **ppFile){ return rc; } +/* +** If it has not already been allocated, allocate the UnpackedRecord +** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or +** if no allocation was required), or SQLITE_NOMEM otherwise. +*/ static int vdbeSortAllocUnpacked(SortSubtask *pTask){ if( pTask->pUnpacked==0 ){ char *pFree; pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord( - pTask->pKeyInfo, 0, 0, &pFree + pTask->pSorter->pKeyInfo, 0, 0, &pFree ); assert( pTask->pUnpacked==(UnpackedRecord*)pFree ); if( pFree==0 ) return SQLITE_NOMEM; - pTask->pUnpacked->nField = pTask->pKeyInfo->nField; + pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nField; pTask->pUnpacked->errCode = 0; } return SQLITE_OK; @@ -1280,6 +1331,7 @@ static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){ ** key). The varint is the number of bytes in the blob of data. */ static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){ + sqlite3 *db = pTask->pSorter->db; int rc = SQLITE_OK; /* Return code */ PmaWriter writer; /* Object used to write to the file */ @@ -1295,7 +1347,7 @@ static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){ /* If the first temporary PMA file has not been opened, open it now. */ if( pTask->file.pFd==0 ){ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file.pFd); + rc = vdbeSorterOpenTempFile(db->pVfs, &pTask->file.pFd); assert( rc!=SQLITE_OK || pTask->file.pFd ); assert( pTask->file.iEof==0 ); assert( pTask->nPMA==0 ); @@ -1303,9 +1355,7 @@ static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){ /* Try to get the file to memory map */ if( rc==SQLITE_OK ){ - vdbeSorterExtendFile(pTask->db, - pTask->file.pFd, pTask->file.iEof + pList->szPMA + 9 - ); + vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9); } /* Sort the list */ @@ -1317,7 +1367,7 @@ static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){ SorterRecord *p; SorterRecord *pNext = 0; - vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pgsz, + vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz, pTask->file.iEof); pTask->nPMA++; vdbePmaWriteVarint(&writer, pList->szPMA); @@ -1413,14 +1463,14 @@ static int vdbeSorterNext( } /* -** The main routine for sorter-thread operations. +** The main routine for background threads that write level-0 PMAs. */ static void *vdbeSorterFlushThread(void *pCtx){ SortSubtask *pTask = (SortSubtask*)pCtx; int rc; /* Return code */ - assert( pTask->thread.bDone==0 ); + assert( pTask->bDone==0 ); rc = vdbeSorterListToPMA(pTask, &pTask->list); - pTask->thread.bDone = 1; + pTask->bDone = 1; return SQLITE_INT_TO_PTR(rc); } @@ -1453,10 +1503,10 @@ static int vdbeSorterFlushPMA(VdbeSorter *pSorter){ for(i=0; iiPrev + i + 1) % nWorker; pTask = &pSorter->aTask[iTest]; - if( pTask->thread.bDone ){ - rc = vdbeSorterJoinThread(pTask, &pTask->thread); + if( pTask->bDone ){ + rc = vdbeSorterJoinThread(pTask); } - if( pTask->thread.pThread==0 || rc!=SQLITE_OK ) break; + if( pTask->pThread==0 || rc!=SQLITE_OK ) break; } if( rc==SQLITE_OK ){ @@ -1468,7 +1518,7 @@ static int vdbeSorterFlushPMA(VdbeSorter *pSorter){ u8 *aMem = pTask->list.aMemory; void *pCtx = (void*)pTask; - assert( pTask->thread.pThread==0 && pTask->thread.bDone==0 ); + assert( pTask->pThread==0 && pTask->bDone==0 ); assert( pTask->list.pList==0 ); assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 ); @@ -1484,7 +1534,7 @@ static int vdbeSorterFlushPMA(VdbeSorter *pSorter){ if( !pSorter->list.aMemory ) return SQLITE_NOMEM; } - rc = vdbeSorterCreateThread(&pTask->thread, vdbeSorterFlushThread, pCtx); + rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx); } } @@ -1597,13 +1647,14 @@ static int vdbeIncrPopulate(IncrMerger *pIncr){ int rc2; i64 iStart = pIncr->iStartOff; SorterFile *pOut = &pIncr->aFile[1]; + SortSubtask *pTask = pIncr->pTask; MergeEngine *pMerger = pIncr->pMerger; PmaWriter writer; assert( pIncr->bEof==0 ); - vdbeSorterPopulateDebug(pIncr->pTask, "enter"); + vdbeSorterPopulateDebug(pTask, "enter"); - vdbePmaWriterInit(pOut->pFd, &writer, pIncr->pTask->pgsz, iStart); + vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart); while( rc==SQLITE_OK ){ int dummy; PmaReader *pReader = &pMerger->aIter[ pMerger->aTree[1] ]; @@ -1618,36 +1669,60 @@ static int vdbeIncrPopulate(IncrMerger *pIncr){ /* Write the next key to the output. */ vdbePmaWriteVarint(&writer, nKey); vdbePmaWriteBlob(&writer, pReader->aKey, nKey); - rc = vdbeSorterNext(pIncr->pTask, pIncr->pMerger, &dummy); + rc = vdbeSorterNext(pTask, pIncr->pMerger, &dummy); } rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof); if( rc==SQLITE_OK ) rc = rc2; - vdbeSorterPopulateDebug(pIncr->pTask, "exit"); + vdbeSorterPopulateDebug(pTask, "exit"); return rc; } +#if SQLITE_MAX_WORKER_THREADS>0 +/* +** The main routine for background threads that populate aFile[1] of +** multi-threaded IncrMerger objects. +*/ static void *vdbeIncrPopulateThread(void *pCtx){ IncrMerger *pIncr = (IncrMerger*)pCtx; void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) ); - pIncr->thread.bDone = 1; + pIncr->pTask->bDone = 1; return pRet; } -#if SQLITE_MAX_WORKER_THREADS>0 +/* +** Launch a background thread to populate aFile[1] of pIncr. +*/ static int vdbeIncrBgPopulate(IncrMerger *pIncr){ - void *pCtx = (void*)pIncr; + void *p = (void*)pIncr; assert( pIncr->bUseThread ); - return vdbeSorterCreateThread(&pIncr->thread, vdbeIncrPopulateThread, pCtx); + return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p); } #endif +/* +** This function is called when the PmaReader corresponding to pIncr has +** finished reading the contents of aFile[0]. Its purpose is to "refill" +** aFile[0] such that the iterator should start rereading it from the +** beginning. +** +** For single-threaded objects, this is accomplished by literally reading +** keys from pIncr->pMerger and repopulating aFile[0]. +** +** For multi-threaded objects, all that is required is to wait until the +** background thread is finished (if it is not already) and then swap +** aFile[0] and aFile[1] in place. If the contents of pMerger have not +** been exhausted, this function also launches a new background thread +** to populate the new aFile[1]. +** +** SQLITE_OK is returned on success, or an SQLite error code otherwise. +*/ static int vdbeIncrSwap(IncrMerger *pIncr){ int rc = SQLITE_OK; #if SQLITE_MAX_WORKER_THREADS>0 if( pIncr->bUseThread ){ - rc = vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread); + rc = vdbeSorterJoinThread(pIncr->pTask); if( rc==SQLITE_OK ){ SorterFile f0 = pIncr->aFile[0]; @@ -1675,20 +1750,9 @@ static int vdbeIncrSwap(IncrMerger *pIncr){ return rc; } -static void vdbeIncrFree(IncrMerger *pIncr){ - if( pIncr ){ -#if SQLITE_MAX_WORKER_THREADS>0 - vdbeSorterJoinThread(pIncr->pTask, &pIncr->thread); - if( pIncr->bUseThread ){ - if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd); - if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd); - } -#endif - vdbeMergeEngineFree(pIncr->pMerger); - sqlite3_free(pIncr); - } -} - +/* +** Allocate and return a new IncrMerger object to read data from pMerger. +*/ static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){ IncrMerger *pIncr = sqlite3_malloc(sizeof(IncrMerger)); if( pIncr ){ @@ -1701,6 +1765,9 @@ static IncrMerger *vdbeIncrNew(SortSubtask *pTask, MergeEngine *pMerger){ return pIncr; } +/* +** Set the "use-threads" flag on object pIncr. +*/ static void vdbeIncrSetThreads(IncrMerger *pIncr, int bUseThread){ if( bUseThread ){ pIncr->bUseThread = 1; @@ -1742,6 +1809,7 @@ static int vdbeIncrInit2(PmaReader *pIter, int eMode){ IncrMerger *pIncr = pIter->pIncr; if( pIncr ){ SortSubtask *pTask = pIncr->pTask; + sqlite3 *db = pTask->pSorter->db; rc = vdbeIncrInitMerger(pTask, pIncr->pMerger, eMode); @@ -1749,10 +1817,10 @@ static int vdbeIncrInit2(PmaReader *pIter, int eMode){ if( rc==SQLITE_OK ){ if( pIncr->bUseThread==0 ){ if( pTask->file2.pFd==0 ){ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pTask->file2.pFd); + rc = vdbeSorterOpenTempFile(db->pVfs, &pTask->file2.pFd); assert( pTask->file2.iEof>0 ); if( rc==SQLITE_OK ){ - vdbeSorterExtendFile(pTask->db,pTask->file2.pFd,pTask->file2.iEof); + vdbeSorterExtendFile(db, pTask->file2.pFd, pTask->file2.iEof); pTask->file2.iEof = 0; } } @@ -1762,9 +1830,9 @@ static int vdbeIncrInit2(PmaReader *pIter, int eMode){ pTask->file2.iEof += pIncr->mxSz; } }else{ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[0].pFd); + rc = vdbeSorterOpenTempFile(db->pVfs, &pIncr->aFile[0].pFd); if( rc==SQLITE_OK ){ - rc = vdbeSorterOpenTempFile(pTask->db->pVfs, &pIncr->aFile[1].pFd); + rc = vdbeSorterOpenTempFile(db->pVfs, &pIncr->aFile[1].pFd); } } } @@ -1786,15 +1854,13 @@ static int vdbeIncrInit2(PmaReader *pIter, int eMode){ static void *vdbeIncrInit2Thread(void *pCtx){ PmaReader *pReader = (PmaReader*)pCtx; void *pRet = SQLITE_INT_TO_PTR( vdbeIncrInit2(pReader, INCRINIT2_TASK) ); - pReader->pIncr->thread.bDone = 1; + pReader->pIncr->pTask->bDone = 1; return pRet; } static int vdbeIncrBgInit2(PmaReader *pIter){ void *pCtx = (void*)pIter; - return vdbeSorterCreateThread( - &pIter->pIncr->thread, vdbeIncrInit2Thread, pCtx - ); + return vdbeSorterCreateThread(pIter->pIncr->pTask, vdbeIncrInit2Thread, pCtx); } #endif @@ -1885,7 +1951,7 @@ static int vdbeAddToBuilder( static int vdbePmaReaderIncrInit(VdbeSorter *pSorter){ SortSubtask *pTask0 = &pSorter->aTask[0]; MergeEngine *pMain = 0; - sqlite3 *db = pTask0->db; + sqlite3 *db = pTask0->pSorter->db; int rc = SQLITE_OK; int iTask;