if( aBuf ) aBuf[0] = '\0';
}
- *pnWrite += nByte;
+ if( pnWrite ) *pnWrite += nByte;
return SQLITE_OK;
}
+
/*
** This macro is used to calculate hash key values for data structures. In
** order to use this macro, the entire data structure must be represented
return (*pRc!=SQLITE_OK);
}
+/*
+** Append the value passed as the second argument to the buffer passed
+** as the first.
+**
+** This function is a no-op if *pRc is non-zero when it is called.
+** Otherwise, if an error occurs, *pRc is set to an SQLite error code
+** before returning.
+*/
+static void sessionAppendValue(SessionBuffer *p, sqlite3_value *pVal, int *pRc){
+ int rc = *pRc;
+ if( rc==SQLITE_OK ){
+ int nByte = 0;
+ sessionSerializeValue(0, pVal, &nByte);
+ sessionBufferGrow(p, nByte, &rc);
+ if( rc==SQLITE_OK ){
+ rc = sessionSerializeValue(&p->aBuf[p->nBuf], pVal, 0);
+ p->nBuf += nByte;
+ }else{
+ *pRc = rc;
+ }
+ }
+}
+
/*
** This function is a no-op if *pRc is other than SQLITE_OK when it is
** called. Otherwise, append a single byte to the buffer.
return rc;
}
+/*
+** The input pointer currently points to the first byte of the first field
+** of a record consisting of nCol columns. This function ensures the entire
+** record is buffered.
+*/
+static int sessionChangesetBufferRecord(
+ SessionInput *pIn,
+ int nCol,
+ int *pnByte
+){
+ int rc = SQLITE_OK;
+ int nByte = 0;
+ int i;
+ for(i=0; rc==SQLITE_OK && i<nCol; i++){
+ int eType;
+ rc = sessionInputBuffer(pIn, nByte + 10);
+ if( rc==SQLITE_OK ){
+ eType = pIn->aData[pIn->iNext + nByte++];
+ if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){
+ int n;
+ nByte += sessionVarintGet(&pIn->aData[pIn->iNext+nByte], &n);
+ nByte += n;
+ rc = sessionInputBuffer(pIn, nByte);
+ }else if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){
+ nByte += 8;
+ }
+ }
+ }
+ *pnByte = nByte;
+ return rc;
+}
+
/*
** The input pointer currently points to the second byte of a table-header.
** Specifically, to the following:
return rc;
}
-/*
-** Invert a changeset object.
-*/
-int sqlite3changeset_invert(
- int nChangeset, /* Number of bytes in input */
- const void *pChangeset, /* Input changeset */
+static int sessionChangesetInvert(
+ SessionInput *pInput, /* Input changeset */
+ int (*xOutput)(void *pOut, const void *pData, int nData),
+ void *pOut,
int *pnInverted, /* OUT: Number of bytes in output changeset */
void **ppInverted /* OUT: Inverse of pChangeset */
){
int rc = SQLITE_OK; /* Return value */
- u8 *aOut;
- u8 *aIn;
- int i;
- SessionInput sInput;
+ SessionBuffer sOut; /* Output buffer */
int nCol = 0; /* Number of cols in current table */
u8 *abPK = 0; /* PK array for current table */
sqlite3_value **apVal = 0; /* Space for values for UPDATE inversion */
SessionBuffer sPK = {0, 0, 0}; /* PK array for current table */
+ /* Initialize the output buffer */
+ memset(&sOut, 0, sizeof(SessionBuffer));
+
/* Zero the output variables in case an error occurs. */
- *ppInverted = 0;
- *pnInverted = 0;
- if( nChangeset==0 ) return SQLITE_OK;
+ if( ppInverted ){
+ *ppInverted = 0;
+ *pnInverted = 0;
+ }
- /* Set up the input stream */
- memset(&sInput, 0, sizeof(SessionInput));
- sInput.nData = nChangeset;
- sInput.aData = (u8*)pChangeset;
+ while( 1 ){
+ u8 eType;
- aOut = (u8 *)sqlite3_malloc(nChangeset);
- if( !aOut ) return SQLITE_NOMEM;
- aIn = (u8 *)pChangeset;
+ /* Test for EOF. */
+ if( (rc = sessionInputBuffer(pInput, 2)) ) goto finished_invert;
+ if( pInput->iNext>=pInput->nData ) break;
+ eType = pInput->aData[pInput->iNext];
- i = 0;
- while( i<nChangeset ){
- u8 eType;
- if( (rc = sessionInputBuffer(&sInput, 2)) ) goto finished_invert;
- eType = sInput.aData[sInput.iNext];
switch( eType ){
case 'T': {
/* A 'table' record consists of:
** * A nul-terminated table name.
*/
int nByte;
- int nVarint;
- int iNext = sInput.iNext;
- sInput.iNext++;
- if( (rc = sessionChangesetBufferTblhdr(&sInput, &nByte)) ){
+ int nVar;
+ pInput->iNext++;
+ if( (rc = sessionChangesetBufferTblhdr(pInput, &nByte)) ){
goto finished_invert;
}
- nVarint = sessionVarintGet(&sInput.aData[iNext+1], &nCol);
+ nVar = sessionVarintGet(&pInput->aData[pInput->iNext], &nCol);
sPK.nBuf = 0;
- sessionAppendBlob(&sPK, &sInput.aData[iNext+1+nVarint], nCol, &rc);
+ sessionAppendBlob(&sPK, &pInput->aData[pInput->iNext+nVar], nCol, &rc);
+ sessionAppendByte(&sOut, eType, &rc);
+ sessionAppendBlob(&sOut, &pInput->aData[pInput->iNext], nByte, &rc);
if( rc ) goto finished_invert;
- sInput.iNext += nByte;
- memcpy(&aOut[i], &sInput.aData[iNext], nByte+1);
- i += nByte+1;
+
+ pInput->iNext += nByte;
sqlite3_free(apVal);
apVal = 0;
abPK = sPK.aBuf;
case SQLITE_INSERT:
case SQLITE_DELETE: {
- int iStart;
int nByte;
- sInput.iNext += 2;
- iStart = sInput.iNext;
- sessionReadRecord(&sInput, nCol, 0, 0);
- aOut[i] = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE);
- aOut[i+1] = aIn[i+1]; /* indirect-flag */
- nByte = sInput.iNext - iStart;
- memcpy(&aOut[i+2], &sInput.aData[iStart], nByte);
- i += 2 + nByte;
+ int bIndirect = pInput->aData[pInput->iNext+1];
+ int eType2 = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE);
+ pInput->iNext += 2;
+ assert( rc==SQLITE_OK );
+ rc = sessionChangesetBufferRecord(pInput, nCol, &nByte);
+ sessionAppendByte(&sOut, eType2, &rc);
+ sessionAppendByte(&sOut, bIndirect, &rc);
+ sessionAppendBlob(&sOut, &pInput->aData[pInput->iNext], nByte, &rc);
+ pInput->iNext += nByte;
+ if( rc ) goto finished_invert;
break;
}
case SQLITE_UPDATE: {
int iCol;
- int nWrite = 0;
if( 0==apVal ){
apVal = (sqlite3_value **)sqlite3_malloc(sizeof(apVal[0])*nCol*2);
}
/* Write the header for the new UPDATE change. Same as the original. */
- aOut[i] = SQLITE_UPDATE;
- aOut[i+1] = sInput.aData[sInput.iNext+1];
- nWrite = 2;
+ sessionAppendByte(&sOut, eType, &rc);
+ sessionAppendByte(&sOut, pInput->aData[pInput->iNext+1], &rc);
/* Read the old.* and new.* records for the update change. */
- sInput.iNext += 2;
- rc = sessionReadRecord(&sInput, nCol, 0, &apVal[0]);
+ pInput->iNext += 2;
+ rc = sessionReadRecord(pInput, nCol, 0, &apVal[0]);
if( rc==SQLITE_OK ){
- rc = sessionReadRecord(&sInput, nCol, 0, &apVal[nCol]);
+ rc = sessionReadRecord(pInput, nCol, 0, &apVal[nCol]);
}
/* Write the new old.* record. Consists of the PK columns from the
** new.* record. */
for(iCol=0; rc==SQLITE_OK && iCol<nCol; iCol++){
sqlite3_value *pVal = apVal[iCol + (abPK[iCol] ? 0 : nCol)];
- rc = sessionSerializeValue(&aOut[i+nWrite], pVal, &nWrite);
+ sessionAppendValue(&sOut, pVal, &rc);
}
/* Write the new new.* record. Consists of a copy of all values
** are set to "undefined". */
for(iCol=0; rc==SQLITE_OK && iCol<nCol; iCol++){
sqlite3_value *pVal = (abPK[iCol] ? 0 : apVal[iCol]);
- rc = sessionSerializeValue(&aOut[i+nWrite], pVal, &nWrite);
+ sessionAppendValue(&sOut, pVal, &rc);
}
for(iCol=0; iCol<nCol*2; iCol++){
goto finished_invert;
}
- i += nWrite;
- assert( i==sInput.iNext );
break;
}
rc = SQLITE_CORRUPT_BKPT;
goto finished_invert;
}
+
+ assert( rc==SQLITE_OK );
+ if( xOutput && sOut.nBuf>=SESSIONS_STR_CHUNK_SIZE ){
+ rc = xOutput(pOut, sOut.aBuf, sOut.nBuf);
+ sOut.nBuf = 0;
+ if( rc!=SQLITE_OK ) goto finished_invert;
+ }
}
assert( rc==SQLITE_OK );
- *pnInverted = nChangeset;
- *ppInverted = (void *)aOut;
+ if( pnInverted ){
+ *pnInverted = sOut.nBuf;
+ *ppInverted = sOut.aBuf;
+ sOut.aBuf = 0;
+ }else if( sOut.nBuf>0 ){
+ rc = xOutput(pOut, sOut.aBuf, sOut.nBuf);
+ }
finished_invert:
- if( rc!=SQLITE_OK ){
- sqlite3_free(aOut);
- }
+ sqlite3_free(sOut.aBuf);
sqlite3_free(apVal);
sqlite3_free(sPK.aBuf);
return rc;
}
+
+/*
+** Invert a changeset object.
+*/
+int sqlite3changeset_invert(
+ int nChangeset, /* Number of bytes in input */
+ const void *pChangeset, /* Input changeset */
+ int *pnInverted, /* OUT: Number of bytes in output changeset */
+ void **ppInverted /* OUT: Inverse of pChangeset */
+){
+ SessionInput sInput;
+
+ /* Set up the input stream */
+ memset(&sInput, 0, sizeof(SessionInput));
+ sInput.nData = nChangeset;
+ sInput.aData = (u8*)pChangeset;
+
+ return sessionChangesetInvert(&sInput, 0, 0, pnInverted, ppInverted);
+}
+
+/*
+** Streaming version of sqlite3changeset_invert().
+*/
+int sqlite3changeset_invert_str(
+ int (*xInput)(void *pIn, void *pData, int *pnData),
+ void *pIn,
+ int (*xOutput)(void *pOut, const void *pData, int nData),
+ void *pOut
+){
+ SessionInput sInput;
+ int rc;
+
+ /* Set up the input stream */
+ memset(&sInput, 0, sizeof(SessionInput));
+ sInput.xInput = xInput;
+ sInput.pIn = pIn;
+
+ rc = sessionChangesetInvert(&sInput, xOutput, pOut, 0, 0);
+ sqlite3_free(sInput.buf.aBuf);
+ return rc;
+}
+
typedef struct SessionApplyCtx SessionApplyCtx;
struct SessionApplyCtx {
sqlite3 *db;