/*
** An instance of the following object is used to read records out of a
** PMA, in sorted order. The next key to be read is cached in nKey/aKey.
-** pFile==0 at EOF.
+** aKey might point into aMap or into aBuffer. If neither of those locations
+** contain a contiguous representation of the key, then aAlloc is allocated
+** and the key is copied into aAlloc and aKey is made to poitn to aAlloc.
+**
+** pFd==0 at EOF.
*/
struct PmaReader {
- i64 iReadOff; /* Current read offset */
- i64 iEof; /* 1 byte past EOF for this PmaReader */
- int nAlloc; /* Bytes of space at aAlloc */
- int nKey; /* Number of bytes in key */
- sqlite3_file *pFile; /* File we are reading from */
- u8 *aAlloc; /* Allocated space */
- u8 *aKey; /* Pointer to current key */
- u8 *aBuffer; /* Current read buffer */
- int nBuffer; /* Size of read buffer in bytes */
- u8 *aMap; /* Pointer to mapping of entire file */
- IncrMerger *pIncr; /* Incremental merger */
+ i64 iReadOff; /* Current read offset */
+ i64 iEof; /* 1 byte past EOF for this PmaReader */
+ int nAlloc; /* Bytes of space at aAlloc */
+ int nKey; /* Number of bytes in key */
+ sqlite3_file *pFd; /* File handle we are reading from */
+ u8 *aAlloc; /* Space for aKey if aBuffer and pMap wont work */
+ u8 *aKey; /* Pointer to current key */
+ u8 *aBuffer; /* Current read buffer */
+ int nBuffer; /* Size of read buffer in bytes */
+ u8 *aMap; /* Pointer to mapping of entire file */
+ IncrMerger *pIncr; /* Incremental merger */
};
/*
int iBufStart; /* First byte of buffer to write */
int iBufEnd; /* Last byte of buffer to write */
i64 iWriteOff; /* Offset of start of buffer in file */
- sqlite3_file *pFile; /* File to write to */
+ sqlite3_file *pFd; /* File handle to write to */
};
/*
static void vdbePmaReaderClear(PmaReader *pReadr){
sqlite3_free(pReadr->aAlloc);
sqlite3_free(pReadr->aBuffer);
- if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFile, 0, pReadr->aMap);
+ if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
vdbeIncrFree(pReadr->pIncr);
memset(pReadr, 0, sizeof(PmaReader));
}
assert( nRead>0 );
/* Readr data from the file. Return early if an error occurs. */
- rc = sqlite3OsRead(p->pFile, p->aBuffer, nRead, p->iReadOff);
+ rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff);
assert( rc!=SQLITE_IOERR_SHORT_READ );
if( rc!=SQLITE_OK ) return rc;
}
assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 );
if( pReadr->aMap ){
- sqlite3OsUnfetch(pReadr->pFile, 0, pReadr->aMap);
+ sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
pReadr->aMap = 0;
}
pReadr->iReadOff = iOff;
pReadr->iEof = pFile->iEof;
- pReadr->pFile = pFile->pFd;
+ pReadr->pFd = pFile->pFd;
rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap);
if( rc==SQLITE_OK && pReadr->aMap==0 ){
nRead = (int)(pReadr->iEof - pReadr->iReadOff);
}
rc = sqlite3OsRead(
- pReadr->pFile, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
+ pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
);
testcase( rc!=SQLITE_OK );
}
p1 = &pMerger->aReadr[i1];
p2 = &pMerger->aReadr[i2];
- if( p1->pFile==0 ){
+ if( p1->pFd==0 ){
iRes = i2;
- }else if( p2->pFile==0 ){
+ }else if( p2->pFd==0 ){
iRes = i1;
}else{
int res;
** Whether or not the file does end up memory mapped of course depends on
** the specific VFS implementation.
*/
-static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFile, i64 nByte){
+static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){
if( nByte<=(i64)(db->nMaxSorterMmap) ){
- int rc = sqlite3OsTruncate(pFile, nByte);
+ int rc = sqlite3OsTruncate(pFd, nByte);
if( rc==SQLITE_OK ){
void *p = 0;
- sqlite3OsFetch(pFile, 0, (int)nByte, &p);
- sqlite3OsUnfetch(pFile, 0, p);
+ sqlite3OsFetch(pFd, 0, (int)nByte, &p);
+ sqlite3OsUnfetch(pFd, 0, p);
}
}
}
/*
** Allocate space for a file-handle and open a temporary file. If successful,
-** set *ppFile to point to the malloc'd file-handle and return SQLITE_OK.
-** Otherwise, set *ppFile to 0 and return an SQLite error code.
+** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK.
+** Otherwise, set *ppFd to 0 and return an SQLite error code.
*/
static int vdbeSorterOpenTempFile(
sqlite3 *db, /* Database handle doing sort */
i64 nExtend, /* Attempt to extend file to this size */
- sqlite3_file **ppFile
+ sqlite3_file **ppFd
){
int rc;
- rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFile,
+ rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd,
SQLITE_OPEN_TEMP_JOURNAL |
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
SQLITE_OPEN_EXCLUSIVE | SQLITE_OPEN_DELETEONCLOSE, &rc
);
if( rc==SQLITE_OK ){
i64 max = SQLITE_MAX_MMAP_SIZE;
- sqlite3OsFileControlHint(*ppFile, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
+ sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
if( nExtend>0 ){
- vdbeSorterExtendFile(db, *ppFile, nExtend);
+ vdbeSorterExtendFile(db, *ppFd, nExtend);
}
}
return rc;
** Initialize a PMA-writer object.
*/
static void vdbePmaWriterInit(
- sqlite3_file *pFile, /* File to write to */
+ sqlite3_file *pFd, /* File handle to write to */
PmaWriter *p, /* Object to populate */
int nBuf, /* Buffer size */
- i64 iStart /* Offset of pFile to begin writing at */
+ i64 iStart /* Offset of pFd to begin writing at */
){
memset(p, 0, sizeof(PmaWriter));
p->aBuffer = (u8*)sqlite3Malloc(nBuf);
p->iBufEnd = p->iBufStart = (iStart % nBuf);
p->iWriteOff = iStart - p->iBufStart;
p->nBuffer = nBuf;
- p->pFile = pFile;
+ p->pFd = pFd;
}
}
memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy);
p->iBufEnd += nCopy;
if( p->iBufEnd==p->nBuffer ){
- p->eFWErr = sqlite3OsWrite(p->pFile,
+ p->eFWErr = sqlite3OsWrite(p->pFd,
&p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
p->iWriteOff + p->iBufStart
);
static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){
int rc;
if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
- p->eFWErr = sqlite3OsWrite(p->pFile,
+ p->eFWErr = sqlite3OsWrite(p->pFd,
&p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart,
p->iWriteOff + p->iBufStart
);
for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
/* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
int iRes;
- if( pReadr1->pFile==0 ){
+ if( pReadr1->pFd==0 ){
iRes = +1;
- }else if( pReadr2->pFile==0 ){
+ }else if( pReadr2->pFd==0 ){
iRes = -1;
}else{
iRes = vdbeSorterCompare(pTask,
pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
pKey2 = pReadr2->aKey;
}else{
- if( pReadr1->pFile ) pKey2 = 0;
+ if( pReadr1->pFd ) pKey2 = 0;
pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
}
}
- *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFile==0);
+ *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0);
}
return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
/* Check if the output file is full or if the input has been exhausted.
** In either case exit the loop. */
- if( pReader->pFile==0 ) break;
+ if( pReader->pFd==0 ) break;
if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
/* Write the next key to the output. */
** pMerger argument before returning.
*/
static int vdbeIncrNew(
- SortSubtask *pTask,
- MergeEngine *pMerger,
- IncrMerger **ppOut
+ SortSubtask *pTask, /* The thread that will be using the new IncrMerger */
+ MergeEngine *pMerger, /* The MergeEngine that the IncrMerger will control */
+ IncrMerger **ppOut /* Write the new IncrMerger here */
){
int rc = SQLITE_OK;
IncrMerger *pIncr = *ppOut = (IncrMerger*)
static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode);
/*
-** Initialize the merger argument passed as the second argument. Once this
-** function returns, the first key of merged data may be read from the merger
-** object in the usual fashion.
+** Initialize the MergeEngine object passed as the second argument. Once this
+** function returns, the first key of merged data may be read from the
+** MergeEngine object in the usual fashion.
**
** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
** objects attached to the PmaReader objects that the merger reads from have
** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
*/
static int vdbeIncrInitMerger(
- SortSubtask *pTask,
- MergeEngine *pMerger,
+ SortSubtask *pTask, /* Thread that will run pMerger */
+ MergeEngine *pMerger, /* MergeEngine to initialize */
int eMode /* One of the INCRINIT_XXX constants */
){
int rc = SQLITE_OK; /* Return code */
int i; /* For looping over PmaReader objects */
int nTree = pMerger->nTree;
+ /* Verify that the MergeEngine is assigned to a single thread */
+ assert( pMerger->pTask==0 || pMerger->pTask==pTask );
+ pMerger->pTask = pTask;
+
for(i=0; rc==SQLITE_OK && i<nTree; i++){
if( eMode==INCRINIT_ROOT ){
/* PmaReaders should be normally initialized in order, as if they are
** to open and/or initialize the temp file related fields of the IncrMerge
** object at (pReadr->pIncr).
**
-** If argument eMode is set to INCRINIT_NORMAL, then PmaReader PmaReaders
+** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
** in the sub-tree headed by pReadr are also initialized. Data is then loaded
-** into the buffers belonging to this PmaReader, pReadr, and it is set to
+** into the buffers belonging to pReadr and it is set to
** point to the first key in its range.
**
-** If argument eMode is set to INCRINIT_TASK, then PmaReader is guaranteed
+** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
** to be a multi-threaded PmaReader and this function is being called in a
** background thread. In this case all PmaReaders in the sub-tree are
** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
-** pReadr is populated. However, the PmaReader itself is not set up to point
+** pReadr is populated. However, pReadr itself is not set up to point
** to its first key. A call to vdbePmaReaderNext() is still required to do
** that.
**
** The reason this function does not call vdbePmaReaderNext() immediately
-** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that has
+** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
** to block on thread (pTask->thread) before accessing aFile[1]. But, since
** this entire function is being run by thread (pTask->thread), that will
** lead to the current background thread attempting to join itself.
#if SQLITE_MAX_WORKER_THREADS>0
if( pSorter->bUseThreads ){
rc = vdbePmaReaderNext(pSorter->pReader);
- *pbEof = (pSorter->pReader->pFile==0);
+ *pbEof = (pSorter->pReader->pFd==0);
}else
#endif
/*if( !pSorter->bUseThreads )*/ {