** May you share freely, never taking more than you give.
**
*************************************************************************
-** $Id: btree.c,v 1.217 2004/11/13 13:19:56 danielk1977 Exp $
+** $Id: btree.c,v 1.218 2004/11/16 04:57:24 danielk1977 Exp $
**
** This file implements a external (disk-based) database using BTrees.
** For a detailed discussion of BTrees, refer to
** A cursor is a pointer to a particular entry in the BTree.
** The entry is identified by its MemPage and the index in
** MemPage.aCell[] of the entry.
+**
+** Normally, the BtCursor.delShift variable is 0. If non-zero, this
+** indicates that the entry to which the cursor logically points
+** was deleted (by a BtreeDelete() call). If this is the case, the
+** BtreeKeySize() and BtreeDataSize() calls both return 0.
+
+** If BtCursor.delShift is +1, then do not move the cursor for a
+** BtreeNext() operation (it was already advanced when the entry the
+** cursor logically points to was deleted). If BtCursor.delShift is
+** -1, then ignore the next BtreePrevious() call.
*/
struct BtCursor {
Btree *pBt; /* The Btree to which this cursor belongs */
u8 wrFlag; /* True if writable */
u8 isValid; /* TRUE if points to a valid entry */
u8 status; /* Set to SQLITE_ABORT if cursors is invalidated */
+ int delShift; /* See above. */
};
/*
pBt->pCursor = pCur;
pCur->isValid = 0;
pCur->status = SQLITE_OK;
+ pCur->delShift = 0;
*ppCur = pCur;
return SQLITE_OK;
** itself, not the number of bytes in the key.
*/
int sqlite3BtreeKeySize(BtCursor *pCur, i64 *pSize){
- if( !pCur->isValid ){
+ if( !pCur->isValid || pCur->delShift ){
*pSize = 0;
}else{
getCellInfo(pCur);
** the database is empty) then *pSize is set to 0.
*/
int sqlite3BtreeDataSize(BtCursor *pCur, u32 *pSize){
- if( !pCur->isValid ){
+ if( !pCur->isValid || pCur->delShift ){
/* Not pointing at a valid entry - set *pSize to 0. */
*pSize = 0;
}else{
** the available payload.
*/
int sqlite3BtreeKey(BtCursor *pCur, u32 offset, u32 amt, void *pBuf){
- if( pCur->isValid==0 ){
+ if( !pCur->isValid || pCur->delShift ){
return pCur->status;
}
assert( pCur->pPage!=0 );
** the available payload.
*/
int sqlite3BtreeData(BtCursor *pCur, u32 offset, u32 amt, void *pBuf){
- if( !pCur->isValid ){
+ if( !pCur->isValid || pCur->delShift ){
return pCur->status ? pCur->status : SQLITE_INTERNAL;
}
assert( pCur->pPage!=0 );
rc = moveToChild(pCur, subpage);
}
pCur->isValid = pCur->pPage->nCell>0;
+ pCur->delShift = 0;
return rc;
}
}
assert( pPage->isInit );
assert( pCur->idx<pPage->nCell );
+
+ /* If BtCursor.delShift is 1, the cursor has already been advanced. */
+ if( pCur->delShift==1 ){
+ *pRes = 0;
+ pCur->delShift = 0;
+ return SQLITE_OK;
+ }else{
+ pCur->delShift = 0;
+ }
+
pCur->idx++;
pCur->info.nSize = 0;
if( pCur->idx>=pPage->nCell ){
*pRes = 1;
return SQLITE_OK;
}
+
+ /* If BtCursor.delShift is -1, the cursor has already been advanced. */
+ if( pCur->delShift==-1 ){
+ *pRes = 0;
+ pCur->delShift = 0;
+ return SQLITE_OK;
+ }else{
+ pCur->delShift = 0;
+ }
+
pPage = pCur->pPage;
assert( pPage->isInit );
assert( pCur->idx>=0 );
nNew>=4 ? pgnoNew[3] : 0, nNew>=4 ? szNew[3] : 0,
nNew>=5 ? pgnoNew[4] : 0, nNew>=5 ? szNew[4] : 0));
-#if 0
- /* The following block shows how cells migrated during the balance op. */
- if( sqlite3_btree_trace ){
- char zBuf[200];
- char *zCsr = zBuf;
- int a, b, c=0, d=0;
- *zCsr = '\0';
- for(a=0; a<nOld; a++){
- int nOldCells = apCopy[a]->nCell+apCopy[a]->nOverflow;
- for(b=0; b<(nOldCells+((a!=nOld-1&&!leafData)?1:0)); b++){
- int x = 0;
- Pgno iNewPage;
- Pgno iOldPage;
- int iNewIndex;
- int iOldIndex;
-
- if( b<nOldCells ){
- iOldPage = pgnoOld[a];
- iOldIndex = b;
- }else{
- iOldPage = pParent->pgno;
- iOldIndex = idxDiv[a];
- }
-
- while( cntNew[x]<=c ) x++;
- if( x>0 && c==cntNew[x-1] && !leafData ){
- iNewPage = pParent->pgno;
- iNewIndex = nxDiv + a;
- }else{
- assert( x<nNew );
- iNewPage = pgnoNew[x];
- iNewIndex = c-(x>0?cntNew[x-1]:0)-(leafData?0:1);
- }
-
- if( (&zBuf[sizeof(zBuf)])-zCsr > 100 &&
- (1 || iOldPage!=iNewPage || iOldIndex!=iNewIndex) ){
- zCsr += sprintf(zCsr, " %d.%d->%d.%d", iOldPage, iOldIndex,
- iNewPage, iNewIndex);
- }
- c++;
- if( (d==0 && strlen(zBuf)>35) || strlen(zBuf)>60 ){
- TRACE(("%s%s\n", d==0?"BALANCE: Cell migration:":"", zBuf));
- zCsr = zBuf;
- d = 1;
- }
- }
- }
- assert( c==nCell );
- if( zCsr!=zBuf ){
- TRACE(("%s%s\n", d==0?"BALANCE: Cell migration":"", zBuf));
- }
- }
-#endif
-
/* If there are other cursors that refer to one of the pages involved
** in the balancing, then adjust these cursors so that they still
** point to the same cells.
iCell += (apCopy[i]->nCell + apCopy[i]->nOverflow + 1);
}
}
- if( pCur->idx>=(nxDiv+nOld) ){
+ if( pCur->idx>=(nxDiv+nOld-1) ){
TRACE(("BALANCE: Cursor %p migrates from %d,%d to %d,%d\n",
pCur, pgno, pCur->idx, pgno, pCur->idx+(nNew-nOld)));
pCur->idx += (nNew-nOld);
Pgno pgnoNew;
int x = 0;
+ assert( iCell<nCell );
while( cntNew[x]<=iCell ) x++;
if( x>0 && !leafData && cntNew[x-1]==iCell ){
/* The cell that pCur points to is a divider cell in pParent. */
releasePage(pCur->pPage);
rc = getPage(pBt, pgnoNew, &pCur->pPage);
assert( rc==SQLITE_OK );
+ assert( pCur->pPage->isInit );
pCur->info.nSize = 0;
}
}
/* The child information will fit on the root page, so do the
** copy */
int i;
+ BtCursor *pCur;
zeroPage(pPage, pChild->aData[0]);
for(i=0; i<pChild->nCell; i++){
apCell[i] = findCell(pChild,i);
assemblePage(pPage, pChild->nCell, apCell, szCell);
freePage(pChild);
TRACE(("BALANCE: child %d transfer to page 1\n", pChild->pgno));
+ /* If there were cursors pointing at this page, point them at the
+ ** new page instead. Decrement the reference count for the old
+ ** page and increment it for the new one.
+ */
+ for(pCur=pBt->pCursor; pCur; pCur=pCur->pNext){
+ if( pCur->pPage==pChild ){
+ TRACE(("BALANCE: Cursor %p migrates from %d,%d to %d,%d\n",
+ pCur, pPage->pgno, pCur->idx, pPage->pgno, pCur->idx));
+ releasePage(pCur->pPage);
+ rc = getPage(pBt, 1, &pCur->pPage);
+ assert( rc==SQLITE_OK );
+ }
+ }
}else{
/* The child has more information that will fit on the root.
** The tree is already balanced. Do nothing. */
TRACE(("BALANCE: child %d will not fit on page 1\n", pChild->pgno));
}
}else{
+ BtCursor *pCur;
memcpy(pPage->aData, pChild->aData, pPage->pBt->usableSize);
pPage->isInit = 0;
pPage->pParent = 0;
freePage(pChild);
TRACE(("BALANCE: transfer child %d into root %d\n",
pChild->pgno, pPage->pgno));
+ for(pCur=pBt->pCursor; pCur; pCur=pCur->pNext){
+ if( pCur->pPage==pChild ){
+ TRACE(("BALANCE: Cursor %p migrates from %d,%d to %d,%d\n",
+ pCur, pChild->pgno, pCur->idx, pPage->pgno, pCur->idx));
+ releasePage(pCur->pPage);
+ rc = getPage(pBt, pPage->pgno, &pCur->pPage);
+ assert( rc==SQLITE_OK );
+ }
+ }
}
rc = reparentChildPages(pPage);
if( rc!=SQLITE_OK ) goto end_shallow_balance;
** wrFlag==0 then this routine returns SQLITE_LOCKED. If all
** cursors that point to pgnoRoot were opened with wrFlag==1
** then this routine returns SQLITE_OK.
-**
-** In addition to checking for read-locks (where a read-lock
-** means a cursor opened with wrFlag==0) this routine also moves
-** all cursors other than pExclude so that they are pointing to the
-** first Cell on root page. This is necessary because an insert
-** or delete might change the number of cells on a page or delete
-** a page entirely and we do not want to leave any cursors
-** pointing to non-existant pages or cells.
*/
static int checkReadLocks(Btree *pBt, Pgno pgnoRoot, BtCursor *pExclude){
BtCursor *p;
for(p=pBt->pCursor; p; p=p->pNext){
if( p->pgnoRoot!=pgnoRoot || p==pExclude ) continue;
if( p->wrFlag==0 ) return SQLITE_LOCKED;
- if( p->pPage->pgno!=p->pgnoRoot ){
-/* moveToRoot(p); */
- }
}
return SQLITE_OK;
}
rc = balance(pPage);
/* sqlite3BtreePageDump(pCur->pBt, pCur->pgnoRoot, 1); */
/* fflush(stdout); */
- if( rc==SQLITE_OK ){
- /* moveToRoot(pCur); */
- }
end_insert:
sqliteFree(newCell);
return rc;
int rc;
Pgno pgnoChild = 0;
Btree *pBt = pCur->pBt;
+ int idx; /* Index of the cell to delete */
+ BtCursor *pCur2; /* Iterator variable for the pBt.pCursor link-list */
assert( pPage->isInit );
if( pCur->status ){
rc = sqlite3pager_write(pPage->aData);
if( rc ) return rc;
+ /* Set index to the index in pPage that contains the cell to delete. Also
+ ** increment the reference count for pPage. This allows us to move the
+ ** cursor pCur before the delete takes place.
+ */
+ idx = pCur->idx;
+ rc = getPage(pBt, pPage->pgno, &pPage);
+ if( rc ) return rc;
+ assert( pPage==pCur->pPage );
+
+ /* If there are any cursors that point to the cell being deleted,
+ ** move them to the next or previous entry in the table. It is preferable
+ ** to move the cursor to the 'next' location, rather than the 'previous'
+ ** one, as most table scans are done in the forward direction (also, code
+ ** below depends on this). If neither entry exists, declare the cursor
+ ** invalid.
+ */
+ for(pCur2=pBt->pCursor; pCur2; pCur2 = pCur2->pNext){
+ if( pCur2->pPage==pPage && pCur2->idx==idx && pCur2->isValid ){
+ int res;
+ pCur2->delShift = 0;
+ rc = sqlite3BtreeNext(pCur2, &res);
+ if( rc ) goto delete_out;
+ if( res ){
+ /* If the next tree entry cannot be found, then the cursor must
+ ** already point to the last table entry. So point it to the
+ ** second last by calling BtreeLast(), BtreePrevious().
+ */
+ rc = sqlite3BtreeLast(pCur2, &res);
+ if( rc ) goto delete_out;
+ assert( res==0 );
+ rc = sqlite3BtreePrevious(pCur2, &res);
+ if( rc ) goto delete_out;
+ pCur2->delShift = -1;
+ }else{
+ pCur2->delShift = 1;
+ }
+ }
+ }
+
/* Locate the cell within it's page and leave pCell pointing to the
** data. The clearCell() call frees any overflow pages associated with the
** cell. The cell itself is still intact.
*/
- pCell = findCell(pPage, pCur->idx);
+ pCell = findCell(pPage, idx);
if( !pPage->leaf ){
pgnoChild = get4byte(pCell);
}
** do something we will leave a hole on an internal page.
** We have to fill the hole by moving in a cell from a leaf. The
** next Cell after the one to be deleted is guaranteed to exist and
- ** to be a leaf so we can use it.
+ ** to be a leaf so we can use it. Conveniantly, pCur now points
+ ** at this cell (because it was advanced above).
*/
BtCursor leafCur;
unsigned char *pNext;
int notUsed;
unsigned char *tempCell;
assert( !pPage->leafData );
+
+ /* Make a copy of *pCur in leafCur. leafCur now points to the cell
+ ** that will be moved into the space left by the cell being deleted.
+ */
+ assert( pCur->delShift==1 );
+ assert( pCur->isValid );
getTempCursor(pCur, &leafCur);
- rc = sqlite3BtreeNext(&leafCur, ¬Used);
if( rc!=SQLITE_OK ){
if( rc!=SQLITE_NOMEM ){
rc = SQLITE_CORRUPT; /* bkpt-CORRUPT */
}
- return rc;
+ goto delete_out;
}
rc = sqlite3pager_write(leafCur.pPage->aData);
- if( rc ) return rc;
+ if( rc ) goto delete_out;
TRACE(("DELETE: table=%d delete internal from %d replace from leaf %d\n",
pCur->pgnoRoot, pPage->pgno, leafCur.pPage->pgno));
- dropCell(pPage, pCur->idx, cellSizePtr(pPage, pCell));
+
+ /* Drop the cell from the internal page. Make a copy of the cell from
+ ** the leaf page into memory obtained from malloc(). Insert it into
+ ** the internal page, at the position vacated by the delete. There
+ ** are now two copies of the leaf-cell in the tree.
+ */
+ dropCell(pPage, idx, cellSizePtr(pPage, pCell));
pNext = findCell(leafCur.pPage, leafCur.idx);
szNext = cellSizePtr(leafCur.pPage, pNext);
assert( MX_CELL_SIZE(pBt)>=szNext+4 );
tempCell = sqliteMallocRaw( MX_CELL_SIZE(pBt) );
- if( tempCell==0 ) return SQLITE_NOMEM;
- rc = insertCell(pPage, pCur->idx, pNext-4, szNext+4, tempCell);
- if( rc!=SQLITE_OK ) return rc;
- put4byte(findOverflowCell(pPage, pCur->idx), pgnoChild);
- pCur->isValid = 0;
+ if( tempCell==0 ){
+ rc = SQLITE_NOMEM;
+ goto delete_out;
+ }
+ rc = insertCell(pPage, idx, pNext-4, szNext+4, tempCell);
+ if( rc!=SQLITE_OK ) goto delete_out;
+ put4byte(findOverflowCell(pPage, idx), pgnoChild);
+ pPage->idxShift = 0;
+
+ /* If there are any cursors that point to the leaf-cell, move them
+ ** so that they point at internal cell. This is easiest done by
+ ** calling BtreePrevious().
+ */
+ for(pCur2=pBt->pCursor; pCur2; pCur2 = pCur2->pNext){
+ if( pCur2->pPage==leafCur.pPage && pCur2->idx==leafCur.idx ){
+ int res;
+ int delShiftSave = pCur2->delShift;
+ assert( leafCur.idx==0 );
+ pCur2->delShift = 0;
+ rc = sqlite3BtreePrevious(pCur2, &res);
+ if( rc ) goto delete_out;
+ assert( res==0 );
+ assert( pCur2->pPage==pPage );
+ assert( pCur2->idx==idx );
+ pCur2->delShift = delShiftSave;
+ }
+ }
+
+ /* Balance the internal page. Free the memory allocated for the
+ ** copy of the leaf cell. Then delete the cell from the leaf page.
+ */
rc = balance(pPage);
sqliteFree(tempCell);
- if( rc ) return rc;
+ if( rc ) goto delete_out;
dropCell(leafCur.pPage, leafCur.idx, szNext);
+
+ for(pCur2=pBt->pCursor; pCur2; pCur2 = pCur2->pNext){
+ if( pCur2->pPage==leafCur.pPage && pCur2->idx>leafCur.idx ){
+ TRACE(("DELETE: Cursor %p migrates from %d,%d to %d,%d\n",
+ pCur2, pPage->pgno, pCur2->idx, pPage->pgno, pCur2->idx-1));
+ pCur2->idx--;
+ pCur2->info.nSize = 0;
+ }
+ }
+
rc = balance(leafCur.pPage);
releaseTempCursor(&leafCur);
}else{
- TRACE(("DELETE: table=%d delete from leaf %d\n",
- pCur->pgnoRoot, pPage->pgno));
- dropCell(pPage, pCur->idx, cellSizePtr(pPage, pCell));
- pCur->isValid = 0;
+ TRACE(("DELETE: table=%d delete %d from leaf %d\n",
+ pCur->pgnoRoot, idx, pPage->pgno));
+ dropCell(pPage, idx, cellSizePtr(pPage, pCell));
+
+ /* If there were cursors pointing to cells on pPage with index values
+ ** greater than idx, decrement the index values now.
+ */
+ for(pCur2=pBt->pCursor; pCur2; pCur2 = pCur2->pNext){
+ assert( !pCur2->isValid || pCur2->pPage!=pPage || pCur2->idx!=idx );
+ if( pCur2->pPage==pPage && pCur2->idx>idx ){
+ TRACE(("DELETE: Cursor %p migrates from %d,%d to %d,%d\n",
+ pCur2, pPage->pgno, pCur2->idx, pPage->pgno, pCur2->idx-1));
+ pCur2->idx--;
+ pCur2->info.nSize = 0;
+ }
+ }
+
rc = balance(pPage);
}
- moveToRoot(pCur);
+
+delete_out:
+ releasePage(pPage);
return rc;
}
#
#***********************************************************************
# This file implements regression tests for SQLite library. The
-# focus of this script is btree database backend.
+# focus of this script is btree database backend. Specifically,
+# this file tests that existing cursors are correctly repositioned
+# when entries are inserted into or deleted from btrees.
#
-# $Id: btree8.test,v 1.1 2004/11/13 13:19:56 danielk1977 Exp $
+# $Id: btree8.test,v 1.2 2004/11/16 04:57:25 danielk1977 Exp $
set testdir [file dirname $argv0]
source $testdir/tester.tcl
}
btree_commit $::bt
-# set btree_trace 1
-
# Now write more entries to the table (and overwriting the ones that exist).
# After each write, check that the cursors created above still point to the
# same entries.
btree_begin_transaction $::bt
set ::write_csr [btree_cursor $::bt $::tnum 1]
+set first_entry $testnum
for {set i $testnum} {$i < 5000 && $nErr==0 } {incr i} {
set datalen [expr int(rand()*20.0)]
} $key
}
}
+
+# Now delete entries from the table.
+btree_first $::write_csr
+for {set i $first_entry} {$i < 5000 && $nErr==0 } {incr i} {
+
+ do_test btree8-2.$i.1 {
+ btree_key $::write_csr
+ } $i
+ do_test btree8-2.$i.2 {
+ btree_delete $::write_csr
+ btree_next $::write_csr
+ expr 0
+ } {0}
+ set testnum 2
+ foreach csr $csr_list key $keys {
+ incr testnum
+ if {$key <= $i } {
+ set key 0
+ }
+ do_test btree8-2.$i.$testnum {
+ btree_key $::csr
+ } $key
+ }
+}
+
btree_close_cursor $::write_csr
btree_commit $::bt
-if {$::nErr>0} { puts $::csr_list }
+if {$::nErr>0} { puts $::csr_list ; exit }
foreach csr $csr_list {
btree_close_cursor $csr
}
# [num_to_string 456 10] -> "456.456.45"
#
proc num_to_string {num len} {
+ set num [format %.4d $num]
return [string range [string repeat "$num." $len] 0 [expr $len-1]]
}
set testnum 0
foreach key $skeys {
incr testnum
- do_test btree-8-2.$testnum {
+ do_test btree-8-3.$testnum {
set csr [btree_cursor $::bt $::inum 1]
btree_insert $csr $key ""
lappend csr_list $csr
# same entries.
btree_begin_transaction $::bt
set ::write_csr [btree_cursor $::bt $::inum 1]
+set first_entry $testnum
for {set i $testnum} {$i < 5000 && $nErr==0 } {incr i} {
set skey [num_to_string $i 20]
- do_test btree8-2.$i.1 {
+ do_test btree-8-3.$i.1 {
btree_insert $::write_csr $skey ""
} {}
set testnum 1
foreach csr $csr_list key $skeys {
incr testnum
- do_test btree8-2.$i.$testnum {
+ do_test btree-8-3.$i.$testnum {
btree_key $::csr
} $key
}
}
+btree_commit $::bt
+btree_begin_transaction $::bt
+
+proc lremove {l key} {
+ set idx [lsearch $l $key]
+ return [concat [lrange $l 0 [expr $idx-1]] [lrange $l [expr $idx+1] end]]
+}
+proc K {x y} {set x}
+proc lshuffle { list } {
+ set n [llength $list]
+ while {$n>0} {
+ set j [expr {int(rand()*$n)}]
+ lappend slist [lindex $list $j]
+ set list [lreplace [K $list [set list {}]] $j $j]
+ incr n -1
+ }
+ return $slist
+}
+
+# Now delete entries from the index. Do this in a random order, to try to
+# ensure that internal and external nodes are deleted.
+for {set i $first_entry} {$i < 5000} {incr i} {
+ lappend delete_order $i
+}
+set delete_order [lshuffle $delete_order]
+
+btree_first $::write_csr
+foreach i $delete_order {
+ do_test btree8-4.$i.1 {
+ btree_move_to $::write_csr [num_to_string $i 20]
+ btree_key $::write_csr
+ } [num_to_string $i 20]
+ do_test btree8-4.$i.2 {
+ btree_delete $::write_csr
+ } {}
+
+ set delete_order [lremove $delete_order $i]
+ set testnum 2
+ foreach csr $csr_list key $keys {
+ incr testnum
+ if { [lsearch $delete_order $key]==-1 } {
+ set skey ""
+ } else {
+ set skey [num_to_string $key 20]
+ }
+ do_test btree8-4.$i.$testnum {
+ btree_key $::csr
+ } $skey
+ }
+}
+
btree_close_cursor $::write_csr
btree_commit $::bt
if {$::nErr>0} { puts $::csr_list }