]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
This is the start of an experiment in getting sqlite3_rsync to use less
authordrh <>
Fri, 2 May 2025 17:39:21 +0000 (17:39 +0000)
committerdrh <>
Fri, 2 May 2025 17:39:21 +0000 (17:39 +0000)
bandwidth when the two databases are very similar, by sending hashes
over blocks of pages initially, rather than over individual pages, then
requesting more detail when hashes do not match.

FossilOrigin-Name: 266b4b8f0104bd4b1cff87ed78b0223006bf661a9650294a2b330d50c7ee8a0c

manifest
manifest.uuid
tool/sqlite3_rsync.c

index 96f52aa1fd509d8e732454ee7052e958cd452d3d..05160f3b5ae94ea6c6b6432ddb9c697da602a9f6 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Do\snot\sallow\ssqlite3_rsync\sto\sconvert\sthe\sreplica\sfrom\sWAL-mode\sinto\nDELETE-mode,\sas\sthat\scan\sdisrupt\sexisting\sclients\son\sthe\sreplica\sside.\nDELETE-mode\sto\sWAL-mode\sconversions\sare\sallowed,\showever.\s\sSee\n[forum:/forumpost/6b575b66156673ee|forum\sthread\s6b575b66156].
-D 2025-05-02T11:18:09.023
+C This\sis\sthe\sstart\sof\san\sexperiment\sin\sgetting\ssqlite3_rsync\sto\suse\sless\nbandwidth\swhen\sthe\stwo\sdatabases\sare\svery\ssimilar,\sby\ssending\shashes\nover\sblocks\sof\spages\sinitially,\srather\sthan\sover\sindividual\spages,\sthen\nrequesting\smore\sdetail\swhen\shashes\sdo\snot\smatch.
+D 2025-05-02T17:39:21.504
 F .fossil-settings/binary-glob 61195414528fb3ea9693577e1980230d78a1f8b0a54c78cf1b9b24d0a409ed6a x
 F .fossil-settings/empty-dirs dbb81e8fc0401ac46a1491ab34a7f2c7c0452f2f06b54ebb845d024ca8283ef1
 F .fossil-settings/ignore-glob 35175cdfcf539b2318cb04a9901442804be81cd677d8b889fcc9149c21f239ea
@@ -2189,7 +2189,7 @@ F tool/spellsift.tcl 52b4b04dc4333c7ab024f09d9d66ed6b6f7c6eb00b38497a09f338fa55d
 F tool/split-sqlite3c.tcl 07e18a1d8cc3f6b3a4a1f3528e63c9b29a5c8a7bca0b8d394b231da464ce1247
 F tool/sqldiff.c 134be7866be19f8beb32043d5aea5657f01aaeae2df8d33d758ff722c78666b9
 F tool/sqlite3_analyzer.c.in 14f02cb5ec3c264cd6107d1f1dad77092b1cf440fc196c30b69ae87b56a1a43b
-F tool/sqlite3_rsync.c a8e1962d9e0418b37d6865e483640c49498efe64bf542022e845b056f6eb9cce
+F tool/sqlite3_rsync.c 438ffaf829181863ea941c8e6d5faabfe9161d7a464b3dce1857ac2fabe09273
 F tool/sqltclsh.c.in 1bcc2e9da58fadf17b0bf6a50e68c1159e602ce057210b655d50bad5aaaef898
 F tool/sqltclsh.tcl 862f4cf1418df5e1315b5db3b5ebe88969e2a784525af5fbf9596592f14ed848
 F tool/src-verify.c d00f93263aa2fa6ba0cba0106d95458e6effb94fdb5fc634f56834f90c05bbb4
@@ -2207,8 +2207,11 @@ F tool/version-info.c 3b36468a90faf1bbd59c65fd0eb66522d9f941eedd364fabccd7227350
 F tool/warnings-clang.sh bbf6a1e685e534c92ec2bfba5b1745f34fb6f0bc2a362850723a9ee87c1b31a7
 F tool/warnings.sh 49a486c5069de041aedcbde4de178293e0463ae9918ecad7539eedf0ec77a139
 F tool/win/sqlite.vsix deb315d026cc8400325c5863eef847784a219a2f
-P 4b53603fe468c0c28b818762917e41bdd870de6d4cc143688f1cdea3136c81a4
-R 77c0a71cca6be2a9f28e992f31255bba
+P 660a035b6ce6684d429b882133e032181cc1664f4efadf1bc0e4ae27d45071c4
+R 2f18aca9a98c725fdd5da1ebbb1e7566
+T *branch * faster-rsync
+T *sym-faster-rsync *
+T -sym-trunk *
 U drh
-Z 4ae06cab268d08247b608c21e7458e1c
+Z 08dacd140b0440acc75d69379c13dfa6
 # Remove this line to create a well-formed Fossil manifest.
index 0d00d48c76d5540743061ce2e64ef821b7d1ecf5..9d75464a6e2291373ccc16b6f0ce73cd7961ad32 100644 (file)
@@ -1 +1 @@
-660a035b6ce6684d429b882133e032181cc1664f4efadf1bc0e4ae27d45071c4
+266b4b8f0104bd4b1cff87ed78b0223006bf661a9650294a2b330d50c7ee8a0c
index 34faaf0fd12451aa761e0c511504e21da5968eba..6fb3650949ae3f98f1e8b76990bc25c7cadc7429 100644 (file)
@@ -75,20 +75,26 @@ struct SQLiteRsync {
 
 /* Magic numbers to identify particular messages sent over the wire.
 */
+/**** Baseline: protocol version 1 ****/
 #define ORIGIN_BEGIN    0x41     /* Initial message */
 #define ORIGIN_END      0x42     /* Time to quit */
 #define ORIGIN_ERROR    0x43     /* Error message from the remote */
 #define ORIGIN_PAGE     0x44     /* New page data */
 #define ORIGIN_TXN      0x45     /* Transaction commit */
 #define ORIGIN_MSG      0x46     /* Informational message */
+/**** Added in protocol version 2 ****/
+#define ORIGIN_DETAIL   0x47     /* Request finer-grain hash info */
+#define ORIGIN_READY    0x48     /* Ready for next round of hash exchanges */
 
+/**** Baseline: protocol version 1 ****/
 #define REPLICA_BEGIN   0x61     /* Welcome message */
 #define REPLICA_ERROR   0x62     /* Error.  Report and quit. */
 #define REPLICA_END     0x63     /* Replica wants to stop */
 #define REPLICA_HASH    0x64     /* One or more pages hashes to report */
 #define REPLICA_READY   0x65     /* Read to receive page content */
 #define REPLICA_MSG     0x66     /* Informational message */
-
+/**** Added in protocol version 2 ****/
+#define REPLICA_CONFIG  0x67     /* Hash exchange configuration */
 
 /****************************************************************************
 ** Beginning of the popen2() implementation copied from Fossil  *************
@@ -796,11 +802,49 @@ static void hashFunc(
   sqlite3_result_blob(context, HashFinal(&cx), 160/8, SQLITE_TRANSIENT);
 }
 
+/*
+** Implementation of the agghash(X) function.
+**
+** Return a 160-bit BLOB which is the hash of the concatenation
+** of all X inputs.
+*/
+static void agghashStep(
+  sqlite3_context *context,
+  int argc,
+  sqlite3_value **argv
+){
+  HashContext *pCx;
+  int eType = sqlite3_value_type(argv[0]);
+  int nByte = sqlite3_value_bytes(argv[0]);
+  if( eType==SQLITE_NULL ) return;
+  pCx = (HashContext*)sqlite3_aggregate_context(context, sizeof(*pCx));
+  if( pCx==0 ) return;
+  if( pCx->iSize==0 ) HashInit(pCx, 160);
+  if( eType==SQLITE_BLOB ){
+    HashUpdate(pCx, sqlite3_value_blob(argv[0]), nByte);
+  }else{
+    HashUpdate(pCx, sqlite3_value_text(argv[0]), nByte);
+  }
+}
+static void agghashFinal(sqlite3_context *context){
+  HashContext *pCx = (HashContext*)sqlite3_aggregate_context(context, 0);
+  if( pCx ){
+    sqlite3_result_blob(context, HashFinal(pCx), 160/8, SQLITE_TRANSIENT);
+  }
+}
+
 /* Register the hash function */
 static int hashRegister(sqlite3 *db){
-   return sqlite3_create_function(db, "hash", 1,
+  int rc;
+  rc = sqlite3_create_function(db, "hash", 1,
                 SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC,
                 0, hashFunc, 0, 0);
+  if( rc==SQLITE_OK ){
+    rc = sqlite3_create_function(db, "agghash", 1,
+                SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC,
+                0, 0, agghashStep, agghashFinal);
+  }
+  return rc;
 }
 
 /* End of the hashing logic
@@ -1192,6 +1236,13 @@ static void closeDb(SQLiteRsync *p){
 ** nPage, and szPage.  Then enter a loop responding to message from
 ** the replica:
 **
+**    REPLICA_BEGIN  iProtocol
+**
+**         An optional message sent by the replica in response to the
+**         prior ORIGIN_BEGIN with a counter-proposal for the protocol
+**         level.  If seen, try to reduce the protocol level to what is
+**         requested and send a new ORGIN_BEGIN.
+**
 **    REPLICA_ERROR  size  text
 **
 **         Report an error from the replica and quit
@@ -1202,24 +1253,36 @@ static void closeDb(SQLiteRsync *p){
 **
 **    REPLICA_HASH  hash
 **
-**         The argument is the 20-byte SHA1 hash for the next page
-**         page hashes appear in sequential order with no gaps.
+**         The argument is the 20-byte SHA1 hash for the next page or
+**         block of pages.  Hashes appear in sequential order with no gaps,
+**         unless there is an intervening REPLICA_CONFIG message.
+**
+**    REPLICA_CONFIG   pgno   cnt
+**
+**         Set counters used by REPLICA_HASH.  The next hash will start
+**         on page pgno and all subsequent hashes will cover cnt pages
+**         each.  Note that for a multi-page hash, the hash value is
+**         actually a hash of the individual page hashes.
 **
 **    REPLICA_READY
 **
 **         The replica has sent all the hashes that it intends to send.
 **         This side (the origin) can now start responding with page
-**         content for pages that do not have a matching hash.
+**         content for pages that do not have a matching hash or with
+**         ORIGIN_DETAIL messages with requests for more detail.
 */
 static void originSide(SQLiteRsync *p){
   int rc = 0;
   int c = 0;
   unsigned int nPage = 0;
-  unsigned int iPage = 0;
+  unsigned int iHash = 1;               /* Pgno for next hash to receive */
+  unsigned int nHash = 1;               /* Number of pages per hash received */
   unsigned int lockBytePage = 0;
   unsigned int szPg = 0;
-  sqlite3_stmt *pCkHash = 0;
-  sqlite3_stmt *pInsHash = 0;
+  sqlite3_stmt *pCkHash = 0;            /* Verify hash on a single page */
+  sqlite3_stmt *pCkHashN = 0;           /* Verify a multi-page hash */
+  sqlite3_stmt *pInsHash = 0;           /* Record a bad hash */
+  unsigned int nMulti = 0;              /* Multi-page hashes not matched */
   char buf[200];
 
   p->isReplica = 0;
@@ -1270,11 +1333,16 @@ static void originSide(SQLiteRsync *p){
         ** that is larger than what it knows about.  The replica sends back
         ** a counter-proposal of an earlier protocol which the origin can
         ** accept by resending a new ORIGIN_BEGIN. */
-        p->iProtocol = readByte(p);
-        writeByte(p, ORIGIN_BEGIN);
-        writeByte(p, p->iProtocol);
-        writePow2(p, p->szPage);
-        writeUint32(p, p->nPage);
+        u8 newProtocol = readByte(p);
+        if( newProtocol < p->iProtocol ){
+          p->iProtocol = newProtocol;
+          writeByte(p, ORIGIN_BEGIN);
+          writeByte(p, p->iProtocol);
+          writePow2(p, p->szPage);
+          writeUint32(p, p->nPage);
+        }else{
+          reportError(p, "Invalid REPLICA_BEGIN reply");
+        }
         break;
       }
       case REPLICA_MSG:
@@ -1282,25 +1350,60 @@ static void originSide(SQLiteRsync *p){
         readAndDisplayMessage(p, c);
         break;
       }
+      case REPLICA_CONFIG: {
+        readUint32(p, &iHash);
+        readUint32(p, &nHash);
+        break;
+      }
       case REPLICA_HASH: {
         if( pCkHash==0 ){
-          runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)");
+          runSql(p, "CREATE TEMP TABLE badHash("
+                    " pgno INTEGER PRIMARY KEY,"
+                    " sz INT)");
           pCkHash = prepareStmt(p,
             "SELECT pgno FROM sqlite_dbpage('main')"
-            " WHERE pgno=?1 AND hash(data)!=?2"
+            " WHERE pgno=?1 AND hash(data)!=?3"
           );
           if( pCkHash==0 ) break;
-          pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?)");
+          pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?1,?2)");
           if( pInsHash==0 ) break;
         }
         p->nHashSent++;
-        iPage++;
-        sqlite3_bind_int64(pCkHash, 1, iPage);
-        readBytes(p, 20, buf);
-        sqlite3_bind_blob(pCkHash, 2, buf, 20, SQLITE_STATIC);
-        rc = sqlite3_step(pCkHash);
+        if( nHash>1 ){
+          if( pCkHashN==0 ){
+            pCkHashN = prepareStmt(p,
+              "WITH a1(pgno) AS "
+                "(VALUES(?1) UNION ALL SELECT pgno+1 FROM a1 WHERE pgno<?2)"
+              "SELECT 1 FROM a1 CROSS JOIN sqlite_dbpage('main')"
+                             " USING(pgno)"
+              " WHERE agghash(hash(data))!=?3");
+            if( pCkHashN==0 ) break;
+          }
+          sqlite3_bind_int64(pCkHashN, 1, iHash);
+          sqlite3_bind_int64(pCkHashN, 2, iHash + nHash);
+          readBytes(p, 20, buf);
+          sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC);
+          if( rc==SQLITE_ROW ){
+            nMulti++;
+          }else if( rc==SQLITE_ERROR ){
+            reportError(p, "SQL statement [%s] failed: %s",
+                        sqlite3_sql(pCkHashN), sqlite3_errmsg(p->db));
+          }
+          sqlite3_reset(pCkHashN);
+        }else{
+          sqlite3_bind_int64(pCkHash, 1, iHash);
+          readBytes(p, 20, buf);
+          sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC);
+          rc = sqlite3_step(pCkHash);
+          if( rc==SQLITE_ERROR ){
+            reportError(p, "SQL statement [%s] failed: %s",
+                        sqlite3_sql(pCkHash), sqlite3_errmsg(p->db));
+          }
+          sqlite3_reset(pCkHash);
+        }
         if( rc==SQLITE_ROW ){
-          sqlite3_bind_int64(pInsHash, 1, sqlite3_column_int64(pCkHash, 0));
+          sqlite3_bind_int64(pInsHash, 1, iHash);
+          sqlite3_bind_int64(pInsHash, 2, nHash);
           rc = sqlite3_step(pInsHash);
           if( rc!=SQLITE_DONE ){
             reportError(p, "SQL statement [%s] failed: %s",
@@ -1308,42 +1411,57 @@ static void originSide(SQLiteRsync *p){
           }
           sqlite3_reset(pInsHash);
         }
-        else if( rc!=SQLITE_DONE ){
-          reportError(p, "SQL statement [%s] failed: %s",
-                      sqlite3_sql(pCkHash), sqlite3_errmsg(p->db));
-        }
-        sqlite3_reset(pCkHash);
+        iHash += nHash;
         break;
       }
       case REPLICA_READY: {
-        sqlite3_stmt *pStmt;
-        sqlite3_finalize(pCkHash);
-        sqlite3_finalize(pInsHash);
-        pCkHash = 0;
-        pInsHash = 0;
-        if( iPage+1<p->nPage ){
-          runSql(p, "WITH RECURSIVE c(n) AS"
-                    " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
-                    " INSERT INTO badHash SELECT n FROM c",
-                    iPage+1, p->nPage);
-        }
-        runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage);
-        pStmt = prepareStmt(p,
-               "SELECT pgno, data"
-               "  FROM badHash JOIN sqlite_dbpage('main') USING(pgno)");
-        if( pStmt==0 ) break;
-        while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
-          unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0);
-          const void *pContent = sqlite3_column_blob(pStmt, 1);
-          writeByte(p, ORIGIN_PAGE);
-          writeUint32(p, pgno);
-          writeBytes(p, szPg, pContent);
-          p->nPageSent++;
+        if( nMulti>0 ){
+          sqlite3_stmt *pStmt;
+          pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1");
+          if( pStmt==0 ) break;
+          while( sqlite3_step(pStmt)==SQLITE_ROW ){
+            writeByte(p, ORIGIN_DETAIL);
+            writeUint32(p, sqlite3_column_int(pStmt, 0));
+            writeUint32(p, sqlite3_column_int(pStmt, 1));
+          }
+          sqlite3_finalize(pStmt);
+          runSql(p, "DELETE FROM badHash WHERE sz>1");
+          nMulti = 0;
+          writeByte(p, ORIGIN_READY);
+        }else{
+          sqlite3_stmt *pStmt;
+          sqlite3_finalize(pCkHash);
+          sqlite3_finalize(pCkHashN);
+          sqlite3_finalize(pInsHash);
+          pCkHash = 0;
+          pInsHash = 0;
+          if( iHash+1<p->nPage ){
+            runSql(p, "WITH RECURSIVE c(n) AS"
+                      " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
+                      " INSERT INTO badHash SELECT n, 1 FROM c",
+                      iHash+1, p->nPage);
+          }
+          runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage);
+          pStmt = prepareStmt(p,
+                 "SELECT pgno, data"
+                 "  FROM badHash JOIN sqlite_dbpage('main') USING(pgno)");
+          if( pStmt==0 ) break;
+          while( sqlite3_step(pStmt)==SQLITE_ROW
+              && p->nErr==0
+              && p->nWrErr==0
+          ){
+            unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0);
+            const void *pContent = sqlite3_column_blob(pStmt, 1);
+            writeByte(p, ORIGIN_PAGE);
+            writeUint32(p, pgno);
+            writeBytes(p, szPg, pContent);
+            p->nPageSent++;
+          }
+          sqlite3_finalize(pStmt);
+          writeByte(p, ORIGIN_TXN);
+          writeUint32(p, nPage);
+          writeByte(p, ORIGIN_END);
         }
-        sqlite3_finalize(pStmt);
-        writeByte(p, ORIGIN_TXN);
-        writeUint32(p, nPage);
-        writeByte(p, ORIGIN_END);
         fflush(p->pOut);
         break;
       }
@@ -1360,6 +1478,88 @@ static void originSide(SQLiteRsync *p){
   closeDb(p);
 }
 
+/*
+** Send a REPLICA_HASH message for each entry in the sendHash table.
+** The sendHash table looks like this:
+**
+**   CREATE TABLE sendHash(
+**      fpg INTEGER PRIMARY KEY,   -- Page number of the hash
+**      npg INT                    -- Number of pages in this hash
+**   );
+**
+** If iHash is page number for the next page that the origin will
+** be expecting, and nHash is the number of pages that the origin will
+** be expecting in the hash that follows.  Send a REPLICA_CONFIG message
+** if either of these values if not correct.
+*/
+static void sendHashMessages(
+  SQLiteRsync *p,       /* The replica-side of the sync */
+  unsigned int iHash,   /* Next page expected by origin */
+  unsigned int nHash    /* Next number of pages expected by origin */
+){
+  sqlite3_stmt *pStmt;
+  pStmt = prepareStmt(p,
+    "SELECT if(npg==1,"
+    "  (SELECT hash(data) FROM sqlite_dbpage('replica') WHERE pgno=fpg),"
+    "  (WITH RECURSIVE c(n) AS"
+    "     (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE n<fpg+npg)"
+    "   SELECT agghash(hash(data))"
+    "     FROM c CROSS JOIN sqlite_dbpage('replica') ON pgno=n)) AS hash,"
+    "  fpg,"
+    "  npg"
+    " FROM sendHash ORDER BY fpg"
+  );
+  while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
+    const unsigned char *a = sqlite3_column_blob(pStmt, 0);
+    unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt, 1);
+    unsigned int npg = (unsigned int)sqlite3_column_int64(pStmt, 2);
+    if( pgno!=iHash || npg!=nHash ){
+      writeByte(p, REPLICA_CONFIG);
+      writeUint32(p, pgno);
+      writeUint32(p, npg);
+    }
+    writeByte(p, REPLICA_HASH);
+    writeBytes(p, 20, a);
+    p->nHashSent++;
+    iHash = pgno + npg;
+    nHash = npg;
+  }
+  sqlite3_finalize(pStmt);
+  runSql(p, "DELETE FROM sendHash");
+  writeByte(p, REPLICA_READY);
+  fflush(p->pOut);
+}
+
+/*
+** Make entries in the sendHash table to send hashes for
+** npg (mnemonic: Number of PaGes) pages starting with fpg
+** (mnemonic: First PaGe).
+*/
+static void subdivideHashRange(
+  SQLiteRsync *p,       /* The replica-side of the sync */
+  unsigned int fpg,     /* First page of the range */
+  unsigned int npg      /* Number of pages */
+){
+  unsigned int nChunk;   /* How many pages to request per hash */
+  sqlite3_uint64 iEnd;   /* One more than the last page */
+  if( npg<=30 ){
+    nChunk = 1;
+  }else if( npg<=1000 ){
+    nChunk = 30;
+  }else{
+    nChunk = 1000;
+  }
+  iEnd = fpg;
+  iEnd += npg;
+  runSql(p,
+    "WITH RECURSIVE c(n) AS"
+    "  (VALUES(%u) UNION ALL SELECT n+%u FROM c WHERE n<%llu)"
+    "REPLACE INTO sendHash(fpg,npg)"
+    " SELECT n, min(%llu-fpg,%u) FROM c",
+    fpg, nChunk, iEnd, iEnd, nChunk
+  );
+}
+
 /*
 ** Run the replica-side protocol.  The protocol is passive in the sense
 ** that it only response to message from the origin side.
@@ -1370,15 +1570,35 @@ static void originSide(SQLiteRsync *p){
 **         each page in the origin database (sent as a single-byte power-of-2),
 **         and the number of pages in the origin database.
 **         This procedure checks compatibility, and if everything is ok,
-**         it starts sending hashes of pages already present back to the origin.
+**         it starts sending hashes back to the origin using REPLICA_HASH
+**         and/or REPLICA_CONFIG message, followed by a single REPLICA_READY.
+**         REPLICA_CONFIG is only sent if the protocol is 2 or greater.
+**
+**    ORIGIN_ERROR  size  text
+**
+**         Report an error and quit.
 **
-**    ORIGIN_ERROR  size text
+**    ORIGIN_DETAIL  pgno  cnt
 **
-**         Report the received error and quit.
+**         The origin reports that a multi-page hash starting at pgno and
+**         spanning cnt pages failed to match.  The origin is requesting
+**         details (more REPLICA_HASH message with a smaller cnt).  The
+**         replica must wait on ORIGIN_READY before sending its reply.
 **
-**    ORIGIN_PAGE  pgno content
+**    ORIGIN_READY
 **
-**         Update the content of the given page.
+**         After sending one or more ORIGIN_DETAIL messages, the ORIGIN_READY
+**         is sent by the origin to indicate that it has finished sending
+**         requests for detail and is ready for the replicate to reply
+**         with a new round of REPLICA_CONFIG and REPLICA_HASH messages.
+**
+**    ORIGIN_PAGE  pgno  content
+**
+**         Once the origin believes it knows exactly which pages need to be
+**         updated in the replica, it starts sending those pages using these
+**         messages.  These messages will only appear immediately after
+**         REPLICA_READY.  The origin never mixes ORIGIN_DETAIL and
+**         ORIGIN_PAGE messages in the same batch.
 **
 **    ORIGIN_TXN   pgno
 **
@@ -1418,7 +1638,6 @@ static void replicaSide(SQLiteRsync *p){
         unsigned int nOPage = 0;
         unsigned int nRPage = 0, szRPage = 0;
         int rc = 0;
-        sqlite3_stmt *pStmt = 0;
 
         closeDb(p);
         p->iProtocol = readByte(p);
@@ -1458,6 +1677,12 @@ static void replicaSide(SQLiteRsync *p){
           closeDb(p);
           break;
         }
+        runSql(p,
+          "CREATE TABLE sendHash("
+          "  fpg INTEGER PRIMARY KEY,"   /* The page number of hash to send */
+          "  npg INT"                    /* Number of pages in this hash */
+          ")"
+        );
         hashRegister(p->db);
         if( runSqlReturnUInt(p, &nRPage, "PRAGMA replica.page_count") ){
           break;
@@ -1484,23 +1709,30 @@ static void replicaSide(SQLiteRsync *p){
                          "replica is %d bytes", szOPage, szRPage);
           break;
         }
-
-        pStmt = prepareStmt(p,
-                   "SELECT hash(data) FROM sqlite_dbpage('replica')"
-                   " WHERE pgno<=min(%d,%d)"
-                   " ORDER BY pgno", nRPage, nOPage);
-        while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
-          const unsigned char *a = sqlite3_column_blob(pStmt, 0);
-          writeByte(p, REPLICA_HASH);
-          writeBytes(p, 20, a);
-          p->nHashSent++;
+        if( p->iProtocol<2 ){
+          runSql(p,
+            "WITH RECURSIVE c(n) AS"
+              "(VALUES(1) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
+            "INSERT INTO sendHash(fpg, npg) SELECT n, 1 FROM c",
+            nRPage);
+        }else{
+          subdivideHashRange(p, 1, nRPage);
         }
-        sqlite3_finalize(pStmt);
-        writeByte(p, REPLICA_READY);
-        fflush(p->pOut);
+        sendHashMessages(p, 1, 1);
         runSql(p, "PRAGMA writable_schema=ON");
         break;
       }
+      case ORIGIN_DETAIL: {
+        unsigned int fpg, npg;
+        readUint32(p, &fpg);
+        readUint32(p, &npg);
+        subdivideHashRange(p, fpg, npg);
+        break;
+      }
+      case ORIGIN_READY: {
+        sendHashMessages(p, 0, 0);
+        break;
+      }
       case ORIGIN_TXN: {
         unsigned int nOPage = 0;
         readUint32(p, &nOPage);