--- /dev/null
+# 2014 August 16
+#
+# The author disclaims copyright to this source code. In place of
+# a legal notice, here is a blessing:
+#
+# May you do good and not evil.
+# May you find forgiveness for yourself and forgive others.
+# May you share freely, never taking more than you give.
+#
+#***********************************************************************
+#
+# This file implements regression tests for sessions SQLite extension.
+# Specifically, this file contains tests for "patchset" changes.
+#
+
+if {![info exists testdir]} {
+ set testdir [file join [file dirname [info script]] .. .. test]
+}
+source [file join [file dirname [info script]] session_common.tcl]
+source $testdir/tester.tcl
+ifcapable !session {finish_test; return}
+
+set testprefix sessionB
+
+#
+# 1.*: Test that the blobs returned by the session_patchset() API are
+# as expected. Also the sqlite3_changeset_iter functions.
+#
+# 2.*: Test that patchset blobs are handled by sqlite3changeset_apply().
+#
+# 3.*: Test that sqlite3changeset_invert() works with patchset blobs.
+# Correct behaviour is to return SQLITE_CORRUPT.
+
+proc do_patchset_test {tn session res} {
+ set r [list]
+ foreach x $res {lappend r $x}
+ uplevel do_test $tn [list [subst -nocommands {
+ set x [list]
+ sqlite3session_foreach c [$session patchset] { lappend x [set c] }
+ set x
+ }]] [list $r]
+}
+
+proc do_sql2patchset_test {tn sql res} {
+ sqlite3session S db main
+ S attach *
+ execsql $sql
+ uplevel [list do_patchset_test $tn S $res]
+ S delete
+}
+
+#-------------------------------------------------------------------------
+# Run simple tests of the _patchset() API.
+#
+do_execsql_test 1.0 {
+ CREATE TABLE t1(a, b, c, d, PRIMARY KEY(d, a));
+ INSERT INTO t1 VALUES(1, 2, 3, 4);
+ INSERT INTO t1 VALUES(5, 6, 7, 8);
+ INSERT INTO t1 VALUES(9, 10, 11, 12);
+}
+
+do_test 1.1 {
+ sqlite3session S db main
+ S attach t1
+ execsql {
+ INSERT INTO t1 VALUES('w', 'x', 'y', 'z');
+ DELETE FROM t1 WHERE d=4;
+ UPDATE t1 SET c = 14 WHERE a=5;
+ }
+} {}
+
+do_patchset_test 1.2 S {
+ {UPDATE t1 0 X..X {i 5 {} {} {} {} i 8} {{} {} {} {} i 14 {} {}}}
+ {INSERT t1 0 X..X {} {t w t x t y t z}}
+ {DELETE t1 0 X..X {i 1 {} {} {} {} i 4} {}}
+}
+
+do_test 1.3 {
+ S delete
+} {}
+
+do_sql2patchset_test 1.4 {
+ DELETE FROM t1;
+} {
+ {DELETE t1 0 X..X {i 5 {} {} {} {} i 8} {}}
+ {DELETE t1 0 X..X {t w {} {} {} {} t z} {}}
+ {DELETE t1 0 X..X {i 9 {} {} {} {} i 12} {}}
+}
+
+do_sql2patchset_test 1.5 {
+ INSERT INTO t1 VALUES(X'61626364', NULL, NULL, 4.2);
+ INSERT INTO t1 VALUES(4.2, NULL, NULL, X'61626364');
+} {
+ {INSERT t1 0 X..X {} {f 4.2 n {} n {} b abcd}}
+ {INSERT t1 0 X..X {} {b abcd n {} n {} f 4.2}}
+}
+
+do_sql2patchset_test 1.6 {
+ UPDATE t1 SET b=45 WHERE typeof(a)=='blob';
+ UPDATE t1 SET c='zzzz' WHERE typeof(a)!='blob';
+} {
+ {UPDATE t1 0 X..X {f 4.2 {} {} {} {} b abcd} {{} {} {} {} t zzzz {} {}}}
+ {UPDATE t1 0 X..X {b abcd {} {} {} {} f 4.2} {{} {} i 45 {} {} {} {}}}
+}
+
+do_sql2patchset_test 1.7 {
+ UPDATE t1 SET b='xyz' WHERE typeof(a)=='blob';
+ UPDATE t1 SET c='xyz' WHERE typeof(a)!='blob';
+ UPDATE t1 SET b=45 WHERE typeof(a)=='blob';
+ UPDATE t1 SET c='zzzz' WHERE typeof(a)!='blob';
+} {
+}
+
+do_sql2patchset_test 1.8 {
+ DELETE FROM t1;
+} {
+ {DELETE t1 0 X..X {f 4.2 {} {} {} {} b abcd} {}}
+ {DELETE t1 0 X..X {b abcd {} {} {} {} f 4.2} {}}
+}
+
+#-------------------------------------------------------------------------
+# Run simple tests of _apply() with patchset objects.
+#
+reset_db
+
+proc noop {args} { error $args }
+proc exec_rollback_replay {sql} {
+ sqlite3session S db main
+ S attach *
+ execsql BEGIN
+ execsql $sql
+ set patchset [S patchset]
+ S delete
+ execsql ROLLBACK
+ sqlite3changeset_apply db $patchset noop
+}
+
+do_execsql_test 2.0 {
+ CREATE TABLE t2(a, b, c, d, PRIMARY KEY(b,c));
+ CREATE TABLE t3(w, x, y, z, PRIMARY KEY(w));
+}
+
+do_test 2.1 {
+ exec_rollback_replay {
+ INSERT INTO t2 VALUES(1, 2, 3, 4);
+ INSERT INTO t2 VALUES('w', 'x', 'y', 'z');
+ }
+ execsql { SELECT * FROM t2 }
+} {1 2 3 4 w x y z}
+
+do_test 2.2 {
+ exec_rollback_replay {
+ DELETE FROM t2 WHERE a=1;
+ UPDATE t2 SET d = 'a';
+ }
+ execsql { SELECT * FROM t2 }
+} {w x y a}
+
+#-------------------------------------------------------------------------
+# sqlite3changeset_invert()
+#
+reset_db
+
+do_execsql_test 3.1 { CREATE TABLE t1(x PRIMARY KEY, y) }
+do_test 3.2 {
+ sqlite3session S db main
+ S attach *
+ execsql { INSERT INTO t1 VALUES(1, 2) }
+ set patchset [S patchset]
+ S delete
+ list [catch { sqlite3changeset_invert $patchset } msg] [set msg]
+} {1 SQLITE_CORRUPT}
+
+
+#-------------------------------------------------------------------------
+# sqlite3changeset_concat()
+#
+reset_db
+
+proc do_patchconcat_test {tn args} {
+ set nSql [expr [llength $args]-1]
+ set res [lindex $args $nSql]
+ set patchlist [list]
+
+ execsql BEGIN
+ foreach sql [lrange $args 0 end-1] {
+ sqlite3session S db main
+ S attach *
+ execsql $sql
+ lappend patchlist [S patchset]
+ S delete
+ }
+ execsql ROLLBACK
+
+ set patch [lindex $patchlist 0]
+ foreach p [lrange $patchlist 1 end] {
+ set patch [sqlite3changeset_concat $patch $p]
+ }
+
+ set x [list]
+ sqlite3session_foreach c $patch { lappend x $c }
+
+ uplevel [list do_test $tn [list set {} $x] [list {*}$res]]
+}
+
+do_execsql_test 4.1.1 {
+ CREATE TABLE t1(x PRIMARY KEY, y, z);
+}
+do_patchconcat_test 4.1.2 {
+ INSERT INTO t1 VALUES(1, 2, 3);
+} {
+ INSERT INTO t1 VALUES(4, 5, 6);
+} {
+ {INSERT t1 0 X.. {} {i 1 i 2 i 3}}
+ {INSERT t1 0 X.. {} {i 4 i 5 i 6}}
+}
+
+if 0 {
+do_execsql_test 4.2.1 {
+ INSERT INTO t1 VALUES(1, 2, 3);
+ INSERT INTO t1 VALUES(4, 5, 6);
+}
+do_patchconcat_test 4.2.2 {
+ UPDATE t1 SET z = 'abc' WHERE x=1
+} {
+ UPDATE t1 SET z = 'def' WHERE x=4
+} {
+}
+}
+
+finish_test
+
+
struct sqlite3_changeset_iter {
u8 *aChangeset; /* Pointer to buffer containing changeset */
int nChangeset; /* Number of bytes in aChangeset */
+ int bPatchset; /* True if this is a patchset */
u8 *pNext; /* Pointer to next change within aChangeset */
int rc; /* Iterator error code */
sqlite3_stmt *pConflict; /* Points to conflicting row, if any */
**
** 1 byte: Constant 0x54 (capital 'T')
** Varint: Big-endian integer set to the number of columns in the table.
+** nCol bytes: 0x01 for PK columns, 0x00 otherwise.
** N bytes: Unqualified table name (encoded using UTF-8). Nul-terminated.
**
** Followed by one or more changes to the table.
** 1 byte: The "indirect-change" flag.
** old.* record: (delete and update only)
** new.* record: (insert and update only)
+**
+** PATCHSET FORMAT:
+**
+** A patchset is also a collection of changes. It is similar to a changeset,
+** but omits those fields that are not useful if no conflict resolution
+** is required when applying the changeset.
+**
+** Each group of changes begins with a table header:
+**
+** 1 byte: Constant 0x50 (capital 'P')
+** Varint: Big-endian integer set to the number of columns in the table.
+** nCol bytes: 0x01 for PK columns, 0x00 otherwise.
+** N bytes: Unqualified table name (encoded using UTF-8). Nul-terminated.
+**
+** Followed by one or more changes to the table.
+**
+** 1 byte: Either SQLITE_INSERT, UPDATE or DELETE.
+** 1 byte: The "indirect-change" flag.
+** single record: (PK fields for DELETE, or full record for INSERT/UPDATE).
*/
/*
*/
static int sessionAppendUpdate(
SessionBuffer *pBuf, /* Buffer to append to */
+ int bPatchset, /* True for "patchset", 0 for "changeset" */
sqlite3_stmt *pStmt, /* Statement handle pointing at new row */
SessionChange *p, /* Object containing old values */
u8 *abPK /* Boolean array - true for PK columns */
}
}
- if( bChanged || abPK[i] ){
- sessionAppendBlob(pBuf, pCsr, nAdvance, &rc);
- }else{
- sessionAppendByte(pBuf, 0, &rc);
+ /* If at least one field has been modified, this is not a no-op. */
+ if( bChanged ) bNoop = 0;
+
+ /* Add a field to the old.* record. This is omitted if this modules is
+ ** currently generating a patchset. */
+ if( bPatchset==0 ){
+ if( bChanged || abPK[i] ){
+ sessionAppendBlob(pBuf, pCsr, nAdvance, &rc);
+ }else{
+ sessionAppendByte(pBuf, 0, &rc);
+ }
}
- if( bChanged ){
+ /* Add a field to the new.* record. Or the only record if currently
+ ** generating a patchset. */
+ if( bChanged || (bPatchset && abPK[i]) ){
sessionAppendCol(&buf2, pStmt, i, &rc);
- bNoop = 0;
}else{
sessionAppendByte(&buf2, 0, &rc);
}
return rc;
}
+static int sessionAppendDelete(
+ SessionBuffer *pBuf, /* Buffer to append to */
+ int bPatchset, /* True for "patchset", 0 for "changeset" */
+ sqlite3_stmt *pStmt, /* Statement handle pointing at new row */
+ SessionChange *p, /* Object containing old values */
+ u8 *abPK /* Boolean array - true for PK columns */
+){
+ int rc = SQLITE_OK;
+
+ sessionAppendByte(pBuf, SQLITE_DELETE, &rc);
+ sessionAppendByte(pBuf, p->bIndirect, &rc);
+
+ if( bPatchset==0 ){
+ sessionAppendBlob(pBuf, p->aRecord, p->nRecord, &rc);
+ }else{
+ int nCol = sqlite3_column_count(pStmt);
+ int i;
+ u8 *a = p->aRecord;
+ for(i=0; i<nCol; i++){
+ u8 *pStart = a;
+ int eType = *a++;
+
+ switch( eType ){
+ case 0:
+ case SQLITE_NULL:
+ assert( abPK[i]==0 );
+ break;
+
+ case SQLITE_FLOAT:
+ case SQLITE_INTEGER:
+ a += 8;
+ break;
+
+ default: {
+ int n;
+ a += sessionVarintGet(a, &n);
+ a += n;
+ break;
+ }
+ }
+ if( abPK[i] ){
+ sessionAppendBlob(pBuf, pStart, a-pStart, &rc);
+ }
+ }
+ assert( (a - p->aRecord)==p->nRecord );
+ }
+
+ return rc;
+}
+
/*
** Formulate and prepare a SELECT statement to retrieve a row from table
** zTab in database zDb based on its primary key. i.e.
*/
static void sessionAppendTableHdr(
SessionBuffer *pBuf,
+ int bPatchset,
SessionTable *pTab,
int *pRc
){
/* Write a table header */
- sessionAppendByte(pBuf, 'T', pRc);
+ sessionAppendByte(pBuf, (bPatchset ? 'P' : 'T'), pRc);
sessionAppendVarint(pBuf, pTab->nCol, pRc);
sessionAppendBlob(pBuf, pTab->abPK, pTab->nCol, pRc);
sessionAppendBlob(pBuf, (u8 *)pTab->zName, (int)strlen(pTab->zName)+1, pRc);
}
-/*
-** Obtain a changeset object containing all changes recorded by the
-** session object passed as the first argument.
-**
-** It is the responsibility of the caller to eventually free the buffer
-** using sqlite3_free().
-*/
-int sqlite3session_changeset(
+int sessionGenerateChangeset(
sqlite3_session *pSession, /* Session object */
+ int bPatchset, /* True for patchset, false for changeset */
int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */
void **ppChangeset /* OUT: Buffer containing changeset */
){
}
/* Write a table header */
- sessionAppendTableHdr(&buf, pTab, &rc);
+ sessionAppendTableHdr(&buf, bPatchset, pTab, &rc);
/* Build and compile a statement to execute: */
if( rc==SQLITE_OK ){
sessionAppendCol(&buf, pSel, iCol, &rc);
}
}else{
- rc = sessionAppendUpdate(&buf, pSel, p, abPK);
+ rc = sessionAppendUpdate(&buf, bPatchset, pSel, p, abPK);
}
}else if( p->op!=SQLITE_INSERT ){
- /* A DELETE change */
- sessionAppendByte(&buf, SQLITE_DELETE, &rc);
- sessionAppendByte(&buf, p->bIndirect, &rc);
- sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc);
+ rc = sessionAppendDelete(&buf, bPatchset, pSel, p, abPK);
}
if( rc==SQLITE_OK ){
rc = sqlite3_reset(pSel);
return rc;
}
+/*
+** Obtain a changeset object containing all changes recorded by the
+** session object passed as the first argument.
+**
+** It is the responsibility of the caller to eventually free the buffer
+** using sqlite3_free().
+*/
+int sqlite3session_changeset(
+ sqlite3_session *pSession, /* Session object */
+ int *pnChangeset, /* OUT: Size of buffer at *ppChangeset */
+ void **ppChangeset /* OUT: Buffer containing changeset */
+){
+ return sessionGenerateChangeset(pSession, 0, pnChangeset, ppChangeset);
+}
+
+/*
+** Obtain a patchset object containing all changes recorded by the
+** session object passed as the first argument.
+**
+** It is the responsibility of the caller to eventually free the buffer
+** using sqlite3_free().
+*/
+int sqlite3session_patchset(
+ sqlite3_session *pSession, /* Session object */
+ int *pnPatchset, /* OUT: Size of buffer at *ppChangeset */
+ void **ppPatchset /* OUT: Buffer containing changeset */
+){
+ return sessionGenerateChangeset(pSession, 1, pnPatchset, ppPatchset);
+}
+
/*
** Enable or disable the session object passed as the first argument.
*/
static int sessionReadRecord(
u8 **paChange, /* IN/OUT: Pointer to binary record */
int nCol, /* Number of values in record */
+ u8 *abPK, /* Array of primary key flags, or NULL */
sqlite3_value **apOut /* Write values to this array */
){
int i; /* Used to iterate through columns */
u8 *aRec = *paChange; /* Cursor for the serialized record */
for(i=0; i<nCol; i++){
- int eType = *aRec++; /* Type of value (SQLITE_NULL, TEXT etc.) */
+ int eType;
+ if( abPK && abPK[i]==0 ) continue;
+ eType = *aRec++; /* Type of value (SQLITE_NULL, TEXT etc.) */
assert( !apOut || apOut[i]==0 );
if( eType ){
if( apOut ){
}
aChange = p->pNext;
- if( aChange[0]=='T' ){
+ if( aChange[0]=='T' || aChange[0]=='P' ){
int nByte; /* Bytes to allocate for apValue */
+ p->bPatchset = (aChange[0]=='P');
aChange++;
aChange += sessionVarintGet(aChange, &p->nCol);
p->abPK = (u8 *)aChange;
if( paRec ){ *paRec = aChange; }
/* If this is an UPDATE or DELETE, read the old.* record. */
- if( p->op!=SQLITE_INSERT ){
- p->rc = sessionReadRecord(&aChange, p->nCol, paRec?0:p->apValue);
+ if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
+ u8 *abPK = p->bPatchset ? p->abPK : 0;
+ p->rc = sessionReadRecord(&aChange, p->nCol, abPK, paRec?0:p->apValue);
if( p->rc!=SQLITE_OK ) return p->rc;
}
/* If this is an INSERT or UPDATE, read the new.* record. */
if( p->op!=SQLITE_DELETE ){
- p->rc = sessionReadRecord(&aChange, p->nCol, paRec?0:&p->apValue[p->nCol]);
+ sqlite3_value **apOut = (paRec ? 0 : &p->apValue[p->nCol]);
+ p->rc = sessionReadRecord(&aChange, p->nCol, 0, apOut);
if( p->rc!=SQLITE_OK ) return p->rc;
}
- if( pnRec ){ *pnRec = (int)(aChange - *paRec); }
+ if( pnRec ){
+ *pnRec = (int)(aChange - *paRec);
+ }else if( p->bPatchset && p->op==SQLITE_UPDATE ){
+ /* If this is an UPDATE that is part of a patchset, then all PK and
+ ** modified fields are present in the new.* record. The old.* record
+ ** is currently completely empty. This block shifts the PK fields from
+ ** new.* to old.*, to accommodate the code that reads these arrays. */
+ int i;
+ for(i=0; i<p->nCol; i++){
+ assert( p->apValue[i]==0 );
+ assert( p->abPK[i]==0 || p->apValue[i+p->nCol] );
+ if( p->abPK[i] ){
+ p->apValue[i] = p->apValue[i+p->nCol];
+ p->apValue[i+p->nCol] = 0;
+ }
+ }
+ }
p->pNext = aChange;
return SQLITE_ROW;
}
int nByte;
u8 *aEnd = &aIn[i+2];
- sessionReadRecord(&aEnd, nCol, 0);
+ sessionReadRecord(&aEnd, nCol, 0, 0);
aOut[i] = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE);
aOut[i+1] = aIn[i+1];
nByte = (int)(aEnd - &aIn[i+2]);
}
/* Read the old.* and new.* records for the update change. */
- rc = sessionReadRecord(&aEnd, nCol, &apVal[0]);
+ rc = sessionReadRecord(&aEnd, nCol, 0, &apVal[0]);
if( rc==SQLITE_OK ){
- rc = sessionReadRecord(&aEnd, nCol, &apVal[nCol]);
+ rc = sessionReadRecord(&aEnd, nCol, 0, &apVal[nCol]);
}
/* Write the header for the new UPDATE change. Same as the original. */
if( op==SQLITE_DELETE ){
- /* Bind values to the DELETE statement. */
- rc = sessionBindRow(pIter, sqlite3changeset_old, nCol, 0, p->pDelete);
+ /* Bind values to the DELETE statement. If conflict handling is required,
+ ** bind values for all columns and set bound variable (nCol+1) to true.
+ ** Or, if conflict handling is not required, bind just the PK column
+ ** values and, if it exists, set (nCol+1) to false. Conflict handling
+ ** is not required if:
+ **
+ ** * this is a patchset, or
+ ** * (pbRetry==0), or
+ ** * all columns of the table are PK columns (in this case there is
+ ** no (nCol+1) variable to bind to).
+ */
+ u8 *abPK = (pIter->bPatchset ? p->abPK : 0);
+ rc = sessionBindRow(pIter, sqlite3changeset_old, nCol, abPK, p->pDelete);
if( rc==SQLITE_OK && sqlite3_bind_parameter_count(p->pDelete)>nCol ){
- rc = sqlite3_bind_int(p->pDelete, nCol+1, pbRetry==0);
+ rc = sqlite3_bind_int(p->pDelete, nCol+1, (pbRetry==0 || abPK));
}
if( rc!=SQLITE_OK ) return rc;
rc = sessionBindValue(p->pUpdate, i*3+3, pNew);
}
}
- if( rc==SQLITE_OK ) sqlite3_bind_int(p->pUpdate, nCol*3+1, pbRetry==0);
+ if( rc==SQLITE_OK ){
+ sqlite3_bind_int(p->pUpdate, nCol*3+1, pbRetry==0 || pIter->bPatchset);
+ }
if( rc!=SQLITE_OK ) return rc;
/* Attempt the UPDATE. In the case of a NOTFOUND or DATA conflict,
u8 *a1 = aRec;
assert( op2==SQLITE_UPDATE );
pNew->op = SQLITE_INSERT;
- sessionReadRecord(&a1, pTab->nCol, 0);
+ sessionReadRecord(&a1, pTab->nCol, 0, 0);
sessionMergeRecord(&aCsr, pTab->nCol, pExist->aRecord, a1);
}else if( op1==SQLITE_DELETE ){ /* DELETE + INSERT */
assert( op2==SQLITE_INSERT );
u8 *a1 = pExist->aRecord;
u8 *a2 = aRec;
assert( op1==SQLITE_UPDATE );
- sessionReadRecord(&a1, pTab->nCol, 0);
- sessionReadRecord(&a2, pTab->nCol, 0);
+ sessionReadRecord(&a1, pTab->nCol, 0, 0);
+ sessionReadRecord(&a2, pTab->nCol, 0, 0);
pNew->op = SQLITE_UPDATE;
if( 0==sessionMergeUpdate(&aCsr, pTab, aRec, pExist->aRecord, a1, a2) ){
sqlite3_free(pNew);
int i;
if( pTab->nEntry==0 ) continue;
- sessionAppendTableHdr(&buf, pTab, &rc);
+ sessionAppendTableHdr(&buf, 0, pTab, &rc);
for(i=0; i<pTab->nChange; i++){
SessionChange *p;
for(p=pTab->apChange[i]; p; p=p->pNext){