int i;
u8 op;
- assert( paRec==0 || p->in.xInput==0 ); /* fixme! */
assert( (paRec==0 && pnRec==0) || (paRec && pnRec) );
/* If the iterator is in the error-state, return immediately. */
return (p->rc = SQLITE_CORRUPT_BKPT);
}
- if( paRec ){ *paRec = &p->in.aData[p->in.iNext]; }
-
- /* If this is an UPDATE or DELETE, read the old.* record. */
- if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
- u8 *abPK = p->bPatchset ? p->abPK : 0;
- p->rc = sessionReadRecord(&p->in, p->nCol, abPK, paRec?0:p->apValue);
+ if( paRec ){
+ int nVal; /* Number of values to buffer */
+ if( p->bPatchset==0 && op==SQLITE_UPDATE ){
+ nVal = p->nCol * 2;
+ }else if( p->bPatchset && op==SQLITE_DELETE ){
+ nVal = 0;
+ for(i=0; i<p->nCol; i++) if( p->abPK[i] ) nVal++;
+ }else{
+ nVal = p->nCol;
+ }
+ p->rc = sessionChangesetBufferRecord(&p->in, nVal, pnRec);
if( p->rc!=SQLITE_OK ) return p->rc;
- }
+ *paRec = &p->in.aData[p->in.iNext];
+ p->in.iNext += *pnRec;
+ }else{
- /* If this is an INSERT or UPDATE, read the new.* record. */
- if( p->op!=SQLITE_DELETE ){
- sqlite3_value **apOut = (paRec ? 0 : &p->apValue[p->nCol]);
- p->rc = sessionReadRecord(&p->in, p->nCol, 0, apOut);
- if( p->rc!=SQLITE_OK ) return p->rc;
- }
+ /* If this is an UPDATE or DELETE, read the old.* record. */
+ if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
+ u8 *abPK = p->bPatchset ? p->abPK : 0;
+ p->rc = sessionReadRecord(&p->in, p->nCol, abPK, p->apValue);
+ if( p->rc!=SQLITE_OK ) return p->rc;
+ }
- if( pnRec ){
- *pnRec = (int)(&p->in.aData[p->in.iNext] - *paRec);
- }else if( p->bPatchset && p->op==SQLITE_UPDATE ){
- /* If this is an UPDATE that is part of a patchset, then all PK and
- ** modified fields are present in the new.* record. The old.* record
- ** is currently completely empty. This block shifts the PK fields from
- ** new.* to old.*, to accommodate the code that reads these arrays. */
- int i;
- for(i=0; i<p->nCol; i++){
- assert( p->apValue[i]==0 );
- assert( p->abPK[i]==0 || p->apValue[i+p->nCol] );
- if( p->abPK[i] ){
- p->apValue[i] = p->apValue[i+p->nCol];
- p->apValue[i+p->nCol] = 0;
+ /* If this is an INSERT or UPDATE, read the new.* record. */
+ if( p->op!=SQLITE_DELETE ){
+ p->rc = sessionReadRecord(&p->in, p->nCol, 0, &p->apValue[p->nCol]);
+ if( p->rc!=SQLITE_OK ) return p->rc;
+ }
+
+ if( p->bPatchset && p->op==SQLITE_UPDATE ){
+ /* If this is an UPDATE that is part of a patchset, then all PK and
+ ** modified fields are present in the new.* record. The old.* record
+ ** is currently completely empty. This block shifts the PK fields from
+ ** new.* to old.*, to accommodate the code that reads these arrays. */
+ int i;
+ for(i=0; i<p->nCol; i++){
+ assert( p->apValue[i]==0 );
+ assert( p->abPK[i]==0 || p->apValue[i+p->nCol] );
+ if( p->abPK[i] ){
+ p->apValue[i] = p->apValue[i+p->nCol];
+ p->apValue[i+p->nCol] = 0;
+ }
}
}
}
** callback by changeset_apply().
*/
int sqlite3changeset_finalize(sqlite3_changeset_iter *p){
- int i; /* Used to iterate through p->apValue[] */
- int rc = p->rc; /* Return code */
- if( p->apValue ){
- for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]);
+ int rc = SQLITE_OK;
+ if( p ){
+ int i; /* Used to iterate through p->apValue[] */
+ rc = p->rc;
+ if( p->apValue ){
+ for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]);
+ }
+ sqlite3_free(p->tblhdr.aBuf);
+ sqlite3_free(p->in.buf.aBuf);
+ sqlite3_free(p);
}
- sqlite3_free(p->tblhdr.aBuf);
- sqlite3_free(p->in.buf.aBuf);
- sqlite3_free(p);
return rc;
}
SessionChange *pNew = 0;
if( !pExist ){
- pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange));
+ pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange) + nRec);
if( !pNew ){
return SQLITE_NOMEM;
}
pNew->op = op2;
pNew->bIndirect = bIndirect;
pNew->nRecord = nRec;
- pNew->aRecord = aRec;
+ pNew->aRecord = (u8*)&pNew[1];
+ memcpy(pNew->aRecord, aRec, nRec);
}else{
int op1 = pExist->op;
** Add all changes in the changeset passed via the first two arguments to
** hash tables.
*/
-static int sessionConcatChangeset(
- int bPatchset, /* True to expect patchsets */
- int nChangeset, /* Number of bytes in pChangeset */
- void *pChangeset, /* Changeset buffer */
+static int sessionAddChangeset(
+ sqlite3_changeset_iter *pIter, /* Iterator to read from */
SessionTable **ppTabList /* IN/OUT: List of table objects */
){
u8 *aRec;
int nRec;
- sqlite3_changeset_iter *pIter;
- int rc;
+ int rc = SQLITE_OK;
SessionTable *pTab = 0;
- rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
- if( rc!=SQLITE_OK ) return rc;
-
while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec) ){
const char *zNew;
int nCol;
SessionChange *pExist = 0;
SessionChange **pp;
+#if 0
assert( bPatchset==0 || bPatchset==1 );
assert( pIter->bPatchset==0 || pIter->bPatchset==1 );
if( pIter->bPatchset!=bPatchset ){
rc = SQLITE_ERROR;
break;
}
+#endif
sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect);
if( !pTab || sqlite3_stricmp(zNew, pTab->zName) ){
}
}
- if( sessionGrowHash(bPatchset, pTab) ){
+ if( sessionGrowHash(pIter->bPatchset, pTab) ){
rc = SQLITE_NOMEM;
break;
}
iHash = sessionChangeHash(
- pTab, (bPatchset && op==SQLITE_DELETE), aRec, pTab->nChange
+ pTab, (pIter->bPatchset && op==SQLITE_DELETE), aRec, pTab->nChange
);
/* Search for existing entry. If found, remove it from the hash table.
for(pp=&pTab->apChange[iHash]; *pp; pp=&(*pp)->pNext){
int bPkOnly1 = 0;
int bPkOnly2 = 0;
- if( bPatchset ){
+ if( pIter->bPatchset ){
bPkOnly1 = (*pp)->op==SQLITE_DELETE;
bPkOnly2 = op==SQLITE_DELETE;
}
}
rc = sessionChangeMerge(pTab,
- bPatchset, pExist, op, bIndirect, aRec, nRec, &pChange
+ pIter->bPatchset, pExist, op, bIndirect, aRec, nRec, &pChange
);
if( rc ) break;
if( pChange ){
}
}
- if( rc==SQLITE_OK ){
- rc = sqlite3changeset_finalize(pIter);
- }else{
- sqlite3changeset_finalize(pIter);
- }
+ if( rc==SQLITE_OK ) rc = pIter->rc;
return rc;
}
-
/*
** 1. Iterate through the left-hand changeset. Add an entry to a table
** specific hash table for each change in the changeset. The hash table
**
** 3. Write an output changeset based on the contents of the hash table.
*/
-int sqlite3changeset_concat(
- int nLeft, /* Number of bytes in lhs input */
- void *pLeft, /* Lhs input changeset */
- int nRight /* Number of bytes in rhs input */,
- void *pRight, /* Rhs input changeset */
- int *pnOut, /* OUT: Number of bytes in output changeset */
- void **ppOut /* OUT: changeset (left <concat> right) */
+int sessionChangesetConcat(
+ sqlite3_changeset_iter *pLeft,
+ sqlite3_changeset_iter *pRight,
+ int (*xOutput)(void *pOut, const void *pData, int nData),
+ void *pOut,
+ int *pnOut,
+ void **ppOut
){
SessionTable *pList = 0; /* List of SessionTable objects */
int rc; /* Return code */
int bPatch; /* True for a patchset */
- *pnOut = 0;
- *ppOut = 0;
- bPatch = (nLeft>0 && *(char*)pLeft=='P') || (nRight>0 && *(char*)pRight=='P');
+ assert( xOutput==0 || (ppOut==0 && pnOut==0) );
- rc = sessionConcatChangeset(bPatch, nLeft, pLeft, &pList);
+ rc = sessionAddChangeset(pLeft, &pList);
if( rc==SQLITE_OK ){
- rc = sessionConcatChangeset(bPatch, nRight, pRight, &pList);
+ rc = sessionAddChangeset(pRight, &pList);
}
+ bPatch = pLeft->bPatchset || pRight->bPatchset;
/* Create the serialized output changeset based on the contents of the
** hash tables attached to the SessionTable objects in list pList.
if( rc==SQLITE_OK ){
SessionTable *pTab;
SessionBuffer buf = {0, 0, 0};
- for(pTab=pList; pTab; pTab=pTab->pNext){
+ for(pTab=pList; pTab && rc==SQLITE_OK; pTab=pTab->pNext){
int i;
if( pTab->nEntry==0 ) continue;
sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc);
}
}
+
+ if( rc==SQLITE_OK && xOutput && buf.nBuf>=SESSIONS_STR_CHUNK_SIZE ){
+ rc = xOutput(pOut, buf.aBuf, buf.nBuf);
+ buf.nBuf = 0;
+ }
}
if( rc==SQLITE_OK ){
- *ppOut = buf.aBuf;
- *pnOut = buf.nBuf;
- }else{
- sqlite3_free(buf.aBuf);
+ if( xOutput ){
+ if( buf.nBuf>0 ) rc = xOutput(pOut, buf.aBuf, buf.nBuf);
+ }else{
+ *ppOut = buf.aBuf;
+ *pnOut = buf.nBuf;
+ buf.aBuf = 0;
+ }
}
+ sqlite3_free(buf.aBuf);
}
sessionDeleteTable(pList);
return rc;
}
+/*
+** Combine two changesets together.
+*/
+int sqlite3changeset_concat(
+ int nLeft, /* Number of bytes in lhs input */
+ void *pLeft, /* Lhs input changeset */
+ int nRight /* Number of bytes in rhs input */,
+ void *pRight, /* Rhs input changeset */
+ int *pnOut, /* OUT: Number of bytes in output changeset */
+ void **ppOut /* OUT: changeset (left <concat> right) */
+){
+ sqlite3_changeset_iter *pIter1 = 0;
+ sqlite3_changeset_iter *pIter2 = 0;
+ int rc;
+
+ *pnOut = 0;
+ *ppOut = 0;
+ rc = sqlite3changeset_start(&pIter1, nLeft, pLeft);
+ if( rc==SQLITE_OK ){
+ rc = sqlite3changeset_start(&pIter2, nRight, pRight);
+ }
+ if( rc==SQLITE_OK ){
+ rc = sessionChangesetConcat(pIter1, pIter2, 0, 0, pnOut, ppOut);
+ }
+
+ sqlite3changeset_finalize(pIter1);
+ sqlite3changeset_finalize(pIter2);
+ return rc;
+}
+
+/*
+** Streaming version of sqlite3changeset_concat().
+*/
+int sqlite3changeset_concat_str(
+ int (*xInputA)(void *pIn, void *pData, int *pnData),
+ void *pInA,
+ int (*xInputB)(void *pIn, void *pData, int *pnData),
+ void *pInB,
+ int (*xOutput)(void *pOut, const void *pData, int nData),
+ void *pOut
+){
+ sqlite3_changeset_iter *pIter1 = 0;
+ sqlite3_changeset_iter *pIter2 = 0;
+ int rc;
+
+ rc = sqlite3changeset_start_str(&pIter1, xInputA, pInA);
+ if( rc==SQLITE_OK ){
+ rc = sqlite3changeset_start_str(&pIter2, xInputB, pInB);
+ }
+ if( rc==SQLITE_OK ){
+ rc = sessionChangesetConcat(pIter1, pIter2, xOutput, pOut, 0, 0);
+ }
+
+ sqlite3changeset_finalize(pIter1);
+ sqlite3changeset_finalize(pIter2);
+ return rc;
+}
+
#endif /* SQLITE_ENABLE_SESSION && SQLITE_ENABLE_PREUPDATE_HOOK */
Tcl_Obj *CONST objv[]
){
int rc; /* Return code from changeset_invert() */
- void *aLeft; /* Input changeset */
- int nLeft; /* Size of buffer aChangeset in bytes */
- void *aRight; /* Input changeset */
- int nRight; /* Size of buffer aChangeset in bytes */
- void *aOut; /* Output changeset */
- int nOut; /* Size of buffer aOut in bytes */
+
+ TestStreamInput sLeft; /* Input stream */
+ TestStreamInput sRight; /* Input stream */
+ TestSessionsBlob sOut = {0,0}; /* Output blob */
if( objc!=3 ){
Tcl_WrongNumArgs(interp, 1, objv, "LEFT RIGHT");
return TCL_ERROR;
}
- aLeft = (void *)Tcl_GetByteArrayFromObj(objv[1], &nLeft);
- aRight = (void *)Tcl_GetByteArrayFromObj(objv[2], &nRight);
- rc = sqlite3changeset_concat(nLeft, aLeft, nRight, aRight, &nOut, &aOut);
+ memset(&sLeft, 0, sizeof(sLeft));
+ memset(&sRight, 0, sizeof(sRight));
+ sLeft.aData = Tcl_GetByteArrayFromObj(objv[1], &sLeft.nData);
+ sRight.aData = Tcl_GetByteArrayFromObj(objv[2], &sRight.nData);
+ sLeft.nStream = test_tcl_integer(interp, SESSION_STREAM_TCL_VAR);
+ sRight.nStream = sLeft.nStream;
+
+ if( sLeft.nStream>0 ){
+ rc = sqlite3changeset_concat_str(
+ testStreamInput, (void*)&sLeft,
+ testStreamInput, (void*)&sRight,
+ testSessionsOutput, (void*)&sOut
+ );
+ }else{
+ rc = sqlite3changeset_concat(
+ sLeft.nData, sLeft.aData, sRight.nData, sRight.aData, &sOut.n, &sOut.p
+ );
+ }
+
if( rc!=SQLITE_OK ){
- return test_session_error(interp, rc);
+ rc = test_session_error(interp, rc);
+ }else{
+ Tcl_SetObjResult(interp,Tcl_NewByteArrayObj((unsigned char*)sOut.p,sOut.n));
}
- Tcl_SetObjResult(interp, Tcl_NewByteArrayObj((unsigned char *)aOut, nOut));
- sqlite3_free(aOut);
- return TCL_OK;
+ sqlite3_free(sOut.p);
+ return rc;
}
/*