/* Magic numbers to identify particular messages sent over the wire.
*/
+/**** Baseline: protocol version 1 ****/
#define ORIGIN_BEGIN 0x41 /* Initial message */
#define ORIGIN_END 0x42 /* Time to quit */
#define ORIGIN_ERROR 0x43 /* Error message from the remote */
#define ORIGIN_PAGE 0x44 /* New page data */
#define ORIGIN_TXN 0x45 /* Transaction commit */
#define ORIGIN_MSG 0x46 /* Informational message */
+/**** Added in protocol version 2 ****/
+#define ORIGIN_DETAIL 0x47 /* Request finer-grain hash info */
+#define ORIGIN_READY 0x48 /* Ready for next round of hash exchanges */
+/**** Baseline: protocol version 1 ****/
#define REPLICA_BEGIN 0x61 /* Welcome message */
#define REPLICA_ERROR 0x62 /* Error. Report and quit. */
#define REPLICA_END 0x63 /* Replica wants to stop */
#define REPLICA_HASH 0x64 /* One or more pages hashes to report */
#define REPLICA_READY 0x65 /* Read to receive page content */
#define REPLICA_MSG 0x66 /* Informational message */
-
+/**** Added in protocol version 2 ****/
+#define REPLICA_CONFIG 0x67 /* Hash exchange configuration */
/****************************************************************************
** Beginning of the popen2() implementation copied from Fossil *************
sqlite3_result_blob(context, HashFinal(&cx), 160/8, SQLITE_TRANSIENT);
}
+/*
+** Implementation of the agghash(X) function.
+**
+** Return a 160-bit BLOB which is the hash of the concatenation
+** of all X inputs.
+*/
+static void agghashStep(
+ sqlite3_context *context,
+ int argc,
+ sqlite3_value **argv
+){
+ HashContext *pCx;
+ int eType = sqlite3_value_type(argv[0]);
+ int nByte = sqlite3_value_bytes(argv[0]);
+ if( eType==SQLITE_NULL ) return;
+ pCx = (HashContext*)sqlite3_aggregate_context(context, sizeof(*pCx));
+ if( pCx==0 ) return;
+ if( pCx->iSize==0 ) HashInit(pCx, 160);
+ if( eType==SQLITE_BLOB ){
+ HashUpdate(pCx, sqlite3_value_blob(argv[0]), nByte);
+ }else{
+ HashUpdate(pCx, sqlite3_value_text(argv[0]), nByte);
+ }
+}
+static void agghashFinal(sqlite3_context *context){
+ HashContext *pCx = (HashContext*)sqlite3_aggregate_context(context, 0);
+ if( pCx ){
+ sqlite3_result_blob(context, HashFinal(pCx), 160/8, SQLITE_TRANSIENT);
+ }
+}
+
/* Register the hash function */
static int hashRegister(sqlite3 *db){
- return sqlite3_create_function(db, "hash", 1,
+ int rc;
+ rc = sqlite3_create_function(db, "hash", 1,
SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC,
0, hashFunc, 0, 0);
+ if( rc==SQLITE_OK ){
+ rc = sqlite3_create_function(db, "agghash", 1,
+ SQLITE_UTF8 | SQLITE_INNOCUOUS | SQLITE_DETERMINISTIC,
+ 0, 0, agghashStep, agghashFinal);
+ }
+ return rc;
}
/* End of the hashing logic
** nPage, and szPage. Then enter a loop responding to message from
** the replica:
**
+** REPLICA_BEGIN iProtocol
+**
+** An optional message sent by the replica in response to the
+** prior ORIGIN_BEGIN with a counter-proposal for the protocol
+** level. If seen, try to reduce the protocol level to what is
+** requested and send a new ORGIN_BEGIN.
+**
** REPLICA_ERROR size text
**
** Report an error from the replica and quit
**
** REPLICA_HASH hash
**
-** The argument is the 20-byte SHA1 hash for the next page
-** page hashes appear in sequential order with no gaps.
+** The argument is the 20-byte SHA1 hash for the next page or
+** block of pages. Hashes appear in sequential order with no gaps,
+** unless there is an intervening REPLICA_CONFIG message.
+**
+** REPLICA_CONFIG pgno cnt
+**
+** Set counters used by REPLICA_HASH. The next hash will start
+** on page pgno and all subsequent hashes will cover cnt pages
+** each. Note that for a multi-page hash, the hash value is
+** actually a hash of the individual page hashes.
**
** REPLICA_READY
**
** The replica has sent all the hashes that it intends to send.
** This side (the origin) can now start responding with page
-** content for pages that do not have a matching hash.
+** content for pages that do not have a matching hash or with
+** ORIGIN_DETAIL messages with requests for more detail.
*/
static void originSide(SQLiteRsync *p){
int rc = 0;
int c = 0;
unsigned int nPage = 0;
- unsigned int iPage = 0;
+ unsigned int iHash = 1; /* Pgno for next hash to receive */
+ unsigned int nHash = 1; /* Number of pages per hash received */
unsigned int lockBytePage = 0;
unsigned int szPg = 0;
- sqlite3_stmt *pCkHash = 0;
- sqlite3_stmt *pInsHash = 0;
+ sqlite3_stmt *pCkHash = 0; /* Verify hash on a single page */
+ sqlite3_stmt *pCkHashN = 0; /* Verify a multi-page hash */
+ sqlite3_stmt *pInsHash = 0; /* Record a bad hash */
+ unsigned int nMulti = 0; /* Multi-page hashes not matched */
char buf[200];
p->isReplica = 0;
** that is larger than what it knows about. The replica sends back
** a counter-proposal of an earlier protocol which the origin can
** accept by resending a new ORIGIN_BEGIN. */
- p->iProtocol = readByte(p);
- writeByte(p, ORIGIN_BEGIN);
- writeByte(p, p->iProtocol);
- writePow2(p, p->szPage);
- writeUint32(p, p->nPage);
+ u8 newProtocol = readByte(p);
+ if( newProtocol < p->iProtocol ){
+ p->iProtocol = newProtocol;
+ writeByte(p, ORIGIN_BEGIN);
+ writeByte(p, p->iProtocol);
+ writePow2(p, p->szPage);
+ writeUint32(p, p->nPage);
+ }else{
+ reportError(p, "Invalid REPLICA_BEGIN reply");
+ }
break;
}
case REPLICA_MSG:
readAndDisplayMessage(p, c);
break;
}
+ case REPLICA_CONFIG: {
+ readUint32(p, &iHash);
+ readUint32(p, &nHash);
+ break;
+ }
case REPLICA_HASH: {
if( pCkHash==0 ){
- runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)");
+ runSql(p, "CREATE TEMP TABLE badHash("
+ " pgno INTEGER PRIMARY KEY,"
+ " sz INT)");
pCkHash = prepareStmt(p,
"SELECT pgno FROM sqlite_dbpage('main')"
- " WHERE pgno=?1 AND hash(data)!=?2"
+ " WHERE pgno=?1 AND hash(data)!=?3"
);
if( pCkHash==0 ) break;
- pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?)");
+ pInsHash = prepareStmt(p, "INSERT INTO badHash VALUES(?1,?2)");
if( pInsHash==0 ) break;
}
p->nHashSent++;
- iPage++;
- sqlite3_bind_int64(pCkHash, 1, iPage);
- readBytes(p, 20, buf);
- sqlite3_bind_blob(pCkHash, 2, buf, 20, SQLITE_STATIC);
- rc = sqlite3_step(pCkHash);
+ if( nHash>1 ){
+ if( pCkHashN==0 ){
+ pCkHashN = prepareStmt(p,
+ "WITH a1(pgno) AS "
+ "(VALUES(?1) UNION ALL SELECT pgno+1 FROM a1 WHERE pgno<?2)"
+ "SELECT 1 FROM a1 CROSS JOIN sqlite_dbpage('main')"
+ " USING(pgno)"
+ " WHERE agghash(hash(data))!=?3");
+ if( pCkHashN==0 ) break;
+ }
+ sqlite3_bind_int64(pCkHashN, 1, iHash);
+ sqlite3_bind_int64(pCkHashN, 2, iHash + nHash);
+ readBytes(p, 20, buf);
+ sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC);
+ if( rc==SQLITE_ROW ){
+ nMulti++;
+ }else if( rc==SQLITE_ERROR ){
+ reportError(p, "SQL statement [%s] failed: %s",
+ sqlite3_sql(pCkHashN), sqlite3_errmsg(p->db));
+ }
+ sqlite3_reset(pCkHashN);
+ }else{
+ sqlite3_bind_int64(pCkHash, 1, iHash);
+ readBytes(p, 20, buf);
+ sqlite3_bind_blob(pCkHash, 3, buf, 20, SQLITE_STATIC);
+ rc = sqlite3_step(pCkHash);
+ if( rc==SQLITE_ERROR ){
+ reportError(p, "SQL statement [%s] failed: %s",
+ sqlite3_sql(pCkHash), sqlite3_errmsg(p->db));
+ }
+ sqlite3_reset(pCkHash);
+ }
if( rc==SQLITE_ROW ){
- sqlite3_bind_int64(pInsHash, 1, sqlite3_column_int64(pCkHash, 0));
+ sqlite3_bind_int64(pInsHash, 1, iHash);
+ sqlite3_bind_int64(pInsHash, 2, nHash);
rc = sqlite3_step(pInsHash);
if( rc!=SQLITE_DONE ){
reportError(p, "SQL statement [%s] failed: %s",
}
sqlite3_reset(pInsHash);
}
- else if( rc!=SQLITE_DONE ){
- reportError(p, "SQL statement [%s] failed: %s",
- sqlite3_sql(pCkHash), sqlite3_errmsg(p->db));
- }
- sqlite3_reset(pCkHash);
+ iHash += nHash;
break;
}
case REPLICA_READY: {
- sqlite3_stmt *pStmt;
- sqlite3_finalize(pCkHash);
- sqlite3_finalize(pInsHash);
- pCkHash = 0;
- pInsHash = 0;
- if( iPage+1<p->nPage ){
- runSql(p, "WITH RECURSIVE c(n) AS"
- " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
- " INSERT INTO badHash SELECT n FROM c",
- iPage+1, p->nPage);
- }
- runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage);
- pStmt = prepareStmt(p,
- "SELECT pgno, data"
- " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)");
- if( pStmt==0 ) break;
- while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
- unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0);
- const void *pContent = sqlite3_column_blob(pStmt, 1);
- writeByte(p, ORIGIN_PAGE);
- writeUint32(p, pgno);
- writeBytes(p, szPg, pContent);
- p->nPageSent++;
+ if( nMulti>0 ){
+ sqlite3_stmt *pStmt;
+ pStmt = prepareStmt(p,"SELECT pgno, sz FROM badHash WHERE sz>1");
+ if( pStmt==0 ) break;
+ while( sqlite3_step(pStmt)==SQLITE_ROW ){
+ writeByte(p, ORIGIN_DETAIL);
+ writeUint32(p, sqlite3_column_int(pStmt, 0));
+ writeUint32(p, sqlite3_column_int(pStmt, 1));
+ }
+ sqlite3_finalize(pStmt);
+ runSql(p, "DELETE FROM badHash WHERE sz>1");
+ nMulti = 0;
+ writeByte(p, ORIGIN_READY);
+ }else{
+ sqlite3_stmt *pStmt;
+ sqlite3_finalize(pCkHash);
+ sqlite3_finalize(pCkHashN);
+ sqlite3_finalize(pInsHash);
+ pCkHash = 0;
+ pInsHash = 0;
+ if( iHash+1<p->nPage ){
+ runSql(p, "WITH RECURSIVE c(n) AS"
+ " (VALUES(%d) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
+ " INSERT INTO badHash SELECT n, 1 FROM c",
+ iHash+1, p->nPage);
+ }
+ runSql(p, "DELETE FROM badHash WHERE pgno=%d", lockBytePage);
+ pStmt = prepareStmt(p,
+ "SELECT pgno, data"
+ " FROM badHash JOIN sqlite_dbpage('main') USING(pgno)");
+ if( pStmt==0 ) break;
+ while( sqlite3_step(pStmt)==SQLITE_ROW
+ && p->nErr==0
+ && p->nWrErr==0
+ ){
+ unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt,0);
+ const void *pContent = sqlite3_column_blob(pStmt, 1);
+ writeByte(p, ORIGIN_PAGE);
+ writeUint32(p, pgno);
+ writeBytes(p, szPg, pContent);
+ p->nPageSent++;
+ }
+ sqlite3_finalize(pStmt);
+ writeByte(p, ORIGIN_TXN);
+ writeUint32(p, nPage);
+ writeByte(p, ORIGIN_END);
}
- sqlite3_finalize(pStmt);
- writeByte(p, ORIGIN_TXN);
- writeUint32(p, nPage);
- writeByte(p, ORIGIN_END);
fflush(p->pOut);
break;
}
closeDb(p);
}
+/*
+** Send a REPLICA_HASH message for each entry in the sendHash table.
+** The sendHash table looks like this:
+**
+** CREATE TABLE sendHash(
+** fpg INTEGER PRIMARY KEY, -- Page number of the hash
+** npg INT -- Number of pages in this hash
+** );
+**
+** If iHash is page number for the next page that the origin will
+** be expecting, and nHash is the number of pages that the origin will
+** be expecting in the hash that follows. Send a REPLICA_CONFIG message
+** if either of these values if not correct.
+*/
+static void sendHashMessages(
+ SQLiteRsync *p, /* The replica-side of the sync */
+ unsigned int iHash, /* Next page expected by origin */
+ unsigned int nHash /* Next number of pages expected by origin */
+){
+ sqlite3_stmt *pStmt;
+ pStmt = prepareStmt(p,
+ "SELECT if(npg==1,"
+ " (SELECT hash(data) FROM sqlite_dbpage('replica') WHERE pgno=fpg),"
+ " (WITH RECURSIVE c(n) AS"
+ " (SELECT fpg UNION ALL SELECT n+1 FROM c WHERE n<fpg+npg)"
+ " SELECT agghash(hash(data))"
+ " FROM c CROSS JOIN sqlite_dbpage('replica') ON pgno=n)) AS hash,"
+ " fpg,"
+ " npg"
+ " FROM sendHash ORDER BY fpg"
+ );
+ while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
+ const unsigned char *a = sqlite3_column_blob(pStmt, 0);
+ unsigned int pgno = (unsigned int)sqlite3_column_int64(pStmt, 1);
+ unsigned int npg = (unsigned int)sqlite3_column_int64(pStmt, 2);
+ if( pgno!=iHash || npg!=nHash ){
+ writeByte(p, REPLICA_CONFIG);
+ writeUint32(p, pgno);
+ writeUint32(p, npg);
+ }
+ writeByte(p, REPLICA_HASH);
+ writeBytes(p, 20, a);
+ p->nHashSent++;
+ iHash = pgno + npg;
+ nHash = npg;
+ }
+ sqlite3_finalize(pStmt);
+ runSql(p, "DELETE FROM sendHash");
+ writeByte(p, REPLICA_READY);
+ fflush(p->pOut);
+}
+
+/*
+** Make entries in the sendHash table to send hashes for
+** npg (mnemonic: Number of PaGes) pages starting with fpg
+** (mnemonic: First PaGe).
+*/
+static void subdivideHashRange(
+ SQLiteRsync *p, /* The replica-side of the sync */
+ unsigned int fpg, /* First page of the range */
+ unsigned int npg /* Number of pages */
+){
+ unsigned int nChunk; /* How many pages to request per hash */
+ sqlite3_uint64 iEnd; /* One more than the last page */
+ if( npg<=30 ){
+ nChunk = 1;
+ }else if( npg<=1000 ){
+ nChunk = 30;
+ }else{
+ nChunk = 1000;
+ }
+ iEnd = fpg;
+ iEnd += npg;
+ runSql(p,
+ "WITH RECURSIVE c(n) AS"
+ " (VALUES(%u) UNION ALL SELECT n+%u FROM c WHERE n<%llu)"
+ "REPLACE INTO sendHash(fpg,npg)"
+ " SELECT n, min(%llu-fpg,%u) FROM c",
+ fpg, nChunk, iEnd, iEnd, nChunk
+ );
+}
+
/*
** Run the replica-side protocol. The protocol is passive in the sense
** that it only response to message from the origin side.
** each page in the origin database (sent as a single-byte power-of-2),
** and the number of pages in the origin database.
** This procedure checks compatibility, and if everything is ok,
-** it starts sending hashes of pages already present back to the origin.
+** it starts sending hashes back to the origin using REPLICA_HASH
+** and/or REPLICA_CONFIG message, followed by a single REPLICA_READY.
+** REPLICA_CONFIG is only sent if the protocol is 2 or greater.
+**
+** ORIGIN_ERROR size text
+**
+** Report an error and quit.
**
-** ORIGIN_ERROR size text
+** ORIGIN_DETAIL pgno cnt
**
-** Report the received error and quit.
+** The origin reports that a multi-page hash starting at pgno and
+** spanning cnt pages failed to match. The origin is requesting
+** details (more REPLICA_HASH message with a smaller cnt). The
+** replica must wait on ORIGIN_READY before sending its reply.
**
-** ORIGIN_PAGE pgno content
+** ORIGIN_READY
**
-** Update the content of the given page.
+** After sending one or more ORIGIN_DETAIL messages, the ORIGIN_READY
+** is sent by the origin to indicate that it has finished sending
+** requests for detail and is ready for the replicate to reply
+** with a new round of REPLICA_CONFIG and REPLICA_HASH messages.
+**
+** ORIGIN_PAGE pgno content
+**
+** Once the origin believes it knows exactly which pages need to be
+** updated in the replica, it starts sending those pages using these
+** messages. These messages will only appear immediately after
+** REPLICA_READY. The origin never mixes ORIGIN_DETAIL and
+** ORIGIN_PAGE messages in the same batch.
**
** ORIGIN_TXN pgno
**
unsigned int nOPage = 0;
unsigned int nRPage = 0, szRPage = 0;
int rc = 0;
- sqlite3_stmt *pStmt = 0;
closeDb(p);
p->iProtocol = readByte(p);
closeDb(p);
break;
}
+ runSql(p,
+ "CREATE TABLE sendHash("
+ " fpg INTEGER PRIMARY KEY," /* The page number of hash to send */
+ " npg INT" /* Number of pages in this hash */
+ ")"
+ );
hashRegister(p->db);
if( runSqlReturnUInt(p, &nRPage, "PRAGMA replica.page_count") ){
break;
"replica is %d bytes", szOPage, szRPage);
break;
}
-
- pStmt = prepareStmt(p,
- "SELECT hash(data) FROM sqlite_dbpage('replica')"
- " WHERE pgno<=min(%d,%d)"
- " ORDER BY pgno", nRPage, nOPage);
- while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 && p->nWrErr==0 ){
- const unsigned char *a = sqlite3_column_blob(pStmt, 0);
- writeByte(p, REPLICA_HASH);
- writeBytes(p, 20, a);
- p->nHashSent++;
+ if( p->iProtocol<2 ){
+ runSql(p,
+ "WITH RECURSIVE c(n) AS"
+ "(VALUES(1) UNION ALL SELECT n+1 FROM c WHERE n<%d)"
+ "INSERT INTO sendHash(fpg, npg) SELECT n, 1 FROM c",
+ nRPage);
+ }else{
+ subdivideHashRange(p, 1, nRPage);
}
- sqlite3_finalize(pStmt);
- writeByte(p, REPLICA_READY);
- fflush(p->pOut);
+ sendHashMessages(p, 1, 1);
runSql(p, "PRAGMA writable_schema=ON");
break;
}
+ case ORIGIN_DETAIL: {
+ unsigned int fpg, npg;
+ readUint32(p, &fpg);
+ readUint32(p, &npg);
+ subdivideHashRange(p, fpg, npg);
+ break;
+ }
+ case ORIGIN_READY: {
+ sendHashMessages(p, 0, 0);
+ break;
+ }
case ORIGIN_TXN: {
unsigned int nOPage = 0;
readUint32(p, &nOPage);