** Followed by one or more changes to the table.
**
** 1 byte: Either SQLITE_INSERT, UPDATE or DELETE.
+** 1 byte: The "indirect-change" flag.
** old.* record: (delete and update only)
** new.* record: (insert and update only)
*/
}
/*
-** Based on the primary key values stored in change pChange, calculate a
+** Based on the primary key values stored in change aRecord, calculate a
** hash key, assuming the has table has nBucket buckets. The hash keys
** calculated by this function are compatible with those calculated by
** sessionPreupdateHash().
*/
static unsigned int sessionChangeHash(
- sqlite3 *db, /* Database handle */
SessionTable *pTab, /* Table handle */
- SessionChange *pChange, /* Change handle */
+ u8 *aRecord, /* Change record */
int nBucket /* Assume this many buckets in hash table */
){
unsigned int h = 0; /* Value to return */
int i; /* Used to iterate through columns */
- u8 *a = pChange->aRecord; /* Used to iterate through change record */
+ u8 *a = aRecord; /* Used to iterate through change record */
for(i=0; i<pTab->nCol; i++){
int eType = *a++;
return (h % nBucket);
}
+static int sessionSerialLen(u8 *a){
+ int e = *a;
+ int n;
+ if( e==0 ) return 1;
+ if( e==SQLITE_NULL ) return 1;
+ if( e==SQLITE_INTEGER || e==SQLITE_FLOAT ) return 9;
+ return sessionVarintGet(&a[1], &n) + 1 + n;
+}
+
+static int sessionChangeEqual(
+ SessionTable *pTab,
+ u8 *aLeft, /* Change record */
+ u8 *aRight /* Change record */
+){
+ u8 *a1 = aLeft;
+ u8 *a2 = aRight;
+ int i;
+
+ for(i=0; i<pTab->nCol; i++){
+ int n1 = sessionSerialLen(a1);
+ int n2 = sessionSerialLen(a2);
+
+ if( pTab->abPK[i] && (n1!=n2 || memcmp(a1, a2, n1)) ){
+ return 0;
+ }
+ a1 += n1;
+ a2 += n1;
+ }
+
+ return 1;
+}
+
+static void sessionMergeRecord(
+ u8 **paOut,
+ SessionTable *pTab,
+ u8 *aLeft,
+ u8 *aRight
+){
+ u8 *a1 = aLeft;
+ u8 *a2 = aRight;
+ u8 *aOut = *paOut;
+ int i;
+
+ for(i=0; i<pTab->nCol; i++){
+ int n1 = sessionSerialLen(a1);
+ int n2 = sessionSerialLen(a2);
+ if( *a2 ){
+ memcpy(aOut, a2, n2);
+ aOut += n2;
+ }else{
+ memcpy(aOut, a1, n1);
+ aOut += n1;
+ }
+ a1 += n1;
+ a2 += n2;
+ }
+
+ *paOut = aOut;
+}
+
+static u8 *sessionMergeValue(
+ u8 **paOne,
+ u8 **paTwo,
+ int *pnVal
+){
+ u8 *a1 = *paOne;
+ u8 *a2 = *paTwo;
+ u8 *pRet = 0;
+ int n1;
+
+ assert( a1 );
+ if( a2 ){
+ int n2 = sessionSerialLen(a2);
+ if( *a2 ){
+ *pnVal = n2;
+ pRet = a2;
+ }
+ *paTwo = &a2[n2];
+ }
+
+ n1 = sessionSerialLen(a1);
+ if( pRet==0 ){
+ *pnVal = n1;
+ pRet = a1;
+ }
+ *paOne = &a1[n1];
+
+ return pRet;
+}
+
+static int sessionMergeUpdate(
+ u8 **paOut,
+ SessionTable *pTab,
+ u8 *aOldRecord1,
+ u8 *aOldRecord2,
+ u8 *aNewRecord1,
+ u8 *aNewRecord2
+){
+ u8 *aOld1 = aOldRecord1;
+ u8 *aOld2 = aOldRecord2;
+ u8 *aNew1 = aNewRecord1;
+ u8 *aNew2 = aNewRecord2;
+
+ u8 *aOut = *paOut;
+ int i;
+ int bRequired = 0;
+
+ assert( aOldRecord1 && aNewRecord1 );
+
+ /* Write the old.* vector first. */
+ for(i=0; i<pTab->nCol; i++){
+ int nOld;
+ u8 *aOld;
+ int nNew;
+ u8 *aNew;
+
+ aOld = sessionMergeValue(&aOld1, &aOld2, &nOld);
+ aNew = sessionMergeValue(&aNew1, &aNew2, &nNew);
+ if( pTab->abPK[i] || nOld!=nNew || memcmp(aOld, aNew, nNew) ){
+ if( pTab->abPK[i]==0 ) bRequired = 1;
+ memcpy(aOut, aOld, nOld);
+ aOut += nOld;
+ }else{
+ *(aOut++) = '\0';
+ }
+ }
+
+ if( !bRequired ) return 0;
+
+ /* Write the new.* vector */
+ aOld1 = aOldRecord1;
+ aOld2 = aOldRecord2;
+ aNew1 = aNewRecord1;
+ aNew2 = aNewRecord2;
+ for(i=0; i<pTab->nCol; i++){
+ int nOld;
+ u8 *aOld;
+ int nNew;
+ u8 *aNew;
+
+ aOld = sessionMergeValue(&aOld1, &aOld2, &nOld);
+ aNew = sessionMergeValue(&aNew1, &aNew2, &nNew);
+ if( pTab->abPK[i] || (nOld==nNew && 0==memcmp(aOld, aNew, nNew)) ){
+ *(aOut++) = '\0';
+ }else{
+ memcpy(aOut, aNew, nNew);
+ aOut += nNew;
+ }
+ }
+
+ *paOut = aOut;
+ return 1;
+}
+
static int sessionPreupdateEqual(
sqlite3 *db,
SessionTable *pTab,
** Growing the hash table in this case is a performance optimization only,
** it is not required for correct operation.
*/
-static int sessionGrowHash(sqlite3_session *pSession, SessionTable *pTab){
+static int sessionGrowHash(SessionTable *pTab){
if( pTab->nChange==0 || pTab->nEntry>=(pTab->nChange/2) ){
int i;
SessionChange **apNew;
apNew = (SessionChange **)sqlite3_malloc(sizeof(SessionChange *) * nNew);
if( apNew==0 ){
if( pTab->nChange==0 ){
- pSession->rc = SQLITE_NOMEM;
return SQLITE_ERROR;
}
return SQLITE_OK;
SessionChange *p;
SessionChange *pNext;
for(p=pTab->apChange[i]; p; p=pNext){
- int iHash = sessionChangeHash(pSession->db, pTab, p, nNew);
+ int iHash = sessionChangeHash(pTab, p->aRecord, nNew);
pNext = p->pNext;
p->pNext = apNew[iHash];
apNew[iHash] = p;
if( sessionInitTable(pSession, pTab) ) return;
/* Grow the hash table if required */
- if( sessionGrowHash(pSession, pTab) ) return;
+ if( sessionGrowHash(pTab) ){
+ pSession->rc = SQLITE_NOMEM;
+ return;
+ }
/* Search the hash table for an existing entry for rowid=iKey2. If
** one is found, store a pointer to it in pChange and unlink it from
}
if( pC==0 ){
/* Create a new change object containing all the old values (if
- ** this is an SQLITE_UPDATE or SQLITE_DELETE), or just the PK
- ** values (if this is an INSERT). */
+ ** this is an SQLITE_UPDATE or SQLITE_DELETE), or just the PK
+ ** values (if this is an INSERT). */
SessionChange *pChange; /* New change object */
int nByte; /* Number of bytes to allocate */
int i; /* Used to iterate through columns */
return SQLITE_OK;
}
+void sessionDeleteTable(SessionTable *pList){
+ SessionTable *pNext;
+ SessionTable *pTab;
+
+ for(pTab=pList; pTab; pTab=pNext){
+ int i;
+ pNext = pTab->pNext;
+ for(i=0; i<pTab->nChange; i++){
+ SessionChange *p;
+ SessionChange *pNext;
+ for(p=pTab->apChange[i]; p; p=pNext){
+ pNext = p->pNext;
+ sqlite3_free(p);
+ }
+ }
+ sqlite3_free((char*)pTab->azCol); /* cast works around VC++ bug */
+ sqlite3_free(pTab->apChange);
+ sqlite3_free(pTab);
+ }
+}
+
/*
** Delete a session object previously allocated using sqlite3session_create().
*/
/* Delete all attached table objects. And the contents of their
** associated hash-tables. */
- while( pSession->pTable ){
- int i;
- SessionTable *pTab = pSession->pTable;
- pSession->pTable = pTab->pNext;
- for(i=0; i<pTab->nChange; i++){
- SessionChange *p;
- SessionChange *pNext;
- for(p=pTab->apChange[i]; p; p=pNext){
- pNext = p->pNext;
- sqlite3_free(p);
- }
- }
- sqlite3_free((char*)pTab->azCol); /* cast works around VC++ bug */
- sqlite3_free(pTab->apChange);
- sqlite3_free(pTab);
- }
+ sessionDeleteTable(pSession->pTable);
/* Free the session object itself. */
sqlite3_free(pSession);
return rc;
}
+static void sessionAppendTableHdr(
+ SessionBuffer *pBuf,
+ SessionTable *pTab,
+ int *pRc
+){
+ /* Write a table header */
+ sessionAppendByte(pBuf, 'T', pRc);
+ sessionAppendVarint(pBuf, pTab->nCol, pRc);
+ sessionAppendBlob(pBuf, pTab->abPK, pTab->nCol, pRc);
+ sessionAppendBlob(pBuf, (u8 *)pTab->zName, strlen(pTab->zName)+1, pRc);
+}
+
/*
** Obtain a changeset object containing all changes recorded by the
** session object passed as the first argument.
}
/* Write a table header */
- sessionAppendByte(&buf, 'T', &rc);
- sessionAppendVarint(&buf, nCol, &rc);
- sessionAppendBlob(&buf, pTab->abPK, nCol, &rc);
- sessionAppendBlob(&buf, (u8 *)zName, sqlite3Strlen30(zName)+1, &rc);
+ sessionAppendTableHdr(&buf, pTab, &rc);
/* Build and compile a statement to execute: */
if( rc==SQLITE_OK ){
if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){
int nByte;
- int enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0);
aRec += sessionVarintGet(aRec, &nByte);
if( apOut ){
+ int enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0);
sqlite3ValueSetStr(apOut[i], nByte, aRec, enc, SQLITE_STATIC);
}
aRec += nByte;
return SQLITE_OK;
}
-/*
-** Advance an iterator created by sqlite3changeset_start() to the next
-** change in the changeset. This function may return SQLITE_ROW, SQLITE_DONE
-** or SQLITE_CORRUPT.
-**
-** This function may not be called on iterators passed to a conflict handler
-** callback by changeset_apply().
-*/
-int sqlite3changeset_next(sqlite3_changeset_iter *p){
+static int sessionChangesetNext(
+ sqlite3_changeset_iter *p,
+ u8 **paRec,
+ int *pnRec
+){
u8 *aChange;
int i;
u8 c;
+ assert( (paRec==0 && pnRec==0) || (paRec && pnRec) );
+
/* If the iterator is in the error-state, return immediately. */
if( p->rc!=SQLITE_OK ) return p->rc;
- /* Free the current contents of p->apValue[]. */
+ /* Free the current contents of p->apValue[], if any. */
if( p->apValue ){
for(i=0; i<p->nCol*2; i++){
sqlite3ValueFree(p->apValue[i]);
}
aChange = p->pNext;
- c = *(aChange++);
- if( c=='T' ){
+ if( aChange[0]=='T' ){
int nByte; /* Bytes to allocate for apValue */
+ aChange++;
aChange += sessionVarintGet(aChange, &p->nCol);
p->abPK = (u8 *)aChange;
aChange += p->nCol;
p->zTab = (char *)aChange;
aChange += (sqlite3Strlen30((char *)aChange) + 1);
- p->op = *(aChange++);
- p->bIndirect = *(aChange++);
- sqlite3_free(p->apValue);
- nByte = sizeof(sqlite3_value *) * p->nCol * 2;
- p->apValue = (sqlite3_value **)sqlite3_malloc(nByte);
- if( !p->apValue ){
- return (p->rc = SQLITE_NOMEM);
+
+ if( paRec==0 ){
+ sqlite3_free(p->apValue);
+ nByte = sizeof(sqlite3_value *) * p->nCol * 2;
+ p->apValue = (sqlite3_value **)sqlite3_malloc(nByte);
+ if( !p->apValue ){
+ return (p->rc = SQLITE_NOMEM);
+ }
+ memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2);
}
- memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2);
- }else{
- p->op = c;
- p->bIndirect = *(aChange++);
}
+
+ p->op = *(aChange++);
+ p->bIndirect = *(aChange++);
if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){
return (p->rc = SQLITE_CORRUPT);
}
+ if( paRec ){ *paRec = aChange; }
+
/* If this is an UPDATE or DELETE, read the old.* record. */
if( p->op!=SQLITE_INSERT ){
- p->rc = sessionReadRecord(&aChange, p->nCol, p->apValue);
+ p->rc = sessionReadRecord(&aChange, p->nCol, paRec?0:p->apValue);
if( p->rc!=SQLITE_OK ) return p->rc;
}
/* If this is an INSERT or UPDATE, read the new.* record. */
if( p->op!=SQLITE_DELETE ){
- p->rc = sessionReadRecord(&aChange, p->nCol, &p->apValue[p->nCol]);
+ p->rc = sessionReadRecord(&aChange, p->nCol, paRec?0:&p->apValue[p->nCol]);
if( p->rc!=SQLITE_OK ) return p->rc;
}
+ if( pnRec ){ *pnRec = aChange - *paRec; }
p->pNext = aChange;
return SQLITE_ROW;
}
+/*
+** Advance an iterator created by sqlite3changeset_start() to the next
+** change in the changeset. This function may return SQLITE_ROW, SQLITE_DONE
+** or SQLITE_CORRUPT.
+**
+** This function may not be called on iterators passed to a conflict handler
+** callback by changeset_apply().
+*/
+int sqlite3changeset_next(sqlite3_changeset_iter *p){
+ return sessionChangesetNext(p, 0, 0);
+
+}
+
/*
** The following function extracts information on the current change
** from a changeset iterator. They may only be called after changeset_next()
return rc;
}
+static int sessionChangeMerge(
+ SessionTable *pTab,
+ SessionChange *pExist,
+ int op2,
+ int bIndirect,
+ u8 *aRec,
+ int nRec,
+ SessionChange **ppNew
+){
+ SessionChange *pNew = 0;
+
+ if( !pExist ){
+ pNew = (SessionChange *)sqlite3_malloc(sizeof(SessionChange));
+ if( !pNew ){
+ return SQLITE_NOMEM;
+ }
+ memset(pNew, 0, sizeof(SessionChange));
+ pNew->bInsert = op2;
+ pNew->bIndirect = bIndirect;
+ pNew->nRecord = nRec;
+ pNew->aRecord = aRec;
+ }else{
+ int op1 = pExist->bInsert;
+
+ /*
+ ** op1=INSERT, op2=INSERT -> Unsupported. Discard op2.
+ ** op1=INSERT, op2=UPDATE -> INSERT.
+ ** op1=INSERT, op2=DELETE -> (none)
+ **
+ ** op1=UPDATE, op2=INSERT -> Unsupported. Discard op2.
+ ** op1=UPDATE, op2=UPDATE -> UPDATE.
+ ** op1=UPDATE, op2=DELETE -> DELETE.
+ **
+ ** op1=DELETE, op2=INSERT -> UPDATE.
+ ** op1=DELETE, op2=UPDATE -> Unsupported. Discard op2.
+ ** op1=DELETE, op2=DELETE -> Unsupported. Discard op2.
+ */
+ if( (op1==SQLITE_INSERT && op2==SQLITE_INSERT)
+ || (op1==SQLITE_UPDATE && op2==SQLITE_INSERT)
+ || (op1==SQLITE_DELETE && op2==SQLITE_UPDATE)
+ || (op1==SQLITE_DELETE && op2==SQLITE_DELETE)
+ ){
+ pNew = pExist;
+ }else if( op1==SQLITE_INSERT && op2==SQLITE_DELETE ){
+ sqlite3_free(pExist);
+ assert( pNew==0 );
+ }else{
+ int nByte;
+ u8 *aCsr;
+
+ nByte = sizeof(SessionChange) + pExist->nRecord + nRec;
+ pNew = (SessionChange *)sqlite3_malloc(nByte);
+ if( !pNew ){
+ return SQLITE_NOMEM;
+ }
+ memset(pNew, 0, sizeof(SessionChange));
+ pNew->bIndirect = (bIndirect && pExist->bIndirect);
+ aCsr = pNew->aRecord = (u8 *)&pNew[1];
+
+ if( op1==SQLITE_INSERT && op2==SQLITE_UPDATE ){
+ u8 *a1 = aRec;
+ pNew->bInsert = SQLITE_INSERT;
+ sessionReadRecord(&a1, pTab->nCol, 0);
+ sessionMergeRecord(&aCsr, pTab, pExist->aRecord, a1);
+ }
+ else if( op1==SQLITE_UPDATE && op2==SQLITE_UPDATE ){
+ u8 *a1 = pExist->aRecord;
+ u8 *a2 = aRec;
+ sessionReadRecord(&a1, pTab->nCol, 0);
+ sessionReadRecord(&a2, pTab->nCol, 0);
+ pNew->bInsert = SQLITE_UPDATE;
+ if( 0==sessionMergeUpdate(&aCsr, pTab, aRec, pExist->aRecord, a1, a2) ){
+ sqlite3_free(pNew);
+ pNew = 0;
+ }
+ }
+ else if( op1==SQLITE_UPDATE && op2==SQLITE_DELETE ){
+ pNew->bInsert = SQLITE_DELETE;
+ sessionMergeRecord(&aCsr, pTab, aRec, pExist->aRecord);
+ }
+ else if( op1==SQLITE_DELETE && op2==SQLITE_INSERT ){
+ pNew->bInsert = SQLITE_UPDATE;
+ if( 0==sessionMergeUpdate(&aCsr, pTab, pExist->aRecord, 0, aRec, 0) ){
+ sqlite3_free(pNew);
+ pNew = 0;
+ }
+ }
+
+ if( pNew ){
+ pNew->nRecord = (aCsr - pNew->aRecord);
+ }
+ sqlite3_free(pExist);
+ }
+ }
+
+ *ppNew = pNew;
+ return SQLITE_OK;
+}
+
+int sessionConcatChangeset(
+ int nChangeset,
+ void *pChangeset,
+ SessionTable **ppTabList
+){
+ u8 *aRec;
+ int nRec;
+ sqlite3_changeset_iter *pIter;
+ int rc;
+ SessionTable *pTab = 0;
+
+ rc = sqlite3changeset_start(&pIter, nChangeset, pChangeset);
+ if( rc!=SQLITE_OK ) return rc;
+
+ while( SQLITE_ROW==sessionChangesetNext(pIter, &aRec, &nRec) ){
+ const char *zNew;
+ int nCol;
+ int op;
+ int iHash;
+ int bIndirect;
+ SessionChange *pChange;
+ SessionChange *pExist = 0;
+ SessionChange **pp;
+
+ assert( pIter->apValue==0 );
+ sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect);
+
+ if( !pTab || zNew!=pTab->zName ){
+ /* Search the list for a matching table */
+ int nNew = strlen(zNew);
+ for(pTab = *ppTabList; pTab; pTab=pTab->pNext){
+ if( 0==sqlite3_strnicmp(pTab->zName, zNew, nNew+1) ) break;
+ }
+ if( !pTab ){
+ pTab = sqlite3_malloc(sizeof(SessionTable));
+ if( !pTab ) break;
+ memset(pTab, 0, sizeof(SessionTable));
+ pTab->pNext = *ppTabList;
+ *ppTabList = pTab;
+ }
+ pTab->zName = (char *)zNew;
+ pTab->nCol = nCol;
+ sqlite3changeset_pk(pIter, &pTab->abPK, 0);
+ }
+
+ if( sessionGrowHash(pTab) ) break;
+ iHash = sessionChangeHash(pTab, aRec, pTab->nChange);
+
+ /* Search for existing entry. If found, remove it from the hash table.
+ ** Code below may link it back in.
+ */
+ for(pp=&pTab->apChange[iHash]; *pp; pp=&(*pp)->pNext){
+ if( sessionChangeEqual(pTab, (*pp)->aRecord, aRec) ){
+ pExist = *pp;
+ *pp = (*pp)->pNext;
+ pTab->nEntry--;
+ break;
+ }
+ }
+
+ rc = sessionChangeMerge(pTab, pExist, op, bIndirect, aRec, nRec, &pChange);
+ if( rc ) break;
+ if( pChange ){
+ pChange->pNext = pTab->apChange[iHash];
+ pTab->apChange[iHash] = pChange;
+ pTab->nEntry++;
+ }
+ }
+
+ if( rc==SQLITE_OK ){
+ rc = sqlite3changeset_finalize(pIter);
+ }else{
+ sqlite3changeset_finalize(pIter);
+ }
+ return rc;
+}
+
+
+/*
+** 1. Iterate through the left-hand changeset. Add an entry to a table
+** specific hash table for each change in the changeset. The hash table
+** key is the PK of the row affected by the change.
+**
+** 2. Then interate through the right-hand changeset. Attempt to add an
+** entry to a hash table for each component change. If a change already
+** exists with the same PK values, combine the two into a single change.
+**
+** 3. Write an output changeset based on the contents of the hash table.
+*/
+int sqlite3changeset_concat(
+ int nLeft, /* Number of bytes in lhs input */
+ void *pLeft, /* Lhs input changeset */
+ int nRight /* Number of bytes in rhs input */,
+ void *pRight, /* Rhs input changeset */
+ int *pnOut, /* OUT: Number of bytes in output changeset */
+ void **ppOut /* OUT: changeset (left <concat> right) */
+){
+ SessionTable *pList = 0; /* List of SessionTable objects */
+ int rc; /* Return code */
+
+ *pnOut = 0;
+ *ppOut = 0;
+
+ rc = sessionConcatChangeset(nLeft, pLeft, &pList);
+ if( rc==SQLITE_OK ){
+ rc = sessionConcatChangeset(nRight, pRight, &pList);
+ }
+
+ /* Create the serialized output changeset based on the contents of the
+ ** hash tables attached to the SessionTable objects in list pList.
+ */
+ if( rc==SQLITE_OK ){
+ SessionTable *pTab;
+ SessionBuffer buf = {0, 0, 0};
+ for(pTab=pList; pTab; pTab=pTab->pNext){
+ int i;
+ if( pTab->nEntry==0 ) continue;
+
+ sessionAppendTableHdr(&buf, pTab, &rc);
+ for(i=0; i<pTab->nChange; i++){
+ SessionChange *p;
+ for(p=pTab->apChange[i]; p; p=p->pNext){
+ sessionAppendByte(&buf, p->bInsert, &rc);
+ sessionAppendByte(&buf, p->bIndirect, &rc);
+ sessionAppendBlob(&buf, p->aRecord, p->nRecord, &rc);
+ }
+ }
+ }
+
+ if( rc==SQLITE_OK ){
+ *ppOut = buf.aBuf;
+ *pnOut = buf.nBuf;
+ }else{
+ sqlite3_free(buf.aBuf);
+ }
+ }
+
+ concat_out:
+ sessionDeleteTable(pList);
+ return rc;
+}
+
#endif /* SQLITE_ENABLE_SESSION && SQLITE_ENABLE_PREUPDATE_HOOK */