/*
** Minimum chunk size used by streaming versions of functions.
*/
+#ifdef SQLITE_TEST
+#define SESSIONS_STR_CHUNK_SIZE 1
+#else
#define SESSIONS_STR_CHUNK_SIZE 1024
+#endif
/*
** Session handle structure.
** a stream function (sqlite3changeset_start_str()).
*/
struct SessionInput {
- int iNext; /* Offset in aChangeset[] of next change */
- u8 *aChangeset; /* Pointer to buffer containing changeset */
- int nChangeset; /* Number of bytes in aChangeset */
+ int iNext; /* Offset in aData[] of next change */
+ u8 *aData; /* Pointer to buffer containing changeset */
+ int nData; /* Number of bytes in aData */
+
SessionBuffer buf; /* Current read buffer */
int (*xInput)(void*, void*, int*); /* Input stream call (or NULL) */
void *pIn; /* First argument to xInput */
pRet = (sqlite3_changeset_iter *)sqlite3_malloc(nByte);
if( !pRet ) return SQLITE_NOMEM;
memset(pRet, 0, sizeof(sqlite3_changeset_iter));
- pRet->in.aChangeset = (u8 *)pChangeset;
- pRet->in.nChangeset = nChangeset;
+ pRet->in.aData = (u8 *)pChangeset;
+ pRet->in.nData = nChangeset;
pRet->in.xInput = xInput;
pRet->in.pIn = pIn;
pRet->in.iNext = 0;
**
** Return an SQLite error code if an error occurs, or SQLITE_OK otherwise.
*/
-static int sessionInputBuffer(SessionInput *pInput, int nByte){
+static int sessionInputBuffer(SessionInput *pIn, int nByte){
int rc = SQLITE_OK;
- if( pInput->xInput && !pInput->bEof ){
- assert( 0 );
+ if( pIn->xInput ){
+ while( !pIn->bEof && (pIn->iNext+nByte)>=pIn->nData && rc==SQLITE_OK ){
+ int nNew = SESSIONS_STR_CHUNK_SIZE;
+
+ if( pIn->iNext>=SESSIONS_STR_CHUNK_SIZE ){
+ int nMove = pIn->buf.nBuf - pIn->iNext;
+ memmove(pIn->buf.aBuf, &pIn->buf.aBuf[pIn->iNext], nMove);
+ pIn->buf.nBuf -= pIn->iNext;
+ pIn->iNext = 0;
+ }
+
+ if( SQLITE_OK==sessionBufferGrow(&pIn->buf, nNew, &rc) ){
+ rc = pIn->xInput(pIn->pIn, &pIn->buf.aBuf[pIn->buf.nBuf], &nNew);
+ if( nNew==0 ){
+ pIn->bEof = 1;
+ }else{
+ pIn->buf.nBuf += nNew;
+ }
+ }
+
+ pIn->aData = pIn->buf.aBuf;
+ pIn->nData = pIn->buf.nBuf;
+ }
}
return rc;
}
*ppRec = aRec;
}
+/*
+** This function sets the value of the sqlite3_value object passed as the
+** first argument to a copy of the string or blob held in the aData[]
+** buffer. SQLITE_OK is returned if successful, or SQLITE_NOMEM if an OOM
+** error occurs.
+*/
+static int sessionValueSetStr(
+ sqlite3_value *pVal, /* Set the value of this object */
+ u8 *aData, /* Buffer containing string or blob data */
+ int nData, /* Size of buffer aData[] in bytes */
+ u8 enc /* String encoding (0 for blobs) */
+){
+ u8 *aCopy = sqlite3_malloc(nData);
+ if( aCopy==0 ) return SQLITE_NOMEM;
+ memcpy(aCopy, aData, nData);
+ sqlite3ValueSetStr(pVal, nData, (char*)aCopy, enc, sqlite3_free);
+ return SQLITE_OK;
+}
+
/*
** Deserialize a single record from a buffer in memory. See "RECORD FORMAT"
** for details.
if( abPK && abPK[i]==0 ) continue;
rc = sessionInputBuffer(pIn, 9);
if( rc==SQLITE_OK ){
- eType = pIn->aChangeset[pIn->iNext++];
+ eType = pIn->aData[pIn->iNext++];
}
assert( !apOut || apOut[i]==0 );
}
if( rc==SQLITE_OK ){
- u8 *aVal = &pIn->aChangeset[pIn->iNext];
+ u8 *aVal = &pIn->aData[pIn->iNext];
if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){
int nByte;
pIn->iNext += sessionVarintGet(aVal, &nByte);
rc = sessionInputBuffer(pIn, nByte);
if( apOut && rc==SQLITE_OK ){
- u8 *aRec = &pIn->aChangeset[pIn->iNext];
u8 enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0);
- sqlite3ValueSetStr(apOut[i], nByte, (char *)aRec, enc, SQLITE_STATIC);
+ rc = sessionValueSetStr(apOut[i],&pIn->aData[pIn->iNext],nByte,enc);
}
pIn->iNext += nByte;
}
static int sessionChangesetBufferTblhdr(SessionInput *pIn, int *pnByte){
int rc = SQLITE_OK;
int nCol = 0;
- int iIn = pIn->iNext;
+ int nRead = 0;
rc = sessionInputBuffer(pIn, 9);
if( rc==SQLITE_OK ){
- iIn += sessionVarintGet(&pIn->aChangeset[iIn], &nCol);
- rc = sessionInputBuffer(pIn, nCol+100);
- iIn += nCol;
+ nRead += sessionVarintGet(&pIn->aData[pIn->iNext + nRead], &nCol);
+ rc = sessionInputBuffer(pIn, nRead+nCol+100);
+ nRead += nCol;
}
+
while( rc==SQLITE_OK ){
- while( iIn<pIn->nChangeset && pIn->aChangeset[iIn] ) iIn++;
- if( pIn->aChangeset[iIn]==0 ) break;
- rc = sessionInputBuffer(pIn, 100);
+ while( (pIn->iNext + nRead)<pIn->nData && pIn->aData[pIn->iNext + nRead] ){
+ nRead++;
+ }
+ if( pIn->aData[pIn->iNext + nRead]==0 ) break;
+ rc = sessionInputBuffer(pIn, nRead + 100);
}
- if( pnByte ) *pnByte = (iIn+1 - pIn->iNext);
+ if( pnByte ) *pnByte = nRead+1;
return rc;
}
if( rc==SQLITE_OK ){
int nByte;
int nVarint;
- nVarint = sessionVarintGet(&p->in.aChangeset[p->in.iNext], &p->nCol);
+ nVarint = sessionVarintGet(&p->in.aData[p->in.iNext], &p->nCol);
nCopy -= nVarint;
p->in.iNext += nVarint;
nByte = p->nCol * sizeof(sqlite3_value*) * 2 + nCopy;
if( rc==SQLITE_OK ){
int iPK = sizeof(sqlite3_value*)*p->nCol*2;
memset(p->tblhdr.aBuf, 0, iPK);
- memcpy(&p->tblhdr.aBuf[iPK], &p->in.aChangeset[p->in.iNext], nCopy);
+ memcpy(&p->tblhdr.aBuf[iPK], &p->in.aData[p->in.iNext], nCopy);
p->in.iNext += nCopy;
}
if( p->rc!=SQLITE_OK ) return p->rc;
/* If the iterator is already at the end of the changeset, return DONE. */
- if( p->in.iNext>=p->in.nChangeset ){
+ if( p->in.iNext>=p->in.nData ){
return SQLITE_DONE;
}
- op = p->in.aChangeset[p->in.iNext++];
+ op = p->in.aData[p->in.iNext++];
if( op=='T' || op=='P' ){
p->bPatchset = (op=='P');
if( sessionChangesetReadTblhdr(p) ) return p->rc;
if( (p->rc = sessionInputBuffer(&p->in, 2)) ) return p->rc;
- op = p->in.aChangeset[p->in.iNext++];
+ op = p->in.aData[p->in.iNext++];
}
p->op = op;
- p->bIndirect = p->in.aChangeset[p->in.iNext++];
+ p->bIndirect = p->in.aData[p->in.iNext++];
if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){
- return (p->rc = SQLITE_CORRUPT);
+ return (p->rc = SQLITE_CORRUPT_BKPT);
}
- if( paRec ){ *paRec = &p->in.aChangeset[p->in.iNext]; }
+ if( paRec ){ *paRec = &p->in.aData[p->in.iNext]; }
/* If this is an UPDATE or DELETE, read the old.* record. */
if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
}
if( pnRec ){
- *pnRec = (int)(&p->in.aChangeset[p->in.iNext] - *paRec);
+ *pnRec = (int)(&p->in.aData[p->in.iNext] - *paRec);
}else if( p->bPatchset && p->op==SQLITE_UPDATE ){
/* If this is an UPDATE that is part of a patchset, then all PK and
** modified fields are present in the new.* record. The old.* record
for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]);
}
sqlite3_free(p->tblhdr.aBuf);
+ sqlite3_free(p->in.buf.aBuf);
sqlite3_free(p);
return rc;
}
/* Set up the input stream */
memset(&sInput, 0, sizeof(SessionInput));
- sInput.nChangeset = nChangeset;
- sInput.aChangeset = (u8*)pChangeset;
+ sInput.nData = nChangeset;
+ sInput.aData = (u8*)pChangeset;
aOut = (u8 *)sqlite3_malloc(nChangeset);
if( !aOut ) return SQLITE_NOMEM;
while( i<nChangeset ){
u8 eType;
if( (rc = sessionInputBuffer(&sInput, 2)) ) goto finished_invert;
- eType = sInput.aChangeset[sInput.iNext];
+ eType = sInput.aData[sInput.iNext];
switch( eType ){
case 'T': {
/* A 'table' record consists of:
if( (rc = sessionChangesetBufferTblhdr(&sInput, &nByte)) ){
goto finished_invert;
}
- nVarint = sessionVarintGet(&sInput.aChangeset[iNext+1], &nCol);
+ nVarint = sessionVarintGet(&sInput.aData[iNext+1], &nCol);
sPK.nBuf = 0;
- sessionAppendBlob(&sPK, &sInput.aChangeset[iNext+1+nVarint], nCol, &rc);
+ sessionAppendBlob(&sPK, &sInput.aData[iNext+1+nVarint], nCol, &rc);
if( rc ) goto finished_invert;
sInput.iNext += nByte;
- memcpy(&aOut[i], &sInput.aChangeset[iNext], nByte+1);
+ memcpy(&aOut[i], &sInput.aData[iNext], nByte+1);
i += nByte+1;
sqlite3_free(apVal);
apVal = 0;
aOut[i] = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE);
aOut[i+1] = aIn[i+1]; /* indirect-flag */
nByte = sInput.iNext - iStart;
- memcpy(&aOut[i+2], &sInput.aChangeset[iStart], nByte);
+ memcpy(&aOut[i+2], &sInput.aData[iStart], nByte);
i += 2 + nByte;
break;
}
/* Write the header for the new UPDATE change. Same as the original. */
aOut[i] = SQLITE_UPDATE;
- aOut[i+1] = sInput.aChangeset[sInput.iNext+1];
+ aOut[i+1] = sInput.aData[sInput.iNext+1];
nWrite = 2;
/* Read the old.* and new.* records for the update change. */
}
default:
- rc = SQLITE_CORRUPT;
+ rc = SQLITE_CORRUPT_BKPT;
goto finished_invert;
}
}
}
/*
-** Apply the changeset passed via pChangeset/nChangeset to the main database
-** attached to handle "db". Invoke the supplied conflict handler callback
-** to resolve any conflicts encountered while applying the change.
+** Argument pIter is a changeset iterator that has been initialized, but
+** not yet passed to sqlite3changeset_next(). This function applies the
+** changeset to the main database attached to handle "db". The supplied
+** conflict handler callback is invoked to resolve any conflicts encountered
+** while applying the change.
*/
-int sqlite3changeset_apply(
+static int sessionChangesetApply(
sqlite3 *db, /* Apply change to "main" db of this handle */
- int nChangeset, /* Size of changeset in bytes */
- void *pChangeset, /* Changeset blob */
+ sqlite3_changeset_iter *pIter, /* Changeset to apply */
int(*xFilter)(
void *pCtx, /* Copy of sixth arg to _apply() */
const char *zTab /* Table name */
void *pCtx /* First argument passed to xConflict */
){
int schemaMismatch = 0;
- sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
int rc; /* Return code */
const char *zTab = 0; /* Name of current table */
int nTab = 0; /* Result of sqlite3Strlen30(zTab) */
assert( xConflict!=0 );
memset(&sApply, 0, sizeof(sApply));
- rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
- if( rc!=SQLITE_OK ) return rc;
-
sqlite3_mutex_enter(sqlite3_db_mutex(db));
rc = sqlite3_exec(db, "SAVEPOINT changeset_apply", 0, 0, 0);
if( rc==SQLITE_OK ){
return rc;
}
+/*
+** Apply the changeset passed via pChangeset/nChangeset to the main database
+** attached to handle "db". Invoke the supplied conflict handler callback
+** to resolve any conflicts encountered while applying the change.
+*/
+int sqlite3changeset_apply(
+ sqlite3 *db, /* Apply change to "main" db of this handle */
+ int nChangeset, /* Size of changeset in bytes */
+ void *pChangeset, /* Changeset blob */
+ int(*xFilter)(
+ void *pCtx, /* Copy of sixth arg to _apply() */
+ const char *zTab /* Table name */
+ ),
+ int(*xConflict)(
+ void *pCtx, /* Copy of fifth arg to _apply() */
+ int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
+ sqlite3_changeset_iter *p /* Handle describing change and conflict */
+ ),
+ void *pCtx /* First argument passed to xConflict */
+){
+ sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
+ int rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
+ if( rc==SQLITE_OK ){
+ rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx);
+ }
+ return rc;
+}
+
+/*
+** Apply the changeset passed via xInput/pIn to the main database
+** attached to handle "db". Invoke the supplied conflict handler callback
+** to resolve any conflicts encountered while applying the change.
+*/
+int sqlite3changeset_apply_str(
+ sqlite3 *db, /* Apply change to "main" db of this handle */
+ int (*xInput)(void *pIn, void *pData, int *pnData), /* Input function */
+ void *pIn, /* First arg for xInput */
+ int(*xFilter)(
+ void *pCtx, /* Copy of sixth arg to _apply() */
+ const char *zTab /* Table name */
+ ),
+ int(*xConflict)(
+ void *pCtx, /* Copy of sixth arg to _apply() */
+ int eConflict, /* DATA, MISSING, CONFLICT, CONSTRAINT */
+ sqlite3_changeset_iter *p /* Handle describing change and conflict */
+ ),
+ void *pCtx /* First argument passed to xConflict */
+){
+ sqlite3_changeset_iter *pIter; /* Iterator to skip through changeset */
+ int rc = sqlite3changeset_start_str(&pIter, xInput, pIn);
+ if( rc==SQLITE_OK ){
+ rc = sessionChangesetApply(db, pIter, xFilter, xConflict, pCtx);
+ }
+ return rc;
+}
+
/*
** This function is called to merge two changes to the same row together as
** part of an sqlite3changeset_concat() operation. A new change object is
Tcl_Obj *pFilterScript;
};
+typedef struct TestStreamInput TestStreamInput;
+struct TestStreamInput {
+ int nStream; /* Maximum chunk size */
+ unsigned char *aData; /* Pointer to buffer containing data */
+ int nData; /* Size of buffer aData in bytes */
+ int iData; /* Bytes of data already read by sessions */
+};
+
#define SESSION_STREAM_TCL_VAR "sqlite3session_streams"
/*
** Attempt to find the global variable zVar within interpreter interp
-** and extract a boolean value from it. Return this value.
+** and extract an integer value from it. Return this value.
**
** If the named variable cannot be found, or if it cannot be interpreted
-** as a boolean, return 0.
+** as a integer, return 0.
*/
-static int test_tcl_boolean(Tcl_Interp *interp, const char *zVar){
+static int test_tcl_integer(Tcl_Interp *interp, const char *zVar){
Tcl_Obj *pObj;
- int bVal = 0;
+ int iVal = 0;
pObj = Tcl_ObjGetVar2(interp, Tcl_NewStringObj(zVar, -1), 0, TCL_GLOBAL_ONLY);
- if( pObj ) Tcl_GetBooleanFromObj(0, pObj, &bVal);
- return bVal;
+ if( pObj ) Tcl_GetIntFromObj(0, pObj, &iVal);
+ return iVal;
}
static int test_session_error(Tcl_Interp *interp, int rc){
case 7: /* patchset */
case 1: { /* changeset */
TestSessionsBlob o = {0, 0};
- if( test_tcl_boolean(interp, SESSION_STREAM_TCL_VAR) ){
+ if( test_tcl_integer(interp, SESSION_STREAM_TCL_VAR) ){
void *pCtx = (void*)&o;
if( iSub==7 ){
rc = sqlite3session_patchset_str(pSession, testSessionsOutput, pCtx);
return SQLITE_CHANGESET_OMIT;
}
+static int testStreamInput(
+ void *pCtx, /* Context pointer */
+ void *pData, /* Buffer to populate */
+ int *pnData /* IN/OUT: Bytes requested/supplied */
+){
+ TestStreamInput *p = (TestStreamInput*)pCtx;
+ int nReq = *pnData; /* Bytes of data requested */
+ int nRem = p->nData - p->iData; /* Bytes of data available */
+ int nRet = p->nStream; /* Bytes actually returned */
+
+ if( nRet>nReq ) nRet = nReq;
+ if( nRet>nRem ) nRet = nRem;
+
+ assert( nRet>=0 );
+ if( nRet>0 ){
+ memcpy(pData, &p->aData[p->iData], nRet);
+ p->iData += nRet;
+ }
+
+ *pnData = nRet;
+ return SQLITE_OK;
+}
+
+
/*
** sqlite3changeset_apply DB CHANGESET CONFLICT-SCRIPT ?FILTER-SCRIPT?
*/
void *pChangeset; /* Buffer containing changeset */
int nChangeset; /* Size of buffer aChangeset in bytes */
TestConflictHandler ctx;
+ TestStreamInput sStr;
+
+ memset(&sStr, 0, sizeof(sStr));
+ sStr.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);
if( objc!=4 && objc!=5 ){
Tcl_WrongNumArgs(interp, 1, objv,
ctx.pFilterScript = objc==5 ? objv[4] : 0;
ctx.interp = interp;
- rc = sqlite3changeset_apply(db, nChangeset, pChangeset,
- (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx
- );
+ if( sStr.nStream==0 ){
+ rc = sqlite3changeset_apply(db, nChangeset, pChangeset,
+ (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx
+ );
+ }else{
+ sStr.aData = (unsigned char*)pChangeset;
+ sStr.nData = nChangeset;
+ rc = sqlite3changeset_apply_str(db, testStreamInput, (void*)&sStr,
+ (objc==5) ? test_filter_handler : 0, test_conflict_handler, (void *)&ctx
+ );
+ }
+
if( rc!=SQLITE_OK ){
return test_session_error(interp, rc);
}
){
int rc; /* Return code from changeset_invert() */
void *aChangeset; /* Input changeset */
- int nChangeSet; /* Size of buffer aChangeset in bytes */
+ int nChangeset; /* Size of buffer aChangeset in bytes */
void *aOut; /* Output changeset */
int nOut; /* Size of buffer aOut in bytes */
Tcl_WrongNumArgs(interp, 1, objv, "CHANGESET");
return TCL_ERROR;
}
- aChangeset = (void *)Tcl_GetByteArrayFromObj(objv[1], &nChangeSet);
+ aChangeset = (void *)Tcl_GetByteArrayFromObj(objv[1], &nChangeset);
- rc = sqlite3changeset_invert(nChangeSet, aChangeset, &nOut, &aOut);
+ rc = sqlite3changeset_invert(nChangeset, aChangeset, &nOut, &aOut);
if( rc!=SQLITE_OK ){
return test_session_error(interp, rc);
}
int objc,
Tcl_Obj *CONST objv[]
){
- void *pChangeSet;
- int nChangeSet;
+ void *pChangeset;
+ int nChangeset;
sqlite3_changeset_iter *pIter;
int rc;
Tcl_Obj *pVarname;
Tcl_Obj *pScript;
int isCheckNext = 0;
+ TestStreamInput sStr;
+ memset(&sStr, 0, sizeof(sStr));
+
if( objc>1 ){
char *zOpt = Tcl_GetString(objv[1]);
isCheckNext = (strcmp(zOpt, "-next")==0);
pCS = objv[2+isCheckNext];
pScript = objv[3+isCheckNext];
- pChangeSet = (void *)Tcl_GetByteArrayFromObj(pCS, &nChangeSet);
- rc = sqlite3changeset_start(&pIter, nChangeSet, pChangeSet);
+ pChangeset = (void *)Tcl_GetByteArrayFromObj(pCS, &nChangeset);
+ sStr.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);
+ if( sStr.nStream==0 ){
+ rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
+ }else{
+ sStr.aData = (unsigned char*)pChangeset;
+ sStr.nData = nChangeset;
+ rc = sqlite3changeset_start_str(&pIter, testStreamInput, (void*)&sStr);
+ }
if( rc!=SQLITE_OK ){
return test_session_error(interp, rc);
}