** May you share freely, never taking more than you give.
**
*************************************************************************
-** $Id: btree.c,v 1.137 2004/05/14 16:50:06 drh Exp $
+** $Id: btree.c,v 1.138 2004/05/14 19:08:18 drh Exp $
**
** This file implements a external (disk-based) database using BTrees.
** For a detailed discussion of BTrees, refer to
};
typedef Btree Bt;
+/*
+** An instance of the following structure is used to hold information
+** about a cell. The parseCell() function fills the structure in.
+*/
+typedef struct CellInfo CellInfo;
+struct CellInfo {
+ i64 nKey; /* The key for INTKEY tables, or number of bytes in key */
+ u32 nData; /* Number of bytes of data */
+ u16 nHeader; /* Size of the header in bytes */
+ u16 nLocal; /* Amount of payload held locally */
+ u16 iOverflow; /* Offset to overflow page number. Zero if none */
+ u16 nSize; /* Size of the cell */
+};
+
/*
** A cursor is a pointer to a particular entry in the BTree.
** The entry is identified by its MemPage and the index in
Pgno pgnoRoot; /* The root page of this tree */
MemPage *pPage; /* Page that contains the entry */
int idx; /* Index of the entry in pPage->aCell[] */
+ CellInfo info; /* A parse of the cell we are pointing at */
+ u8 infoValid; /* True if information in BtCursor.info is valid */
u8 wrFlag; /* True if writable */
u8 iMatch; /* compare result from last sqlite3BtreeMoveto() */
u8 isValid; /* TRUE if points to a valid entry */
u8 status; /* Set to SQLITE_ABORT if cursors is invalidated */
};
-/*
-** An instance of the following structure is used to hold information
-** about a cell. The parseCell() function fills the structure in.
-*/
-typedef struct CellInfo CellInfo;
-struct CellInfo {
- i64 nKey; /* The key for INTKEY tables, or number of bytes in key */
- u32 nData; /* Number of bytes of data */
- int nHeader; /* Size of the header in bytes */
- int nLocal; /* Amount of payload held locally */
- int iOverflow; /* Offset to overflow page number. Zero if none */
- int nSize; /* Size of the cell */
-};
-
/*
** Read or write a two-, four-, and eight-byte big-endian integer values.
*/
*/
static int resizeCellArray(MemPage *pPage, int nNewSz){
if( pPage->nCellAlloc<nNewSz ){
- pPage->aCell = sqliteRealloc(pPage->aCell, nNewSz*sizeof(pPage->aCell[0]) );
+ int n = nNewSz*sizeof(pPage->aCell[0]);
+ if( pPage->aCell==0 ){
+ pPage->aCell = sqliteMallocRaw( n );
+ }else{
+ pPage->aCell = sqliteRealloc(pPage->aCell, n);
+ }
if( sqlite3_malloc_failed ) return SQLITE_NOMEM;
pPage->nCellAlloc = nNewSz;
}
pCur->pBt = pBt;
pCur->wrFlag = wrFlag;
pCur->idx = 0;
+ pCur->infoValid = 0;
pCur->pNext = pBt->pCursor;
if( pCur->pNext ){
pCur->pNext->pPrev = pCur;
MemPage *pPage;
Btree *pBt;
int ovflSize;
- CellInfo info;
+ u32 nKey;
assert( pCur!=0 && pCur->pPage!=0 );
assert( pCur->isValid );
pageIntegrity(pPage);
assert( pCur->idx>=0 && pCur->idx<pPage->nCell );
aPayload = pPage->aCell[pCur->idx];
- parseCell(pPage, aPayload, &info);
- aPayload += info.nHeader;
+ if( !pCur->infoValid ){
+ parseCell(pPage, aPayload, &pCur->info);
+ pCur->infoValid = 1;
+ }else{
+#ifndef NDEBUG
+ CellInfo info;
+ parseCell(pPage, aPayload, &info);
+ assert( memcmp(&info, &pCur->info, sizeof(info))==0 );
+#endif
+ }
+ aPayload += pCur->info.nHeader;
if( pPage->intKey ){
- info.nKey = 0;
+ nKey = 0;
+ }else{
+ nKey = pCur->info.nKey;
}
assert( offset>=0 );
if( skipKey ){
- offset += info.nKey;
+ offset += nKey;
}
- if( offset+amt > info.nKey+info.nData ){
+ if( offset+amt > nKey+pCur->info.nData ){
return SQLITE_ERROR;
}
- if( offset<info.nLocal ){
+ if( offset<pCur->info.nLocal ){
int a = amt;
- if( a+offset>info.nLocal ){
- a = info.nLocal - offset;
+ if( a+offset>pCur->info.nLocal ){
+ a = pCur->info.nLocal - offset;
}
memcpy(pBuf, &aPayload[offset], a);
if( a==amt ){
pBuf += a;
amt -= a;
}else{
- offset -= info.nLocal;
+ offset -= pCur->info.nLocal;
}
if( amt>0 ){
- nextPage = get4byte(&aPayload[info.nLocal]);
+ nextPage = get4byte(&aPayload[pCur->info.nLocal]);
}
ovflSize = pBt->usableSize - 4;
while( amt>0 && nextPage ){
unsigned char *aPayload;
MemPage *pPage;
Btree *pBt;
- CellInfo info;
+ u32 nKey;
+ int nLocal;
assert( pCur!=0 && pCur->pPage!=0 );
assert( pCur->isValid );
pageIntegrity(pPage);
assert( pCur->idx>=0 && pCur->idx<pPage->nCell );
aPayload = pPage->aCell[pCur->idx];
- parseCell(pPage, aPayload, &info);
- aPayload += info.nHeader;
+ if( !pCur->infoValid ){
+ parseCell(pPage, aPayload, &pCur->info);
+ pCur->infoValid = 1;
+ }else{
+#ifndef NDEBUG
+ CellInfo info;
+ parseCell(pPage, aPayload, &info);
+ assert( memcmp(&info, &pCur->info, sizeof(info))==0 );
+#endif
+ }
+ aPayload += pCur->info.nHeader;
if( pPage->intKey ){
- info.nKey = 0;
+ nKey = 0;
+ }else{
+ nKey = pCur->info.nKey;
}
if( skipKey ){
- aPayload += info.nKey;
- info.nLocal -= info.nKey;
- if( amt<0 ) amt = info.nData;
- assert( amt<=info.nData );
+ aPayload += nKey;
+ nLocal = pCur->info.nLocal - nKey;
+ if( amt<0 ) amt = pCur->info.nData;
+ assert( amt<=pCur->info.nData );
}else{
- if( amt<0 ) amt = info.nKey;
- assert( amt<=info.nKey );
+ nLocal = pCur->info.nLocal;
+ if( amt<0 ) amt = nKey;
+ assert( amt<=nKey );
}
- if( amt>info.nLocal ){
+ if( amt>nLocal ){
return 0; /* If any of the data is not local, return nothing */
}
return aPayload;
releasePage(pOldPage);
pCur->pPage = pNewPage;
pCur->idx = 0;
+ pCur->infoValid = 0;
if( pNewPage->nCell<1 ){
return SQLITE_CORRUPT;
}
oldPgno = pPage->pgno;
releasePage(pPage);
pCur->pPage = pParent;
+ pCur->infoValid = 0;
assert( pParent->idxShift==0 );
if( pParent->idxShift==0 ){
pCur->idx = idxParent;
pageIntegrity(pRoot);
pCur->pPage = pRoot;
pCur->idx = 0;
+ pCur->infoValid = 0;
if( pRoot->nCell==0 && !pRoot->leaf ){
Pgno subpage;
assert( pRoot->pgno==1 );
if( rc ) return rc;
}
pCur->idx = pPage->nCell - 1;
+ pCur->infoValid = 0;
return SQLITE_OK;
}
const void *pCellKey;
i64 nCellKey;
pCur->idx = (lwr+upr)/2;
+ pCur->infoValid = 0;
sqlite3BtreeKeySize(pCur, &nCellKey);
if( pPage->intKey ){
if( nCellKey<nKey ){
return SQLITE_OK;
}
pCur->idx = lwr;
+ pCur->infoValid = 0;
rc = moveToChild(pCur, chldPg);
if( rc ){
return rc;
assert( pPage->isInit );
assert( pCur->idx<pPage->nCell );
pCur->idx++;
+ pCur->infoValid = 0;
if( pCur->idx>=pPage->nCell ){
if( !pPage->leaf ){
rc = moveToChild(pCur, get4byte(&pPage->aData[pPage->hdrOffset+6]));
pPage = pCur->pPage;
}
pCur->idx--;
+ pCur->infoValid = 0;
if( pPage->leafData ){
rc = sqlite3BtreePrevious(pCur, pRes);
}else{
pPage->idxShift = 1;
}
+/*
+** Add a list of cells to a page. The page should be initially empty.
+** The cells are guaranteed to fit on the page.
+*/
+static void assemblePage(
+ MemPage *pPage, /* The page to be assemblied */
+ int nCell, /* The number of cells to add to this page */
+ u8 **apCell, /* Pointers to cell text */
+ int *aSize /* Sizes of the cells */
+){
+ int i; /* Loop counter */
+ int totalSize; /* Total size of all cells */
+ int hdr; /* Index of page header */
+ int pc, prevpc; /* Addresses of cells being inserted */
+ u8 *data; /* Data for the page */
+
+ assert( pPage->needRelink==0 );
+ assert( pPage->isOverfull==0 );
+ totalSize = 0;
+ for(i=0; i<nCell; i++){
+ totalSize += aSize[i];
+ }
+ assert( totalSize<=pPage->nFree );
+ assert( pPage->nCell==0 );
+ resizeCellArray(pPage, nCell);
+ pc = allocateSpace(pPage, totalSize);
+ data = pPage->aData;
+ hdr = pPage->hdrOffset;
+ prevpc = hdr+3;
+ for(i=0; i<nCell; i++){
+ memcpy(data+pc, apCell[i], aSize[i]);
+ put2byte(data+prevpc, pc);
+ pPage->aCell[i] = data+pc;
+ prevpc = pc;
+ pc += aSize[i];
+ assert( pc<=pPage->pBt->usableSize );
+ }
+ pPage->nCell = nCell;
+ put2byte(data+prevpc, 0);
+}
+
/*
** Rebuild the linked list of cells on a page so that the cells
** occur in the order specified by the pPage->aCell[] array.
/* The child information will fit on the root page, so do the
** copy */
zeroPage(pPage, pChild->aData[0]);
- resizeCellArray(pPage, pChild->nCell);
for(i=0; i<pChild->nCell; i++){
- insertCell(pPage, i, pChild->aCell[i],
- cellSize(pChild, pChild->aCell[i]), 0);
+ szCell[i] = cellSize(pChild, pChild->aCell[i]);
}
+ assemblePage(pPage, pChild->nCell, pChild->aCell, szCell);
freePage(pChild);
TRACE(("BALANCE: child %d transfer to page 1\n", pChild->pgno));
}else{
MemPage *pNew = apNew[i];
assert( pNew->pgno==pgnoNew[i] );
resizeCellArray(pNew, cntNew[i] - j);
- while( j<cntNew[i] ){
- assert( pNew->nFree>=szCell[j] );
- insertCell(pNew, pNew->nCell, apCell[j], szCell[j], 0);
- j++;
- }
+ assemblePage(pNew, cntNew[i]-j, &apCell[j], &szCell[j]);
+ j = cntNew[i];
assert( pNew->nCell>0 );
assert( !pNew->isOverfull );
relinkCellList(pNew);
}else if( loc<0 && pPage->nCell>0 ){
assert( pPage->leaf );
pCur->idx++;
+ pCur->infoValid = 0;
}else{
assert( pPage->leaf );
}