"copy of ORIGIN\n"
;
+typedef unsigned char u8;
+
/* Context for the run */
typedef struct SQLiteRsync SQLiteRsync;
struct SQLiteRsync {
const char *zReplica; /* Name of the replica */
FILE *pOut; /* Transmit to the other side */
FILE *pIn; /* Receive from the other side */
- sqlite3_uint64 nOut; /* Bytes transmitted */
- sqlite3_uint64 nIn; /* Bytes received */
sqlite3 *db; /* Database connection */
int nErr; /* Number of errors encountered */
- int eVerbose; /* Bigger for more output. 0 means none. */
- int bCommCheck; /* True to debug the communication protocol */
- int isRemote; /* On the remote side of a connection */
+ u8 eVerbose; /* Bigger for more output. 0 means none. */
+ u8 bCommCheck; /* True to debug the communication protocol */
+ u8 isRemote; /* On the remote side of a connection */
+ sqlite3_uint64 nOut; /* Bytes transmitted */
+ sqlite3_uint64 nIn; /* Bytes received */
+ unsigned int nPage; /* Total number of pages in the database */
+ unsigned int szPage; /* Database page size */
+ unsigned int nHashSent; /* Hashes sent (replica to origin) */
+ unsigned int nPageSent; /* Page contents sent (origin to replica) */
};
unsigned char buf[4];
if( fread(buf, sizeof(buf), 1, p->pIn)==1 ){
*pU = (buf[0]<<24) | (buf[1]<<16) | (buf[2]<<8) | buf[3];
+ p->nIn += 4;
return 0;
}else{
p->nErr++;
p->nErr++;
return 1;
}
+ p->nOut += 4;
return 0;
}
+/* Read a single byte from the wire.
+*/
+int readByte(SQLiteRsync *p){
+ int c = fgetc(p->pIn);
+ if( c!=EOF ) p->nIn++;
+ return c;
+}
+
+/* Write a single byte into the wire.
+*/
+void writeByte(SQLiteRsync *p, int c){
+ fputc(c, p->pOut);
+ p->nOut++;
+}
+
+/* Read an array of bytes from the wire.
+*/
+void readBytes(SQLiteRsync *p, int nByte, void *pData){
+ if( fread(pData, 1, nByte, p->pIn)==nByte ){
+ p->nIn += nByte;
+ }else{
+ p->nErr++;
+ }
+}
+
+/* Write an array of bytes onto the wire.
+*/
+void writeBytes(SQLiteRsync *p, int nByte, const void *pData){
+ if( fwrite(pData, 1, nByte, p->pOut)==nByte ){
+ p->nOut += nByte;
+ }else{
+ p->nErr++;
+ }
+}
+
/* Report an error.
**
** If this happens on the remote side, we send back a REMOTE_ERROR
putc(ORIGIN_ERROR, p->pOut);
}
writeUint32(p, nMsg);
- fwrite(zMsg, nMsg, 1, p->pOut);
+ writeBytes(p, nMsg, zMsg);
fflush(p->pOut);
}else{
fprintf(stderr, "%s\n", zMsg);
return;
}
memset(zMsg, 0, n+1);
- fread(zMsg, 1, n, p->pIn);
+ readBytes(p, n, zMsg);
fprintf(stderr,"ERROR: %s\n", zMsg);
sqlite3_free(zMsg);
}
va_end(ap);
if( pStmt ){
int rc = sqlite3_step(pStmt);
+ if( rc==SQLITE_ROW ) rc = sqlite3_step(pStmt);
if( rc!=SQLITE_OK && rc!=SQLITE_DONE ){
reportError(p, "SQL statement [%s] failed: %s", zSql,
sqlite3_errmsg(p->db));
/*
** Run the origin-side protocol.
**
-** 1. Send the origin-begin message
-** 2. Receive replica-begin message
-** - Error check and abort if necessary
-** 3. Receive replica-hash messages
-** 4. BEGIN
-** 5. Send changed pages
-** 6. COMMIT
-** 7. Send origin-end message
+** Begin by sending the ORIGIN_BEGIN message with two arguments,
+** nPage, and szPage. Then enter a loop responding to message from
+** the replica:
+**
+** REPLICA_ERROR size text
+**
+** Report an error from the replica and quit
+**
+** REPLICA_END
+**
+** The replica is terminating. Stop processing now.
+**
+** REPLICA_HASH hash
+**
+** The argument is the 20-byte SHA1 hash for the next page
+** page hashes appear in sequential order with no gaps.
+**
+** REPLICA_READY
+**
+** The replica has sent all the hashes that it intends to send.
+** This side (the origin) can now start responding with page
+** content for pages that do not have a matching hash.
*/
static void originSide(SQLiteRsync *p){
int rc = 0;
int c = 0;
unsigned int nPage = 0;
+ unsigned int iPage = 0;
unsigned int szPg = 0;
- char buf[100];
+ sqlite3_stmt *pCkHash = 0;
+ char buf[200];
if( p->bCommCheck ){
fprintf(p->pOut, "sqlite3-rsync origin-begin %s\n", p->zOrigin);
}
/* Open the ORIGIN database. */
- rc = sqlite3_open_v2(p->zOrigin, &p->db, SQLITE_OPEN_READONLY, 0);
+ rc = sqlite3_open_v2(p->zOrigin, &p->db, SQLITE_OPEN_READWRITE, 0);
if( rc ){
reportError(p, "unable to open origin database file \"%s\": %s",
sqlite3_errmsg(p->db));
if( p->nErr==0 ){
/* Send the ORIGIN_BEGIN message */
- fputc(ORIGIN_BEGIN, p->pOut);
+ writeByte(p, ORIGIN_BEGIN);
writeUint32(p, nPage);
writeUint32(p, szPg);
fflush(p->pOut);
+ p->nPage = nPage;
+ p->szPage = szPg;
}
/* Respond to message from the replica */
- while( p->nErr==0 && (c = fgetc(p->pIn))!=EOF ){
+ while( p->nErr==0 && (c = readByte(p))!=EOF && c!=REPLICA_END ){
switch( c ){
case REPLICA_ERROR: {
readAndDisplayError(p);
break;
}
- case REPLICA_BEGIN: {
- break;
- }
- case REPLICA_END: {
- break;
- }
case REPLICA_HASH: {
+ if( pCkHash==0 ){
+ runSql(p, "CREATE TEMP TABLE badHash(pgno INTEGER PRIMARY KEY)");
+ pCkHash = prepareStmt(p,
+ "INSERT INTO badHash SELECT pgno FROM sqlite_dbpage('main')"
+ " WHERE pgno=?1 AND sha1b(data)!=?2"
+ );
+ if( pCkHash==0 ) break;
+ }
+ p->nHashSent++;
+ iPage++;
+ sqlite3_bind_int64(pCkHash, 1, iPage);
+ readBytes(p, 20, buf);
+ sqlite3_bind_blob(pCkHash, 2, buf, 20, SQLITE_STATIC);
+ rc = sqlite3_step(pCkHash);
+ if( rc!=SQLITE_DONE ){
+ reportError(p, "SQL statement [%s] failed: %s",
+ sqlite3_sql(pCkHash), sqlite3_errmsg(p->db));
+ }
+ sqlite3_reset(pCkHash);
break;
}
case REPLICA_READY: {
- break;
+ sqlite3_stmt *pStmt;
+ sqlite3_finalize(pCkHash);
+ pCkHash = 0;
+ pStmt = prepareStmt(p,
+ "SELECT pgno, data"
+ " FROM badHash JOIN sqlite_dbpage('main') USING(pgno) "
+ "UNION ALL "
+ "SELECT pgno, data"
+ " FROM sqlite_dbpage('main')"
+ " WHERE pgno>%d",
+ iPage);
+ if( pStmt==0 ) break;
+ while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 ){
+ const void *pContent = sqlite3_column_blob(pStmt, 1);
+ writeByte(p, ORIGIN_PAGE);
+ writeUint32(p, (unsigned int)sqlite3_column_int64(pStmt, 0));
+ writeBytes(p, szPg, pContent);
+ p->nPageSent++;
+ }
+ sqlite3_finalize(pStmt);
+ writeByte(p, ORIGIN_TXN);
+ writeUint32(p, nPage);
+ writeByte(p, ORIGIN_END);
+ goto origin_end;
}
default: {
reportError(p, "Origin side received unknown message: 0x%02x", c);
}
}
+origin_end:
+ if( pCkHash ) sqlite3_finalize(pCkHash);
closeDb(p);
}
/*
-** Run the replica-side protocol.
+** Run the replica-side protocol. The protocol is passive in the sense
+** that it only response to message from the origin side.
+**
+** ORIGIN_BEGIN nPage szPage
+**
+** The origin is reporting the number of pages and the size of each
+** pages. This procedure checks compatibility, and if everything is
+** ok, it sends hash for all its extant pages.
**
-** 1. Receive the origin-begin message
-** - Error check. If unable to continue, send replica-error and quit
-** 2. BEGIN IMMEDIATE
-** 3. Send replica-begin message
-** 4. Send replica-hash messages
-** 5. Receive changed pages and apply them
-** 6. Receive origin-end message
-** 7. COMMIT
+** ORIGIN_ERROR size text
+**
+** Report the received error and quit.
+**
+** ORIGIN_PAGE pgno content
+**
+** Update the content of the given page.
+**
+** ORIGIN_TXN pgno
+**
+** Close the update transaction. The total database size is pgno
+** pages.
+**
+** ORIGIN_END
+**
+** Expect no more transmissions from the origin.
*/
static void replicaSide(SQLiteRsync *p){
int c;
- char buf[100];
+ sqlite3_stmt *pIns = 0;
+ unsigned int szOPage = 0;
+ char buf[65536];
if( p->bCommCheck ){
echoOneLine(p);
fprintf(p->pOut, "replica-begin %s\n", p->zReplica);
/* Respond to message from the origin. The origin will initiate the
** the conversation with an ORIGIN_BEGIN message.
*/
- while( p->nErr==0 && (c = fgetc(p->pIn))!=EOF ){
+ while( p->nErr==0 && (c = readByte(p))!=EOF && c!=ORIGIN_END ){
switch( c ){
case ORIGIN_ERROR: {
readAndDisplayError(p);
break;
}
case ORIGIN_BEGIN: {
- unsigned int nOPage = 0, szOPage = 0;
+ unsigned int nOPage = 0;
unsigned int nRPage = 0, szRPage = 0;
int rc = 0;
sqlite3_stmt *pStmt = 0;
readUint32(p, &nOPage);
readUint32(p, &szOPage);
if( p->nErr ) break;
+ p->nPage = nOPage;
+ p->szPage = szOPage;
rc = sqlite3_open(p->zReplica, &p->db);
if( rc ){
reportError(p, "cannot open replica database \"%s\": %s",
break;
}
pStmt = prepareStmt(p,
- "SELECT pgno, sha1(data) FROM sqlite_dbpage"
- " WHERE pgno<=min(%d,%d)", nRPage, nOPage);
+ "SELECT sha1b(data) FROM sqlite_dbpage"
+ " WHERE pgno<=min(%d,%d)"
+ " ORDER BY pgno", nRPage, nOPage);
+ while( sqlite3_step(pStmt)==SQLITE_ROW && p->nErr==0 ){
+ const unsigned char *a = sqlite3_column_blob(pStmt, 0);
+ writeByte(p, REPLICA_HASH);
+ writeBytes(p, 20, a);
+ p->nHashSent++;
+ }
sqlite3_finalize(pStmt);
+ writeByte(p, REPLICA_READY);
+ fflush(p->pOut);
break;
}
- case ORIGIN_END: {
+ case ORIGIN_TXN: {
+ unsigned int nOPage = 0;
+ readUint32(p, &nOPage);
+ if( pIns==0 ){
+ /* Nothing has changed */
+ runSql(p, "COMMIT");
+ }else if( p->nErr ){
+ runSql(p, "ROLLBACK");
+ }else{
+ int rc;
+ sqlite3_bind_int64(pIns, 1, nOPage);
+ sqlite3_bind_null(pIns, 2);
+ rc = sqlite3_step(pIns);
+ if( rc!=SQLITE_DONE ){
+ reportError(p, "SQL statement [%s] failed: %s",
+ sqlite3_sql(pIns), sqlite3_errmsg(p->db));
+ }
+ sqlite3_reset(pIns);
+ runSql(p, "COMMIT");
+ }
break;
}
case ORIGIN_PAGE: {
+ unsigned int pgno = 0;
+ int rc;
+ readUint32(p, &pgno);
+ if( p->nErr ) break;
+ if( pIns==0 ){
+ pIns = prepareStmt(p,
+ "INSERT INTO sqlite_dbpage(pgno,data,schema) VALUES(?1,?2,'main')"
+ );
+ if( pIns==0 ) break;
+ }
+ readBytes(p, szOPage, buf);
+ if( p->nErr ) break;
+ p->nPageSent++;
+ sqlite3_bind_int64(pIns, 1, pgno);
+ sqlite3_bind_blob(pIns, 2, buf, szOPage, SQLITE_STATIC);
+ rc = sqlite3_step(pIns);
+ if( rc!=SQLITE_DONE ){
+ reportError(p, "SQL statement [%s] failed: %s",
+ sqlite3_sql(pIns), sqlite3_errmsg(p->db));
+ }
+ sqlite3_reset(pIns);
break;
}
default: {
}
}
+ if( pIns ) sqlite3_finalize(pIns);
closeDb(p);
}
+/*
+** The argument might be -vvv...vv with any number of "v"s. Return
+** the number of "v"s. Return 0 if the argument is not a -vvv...v.
+*/
+static int numVs(const char *z){
+ int n = 0;
+ if( z[0]!='-' ) return 0;
+ z++;
+ if( z[0]=='-' ) z++;
+ while( z[0]=='v' ){ n++; z++; }
+ if( z[0]==0 ) return n;
+ return 0;
+}
+
/*
** Parse command-line arguments. Dispatch subroutines to do the
isReplica = 1;
continue;
}
- if( strcmp(z, "-v")==0 ){
- ctx.eVerbose++;
+ if( numVs(z) ){
+ ctx.eVerbose += numVs(z);
continue;
}
if( strcmp(z, "--ssh")==0 ){
fprintf(stderr, "Could not start auxiliary process: %s\n", zCmd);
return 1;
}
- originSide(&ctx);
+ replicaSide(&ctx);
}else if( (zDiv = strchr(ctx.zReplica,':'))!=0 ){
/* Local ORIGIN and remote REPLICA */
sqlite3_str *pStr = sqlite3_str_new(0);
}else{
/* Local ORIGIN and REPLICA */
sqlite3_str *pStr = sqlite3_str_new(0);
- append_escaped_arg(pStr, zExe, 1);
+ append_escaped_arg(pStr, argv[0], 1);
append_escaped_arg(pStr, "--replica", 0);
if( ctx.bCommCheck ){
append_escaped_arg(pStr, "--commcheck", 0);
}
originSide(&ctx);
}
+ if( ctx.eVerbose ){
+ if( ctx.nErr ) printf("%d errors, ", ctx.nErr);
+ printf("%lld bytes sent, %lld bytes received\n", ctx.nOut, ctx.nIn);
+ if( ctx.eVerbose>=2 ){
+ printf("Database is %u pages of %u bytes each.\n",
+ ctx.nPage, ctx.szPage);
+ printf("Sent %u hashes, %u page contents\n",
+ ctx.nHashSent, ctx.nPageSent);
+ }
+ }
sqlite3_free(zCmd);
if( pIn!=0 && pOut!=0 ){
pclose2(pIn, pOut, childPid);