]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Change the balance_nonroot() routine to reduce the amount of memcpy work that takes...
authordan <dan@noemail.net>
Thu, 9 Oct 2014 19:35:37 +0000 (19:35 +0000)
committerdan <dan@noemail.net>
Thu, 9 Oct 2014 19:35:37 +0000 (19:35 +0000)
FossilOrigin-Name: 29304499ea4b72dbb6701e10cc19b5d41f7e5ac9

manifest
manifest.uuid
src/btree.c
src/pager.c
src/pager.h

index 574bfe2c8d7aae49a371e42a058d26cc6e296fe0..aeefa53238e6e693bbcde97edda3620f16df7ab2 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Reduce\sthe\samount\sof\smemcpy()\srequired\sby\sdefragmentPage().
-D 2014-09-27T05:00:25.096
+C Change\sthe\sbalance_nonroot()\sroutine\sto\sreduce\sthe\samount\sof\smemcpy\swork\sthat\stakes\splace.\sThis\sis\sa\swork\sin\sprogress.
+D 2014-10-09T19:35:37.452
 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
 F Makefile.in cf57f673d77606ab0f2d9627ca52a9ba1464146a
 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@@ -172,7 +172,7 @@ F src/auth.c d8abcde53426275dab6243b441256fcd8ccbebb2
 F src/backup.c a31809c65623cc41849b94d368917f8bb66e6a7e
 F src/bitvec.c 19a4ba637bd85f8f63fc8c9bae5ade9fb05ec1cb
 F src/btmutex.c 49ca66250c7dfa844a4d4cb8272b87420d27d3a5
-F src/btree.c 95a942a6ebdb23eb2a5d925526d35169aa6742f6
+F src/btree.c 7b89fde3bffa5b7300e94c4aeb69ccff926ef513
 F src/btree.h a79aa6a71e7f1055f01052b7f821bd1c2dce95c8
 F src/btreeInt.h 1bd7957161a1346a914f1f09231610e777a8e58d
 F src/build.c bde83dd5cf812e310a7e5ad2846790a14745bef4
@@ -215,8 +215,8 @@ F src/os_setup.h c9d4553b5aaa6f73391448b265b89bed0b890faa
 F src/os_unix.c fb587121840f690101336879adfa6d0b2cd0e8c7
 F src/os_win.c 0a4042ef35f322e86fa01f6c8884c5e645b911e7
 F src/os_win.h 09e751b20bbc107ffbd46e13555dc73576d88e21
-F src/pager.c caab007743821d96752597c9cfd7351654697b06
-F src/pager.h ffd5607f7b3e4590b415b007a4382f693334d428
+F src/pager.c 0abcb0904a78d68b96357f360c6b160bcfc2a3e0
+F src/pager.h 8b6707cb32c788cf36bfc3d63f6d4b4fa689e7c2
 F src/parse.y b98772da2bb5415970085b707203f92569400aa8
 F src/pcache.c 4121a0571c18581ee9f82f086d5e2030051ebd6a
 F src/pcache.h 9b559127b83f84ff76d735c8262f04853be0c59a
@@ -1200,10 +1200,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1
 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
 F tool/warnings.sh 0abfd78ceb09b7f7c27c688c8e3fe93268a13b32
 F tool/win/sqlite.vsix deb315d026cc8400325c5863eef847784a219a2f
-P 83913515830aa850f9e38406f9422d7e88dcab66
-R 66a1e1f00a844450677737824735607d
-T *branch * defrag-opt
-T *sym-defrag-opt *
-T -sym-trunk *
-U drh
-Z f60a1f2e4650c574e91ad245c0c67dee
+P 3edab9957cc7bb90b52fd40b02613c2cb03fc166
+R 5fca1836b5a4d862df24682ecb47d048
+U dan
+Z 8527330c8f275358262176fc962502e2
index bf8d17f152f84a1ac844771e98281e6d749195c1..bb422fd7dd5206e0947064a1098829fdbf8a977b 100644 (file)
@@ -1 +1 @@
-3edab9957cc7bb90b52fd40b02613c2cb03fc166
\ No newline at end of file
+29304499ea4b72dbb6701e10cc19b5d41f7e5ac9
\ No newline at end of file
index c4832b4ee900e1c7359109f63c5e8c440f946782..dbf002ecdbf4105bc250dee77de946e875b8a5cf 100644 (file)
@@ -5977,6 +5977,49 @@ static void assemblePage(
   pPage->nCell = (u16)nCell;
 }
 
+
+static void rebuildPage(
+  MemPage *pPg,                   /* Edit this page */
+  int nRemove,                    /* Cells to remove from start of page */
+  int nCell,                      /* Final number of cells on page */
+  u8 **apCell,                    /* Array of nCell final cells */
+  u16 *szCell                     /* Array of nCell cell sizes */
+){
+  const int hdr = pPg->hdrOffset;          /* Offset of header on pPg */
+  u8 * const aData = pPg->aData;           /* Pointer to data for pPg */
+  const int usableSize = pPg->pBt->usableSize;
+  u8 * const pEnd = &aData[usableSize];
+  int i;
+  u8 *pCellptr = pPg->aCellIdx;
+  u8 *pTmp = sqlite3PagerTempSpace(pPg->pBt->pPager);
+  u8 *pData;
+
+  i = get2byte(&aData[hdr+5]);
+  memcpy(&pTmp[i], &aData[i], usableSize - i);
+  pData = &aData[usableSize];
+
+  for(i=0; i<nCell; i++){
+    u8 *pCell = apCell[i];
+    if( pCell>aData && pCell<pEnd ){
+      pCell = &pTmp[pCell - aData];
+    }
+    pData -= szCell[i];
+    memcpy(pData, pCell, szCell[i]);
+    put2byte(pCellptr, (pData - aData));
+    pCellptr += 2;
+    assert( szCell[i]==cellSizePtr(pPg, pCell) );
+  }
+
+  pPg->nFree = (pData - pCellptr);
+  pPg->nCell = nCell;
+  pPg->nOverflow = 0;
+
+  put2byte(&aData[hdr+1], 0);
+  put2byte(&aData[hdr+3], pPg->nCell);
+  put2byte(&aData[hdr+5], pData - aData);
+  aData[hdr+7] = 0x00;
+}
+
 /*
 ** The following parameters determine how many adjacent pages get involved
 ** in a balancing operation.  NN is the number of neighbors on either side
@@ -6098,7 +6141,7 @@ static int balance_quick(MemPage *pParent, MemPage *pPage, u8 *pSpace){
 }
 #endif /* SQLITE_OMIT_QUICKBALANCE */
 
-#if 0
+#if 1
 /*
 ** This function does not contribute anything to the operation of SQLite.
 ** it is sometimes activated temporarily while debugging code responsible 
@@ -6265,7 +6308,6 @@ static int balance_nonroot(
   int iOvflSpace = 0;          /* First unused byte of aOvflSpace[] */
   int szScratch;               /* Size of scratch memory requested */
   MemPage *apOld[NB];          /* pPage and up to two siblings */
-  MemPage *apCopy[NB];         /* Private copies of apOld[] pages */
   MemPage *apNew[NB+2];        /* pPage and up to NB siblings after balancing */
   u8 *pRight;                  /* Location in parent of right-sibling pointer */
   u8 *apDiv[NB-1];             /* Divider cells in pParent */
@@ -6276,6 +6318,13 @@ static int balance_nonroot(
   u8 *aSpace1;                 /* Space for copies of dividers cells */
   Pgno pgno;                   /* Temp var to store a page number in */
 
+  int aShiftLeft[NB+2];
+  int aShiftRight[NB+2];
+  u8 abDone[NB+2];
+  Pgno aPgno[NB+2];
+  u16 aPgFlags[NB+2];
+
+  memset(abDone, 0, sizeof(abDone));
   pBt = pParent->pBt;
   assert( sqlite3_mutex_held(pBt->mutex) );
   assert( sqlite3PagerIswriteable(pParent->pDbPage) );
@@ -6384,12 +6433,10 @@ static int balance_nonroot(
   /*
   ** Allocate space for memory structures
   */
-  k = pBt->pageSize + ROUND8(sizeof(MemPage));
   szScratch =
        nMaxCells*sizeof(u8*)                       /* apCell */
      + nMaxCells*sizeof(u16)                       /* szCell */
-     + pBt->pageSize                               /* aSpace1 */
-     + k*nOld;                                     /* Page copies (apCopy) */
+     + pBt->pageSize;                              /* aSpace1 */
   apCell = sqlite3ScratchMalloc( szScratch ); 
   if( apCell==0 ){
     rc = SQLITE_NOMEM;
@@ -6402,8 +6449,8 @@ static int balance_nonroot(
   /*
   ** Load pointers to all cells on sibling pages and the divider cells
   ** into the local apCell[] array.  Make copies of the divider cells
-  ** into space obtained from aSpace1[] and remove the divider cells
-  ** from pParent.
+  ** into space obtained from aSpace1[]. The divider cells have already
+  ** been removed from pParent.
   **
   ** If the siblings are on leaf pages, then the child pointers of the
   ** divider cells are stripped from the cells before they are copied
@@ -6419,15 +6466,7 @@ static int balance_nonroot(
   leafData = apOld[0]->intKeyLeaf;
   for(i=0; i<nOld; i++){
     int limit;
-    
-    /* Before doing anything else, take a copy of the i'th original sibling
-    ** The rest of this function will use data from the copies rather
-    ** that the original pages since the original pages will be in the
-    ** process of being overwritten.  */
-    MemPage *pOld = apCopy[i] = (MemPage*)&aSpace1[pBt->pageSize + k*i];
-    memcpy(pOld, apOld[i], sizeof(MemPage));
-    pOld->aData = (void*)&pOld[1];
-    memcpy(pOld->aData, apOld[i]->aData, pBt->pageSize);
+    MemPage *pOld = apOld[i];
 
     limit = pOld->nCell+pOld->nOverflow;
     if( pOld->nOverflow>0 ){
@@ -6556,10 +6595,10 @@ static int balance_nonroot(
   assert( cntNew[0]>0 || (pParent->pgno==1 && pParent->nCell==0) );
 #endif
 
-  TRACE(("BALANCE: old: %d %d %d  ",
-    apOld[0]->pgno, 
-    nOld>=2 ? apOld[1]->pgno : 0,
-    nOld>=3 ? apOld[2]->pgno : 0
+  TRACE(("BALANCE: old: %d(nc=%d) %d(nc=%d) %d(nc=%d)\n",
+    apOld[0]->pgno, apOld[0]->nCell,
+    nOld>=2 ? apOld[1]->pgno : 0, nOld>=2 ? apOld[1]->nCell : 0,
+    nOld>=3 ? apOld[2]->pgno : 0, nOld>=3 ? apOld[2]->nCell : 0
   ));
 
   /*
@@ -6582,6 +6621,7 @@ static int balance_nonroot(
       assert( i>0 );
       rc = allocateBtreePage(pBt, &pNew, &pgno, (bBulk ? 1 : pgno), 0);
       if( rc ) goto balance_cleanup;
+      zeroPage(pNew, pageFlags);
       apNew[i] = pNew;
       nNew++;
 
@@ -6595,135 +6635,223 @@ static int balance_nonroot(
     }
   }
 
-  /* Free any old pages that were not reused as new pages.
-  */
-  while( i<nOld ){
-    freePage(apOld[i], &rc);
-    if( rc ) goto balance_cleanup;
-    releasePage(apOld[i]);
-    apOld[i] = 0;
-    i++;
-  }
-
   /*
-  ** Put the new pages in ascending order.  This helps to
-  ** keep entries in the disk file in order so that a scan
-  ** of the table is a linear scan through the file.  That
-  ** in turn helps the operating system to deliver pages
-  ** from the disk more rapidly.
+  ** Reassign page numbers so that the new pages are in ascending order. 
+  ** This helps to keep entries in the disk file in order so that a scan
+  ** of the table is closer to a linear scan through the file. That in turn 
+  ** helps the operating system to deliver pages from the disk more rapidly.
   **
-  ** An O(n^2) insertion sort algorithm is used, but since
-  ** n is never more than NB (a small constant), that should
-  ** not be a problem.
+  ** An O(n^2) insertion sort algorithm is used, but since n is never more 
+  ** than (NB+2) (a small constant), that should not be a problem.
   **
-  ** When NB==3, this one optimization makes the database
-  ** about 25% faster for large insertions and deletions.
+  ** When NB==3, this one optimization makes the database about 25% faster 
+  ** for large insertions and deletions.
   */
-  for(i=0; i<k-1; i++){
-    int minV = apNew[i]->pgno;
-    int minI = i;
-    for(j=i+1; j<k; j++){
-      if( apNew[j]->pgno<(unsigned)minV ){
-        minI = j;
-        minV = apNew[j]->pgno;
+  for(i=0; i<nNew; i++){
+    aPgno[i] = apNew[i]->pgno;
+    aPgFlags[i] = apNew[i]->pDbPage->flags;
+  }
+  for(i=0; i<nNew; i++){
+    Pgno iGt = (i==0 ? 0 : apNew[i-1]->pgno);
+    Pgno iMin = 0;
+    u16 flags = 0;
+    for(j=0; j<nNew; j++){
+      Pgno iPgno = aPgno[j];
+      if( iPgno>iGt && (iMin==0 || iPgno<iMin) ){
+        iMin = iPgno;
+        flags = aPgFlags[j];
       }
     }
-    if( minI>i ){
-      MemPage *pT;
-      pT = apNew[i];
-      apNew[i] = apNew[minI];
-      apNew[minI] = pT;
+    if( apNew[i]->pgno!=iMin ){
+      apNew[i]->pDbPage->flags = flags;
+      sqlite3PagerRekey(apNew[i]->pDbPage, iMin);
+      apNew[i]->pgno = iMin;
     }
   }
-  TRACE(("new: %d(%d) %d(%d) %d(%d) %d(%d) %d(%d)\n",
-    apNew[0]->pgno, szNew[0],
+
+  TRACE(("BALANCE: new: %d(%d nc=%d) %d(%d nc=%d) %d(%d nc=%d) "
+         "%d(%d nc=%d) %d(%d nc=%d)\n",
+    apNew[0]->pgno, szNew[0], cntNew[0],
     nNew>=2 ? apNew[1]->pgno : 0, nNew>=2 ? szNew[1] : 0,
+    nNew>=2 ? cntNew[1] - cntNew[0] - !leafData : 0,
     nNew>=3 ? apNew[2]->pgno : 0, nNew>=3 ? szNew[2] : 0,
+    nNew>=3 ? cntNew[2] - cntNew[1] - !leafData : 0,
     nNew>=4 ? apNew[3]->pgno : 0, nNew>=4 ? szNew[3] : 0,
-    nNew>=5 ? apNew[4]->pgno : 0, nNew>=5 ? szNew[4] : 0));
+    nNew>=4 ? cntNew[3] - cntNew[2] - !leafData : 0,
+    nNew>=5 ? apNew[4]->pgno : 0, nNew>=5 ? szNew[4] : 0,
+    nNew>=5 ? cntNew[4] - cntNew[3] - !leafData : 0
+  ));
 
   assert( sqlite3PagerIswriteable(pParent->pDbPage) );
   put4byte(pRight, apNew[nNew-1]->pgno);
 
-  /*
-  ** Evenly distribute the data in apCell[] across the new pages.
-  ** Insert divider cells into pParent as necessary.
-  */
   j = 0;
   for(i=0; i<nNew; i++){
-    /* Assemble the new sibling page. */
-    MemPage *pNew = apNew[i];
-    assert( j<nMaxCells );
-    zeroPage(pNew, pageFlags);
-    assemblePage(pNew, cntNew[i]-j, &apCell[j], &szCell[j]);
-    assert( pNew->nCell>0 || (nNew==1 && cntNew[0]==0) );
-    assert( pNew->nOverflow==0 );
+    /* At this point, "j" is the apCell[] index of the first cell currently
+    ** stored on page apNew[i]. Or, if apNew[i] was not one of the original 
+    ** sibling pages, "j" should be set to nCell. Variable iFirst is set
+    ** to the apCell[] index of the first cell that will appear on the
+    ** page following this balancing operation.  */
+    int iFirst = (i==0 ? 0 : cntNew[i-1] + !leafData);     /* new first cell */
+    assert( i<nOld || j==nCell );
+    aShiftLeft[i] = j - iFirst;
+    j += apNew[i]->nCell + apNew[i]->nOverflow;
+    aShiftRight[i] = cntNew[i] - j;
+    assert( i!=nOld-1 || j==nCell );
+    if( j<nCell ) j += !leafData;
+  }
+
+  /* If the sibling pages are not leaves, ensure that the right-child pointer
+  ** of the right-most new sibling page is set to the value that was 
+  ** originally in the same field of the right-most old sibling page. */
+  if( (pageFlags & PTF_LEAF)==0 && nOld!=nNew ){
+    MemPage *pOld = (nNew>nOld ? apNew : apOld)[nOld-1];
+    memcpy(&apNew[nNew-1]->aData[8], &pOld->aData[8], 4);
+  }
+
+  /* Make any required updates to pointer map entries associated with 
+  ** cells stored on sibling pages following the balance operation. Pointer
+  ** map entries associated with divider cells are set by the insertCell()
+  ** routine. The associated pointer map entries are:
+  **
+  **   a) if the cell contains a reference to an overflow chain, the
+  **      entry associated with the first page in the overflow chain, and
+  **
+  **   b) if the sibling pages are not leaves, the child page associated
+  **      with the cell.
+  **
+  ** If the sibling pages are not leaves, then the pointer map entry 
+  ** associated with the right-child of each sibling may also need to be 
+  ** updated. This happens below, after the sibling pages have been 
+  ** populated, not here.
+  */
+  if( ISAUTOVACUUM ){
+    MemPage *pNew = apNew[0];
+    u8 *aOld = pNew->aData;
+    int cntOldNext = pNew->nCell + pNew->nOverflow;
+    int usableSize = pBt->usableSize;
+    int iNew = 0;
+    int iOld = 0;
 
-    j = cntNew[i];
+    for(i=0; i<nCell; i++){
+      u8 *pCell = apCell[i];
+      if( i==cntOldNext ){
+        MemPage *pOld = (++iOld)<nNew ? apNew[iOld] : apOld[iOld];
+        cntOldNext += pOld->nCell + pOld->nOverflow + !leafData;
+        aOld = pOld->aData;
+      }
+      if( i==cntNew[iNew] ){
+        pNew = apNew[++iNew];
+        if( !leafData ) continue;
+      }
 
-    /* If the sibling page assembled above was not the right-most sibling,
-    ** insert a divider cell into the parent page.
-    */
-    assert( i<nNew-1 || j==nCell );
-    if( j<nCell ){
-      u8 *pCell;
-      u8 *pTemp;
-      int sz;
-
-      assert( j<nMaxCells );
-      pCell = apCell[j];
-      sz = szCell[j] + leafCorrection;
-      pTemp = &aOvflSpace[iOvflSpace];
-      if( !pNew->leaf ){
-        memcpy(&pNew->aData[8], pCell, 4);
-      }else if( leafData ){
-        /* If the tree is a leaf-data tree, and the siblings are leaves, 
-        ** then there is no divider cell in apCell[]. Instead, the divider 
-        ** cell consists of the integer key for the right-most cell of 
-        ** the sibling-page assembled above only.
-        */
-        CellInfo info;
-        j--;
-        btreeParseCellPtr(pNew, apCell[j], &info);
-        pCell = pTemp;
-        sz = 4 + putVarint(&pCell[4], info.nKey);
-        pTemp = 0;
-      }else{
-        pCell -= 4;
-        /* Obscure case for non-leaf-data trees: If the cell at pCell was
-        ** previously stored on a leaf node, and its reported size was 4
-        ** bytes, then it may actually be smaller than this 
-        ** (see btreeParseCellPtr(), 4 bytes is the minimum size of
-        ** any cell). But it is important to pass the correct size to 
-        ** insertCell(), so reparse the cell now.
-        **
-        ** Note that this can never happen in an SQLite data file, as all
-        ** cells are at least 4 bytes. It only happens in b-trees used
-        ** to evaluate "IN (SELECT ...)" and similar clauses.
-        */
-        if( szCell[j]==4 ){
-          assert(leafCorrection==4);
-          sz = cellSizePtr(pParent, pCell);
+      /* Cell pCell is destined for new sibling page pNew. Originally, it
+      ** was either part of sibling page iOld (possibly an overflow page), 
+      ** or else the divider cell to the left of sibling page iOld. So,
+      ** if sibling page iOld had the same page number as pNew, and if
+      ** pCell really was a part of sibling page iOld (not a divider or
+      ** overflow cell), we can skip updating the pointer map entries.  */
+      if( pNew->pgno!=aPgno[iOld] || pCell<aOld || pCell>=&aOld[usableSize] ){
+        if( !leafCorrection ){
+          ptrmapPut(pBt, get4byte(pCell), PTRMAP_BTREE, pNew->pgno, &rc);
+        }
+        if( szCell[i]>pNew->minLocal ){
+          ptrmapPutOvflPtr(pNew, pCell, &rc);
         }
       }
-      iOvflSpace += sz;
-      assert( sz<=pBt->maxLocal+23 );
-      assert( iOvflSpace <= (int)pBt->pageSize );
-      insertCell(pParent, nxDiv, pCell, sz, pTemp, pNew->pgno, &rc);
-      if( rc!=SQLITE_OK ) goto balance_cleanup;
-      assert( sqlite3PagerIswriteable(pParent->pDbPage) );
+    }
+  }
+
+  /* Insert new divider cells into pParent. */
+  for(i=0; i<nNew-1; i++){
+    u8 *pCell;
+    u8 *pTemp;
+    int sz;
+    MemPage *pNew = apNew[i];
+    j = cntNew[i];
 
-      j++;
-      nxDiv++;
+    assert( j<nMaxCells );
+    pCell = apCell[j];
+    sz = szCell[j] + leafCorrection;
+    pTemp = &aOvflSpace[iOvflSpace];
+    if( !pNew->leaf ){
+      memcpy(&pNew->aData[8], pCell, 4);
+    }else if( leafData ){
+      /* If the tree is a leaf-data tree, and the siblings are leaves, 
+      ** then there is no divider cell in apCell[]. Instead, the divider 
+      ** cell consists of the integer key for the right-most cell of 
+      ** the sibling-page assembled above only.
+      */
+      CellInfo info;
+      j--;
+      btreeParseCellPtr(pNew, apCell[j], &info);
+      pCell = pTemp;
+      sz = 4 + putVarint(&pCell[4], info.nKey);
+      pTemp = 0;
+    }else{
+      pCell -= 4;
+      /* Obscure case for non-leaf-data trees: If the cell at pCell was
+      ** previously stored on a leaf node, and its reported size was 4
+      ** bytes, then it may actually be smaller than this 
+      ** (see btreeParseCellPtr(), 4 bytes is the minimum size of
+      ** any cell). But it is important to pass the correct size to 
+      ** insertCell(), so reparse the cell now.
+      **
+      ** Note that this can never happen in an SQLite data file, as all
+      ** cells are at least 4 bytes. It only happens in b-trees used
+      ** to evaluate "IN (SELECT ...)" and similar clauses.
+      */
+      if( szCell[j]==4 ){
+        assert(leafCorrection==4);
+        sz = cellSizePtr(pParent, pCell);
+      }
     }
+    iOvflSpace += sz;
+    assert( sz<=pBt->maxLocal+23 );
+    assert( iOvflSpace <= (int)pBt->pageSize );
+    insertCell(pParent, nxDiv+i, pCell, sz, pTemp, pNew->pgno, &rc);
+    if( rc!=SQLITE_OK ) goto balance_cleanup;
+    assert( sqlite3PagerIswriteable(pParent->pDbPage) );
   }
-  assert( j==nCell );
+
+  /* Now update the actual sibling pages. The order in which they are updated
+  ** is important, as this code needs to avoid disrupting any page from which
+  ** cells may still to be read. In practice, this means:
+  **
+  **   1) If the aShiftLeft[] entry is less than 0, it is not safe to 
+  **      update the page until the page to the left of the current page
+  **      (apNew[i-1]) has already been updated.
+  **
+  **   2) If the aShiftRight[] entry is less than 0, it is not safe to 
+  **      update the page until the page to the right of the current page
+  **      (apNew[i+1]) has already been updated.
+  **
+  ** If neither of the above apply, the page is safe to update.
+  */
+  assert( aShiftRight[nNew-1]>=0 && aShiftLeft[0]==0 );
+  for(i=0; i<nNew*2; i++){
+    int iPg = (i>=nNew ? i-nNew : nNew-1-i);
+    if( abDone[iPg]==0 
+     && (aShiftLeft[iPg]>=0 || abDone[iPg-1])
+     && (aShiftRight[iPg]>=0 || abDone[iPg+1])
+    ){
+      MemPage *pNew = apNew[iPg];
+      int iLeft = ((iPg==0) ? 0 : cntNew[iPg-1] + !leafData);
+      rebuildPage(pNew, 
+          aShiftLeft[iPg] < 0 ? (aShiftLeft[iPg]*-1) : 0,
+          cntNew[iPg] - iLeft,
+          &apCell[iLeft],
+          &szCell[iLeft]
+      );
+      abDone[iPg] = 1;
+      assert( pNew->nOverflow==0 );
+      assert( pNew->nCell==(cntNew[iPg] - (iPg==0?0:cntNew[iPg-1]+!leafData)) );
+    }
+  }
+  assert( memcmp(abDone, "\01\01\01\01\01", nNew)==0 );
+
   assert( nOld>0 );
   assert( nNew>0 );
-  if( (pageFlags & PTF_LEAF)==0 ){
-    u8 *zChild = &apCopy[nOld-1]->aData[8];
-    memcpy(&apNew[nNew-1]->aData[8], zChild, 4);
-  }
 
   if( isRoot && pParent->nCell==0 && pParent->hdrOffset<=apNew[0]->nFree ){
     /* The root page of the b-tree now contains no cells. The only sibling
@@ -6746,116 +6874,36 @@ static int balance_nonroot(
     );
     copyNodeContent(apNew[0], pParent, &rc);
     freePage(apNew[0], &rc);
-  }else if( ISAUTOVACUUM ){
-    /* Fix the pointer-map entries for all the cells that were shifted around. 
-    ** There are several different types of pointer-map entries that need to
-    ** be dealt with by this routine. Some of these have been set already, but
-    ** many have not. The following is a summary:
-    **
-    **   1) The entries associated with new sibling pages that were not
-    **      siblings when this function was called. These have already
-    **      been set. We don't need to worry about old siblings that were
-    **      moved to the free-list - the freePage() code has taken care
-    **      of those.
-    **
-    **   2) The pointer-map entries associated with the first overflow
-    **      page in any overflow chains used by new divider cells. These 
-    **      have also already been taken care of by the insertCell() code.
-    **
-    **   3) If the sibling pages are not leaves, then the child pages of
-    **      cells stored on the sibling pages may need to be updated.
-    **
-    **   4) If the sibling pages are not internal intkey nodes, then any
-    **      overflow pages used by these cells may need to be updated
-    **      (internal intkey nodes never contain pointers to overflow pages).
-    **
-    **   5) If the sibling pages are not leaves, then the pointer-map
-    **      entries for the right-child pages of each sibling may need
-    **      to be updated.
-    **
-    ** Cases 1 and 2 are dealt with above by other code. The next
-    ** block deals with cases 3 and 4 and the one after that, case 5. Since
-    ** setting a pointer map entry is a relatively expensive operation, this
-    ** code only sets pointer map entries for child or overflow pages that have
-    ** actually moved between pages.  */
-    MemPage *pNew = apNew[0];
-    MemPage *pOld = apCopy[0];
-    int nOverflow = pOld->nOverflow;
-    int iNextOld = pOld->nCell + nOverflow;
-    int iOverflow = (nOverflow ? pOld->aiOvfl[0] : -1);
-    j = 0;                             /* Current 'old' sibling page */
-    k = 0;                             /* Current 'new' sibling page */
-    for(i=0; i<nCell; i++){
-      int isDivider = 0;
-      while( i==iNextOld ){
-        /* Cell i is the cell immediately following the last cell on old
-        ** sibling page j. If the siblings are not leaf pages of an
-        ** intkey b-tree, then cell i was a divider cell. */
-        assert( j+1 < ArraySize(apCopy) );
-        assert( j+1 < nOld );
-        pOld = apCopy[++j];
-        iNextOld = i + !leafData + pOld->nCell + pOld->nOverflow;
-        if( pOld->nOverflow ){
-          nOverflow = pOld->nOverflow;
-          iOverflow = i + !leafData + pOld->aiOvfl[0];
-        }
-        isDivider = !leafData;  
-      }
-
-      assert(nOverflow>0 || iOverflow<i );
-      assert(nOverflow<2 || pOld->aiOvfl[0]==pOld->aiOvfl[1]-1);
-      assert(nOverflow<3 || pOld->aiOvfl[1]==pOld->aiOvfl[2]-1);
-      if( i==iOverflow ){
-        isDivider = 1;
-        if( (--nOverflow)>0 ){
-          iOverflow++;
-        }
-      }
-
-      if( i==cntNew[k] ){
-        /* Cell i is the cell immediately following the last cell on new
-        ** sibling page k. If the siblings are not leaf pages of an
-        ** intkey b-tree, then cell i is a divider cell.  */
-        pNew = apNew[++k];
-        if( !leafData ) continue;
-      }
-      assert( j<nOld );
-      assert( k<nNew );
-
-      /* If the cell was originally divider cell (and is not now) or
-      ** an overflow cell, or if the cell was located on a different sibling
-      ** page before the balancing, then the pointer map entries associated
-      ** with any child or overflow pages need to be updated.  */
-      if( isDivider || pOld->pgno!=pNew->pgno ){
-        if( !leafCorrection ){
-          ptrmapPut(pBt, get4byte(apCell[i]), PTRMAP_BTREE, pNew->pgno, &rc);
-        }
-        if( szCell[i]>pNew->minLocal ){
-          ptrmapPutOvflPtr(pNew, apCell[i], &rc);
-        }
-      }
+  }else if( ISAUTOVACUUM && !leafCorrection ){
+    /* Fix the pointer map entries associated with the right-child of each
+    ** sibling page. All other pointer map entries have already been taken
+    ** care of.  */
+    for(i=0; i<nNew; i++){
+      u32 key = get4byte(&apNew[i]->aData[8]);
+      ptrmapPut(pBt, key, PTRMAP_BTREE, apNew[i]->pgno, &rc);
     }
+  }
 
-    if( !leafCorrection ){
-      for(i=0; i<nNew; i++){
-        u32 key = get4byte(&apNew[i]->aData[8]);
-        ptrmapPut(pBt, key, PTRMAP_BTREE, apNew[i]->pgno, &rc);
-      }
-    }
+  assert( pParent->isInit );
+  TRACE(("BALANCE: finished: old=%d new=%d cells=%d\n",
+          nOld, nNew, nCell));
 
-#if 0
+  /* Free any old pages that were not reused as new pages.
+  */
+  for(i=nNew; i<nOld; i++){
+    freePage(apOld[i], &rc);
+  }
+
+#if 1
+  if( ISAUTOVACUUM && rc==SQLITE_OK && apNew[0]->isInit ){
     /* The ptrmapCheckPages() contains assert() statements that verify that
     ** all pointer map pages are set correctly. This is helpful while 
     ** debugging. This is usually disabled because a corrupt database may
     ** cause an assert() statement to fail.  */
     ptrmapCheckPages(apNew, nNew);
     ptrmapCheckPages(&pParent, 1);
-#endif
   }
-
-  assert( pParent->isInit );
-  TRACE(("BALANCE: finished: old=%d new=%d cells=%d\n",
-          nOld, nNew, nCell));
+#endif
 
   /*
   ** Cleanup before returning.
index 79bfe15f10b7dce3a87e2a7f5ba6a6f57f0a239d..e68d147de589e8c7ad1626eb4ed2813b59fb3406 100644 (file)
@@ -6835,6 +6835,14 @@ int sqlite3PagerMovepage(Pager *pPager, DbPage *pPg, Pgno pgno, int isCommit){
 
   return SQLITE_OK;
 }
+
+void sqlite3PagerRekey(DbPage *pPage, Pgno iNew){
+  PgHdr *pPg = (PgHdr*)pPage;
+  assert( pPg->flags & PGHDR_DIRTY );
+  assert( !subjRequiresPage(pPg) );
+  sqlite3PcacheMove(pPg, iNew);
+}
+
 #endif
 
 /*
@@ -7235,4 +7243,5 @@ int sqlite3PagerWalFramesize(Pager *pPager){
 }
 #endif
 
+
 #endif /* SQLITE_OMIT_DISKIO */
index c9ca8553b9d7e703f47ec7f693aebbcf9a0a8e85..f3a04bbca6159186d289f5d3854d3868d453b9dd 100644 (file)
@@ -188,6 +188,8 @@ int sqlite3SectorSize(sqlite3_file *);
 /* Functions used to truncate the database file. */
 void sqlite3PagerTruncateImage(Pager*,Pgno);
 
+void sqlite3PagerRekey(DbPage*, Pgno);
+
 #if defined(SQLITE_HAS_CODEC) && !defined(SQLITE_OMIT_WAL)
 void *sqlite3PagerCodec(DbPage *);
 #endif