return rc;
}
+/*
+** Finalize statement pStmt. If (*pRc) is SQLITE_OK when this function is
+** called, set it to the results of the sqlite3_finalize() call. Or, if
+** it is already set to an error code, leave it as is.
+*/
+static void sessionFinalizeStmt(sqlite3_stmt *pStmt, int *pRc){
+ int rc = sqlite3_finalize(pStmt);
+ if( *pRc==SQLITE_OK ) *pRc = rc;
+}
+
/*
** Table pTab has one or more existing change-records with old.* records
** with fewer than pTab->nCol columns. This function updates all such
}
}
+ sessionFinalizeStmt(pStmt, &rc);
pSession->rc = rc;
- rc = sqlite3_finalize(pStmt);
- if( pSession->rc==SQLITE_OK ) pSession->rc = rc;
return pSession->rc;
}
);
sessionAppendStr(&cols, "tbl, ?2, stat", &rc);
}else{
- #if 0
+#if 0
if( bRowid ){
sessionAppendStr(&cols, SESSIONS_ROWID, &rc);
}
- #endif
+#endif
for(i=0; i<nCol; i++){
if( cols.nBuf ) sessionAppendStr(&cols, ", ", &rc);
sessionAppendIdent(&cols, azCol[i], &rc);
u8 *aBlob = &pIter->in.aData[pIter->in.iCurrent];
int nBlob = pIter->in.iNext - pIter->in.iCurrent;
sessionAppendBlob(&p->constraints, aBlob, nBlob, &rc);
- return SQLITE_OK;
+ return rc;
}else if( p->bIgnoreNoop==0 || op!=SQLITE_DELETE
|| eType==SQLITE_CHANGESET_CONFLICT
){
}
/*
-** Retry the changes accumulated in the pApply->constraints buffer.
+** Create an iterator to iterate through the retry buffer pRetry.
+*/
+static int sessionRetryIterInit(
+ SessionBuffer *pRetry, /* Buffer to iterate through */
+ int bPatchset, /* True for patchset, false for changeset */
+ const char *zTab, /* Table name */
+ SessionApplyCtx *pApply, /* Session apply context */
+ sqlite3_changeset_iter **ppIter /* OUT: New iterator */
+){
+ sqlite3_changeset_iter *pRet = 0;
+ int rc = SQLITE_OK;
+
+ rc = sessionChangesetStart(
+ &pRet, 0, 0, pRetry->nBuf, pRetry->aBuf, pApply->bInvertConstraints, 1
+ );
+ if( rc==SQLITE_OK ){
+ size_t nByte = 2*pApply->nCol*sizeof(sqlite3_value*);
+ pRet->bPatchset = bPatchset;
+ pRet->zTab = (char*)zTab;
+ pRet->nCol = pApply->nCol;
+ pRet->abPK = pApply->abPK;
+ sessionBufferGrow(&pRet->tblhdr, nByte, &rc);
+ pRet->apValue = (sqlite3_value**)pRet->tblhdr.aBuf;
+ if( rc==SQLITE_OK ) memset(pRet->apValue, 0, nByte);
+ }
+
+ *ppIter = pRet;
+ return rc;
+}
+
+/*
+** Attempt to apply all the changes in retry buffer pRetry to the database.
+** Except, if parameter iSkip is greater than or equal to 0, skip change
+** iSkip.
+*/
+static int sessionApplyRetryBuffer(
+ SessionBuffer *pRetry, /* Buffer to apply changes from */
+ int iSkip, /* If >=0, index of change to omit */
+ sqlite3 *db, /* Database handle */
+ int bPatchset, /* True for patchset, false for changeset */
+ const char *zTab, /* Name of table to write to */
+ SessionApplyCtx *pApply, /* Apply context */
+ int(*xConflict)(void*, int, sqlite3_changeset_iter*),
+ void *pCtx /* First argument passed to xConflict */
+){
+ int rc = SQLITE_OK;
+ int rc2 = SQLITE_OK;
+ int ii = 0;
+ sqlite3_changeset_iter *pIter = 0;
+
+ assert( pApply->constraints.nBuf==0 );
+
+ rc = sessionRetryIterInit(pRetry, bPatchset, zTab, pApply, &pIter);
+
+ for(ii=0; rc==SQLITE_OK && SQLITE_ROW==sqlite3changeset_next(pIter); ii++){
+ if( ii!=iSkip ){
+ rc = sessionApplyOneWithRetry(db, pIter, pApply, xConflict, pCtx);
+ }
+ }
+
+ rc2 = sqlite3changeset_finalize(pIter);
+ if( rc==SQLITE_OK ) rc = rc2;
+ assert( pApply->bDeferConstraints || pApply->constraints.nBuf==0 );
+
+ return rc;
+}
+
+/*
+** Check if table zTab in the "main" database of db is a WITHOUT ROWID
+** table.
+**
+** If no error occurs, return SQLITE_OK and set output variable (*pbWR) to
+** true if zTab is a WITHOUT ROWID table, or false otherwise. Or, if an
+** error does occur, return an SQLite error code. The final value of (*pbWR)
+** is undefined in this case.
+*/
+static int sessionTableIsWithoutRowid(sqlite3 *db, const char *zTab, int *pbWR){
+ sqlite3_stmt *pList = 0;
+ char *zSql = 0;
+ int rc = SQLITE_OK;
+
+ zSql = sqlite3_mprintf("PRAGMA table_list = %Q", zTab);
+ if( zSql==0 ){
+ rc = SQLITE_NOMEM;
+ }else{
+ rc = sqlite3_prepare_v2(db, zSql, -1, &pList, 0);
+ sqlite3_free(zSql);
+ }
+
+ if( rc==SQLITE_OK ){
+ if( SQLITE_ROW==sqlite3_step(pList) ){
+ *pbWR = sqlite3_column_int(pList, 4);
+ }
+ rc = sqlite3_finalize(pList);
+ }
+
+ return rc;
+}
+
+/*
+** Iterator pUp points to an UPDATE change. This function deletes the
+** affected row from the database and creates an INSERT statement that
+** may be used to reinsert the row as it is after the UPDATE change
+** has been applied.
+**
+** If successful, SQLITE_OK is returned and output variable (*ppInsert)
+** is left pointing to a prepared INSERT statement. It is the responsibility
+** of the caller to eventually free this statement using sqlite3_finalize().
+** Or, if an error occurs, an SQLite error code is returned and (*ppInsert)
+** set to NULL. pApply->zErr may be set to an error message in this case.
+*/
+static int sessionUpdateToDeleteInsert(
+ sqlite3 *db, /* Database to write to */
+ const char *zTab, /* Table name */
+ SessionApplyCtx *pApply, /* Apply context */
+ sqlite3_changeset_iter *pUp, /* Iterator pointing to UPDATE change */
+ sqlite3_stmt **ppInsert /* OUT: INSERT statement */
+){
+ sqlite3_stmt *pRet = 0; /* The INSERT statement */
+ sqlite3_stmt *pSelect = 0; /* SELECT to read current values of row */
+ int rc = SQLITE_OK;
+ int bWR = 0;
+
+ rc = sessionTableIsWithoutRowid(db, zTab, &bWR);
+ if( rc==SQLITE_OK ){
+ char *zSelect = 0;
+ char *zInsert = 0;
+ SessionBuffer cols = {0, 0, 0};
+ SessionBuffer insbind = {0, 0, 0};
+ SessionBuffer pkcols = {0, 0, 0};
+ SessionBuffer selbind = {0, 0, 0};
+
+ const char *zComma = "";
+ const char *zComma2 = "";
+ int ii;
+ for(ii=0; ii<pApply->nCol; ii++){
+ sessionAppendStr(&cols, zComma, &rc);
+ sessionAppendIdent(&cols, pApply->azCol[ii], &rc);
+ sessionAppendStr(&insbind, zComma, &rc);
+ sessionAppendStr(&insbind, "?", &rc);
+ zComma = ", ";
+
+ if( pApply->abPK[ii] ){
+ sessionAppendStr(&pkcols, zComma2, &rc);
+ sessionAppendIdent(&pkcols, pApply->azCol[ii], &rc);
+ sessionAppendStr(&selbind, zComma2, &rc);
+ sessionAppendPrintf(&selbind, &rc, "?%d", ii+1);
+ zComma2 = ", ";
+ }
+ }
+ if( bWR==0 ){
+ sessionAppendStr(&cols, zComma, &rc);
+ sessionAppendStr(&cols, SESSIONS_ROWID, &rc);
+ sessionAppendStr(&insbind, zComma, &rc);
+ sessionAppendStr(&insbind, "?", &rc);
+ }
+
+ if( rc==SQLITE_OK ){
+ zSelect = sqlite3_mprintf("SELECT %s FROM %Q WHERE (%s) IS (%s)",
+ cols.aBuf, zTab, pkcols.aBuf, selbind.aBuf
+ );
+ if( zSelect==0 ) rc = SQLITE_NOMEM;
+ }
+ if( rc==SQLITE_OK ){
+ zInsert = sqlite3_mprintf("INSERT INTO %Q(%s) VALUES(%s)",
+ zTab, cols.aBuf, insbind.aBuf
+ );
+ if( zInsert==0 ) rc = SQLITE_NOMEM;
+ }
+
+ if( rc==SQLITE_OK ){
+ rc = sessionPrepare(db, &pSelect, &pApply->zErr, zSelect);
+ }
+ if( rc==SQLITE_OK ){
+ rc = sessionPrepare(db, &pRet, &pApply->zErr, zInsert);
+ }
+
+ sqlite3_free(zSelect);
+ sqlite3_free(zInsert);
+ sqlite3_free(cols.aBuf);
+ sqlite3_free(insbind.aBuf);
+ sqlite3_free(pkcols.aBuf);
+ sqlite3_free(selbind.aBuf);
+ }
+
+ if( rc==SQLITE_OK ){
+ rc = sessionBindRow(
+ pUp, sqlite3changeset_old, pApply->nCol, pApply->abPK, pSelect
+ );
+ }
+ if( rc==SQLITE_OK && sqlite3_step(pSelect)!=SQLITE_ROW ){
+ rc = SQLITE_ERROR;
+ }
+
+ if( rc==SQLITE_OK ){
+ int iCol;
+ for(iCol=0; iCol<pApply->nCol; iCol++){
+ sqlite3_value *pVal = pUp->apValue[iCol+pApply->nCol];
+ if( pVal==0 ){
+ pVal = sqlite3_column_value(pSelect, iCol);
+ }
+ rc = sqlite3_bind_value(pRet, iCol+1, pVal);
+ }
+ if( bWR==0 ){
+ sqlite3_bind_int64(pRet, iCol+1, sqlite3_column_int64(pSelect, iCol));
+ }
+ }
+ sessionFinalizeStmt(pSelect, &rc);
+
+ /* Delete the row from the database. */
+ if( rc==SQLITE_OK ){
+ rc = sessionBindRow(
+ pUp, sqlite3changeset_old, pApply->nCol, pApply->abPK, pApply->pDelete
+ );
+ sqlite3_bind_int(pApply->pDelete, pApply->nCol+1, 1);
+ }
+ if( rc==SQLITE_OK ){
+ sqlite3_step(pApply->pDelete);
+ rc = sqlite3_reset(pApply->pDelete);
+ }
+
+ if( rc!=SQLITE_OK ){
+ sqlite3_finalize(pRet);
+ pRet = 0;
+ }
+
+ *ppInsert = pRet;
+ return rc;
+}
+
+/*
+** Retry the changes accumulated in the pApply->constraints buffer. The
+** pApply->constraints buffer contains all changes to table zTab that
+** could not be applied due to SQLITE_CONSTRAINT errors. This function
+** attempts to apply them as follows:
+**
+** 1) It runs through the buffer and attempts to retry each change,
+** removing any that are successfully applied from the buffer. This
+** is repeated until no further progress can be made.
+**
+** 2) For each UPDATE change in the buffer, try the following in a
+** savepoint transaction:
+**
+** a) DELETE the affected row,
+** b) Attempt step (1) with remaining changes,
+** c) Attempt to INSERT a row equivalent to the one that would be
+** created by applying this UPDATE change.
+**
+** If the INSERT in (c) succeeds, the savepoint is committed and all
+** successfully applied changes are removed from the buffer. Step (2)
+** is then repeated.
+**
+** 3) Once step (2) has been attempted for each UPDATE in the change,
+** a final attempt is made to apply each remaining change. This time,
+** if an SQLITE_CONSTRAINT error is encountered, the conflict handler
+** is invoked and the user has to decide whether to omit the change
+** or rollback the entire _apply() operation.
*/
static int sessionRetryConstraints(
sqlite3 *db,
void *pCtx /* First argument passed to xConflict */
){
int rc = SQLITE_OK;
+ int iUpdate = 0;
+ /* Step (1) */
while( pApply->constraints.nBuf ){
- sqlite3_changeset_iter *pIter2 = 0;
SessionBuffer cons = pApply->constraints;
memset(&pApply->constraints, 0, sizeof(SessionBuffer));
- rc = sessionChangesetStart(
- &pIter2, 0, 0, cons.nBuf, cons.aBuf, pApply->bInvertConstraints, 1
+ rc = sessionApplyRetryBuffer(
+ &cons, -1, db, bPatchset, zTab, pApply, xConflict, pCtx
+ );
+
+ sqlite3_free(cons.aBuf);
+ if( rc!=SQLITE_OK ) break;
+
+ /* If no progress has been made this round, break out of the loop. */
+ if( pApply->constraints.nBuf>=cons.nBuf ) break;
+ }
+
+ /* Step (2) */
+ while( rc==SQLITE_OK && pApply->constraints.nBuf ){
+ SessionBuffer cons = {0, 0, 0};
+ sqlite3_changeset_iter *pUp = 0;
+ sqlite3_stmt *pInsert = 0;
+ int iSkip = 0;
+
+ rc = sessionRetryIterInit(
+ &pApply->constraints, bPatchset, zTab, pApply, &pUp
);
if( rc==SQLITE_OK ){
- size_t nByte = 2*pApply->nCol*sizeof(sqlite3_value*);
- int rc2;
- pIter2->bPatchset = bPatchset;
- pIter2->zTab = (char*)zTab;
- pIter2->nCol = pApply->nCol;
- pIter2->abPK = pApply->abPK;
- sessionBufferGrow(&pIter2->tblhdr, nByte, &rc);
- pIter2->apValue = (sqlite3_value**)pIter2->tblhdr.aBuf;
- if( rc==SQLITE_OK ) memset(pIter2->apValue, 0, nByte);
+ int iThis = -1;
+ while( SQLITE_ROW==sqlite3changeset_next(pUp) ){
+ if( pUp->op==SQLITE_UPDATE ) iThis++;
+ if( iThis==iUpdate ) break;
+ iSkip++;
+ }
+ if( iThis==iUpdate ){
+ rc = sqlite3_exec(db, "SAVEPOINT update_op", 0, 0, 0);
+ if( rc==SQLITE_OK ){
+ rc = sessionUpdateToDeleteInsert(db, zTab, pApply, pUp, &pInsert);
+ }
+ }
+ sqlite3changeset_finalize(pUp);
+ if( iThis!=iUpdate ) break;
+ }
- while( rc==SQLITE_OK && SQLITE_ROW==sqlite3changeset_next(pIter2) ){
- rc = sessionApplyOneWithRetry(db, pIter2, pApply, xConflict, pCtx);
+ if( rc==SQLITE_OK ){
+ cons = pApply->constraints;
+
+ while( rc==SQLITE_OK && pApply->constraints.nBuf>0 ){
+ SessionBuffer app = pApply->constraints;
+ memset(&pApply->constraints, 0, sizeof(SessionBuffer));
+ rc = sessionApplyRetryBuffer(
+ &app, iSkip, db, bPatchset, zTab, pApply, xConflict, pCtx
+ );
+ if( app.aBuf!=cons.aBuf ){
+ sqlite3_free(app.aBuf);
+ }
+ if( pApply->constraints.nBuf>=app.nBuf ){
+ break;
+ }
}
+ }
- rc2 = sqlite3changeset_finalize(pIter2);
- if( rc==SQLITE_OK ) rc = rc2;
+ iUpdate++;
+ if( rc==SQLITE_OK ){
+ sqlite3_step(pInsert);
+ rc = sqlite3_finalize(pInsert);
+ if( rc==SQLITE_CONSTRAINT ){
+ rc = sqlite3_exec(db, "ROLLBACK TO update_op", 0, 0, 0);
+ sqlite3_free(pApply->constraints.aBuf);
+ pApply->constraints = cons;
+ memset(&cons, 0, sizeof(cons));
+ }else if( rc==SQLITE_OK ){
+ iUpdate = 0;
+ }
+ if( rc==SQLITE_OK ){
+ rc = sqlite3_exec(db, "RELEASE update_op", 0, 0, 0);
+ }
}
- assert( pApply->bDeferConstraints || pApply->constraints.nBuf==0 );
sqlite3_free(cons.aBuf);
- if( rc!=SQLITE_OK ) break;
- if( pApply->constraints.nBuf>=cons.nBuf ){
- /* No progress was made on the last round. */
- pApply->bDeferConstraints = 0;
- }
+ }
+
+ /* Step (3) */
+ if( rc==SQLITE_OK && pApply->constraints.nBuf ){
+ SessionBuffer cons = pApply->constraints;
+ memset(&pApply->constraints, 0, sizeof(SessionBuffer));
+ pApply->bDeferConstraints = 0;
+ rc = sessionApplyRetryBuffer(
+ &cons, -1, db, bPatchset, zTab, pApply, xConflict, pCtx
+ );
+ sqlite3_free(cons.aBuf);
}
return rc;