** sqlite3changeset_start_strm()).
*/
struct SessionInput {
+ int bNoDiscard; /* If true, discard no data */
+ int iCurrent; /* Offset in aData[] of current change */
int iNext; /* Offset in aData[] of next change */
u8 *aData; /* Pointer to buffer containing changeset */
int nData; /* Number of bytes in aData */
pRet->in.nData = nChangeset;
pRet->in.xInput = xInput;
pRet->in.pIn = pIn;
- pRet->in.iNext = 0;
pRet->in.bEof = (xInput ? 0 : 1);
/* Populate the output variable and return success. */
return sessionChangesetStart(pp, xInput, pIn, 0, 0);
}
+/*
+** If the SessionInput object passed as the only argument is a streaming
+** object and the buffer is full, discard some data to free up space.
+*/
+static void sessionDiscardData(SessionInput *pIn){
+ if( pIn->bEof && pIn->xInput && pIn->iNext>=SESSIONS_STRM_CHUNK_SIZE ){
+ int nMove = pIn->buf.nBuf - pIn->iNext;
+ assert( nMove>=0 );
+ if( nMove>0 ){
+ memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove);
+ }
+ pIn->buf.nBuf -= pIn->iNext;
+ pIn->iNext = 0;
+ pIn->nData = pIn->buf.nBuf;
+ }
+}
+
/*
** Ensure that there are at least nByte bytes available in the buffer. Or,
** if there are not nByte bytes remaining in the input, that all available
while( !pIn->bEof && (pIn->iNext+nByte)>=pIn->nData && rc==SQLITE_OK ){
int nNew = SESSIONS_STRM_CHUNK_SIZE;
- if( pIn->iNext>=SESSIONS_STRM_CHUNK_SIZE ){
- int nMove = pIn->buf.nBuf - pIn->iNext;
- memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove);
- pIn->buf.nBuf -= pIn->iNext;
- pIn->iNext = 0;
- }
-
+ if( pIn->bNoDiscard==0 ) sessionDiscardData(pIn);
if( SQLITE_OK==sessionBufferGrow(&pIn->buf, nNew, &rc) ){
rc = pIn->xInput(pIn->pIn, &pIn->buf.aBuf[pIn->buf.nBuf], &nNew);
if( nNew==0 ){
return SQLITE_DONE;
}
+ sessionDiscardData(&p->in);
+ p->in.iCurrent = p->in.iNext;
+
op = p->in.aData[p->in.iNext++];
if( op=='T' || op=='P' ){
p->bPatchset = (op=='P');
if( sessionChangesetReadTblhdr(p) ) return p->rc;
if( (p->rc = sessionInputBuffer(&p->in, 2)) ) return p->rc;
+ p->in.iCurrent = p->in.iNext;
op = p->in.aData[p->in.iNext++];
}
int nCol; /* Size of azCol[] and abPK[] arrays */
const char **azCol; /* Array of column names */
u8 *abPK; /* Boolean array - true if column is in PK */
+
+ int bDeferConstraints; /* True to defer constraints */
+ SessionBuffer constraints; /* Deferred constraints are stored here */
};
/*
** transfers new.* values from the current iterator entry to statement
** pStmt. The table being inserted into has nCol columns.
**
-** New.* value $i 0 from the iterator is bound to variable ($i+1) of
+** New.* value $i from the iterator is bound to variable ($i+1) of
** statement pStmt. If parameter abPK is NULL, all values from 0 to (nCol-1)
** are transfered to the statement. Otherwise, if abPK is not NULL, it points
** to an array nCol elements in size. In this case only those values for
pIter->pConflict = 0;
rc = sqlite3_reset(p->pSelect);
}else if( rc==SQLITE_OK ){
- /* No other row with the new.* primary key. */
- res = xConflict(pCtx, eType+1, pIter);
- if( res==SQLITE_CHANGESET_REPLACE ) rc = SQLITE_MISUSE;
+ if( p->bDeferConstraints && eType==SQLITE_CHANGESET_CONFLICT ){
+ /* Instead of invoking the conflict handler, append the change blob
+ ** to the SessionApplyCtx.constraints buffer. */
+ u8 *aBlob = &pIter->in.aData[pIter->in.iCurrent];
+ int nBlob = pIter->in.iNext - pIter->in.iCurrent;
+ sessionAppendBlob(&p->constraints, aBlob, nBlob, &rc);
+ res = SQLITE_CHANGESET_OMIT;
+ }else{
+ /* No other row with the new.* primary key. */
+ res = xConflict(pCtx, eType+1, pIter);
+ if( res==SQLITE_CHANGESET_REPLACE ) rc = SQLITE_MISUSE;
+ }
}
if( rc==SQLITE_OK ){
return rc;
}
+static int sessionApplyOneWithRetry(
+ sqlite3 *db, /* Apply change to "main" db of this handle */
+ sqlite3_changeset_iter *pIter, /* Changeset iterator to read change from */
+ SessionApplyCtx *pApply, /* Apply context */
+ int(*xConflict)(void*, int, sqlite3_changeset_iter*),
+ void *pCtx /* First argument passed to xConflict */
+){
+ int bReplace = 0;
+ int bRetry = 0;
+ int rc;
+
+ rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, &bReplace, &bRetry);
+
+ if( rc==SQLITE_OK && bRetry ){
+ rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, &bReplace, 0);
+ }
+
+ if( bReplace ){
+ assert( pIter->op==SQLITE_INSERT );
+ rc = sqlite3_exec(db, "SAVEPOINT replace_op", 0, 0, 0);
+ if( rc==SQLITE_OK ){
+ rc = sessionBindRow(pIter,
+ sqlite3changeset_new, pApply->nCol, pApply->abPK, pApply->pDelete);
+ sqlite3_bind_int(pApply->pDelete, pApply->nCol+1, 1);
+ }
+ if( rc==SQLITE_OK ){
+ sqlite3_step(pApply->pDelete);
+ rc = sqlite3_reset(pApply->pDelete);
+ }
+ if( rc==SQLITE_OK ){
+ rc = sessionApplyOneOp(pIter, pApply, xConflict, pCtx, 0, 0);
+ }
+ if( rc==SQLITE_OK ){
+ rc = sqlite3_exec(db, "RELEASE replace_op", 0, 0, 0);
+ }
+ }
+
+ return rc;
+}
+
+/*
+** Retry the changes accumulated in the pApply->constraints buffer.
+*/
+static int sessionRetryConstraints(
+ sqlite3 *db,
+ int bPatchset,
+ const char *zTab,
+ SessionApplyCtx *pApply,
+ int(*xConflict)(void*, int, sqlite3_changeset_iter*),
+ void *pCtx /* First argument passed to xConflict */
+){
+ int rc = SQLITE_OK;
+
+ while( pApply->constraints.nBuf ){
+ sqlite3_changeset_iter *pIter2 = 0;
+ SessionBuffer cons = pApply->constraints;
+ memset(&pApply->constraints, 0, sizeof(SessionBuffer));
+
+ rc = sessionChangesetStart(&pIter2, 0, 0, cons.nBuf, cons.aBuf);
+ if( rc==SQLITE_OK ){
+ int nByte = 2*pApply->nCol*sizeof(sqlite3_value*);
+ int rc2;
+ pIter2->bPatchset = bPatchset;
+ pIter2->zTab = (char*)zTab;
+ pIter2->nCol = pApply->nCol;
+ pIter2->abPK = pApply->abPK;
+ sessionBufferGrow(&pIter2->tblhdr, nByte, &rc);
+ pIter2->apValue = (sqlite3_value**)pIter2->tblhdr.aBuf;
+ if( rc==SQLITE_OK ) memset(pIter2->apValue, 0, nByte);
+
+ while( rc==SQLITE_OK && SQLITE_ROW==sqlite3changeset_next(pIter2) ){
+ rc = sessionApplyOneWithRetry(db, pIter2, pApply, xConflict, pCtx);
+ }
+
+ rc2 = sqlite3changeset_finalize(pIter2);
+ if( rc==SQLITE_OK ) rc==rc2;
+ }
+ assert( pApply->bDeferConstraints || pApply->constraints.nBuf==0 );
+
+ sqlite3_free(cons.aBuf);
+ if( rc!=SQLITE_OK ) break;
+ if( pApply->constraints.nBuf>=cons.nBuf ){
+ /* No progress was made on the last round. */
+ pApply->bDeferConstraints = 0;
+ }
+ }
+
+ return rc;
+}
+
/*
** Argument pIter is a changeset iterator that has been initialized, but
** not yet passed to sqlite3changeset_next(). This function applies the
assert( xConflict!=0 );
+ pIter->in.bNoDiscard = 1;
memset(&sApply, 0, sizeof(sApply));
sqlite3_mutex_enter(sqlite3_db_mutex(db));
rc = sqlite3_exec(db, "SAVEPOINT changeset_apply", 0, 0, 0);
while( rc==SQLITE_OK && SQLITE_ROW==sqlite3changeset_next(pIter) ){
int nCol;
int op;
- int bReplace = 0;
- int bRetry = 0;
const char *zNew;
sqlite3changeset_op(pIter, &zNew, &nCol, &op, 0);
if( zTab==0 || sqlite3_strnicmp(zNew, zTab, nTab+1) ){
u8 *abPK;
+ rc = sessionRetryConstraints(
+ db, pIter->bPatchset, zTab, &sApply, xConflict, pCtx
+ );
+ if( rc!=SQLITE_OK ) break;
+
sqlite3_free((char*)sApply.azCol); /* cast works around VC++ bug */
sqlite3_finalize(sApply.pDelete);
sqlite3_finalize(sApply.pUpdate);
sqlite3_finalize(sApply.pSelect);
memset(&sApply, 0, sizeof(sApply));
sApply.db = db;
+ sApply.bDeferConstraints = 1;
/* If an xFilter() callback was specified, invoke it now. If the
** xFilter callback returns zero, skip this table. If it returns
** next change. A log message has already been issued. */
if( schemaMismatch ) continue;
- rc = sessionApplyOneOp(pIter, &sApply, xConflict, pCtx, &bReplace, &bRetry);
-
- if( rc==SQLITE_OK && bRetry ){
- rc = sessionApplyOneOp(pIter, &sApply, xConflict, pCtx, &bReplace, 0);
- }
+ rc = sessionApplyOneWithRetry(db, pIter, &sApply, xConflict, pCtx);
+ }
- if( bReplace ){
- assert( pIter->op==SQLITE_INSERT );
- rc = sqlite3_exec(db, "SAVEPOINT replace_op", 0, 0, 0);
- if( rc==SQLITE_OK ){
- rc = sessionBindRow(pIter,
- sqlite3changeset_new, sApply.nCol, sApply.abPK, sApply.pDelete);
- sqlite3_bind_int(sApply.pDelete, sApply.nCol+1, 1);
- }
- if( rc==SQLITE_OK ){
- sqlite3_step(sApply.pDelete);
- rc = sqlite3_reset(sApply.pDelete);
- }
- if( rc==SQLITE_OK ){
- rc = sessionApplyOneOp(pIter, &sApply, xConflict, pCtx, 0, 0);
- }
- if( rc==SQLITE_OK ){
- rc = sqlite3_exec(db, "RELEASE replace_op", 0, 0, 0);
- }
- }
+ if( rc==SQLITE_OK ){
+ rc = sessionRetryConstraints(
+ db, pIter->bPatchset, zTab, &sApply, xConflict, pCtx
+ );
}
if( rc==SQLITE_OK ){
sqlite3_finalize(sApply.pUpdate);
sqlite3_finalize(sApply.pSelect);
sqlite3_free((char*)sApply.azCol); /* cast works around VC++ bug */
+ sqlite3_free((char*)sApply.constraints.aBuf);
sqlite3_mutex_leave(sqlite3_db_mutex(db));
return rc;
}