]> git.ipfire.org Git - thirdparty/sqlite.git/commitdiff
Begin adding 'streaming' APIs to sessions module. This is a work in progress.
authordan <dan@noemail.net>
Tue, 23 Sep 2014 20:39:55 +0000 (20:39 +0000)
committerdan <dan@noemail.net>
Tue, 23 Sep 2014 20:39:55 +0000 (20:39 +0000)
FossilOrigin-Name: 3c7d3d950bbf5f5ed3696ebc61c77ca48bafe2b5

ext/session/sqlite3session.c
ext/session/sqlite3session.h
ext/session/test_session.c
manifest
manifest.uuid
test/permutations.test

index 4617e6a3c2ba9ce5ce7fb771d41395bdf0e334a2..ec0387e77578b466d337e2a7f68a5a2b0f9640bb 100644 (file)
 typedef struct SessionTable SessionTable;
 typedef struct SessionChange SessionChange;
 typedef struct SessionBuffer SessionBuffer;
+typedef struct SessionInput SessionInput;
+
+/*
+** Minimum chunk size used by streaming versions of functions.
+*/
+#define SESSIONS_STR_CHUNK_SIZE 1024
 
 /*
 ** Session handle structure.
@@ -30,13 +36,37 @@ struct sqlite3_session {
 };
 
 /*
-** Structure for changeset iterators.
+** Instances of this structure are used to build strings or binary records.
 */
-struct sqlite3_changeset_iter {
+struct SessionBuffer {
+  u8 *aBuf;                       /* Pointer to changeset buffer */
+  int nBuf;                       /* Size of buffer aBuf */
+  int nAlloc;                     /* Size of allocation containing aBuf */
+};
+
+/*
+** An object of this type is used internally as an abstraction for the 
+** input data read by changeset iterators. Input data may be supplied 
+** either as a single large buffer (sqlite3changeset_start()) or using
+** a stream function (sqlite3changeset_start_str()).
+*/
+struct SessionInput {
+  int iNext;                      /* Offset in aChangeset[] of next change */
   u8 *aChangeset;                 /* Pointer to buffer containing changeset */
   int nChangeset;                 /* Number of bytes in aChangeset */
+  SessionBuffer buf;              /* Current read buffer */
+  int (*xInput)(void*, void*, int*);        /* Input stream call (or NULL) */
+  void *pIn;                                /* First argument to xInput */
+  int bEof;                       /* Set to true after xInput finished */
+};
+
+/*
+** Structure for changeset iterators.
+*/
+struct sqlite3_changeset_iter {
+  SessionInput in;                /* Input buffer or stream */
+  SessionBuffer tblhdr;           /* Buffer to hold apValue/zTab/abPK/ */
   int bPatchset;                  /* True if this is a patchset */
-  u8 *pNext;                      /* Pointer to next change within aChangeset */
   int rc;                         /* Iterator error code */
   sqlite3_stmt *pConflict;        /* Points to conflicting row, if any */
   char *zTab;                     /* Current table */
@@ -165,15 +195,6 @@ struct SessionChange {
   SessionChange *pNext;           /* For hash-table collisions */
 };
 
-/*
-** Instances of this structure are used to build strings or binary records.
-*/
-struct SessionBuffer {
-  u8 *aBuf;                       /* Pointer to changeset buffer */
-  int nBuf;                       /* Size of buffer aBuf */
-  int nAlloc;                     /* Size of allocation containing aBuf */
-};
-
 /*
 ** Write a varint with value iVal into the buffer at aBuf. Return the 
 ** number of bytes written.
@@ -1291,7 +1312,7 @@ static int sessionBufferGrow(SessionBuffer *p, int nByte, int *pRc){
     int nNew = p->nAlloc ? p->nAlloc : 128;
     do {
       nNew = nNew*2;
-    }while( nNew<(p->nAlloc+nByte) );
+    }while( nNew<(p->nBuf+nByte) );
 
     aNew = (u8 *)sqlite3_realloc(p->aBuf, nNew);
     if( 0==aNew ){
@@ -1776,6 +1797,8 @@ static void sessionAppendTableHdr(
 int sessionGenerateChangeset(
   sqlite3_session *pSession,      /* Session object */
   int bPatchset,                  /* True for patchset, false for changeset */
+  int (*xOutput)(void *pOut, const void *pData, int nData),
+  void *pOut,                     /* First argument for xOutput */
   int *pnChangeset,               /* OUT: Size of buffer at *ppChangeset */
   void **ppChangeset              /* OUT: Buffer containing changeset */
 ){
@@ -1784,11 +1807,15 @@ int sessionGenerateChangeset(
   SessionBuffer buf = {0,0,0};    /* Buffer in which to accumlate changeset */
   int rc;                         /* Return code */
 
+  assert( xOutput==0 || (pnChangeset==0 && ppChangeset==0 ) );
+
   /* Zero the output variables in case an error occurs. If this session
   ** object is already in the error state (sqlite3_session.rc != SQLITE_OK),
   ** this call will be a no-op.  */
-  *pnChangeset = 0;
-  *ppChangeset = 0;
+  if( xOutput==0 ){
+    *pnChangeset = 0;
+    *ppChangeset = 0;
+  }
 
   if( pSession->rc ) return pSession->rc;
   rc = sqlite3_exec(pSession->db, "SAVEPOINT changeset", 0, 0, 0);
@@ -1846,6 +1873,19 @@ int sessionGenerateChangeset(
           if( rc==SQLITE_OK ){
             rc = sqlite3_reset(pSel);
           }
+
+          /* If the buffer is now larger than SESSIONS_STR_CHUNK_SIZE, pass
+          ** its contents to the xOutput() callback. */
+          if( xOutput 
+           && rc==SQLITE_OK 
+           && buf.nBuf>nNoop 
+           && buf.nBuf>SESSIONS_STR_CHUNK_SIZE 
+          ){
+            rc = xOutput(pOut, (void*)buf.aBuf, buf.nBuf);
+            nNoop = -1;
+            buf.nBuf = 0;
+          }
+
         }
       }
 
@@ -1858,12 +1898,16 @@ int sessionGenerateChangeset(
   }
 
   if( rc==SQLITE_OK ){
-    *pnChangeset = buf.nBuf;
-    *ppChangeset = buf.aBuf;
-  }else{
-    sqlite3_free(buf.aBuf);
+    if( xOutput==0 ){
+      *pnChangeset = buf.nBuf;
+      *ppChangeset = buf.aBuf;
+      buf.aBuf = 0;
+    }else if( buf.nBuf>0 ){
+      rc = xOutput(pOut, (void*)buf.aBuf, buf.nBuf);
+    }
   }
 
+  sqlite3_free(buf.aBuf);
   sqlite3_exec(db, "RELEASE changeset", 0, 0, 0);
   sqlite3_mutex_leave(sqlite3_db_mutex(db));
   return rc;
@@ -1881,7 +1925,29 @@ int sqlite3session_changeset(
   int *pnChangeset,               /* OUT: Size of buffer at *ppChangeset */
   void **ppChangeset              /* OUT: Buffer containing changeset */
 ){
-  return sessionGenerateChangeset(pSession, 0, pnChangeset, ppChangeset);
+  return sessionGenerateChangeset(pSession, 0, 0, 0, pnChangeset, ppChangeset);
+}
+
+/*
+** Streaming version of sqlite3session_changeset().
+*/
+int sqlite3session_changeset_str(
+  sqlite3_session *pSession,
+  int (*xOutput)(void *pOut, const void *pData, int nData),
+  void *pOut
+){
+  return sessionGenerateChangeset(pSession, 0, xOutput, pOut, 0, 0);
+}
+
+/*
+** Streaming version of sqlite3session_patchset().
+*/
+int sqlite3session_patchset_str(
+  sqlite3_session *pSession,
+  int (*xOutput)(void *pOut, const void *pData, int nData),
+  void *pOut
+){
+  return sessionGenerateChangeset(pSession, 1, xOutput, pOut, 0, 0);
 }
 
 /*
@@ -1896,7 +1962,7 @@ int sqlite3session_patchset(
   int *pnPatchset,                /* OUT: Size of buffer at *ppChangeset */
   void **ppPatchset               /* OUT: Buffer containing changeset */
 ){
-  return sessionGenerateChangeset(pSession, 1, pnPatchset, ppPatchset);
+  return sessionGenerateChangeset(pSession, 1, 0, 0, pnPatchset, ppPatchset);
 }
 
 /*
@@ -1945,16 +2011,20 @@ int sqlite3session_isempty(sqlite3_session *pSession){
 }
 
 /*
-** Create an iterator used to iterate through the contents of a changeset.
+** Do the work for either sqlite3changeset_start() or start_str().
 */
-int sqlite3changeset_start(
+int sessionChangesetStart(
   sqlite3_changeset_iter **pp,    /* OUT: Changeset iterator handle */
+  int (*xInput)(void *pIn, void *pData, int *pnData),
+  void *pIn,
   int nChangeset,                 /* Size of buffer pChangeset in bytes */
   void *pChangeset                /* Pointer to buffer containing changeset */
 ){
   sqlite3_changeset_iter *pRet;   /* Iterator to return */
   int nByte;                      /* Number of bytes to allocate for iterator */
 
+  assert( xInput==0 || (pChangeset==0 && nChangeset==0) );
+
   /* Zero the output variable in case an error occurs. */
   *pp = 0;
 
@@ -1963,15 +2033,80 @@ int sqlite3changeset_start(
   pRet = (sqlite3_changeset_iter *)sqlite3_malloc(nByte);
   if( !pRet ) return SQLITE_NOMEM;
   memset(pRet, 0, sizeof(sqlite3_changeset_iter));
-  pRet->aChangeset = (u8 *)pChangeset;
-  pRet->nChangeset = nChangeset;
-  pRet->pNext = pRet->aChangeset;
+  pRet->in.aChangeset = (u8 *)pChangeset;
+  pRet->in.nChangeset = nChangeset;
+  pRet->in.xInput = xInput;
+  pRet->in.pIn = pIn;
+  pRet->in.iNext = 0;
+  pRet->in.bEof = (xInput ? 0 : 1);
 
   /* Populate the output variable and return success. */
   *pp = pRet;
   return SQLITE_OK;
 }
 
+/*
+** Create an iterator used to iterate through the contents of a changeset.
+*/
+int sqlite3changeset_start(
+  sqlite3_changeset_iter **pp,    /* OUT: Changeset iterator handle */
+  int nChangeset,                 /* Size of buffer pChangeset in bytes */
+  void *pChangeset                /* Pointer to buffer containing changeset */
+){
+  return sessionChangesetStart(pp, 0, 0, nChangeset, pChangeset);
+}
+
+/*
+** Streaming version of sqlite3changeset_start().
+*/
+int sqlite3changeset_start_str(
+  sqlite3_changeset_iter **pp,    /* OUT: Changeset iterator handle */
+  int (*xInput)(void *pIn, void *pData, int *pnData),
+  void *pIn
+){
+  return sessionChangesetStart(pp, xInput, pIn, 0, 0);
+}
+
+/*
+** Ensure that there are at least nByte bytes available in the buffer. Or,
+** if there are not nByte bytes remaining in the input, that all available
+** data is in the buffer.
+**
+** Return an SQLite error code if an error occurs, or SQLITE_OK otherwise.
+*/
+static int sessionInputBuffer(SessionInput *pInput, int nByte){
+  int rc = SQLITE_OK;
+  if( pInput->xInput && !pInput->bEof ){
+    assert( 0 );
+  }
+  return rc;
+}
+
+/*
+** When this function is called, *ppRec points to the start of a record
+** that contains nCol values. This function advances the pointer *ppRec
+** until it points to the byte immediately following that record.
+*/
+static void sessionSkipRecord(
+  u8 **ppRec,                     /* IN/OUT: Record pointer */
+  int nCol                        /* Number of values in record */
+){
+  u8 *aRec = *ppRec;
+  int i;
+  for(i=0; i<nCol; i++){
+    int eType = *aRec++;
+    if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){
+      int nByte;
+      aRec += sessionVarintGet((u8*)aRec, &nByte);
+      aRec += nByte;
+    }else if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){
+      aRec += 8;
+    }
+  }
+
+  *ppRec = aRec;
+}
+
 /*
 ** Deserialize a single record from a buffer in memory. See "RECORD FORMAT"
 ** for details.
@@ -1997,37 +2132,46 @@ int sqlite3changeset_start(
 ** The apOut[] array may have been partially populated in this case.
 */
 static int sessionReadRecord(
-  u8 **paChange,                  /* IN/OUT: Pointer to binary record */
+  SessionInput *pIn,              /* Input data */
   int nCol,                       /* Number of values in record */
   u8 *abPK,                       /* Array of primary key flags, or NULL */
   sqlite3_value **apOut           /* Write values to this array */
 ){
   int i;                          /* Used to iterate through columns */
-  u8 *aRec = *paChange;           /* Cursor for the serialized record */
+  int rc = SQLITE_OK;
 
-  for(i=0; i<nCol; i++){
-    int eType;
+  for(i=0; i<nCol && rc==SQLITE_OK; i++){
+    int eType = 0;                /* Type of value (SQLITE_NULL, TEXT etc.) */
     if( abPK && abPK[i]==0 ) continue;
-    eType = *aRec++;              /* Type of value (SQLITE_NULL, TEXT etc.) */
+    rc = sessionInputBuffer(pIn, 9);
+    if( rc==SQLITE_OK ){
+      eType = pIn->aChangeset[pIn->iNext++];
+    }
+
     assert( !apOut || apOut[i]==0 );
     if( eType ){
       if( apOut ){
         apOut[i] = sqlite3ValueNew(0);
-        if( !apOut[i] ) return SQLITE_NOMEM;
+        if( !apOut[i] ) rc = SQLITE_NOMEM;
       }
+    }
 
+    if( rc==SQLITE_OK ){
+      u8 *aVal = &pIn->aChangeset[pIn->iNext];
       if( eType==SQLITE_TEXT || eType==SQLITE_BLOB ){
         int nByte;
-        aRec += sessionVarintGet(aRec, &nByte);
-        if( apOut ){
+        pIn->iNext += sessionVarintGet(aVal, &nByte);
+        rc = sessionInputBuffer(pIn, nByte);
+        if( apOut && rc==SQLITE_OK ){
+          u8 *aRec = &pIn->aChangeset[pIn->iNext];
           u8 enc = (eType==SQLITE_TEXT ? SQLITE_UTF8 : 0);
           sqlite3ValueSetStr(apOut[i], nByte, (char *)aRec, enc, SQLITE_STATIC);
         }
-        aRec += nByte;
+        pIn->iNext += nByte;
       }
       if( eType==SQLITE_INTEGER || eType==SQLITE_FLOAT ){
         if( apOut ){
-          sqlite3_int64 v = sessionGetI64(aRec);
+          sqlite3_int64 v = sessionGetI64(aVal);
           if( eType==SQLITE_INTEGER ){
             sqlite3VdbeMemSetInt64(apOut[i], v);
           }else{
@@ -2036,13 +2180,83 @@ static int sessionReadRecord(
             sqlite3VdbeMemSetDouble(apOut[i], d);
           }
         }
-        aRec += 8;
+        pIn->iNext += 8;
       }
     }
   }
 
-  *paChange = aRec;
-  return SQLITE_OK;
+  return rc;
+}
+
+/*
+** The input pointer currently points to the second byte of a table-header.
+** Specifically, to the following:
+**
+**   + number of columns in table (varint)
+**   + array of PK flags (1 byte per column),
+**   + table name (nul terminated).
+**
+** This function ensures that all of the above is present in the input 
+** buffer (i.e. that it can be accessed without any calls to xInput()).
+** If successful, SQLITE_OK is returned. Otherwise, an SQLite error code.
+** The input pointer is not moved.
+*/
+static int sessionChangesetBufferTblhdr(SessionInput *pIn, int *pnByte){
+  int rc = SQLITE_OK;
+  int nCol = 0;
+  int iIn = pIn->iNext;
+
+  rc = sessionInputBuffer(pIn, 9);
+  if( rc==SQLITE_OK ){
+    iIn += sessionVarintGet(&pIn->aChangeset[iIn], &nCol);
+    rc = sessionInputBuffer(pIn, nCol+100);
+    iIn += nCol;
+  }
+  while( rc==SQLITE_OK ){
+    while( iIn<pIn->nChangeset && pIn->aChangeset[iIn] ) iIn++;
+    if( pIn->aChangeset[iIn]==0 ) break;
+    rc = sessionInputBuffer(pIn, 100);
+  }
+  if( pnByte ) *pnByte = (iIn+1 - pIn->iNext);
+  return rc;
+}
+
+/*
+** The input pointer currently points to the second byte of a table-header.
+** Specifically, to the following:
+**
+**   + number of columns in table (varint)
+**   + array of PK flags (1 byte per column),
+**   + table name (nul terminated).
+*/
+static int sessionChangesetReadTblhdr(sqlite3_changeset_iter *p){
+  int rc;
+  int nCopy;
+  assert( p->rc==SQLITE_OK );
+
+  rc = sessionChangesetBufferTblhdr(&p->in, &nCopy);
+  if( rc==SQLITE_OK ){
+    int nByte;
+    int nVarint;
+    nVarint = sessionVarintGet(&p->in.aChangeset[p->in.iNext], &p->nCol);
+    nCopy -= nVarint;
+    p->in.iNext += nVarint;
+    nByte = p->nCol * sizeof(sqlite3_value*) * 2 + nCopy;
+    p->tblhdr.nBuf = 0;
+    sessionBufferGrow(&p->tblhdr, nByte, &rc);
+  }
+
+  if( rc==SQLITE_OK ){
+    int iPK = sizeof(sqlite3_value*)*p->nCol*2;
+    memset(p->tblhdr.aBuf, 0, iPK);
+    memcpy(&p->tblhdr.aBuf[iPK], &p->in.aChangeset[p->in.iNext], nCopy);
+    p->in.iNext += nCopy;
+  }
+
+  p->apValue = (sqlite3_value**)p->tblhdr.aBuf;
+  p->abPK = (u8*)&p->apValue[p->nCol*2];
+  p->zTab = (char*)&p->abPK[p->nCol];
+  return (p->rc = rc);
 }
 
 /*
@@ -2066,9 +2280,10 @@ static int sessionChangesetNext(
   u8 **paRec,                     /* If non-NULL, store record pointer here */
   int *pnRec                      /* If non-NULL, store size of record here */
 ){
-  u8 *aChange;
   int i;
+  u8 op;
 
+  assert( paRec==0 || p->in.xInput==0 ); /* fixme! */
   assert( (paRec==0 && pnRec==0) || (paRec && pnRec) );
 
   /* If the iterator is in the error-state, return immediately. */
@@ -2082,57 +2297,50 @@ static int sessionChangesetNext(
     memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2);
   }
 
+  /* Make sure the buffer contains at least 10 bytes of input data, or all
+  ** remaining data if there are less than 10 bytes available. This is
+  ** sufficient either for the 'T' or 'P' byte and the varint that follows
+  ** it, or for the two single byte values otherwise. */
+  p->rc = sessionInputBuffer(&p->in, 2);
+  if( p->rc!=SQLITE_OK ) return p->rc;
+
   /* If the iterator is already at the end of the changeset, return DONE. */
-  if( p->pNext>=&p->aChangeset[p->nChangeset] ){
+  if( p->in.iNext>=p->in.nChangeset ){
     return SQLITE_DONE;
   }
-  aChange = p->pNext;
-
-  if( aChange[0]=='T' || aChange[0]=='P' ){
-    int nByte;                    /* Bytes to allocate for apValue */
-    p->bPatchset = (aChange[0]=='P');
-    aChange++;
-    aChange += sessionVarintGet(aChange, &p->nCol);
-    p->abPK = (u8 *)aChange;
-    aChange += p->nCol;
-    p->zTab = (char *)aChange;
-    aChange += (sqlite3Strlen30((char *)aChange) + 1);
-    
-    if( paRec==0 ){
-      sqlite3_free(p->apValue);
-      nByte = sizeof(sqlite3_value *) * p->nCol * 2;
-      p->apValue = (sqlite3_value **)sqlite3_malloc(nByte);
-      if( !p->apValue ){
-        return (p->rc = SQLITE_NOMEM);
-      }
-      memset(p->apValue, 0, sizeof(sqlite3_value*)*p->nCol*2);
-    }
+
+  op = p->in.aChangeset[p->in.iNext++];
+  if( op=='T' || op=='P' ){
+    p->bPatchset = (op=='P');
+    if( sessionChangesetReadTblhdr(p) ) return p->rc;
+    if( (p->rc = sessionInputBuffer(&p->in, 2)) ) return p->rc;
+    op = p->in.aChangeset[p->in.iNext++];
   }
 
-  p->op = *(aChange++);
-  p->bIndirect = *(aChange++);
+  p->op = op;
+  p->bIndirect = p->in.aChangeset[p->in.iNext++];
   if( p->op!=SQLITE_UPDATE && p->op!=SQLITE_DELETE && p->op!=SQLITE_INSERT ){
     return (p->rc = SQLITE_CORRUPT);
   }
 
-  if( paRec ){ *paRec = aChange; }
+  if( paRec ){ *paRec = &p->in.aChangeset[p->in.iNext]; }
 
   /* If this is an UPDATE or DELETE, read the old.* record. */
   if( p->op!=SQLITE_INSERT && (p->bPatchset==0 || p->op==SQLITE_DELETE) ){
     u8 *abPK = p->bPatchset ? p->abPK : 0;
-    p->rc = sessionReadRecord(&aChange, p->nCol, abPK, paRec?0:p->apValue);
+    p->rc = sessionReadRecord(&p->in, p->nCol, abPK, paRec?0:p->apValue);
     if( p->rc!=SQLITE_OK ) return p->rc;
   }
 
   /* If this is an INSERT or UPDATE, read the new.* record. */
   if( p->op!=SQLITE_DELETE ){
     sqlite3_value **apOut = (paRec ? 0 : &p->apValue[p->nCol]);
-    p->rc = sessionReadRecord(&aChange, p->nCol, 0, apOut);
+    p->rc = sessionReadRecord(&p->in, p->nCol, 0, apOut);
     if( p->rc!=SQLITE_OK ) return p->rc;
   }
 
-  if( pnRec ){ 
-    *pnRec = (int)(aChange - *paRec); 
+  if( pnRec ){
+    *pnRec = (int)(&p->in.aChangeset[p->in.iNext] - *paRec);
   }else if( p->bPatchset && p->op==SQLITE_UPDATE ){
     /* If this is an UPDATE that is part of a patchset, then all PK and
     ** modified fields are present in the new.* record. The old.* record
@@ -2148,7 +2356,7 @@ static int sessionChangesetNext(
       }
     }
   }
-  p->pNext = aChange;
+
   return SQLITE_ROW;
 }
 
@@ -2321,7 +2529,7 @@ int sqlite3changeset_finalize(sqlite3_changeset_iter *p){
   if( p->apValue ){
     for(i=0; i<p->nCol*2; i++) sqlite3ValueFree(p->apValue[i]);
   }
-  sqlite3_free(p->apValue);
+  sqlite3_free(p->tblhdr.aBuf);
   sqlite3_free(p);
   return rc;
 }
@@ -2339,52 +2547,71 @@ int sqlite3changeset_invert(
   u8 *aOut;
   u8 *aIn;
   int i;
+  SessionInput sInput;
   int nCol = 0;                   /* Number of cols in current table */
   u8 *abPK = 0;                   /* PK array for current table */
   sqlite3_value **apVal = 0;      /* Space for values for UPDATE inversion */
+  SessionBuffer sPK = {0, 0, 0};  /* PK array for current table */
 
   /* Zero the output variables in case an error occurs. */
   *ppInverted = 0;
   *pnInverted = 0;
   if( nChangeset==0 ) return SQLITE_OK;
 
+  /* Set up the input stream */
+  memset(&sInput, 0, sizeof(SessionInput));
+  sInput.nChangeset = nChangeset;
+  sInput.aChangeset = (u8*)pChangeset;
+
   aOut = (u8 *)sqlite3_malloc(nChangeset);
   if( !aOut ) return SQLITE_NOMEM;
   aIn = (u8 *)pChangeset;
 
   i = 0;
   while( i<nChangeset ){
-    u8 eType = aIn[i];
+    u8 eType;
+    if( (rc = sessionInputBuffer(&sInput, 2)) ) goto finished_invert;
+    eType = sInput.aChangeset[sInput.iNext];
     switch( eType ){
       case 'T': {
         /* A 'table' record consists of:
         **
         **   * A constant 'T' character,
         **   * Number of columns in said table (a varint),
-        **   * An array of nCol bytes (abPK),
+        **   * An array of nCol bytes (sPK),
         **   * A nul-terminated table name.
         */
-        int nByte = 1 + sessionVarintGet(&aIn[i+1], &nCol);
-        abPK = &aIn[i+nByte];
-        nByte += nCol;
-        nByte += 1 + sqlite3Strlen30((char *)&aIn[i+nByte]);
-        memcpy(&aOut[i], &aIn[i], nByte);
-        i += nByte;
+        int nByte;
+        int nVarint;
+        int iNext = sInput.iNext;
+        sInput.iNext++;
+        if( (rc = sessionChangesetBufferTblhdr(&sInput, &nByte)) ){
+          goto finished_invert;
+        }
+        nVarint = sessionVarintGet(&sInput.aChangeset[iNext+1], &nCol);
+        sPK.nBuf = 0;
+        sessionAppendBlob(&sPK, &sInput.aChangeset[iNext+1+nVarint], nCol, &rc);
+        if( rc ) goto finished_invert;
+        sInput.iNext += nByte;
+        memcpy(&aOut[i], &sInput.aChangeset[iNext], nByte+1);
+        i += nByte+1;
         sqlite3_free(apVal);
         apVal = 0;
+        abPK = sPK.aBuf;
         break;
       }
 
       case SQLITE_INSERT:
       case SQLITE_DELETE: {
+        int iStart;
         int nByte;
-        u8 *aEnd = &aIn[i+2];
-
-        sessionReadRecord(&aEnd, nCol, 0, 0);
+        sInput.iNext += 2;
+        iStart = sInput.iNext;
+        sessionReadRecord(&sInput, nCol, 0, 0);
         aOut[i] = (eType==SQLITE_DELETE ? SQLITE_INSERT : SQLITE_DELETE);
-        aOut[i+1] = aIn[i+1];
-        nByte = (int)(aEnd - &aIn[i+2]);
-        memcpy(&aOut[i+2], &aIn[i+2], nByte);
+        aOut[i+1] = aIn[i+1];               /* indirect-flag */
+        nByte = sInput.iNext - iStart;
+        memcpy(&aOut[i+2], &sInput.aChangeset[iStart], nByte);
         i += 2 + nByte;
         break;
       }
@@ -2392,7 +2619,6 @@ int sqlite3changeset_invert(
       case SQLITE_UPDATE: {
         int iCol;
         int nWrite = 0;
-        u8 *aEnd = &aIn[i+2];
 
         if( 0==apVal ){
           apVal = (sqlite3_value **)sqlite3_malloc(sizeof(apVal[0])*nCol*2);
@@ -2403,17 +2629,18 @@ int sqlite3changeset_invert(
           memset(apVal, 0, sizeof(apVal[0])*nCol*2);
         }
 
-        /* Read the old.* and new.* records for the update change. */
-        rc = sessionReadRecord(&aEnd, nCol, 0, &apVal[0]);
-        if( rc==SQLITE_OK ){
-          rc = sessionReadRecord(&aEnd, nCol, 0, &apVal[nCol]);
-        }
-
         /* Write the header for the new UPDATE change. Same as the original. */
         aOut[i] = SQLITE_UPDATE;
-        aOut[i+1] = aIn[i+1];
+        aOut[i+1] = sInput.aChangeset[sInput.iNext+1];
         nWrite = 2;
 
+        /* Read the old.* and new.* records for the update change. */
+        sInput.iNext += 2;
+        rc = sessionReadRecord(&sInput, nCol, 0, &apVal[0]);
+        if( rc==SQLITE_OK ){
+          rc = sessionReadRecord(&sInput, nCol, 0, &apVal[nCol]);
+        }
+
         /* Write the new old.* record. Consists of the PK columns from the
         ** original old.* record, and the other values from the original
         ** new.* record. */
@@ -2439,7 +2666,7 @@ int sqlite3changeset_invert(
         }
 
         i += nWrite;
-        assert( &aIn[i]==aEnd );
+        assert( i==sInput.iNext );
         break;
       }
 
@@ -2458,6 +2685,7 @@ int sqlite3changeset_invert(
     sqlite3_free(aOut);
   }
   sqlite3_free(apVal);
+  sqlite3_free(sPK.aBuf);
   return rc;
 }
 
@@ -3276,7 +3504,7 @@ static int sessionChangeMerge(
         u8 *a1 = aRec;
         assert( op2==SQLITE_UPDATE );
         pNew->op = SQLITE_INSERT;
-        if( bPatchset==0 ) sessionReadRecord(&a1, pTab->nCol, 0, 0);
+        if( bPatchset==0 ) sessionSkipRecord(&a1, pTab->nCol);
         sessionMergeRecord(&aCsr, pTab->nCol, aExist, a1);
       }else if( op1==SQLITE_DELETE ){       /* DELETE + INSERT */
         assert( op2==SQLITE_INSERT );
@@ -3290,8 +3518,8 @@ static int sessionChangeMerge(
         u8 *a2 = aRec;
         assert( op1==SQLITE_UPDATE );
         if( bPatchset==0 ){
-          sessionReadRecord(&a1, pTab->nCol, 0, 0);
-          sessionReadRecord(&a2, pTab->nCol, 0, 0);
+          sessionSkipRecord(&a1, pTab->nCol);
+          sessionSkipRecord(&a2, pTab->nCol);
         }
         pNew->op = SQLITE_UPDATE;
         if( 0==sessionMergeUpdate(&aCsr, pTab, bPatchset, aRec, aExist,a1,a2) ){
@@ -3356,14 +3584,8 @@ static int sessionConcatChangeset(
       break;
     }
 
-    assert( pIter->apValue==0 );
     sqlite3changeset_op(pIter, &zNew, &nCol, &op, &bIndirect);
-
-    assert( zNew>=(char *)pChangeset && zNew-nChangeset<((char *)pChangeset) );
-    assert( !pTab || pTab->zName-nChangeset<(char *)pChangeset );
-    assert( !pTab || zNew>=pTab->zName );
-
-    if( !pTab || zNew!=pTab->zName ){
+    if( !pTab || sqlite3_stricmp(zNew, pTab->zName) ){
       /* Search the list for a matching table */
       int nNew = (int)strlen(zNew);
       u8 *abPK;
@@ -3373,21 +3595,23 @@ static int sessionConcatChangeset(
         if( 0==sqlite3_strnicmp(pTab->zName, zNew, nNew+1) ) break;
       }
       if( !pTab ){
-        pTab = sqlite3_malloc(sizeof(SessionTable));
+        pTab = sqlite3_malloc(sizeof(SessionTable) + nCol + nNew+1);
         if( !pTab ){
           rc = SQLITE_NOMEM;
           break;
         }
         memset(pTab, 0, sizeof(SessionTable));
         pTab->pNext = *ppTabList;
-        pTab->abPK = abPK;
         pTab->nCol = nCol;
+        pTab->abPK = (u8*)&pTab[1];
+        memcpy(pTab->abPK, abPK, nCol);
+        pTab->zName = (char*)&pTab->abPK[nCol];
+        memcpy(pTab->zName, zNew, nNew+1);
         *ppTabList = pTab;
       }else if( pTab->nCol!=nCol || memcmp(pTab->abPK, abPK, nCol) ){
         rc = SQLITE_SCHEMA;
         break;
       }
-      pTab->zName = (char *)zNew;
     }
 
     if( sessionGrowHash(bPatchset, pTab) ){
index ced984ecbe3b7a63ee35abc407f0e0c61e1f3027..4737bd953ff0c6baaa29ff414224b79f8ef27fa5 100644 (file)
@@ -273,6 +273,31 @@ int sqlite3session_changeset(
   void **ppChangeset              /* OUT: Buffer containing changeset */
 );
 
+
+/*
+** This function is similar to sqlite3session_changeset(), except that instead
+** of storing the output changeset in a buffer obtained from sqlite3_malloc()
+** it invokes the supplied xOutput() callback zero or more times to stream the
+** changeset to the application. This is useful in order to avoid large memory
+** allocations when working with very large changesets.
+**
+** The first parameter passed to each call to the xOutput callback is a copy
+** of the pOut parameter passed to this function. The following two parameters
+** are a pointer to the buffer containing the next chunk of the output changeset
+** and the size of that buffer in bytes.
+**
+** If the data is successfully processed by the xOutput callback, it should
+** return SQLITE_OK. Or, if an error occurs, some other SQLite error code. In
+** this case the sqlite3session_changeset_str() call is abandoned immediately
+** and returns a copy of the xOutput return code.
+*/
+int sqlite3session_changeset_str(
+  sqlite3_session *pSession,
+  int (*xOutput)(void *pOut, const void *pData, int nData),
+  void *pOut
+);
+
+
 /*
 ** CAPI3REF: Generate A Patchset From A Session Object
 **
@@ -302,6 +327,15 @@ int sqlite3session_patchset(
   void **ppPatchset               /* OUT: Buffer containing changeset */
 );
 
+/*
+** Streaming version of sqlite3session_patchset().
+*/
+int sqlite3session_patchset_str(
+  sqlite3_session *pSession,
+  int (*xOutput)(void *pOut, const void *pData, int nData),
+  void *pOut
+);
+
 /*
 ** CAPI3REF: Test if a changeset has recorded any changes.
 **
@@ -358,6 +392,30 @@ int sqlite3changeset_start(
   void *pChangeset                /* Pointer to blob containing changeset */
 );
 
+
+/*
+** This function is similar to sqlite3changeset_start(), except that instead
+** of reading data from a single buffer, it requests it one chunk at a time
+** from the application by invoking the supplied xInput() callback. The xInput()
+** callback may be invoked at any time during the lifetime of the iterator.
+**
+** Each time the xInput callback is invoked, the first argument passed is a
+** copy of the third parameter passed to this function. The second argument,
+** pData, points to a buffer (*pnData) bytes in size. Assuming no error occurs
+** the xInput method should copy up to (*pnData) bytes of data into the buffer
+** and set (*pnData) to the actual number of bytes copied before returning
+** SQLITE_OK. If the input is completely exhausted, (*pnData) should be set
+** to zero to indicate this. Or, if an error occurs, an SQLite error code
+** should be returned. In this case the iterator is put into an error state and
+** all subsequent calls to iterator methods return a copy of the xInput error
+** code.
+*/
+int sqlite3changeset_start_str(
+  sqlite3_changeset_iter **pp,
+  int (*xInput)(void *pIn, void *pData, int *pnData),
+  void *pIn
+);
+
 /*
 ** CAPI3REF: Advance A Changeset Iterator
 **
index 38e4be1481089d1f8fcf1e7cbd30d70d42742e80..bf4bde39a44424c39843d777de215b1b3feacb2d 100644 (file)
@@ -14,6 +14,23 @@ struct TestSession {
   Tcl_Obj *pFilterScript;
 };
 
+#define SESSION_STREAM_TCL_VAR "sqlite3session_streams"
+
+/*
+** Attempt to find the global variable zVar within interpreter interp
+** and extract a boolean value from it. Return this value.
+**
+** If the named variable cannot be found, or if it cannot be interpreted
+** as a boolean, return 0.
+*/
+static int test_tcl_boolean(Tcl_Interp *interp, const char *zVar){
+  Tcl_Obj *pObj;
+  int bVal = 0;
+  pObj = Tcl_ObjGetVar2(interp, Tcl_NewStringObj(zVar, -1), 0, TCL_GLOBAL_ONLY);
+  if( pObj ) Tcl_GetBooleanFromObj(0, pObj, &bVal);
+  return bVal;
+}
+
 static int test_session_error(Tcl_Interp *interp, int rc){
   extern const char *sqlite3ErrName(int);
   Tcl_SetObjResult(interp, Tcl_NewStringObj(sqlite3ErrName(rc), -1));
@@ -44,12 +61,38 @@ static int test_table_filter(void *pCtx, const char *zTbl){
   return bRes;
 }
 
+struct TestSessionsBlob {
+  void *p;
+  int n;
+};
+typedef struct TestSessionsBlob TestSessionsBlob;
+
+static int testSessionsOutput(
+  void *pCtx,
+  const void *pData,
+  int nData
+){
+  TestSessionsBlob *pBlob = (TestSessionsBlob*)pCtx;
+  char *pNew;
+
+  assert( nData>0 );
+  pNew = (char*)sqlite3_realloc(pBlob->p, pBlob->n + nData);
+  if( pNew==0 ){
+    return SQLITE_NOMEM;
+  }
+  pBlob->p = (void*)pNew;
+  memcpy(&pNew[pBlob->n], pData, nData);
+  pBlob->n += nData;
+  return SQLITE_OK;
+}
+
 /*
 ** Tclcmd:  $session attach TABLE
 **          $session changeset
 **          $session delete
 **          $session enable BOOL
 **          $session indirect INTEGER
+**          $session patchset
 **          $session table_filter SCRIPT
 */
 static int test_session_cmd(
@@ -105,17 +148,26 @@ static int test_session_cmd(
 
     case 7:        /* patchset */
     case 1: {      /* changeset */
-      int nChange;
-      void *pChange;
-      if( iSub==7 ){
-        rc = sqlite3session_patchset(pSession, &nChange, &pChange);
+      TestSessionsBlob o = {0, 0};
+      if( test_tcl_boolean(interp, SESSION_STREAM_TCL_VAR) ){
+        void *pCtx = (void*)&o;
+        if( iSub==7 ){
+          rc = sqlite3session_patchset_str(pSession, testSessionsOutput, pCtx);
+        }else{
+          rc = sqlite3session_changeset_str(pSession, testSessionsOutput, pCtx);
+        }
       }else{
-        rc = sqlite3session_changeset(pSession, &nChange, &pChange);
+        if( iSub==7 ){
+          rc = sqlite3session_patchset(pSession, &o.n, &o.p);
+        }else{
+          rc = sqlite3session_changeset(pSession, &o.n, &o.p);
+        }
       }
       if( rc==SQLITE_OK ){
-        Tcl_SetObjResult(interp, Tcl_NewByteArrayObj(pChange, nChange)); 
-        sqlite3_free(pChange);
-      }else{
+        Tcl_SetObjResult(interp, Tcl_NewByteArrayObj(o.p, o.n)); 
+      }
+      sqlite3_free(o.p);
+      if( rc!=SQLITE_OK ){
         return test_session_error(interp, rc);
       }
       break;
index ffe55892fcbb548ee1103a830c4767eb46989fc5..753640048a69a37591c89ab4f56840ae98659355 100644 (file)
--- a/manifest
+++ b/manifest
@@ -1,5 +1,5 @@
-C Merge\sall\srecent\strunk\schanges\sinto\sthe\ssessions\sbranch.
-D 2014-09-21T22:49:20.257
+C Begin\sadding\s'streaming'\sAPIs\sto\ssessions\smodule.\sThis\sis\sa\swork\sin\sprogress.
+D 2014-09-23T20:39:55.903
 F Makefile.arm-wince-mingw32ce-gcc d6df77f1f48d690bd73162294bbba7f59507c72f
 F Makefile.in dd5f245aa8c741bc65845747203c8ce2f3fb6c83
 F Makefile.linux-gcc 91d710bdc4998cb015f39edf3cb314ec4f4d7e23
@@ -158,9 +158,9 @@ F ext/session/sessionA.test eb05c13e4ef1ca8046a3a6dbf2d5f6f5b04a11d4
 F ext/session/sessionB.test 276267cd7fc37c2e2dd03f1e2ed9ada336a8bdb4
 F ext/session/session_common.tcl 1539d8973b2aea0025c133eb0cc4c89fcef541a5
 F ext/session/sessionfault.test e7965159a73d385c1a4af12d82c3a039ebdd71a6
-F ext/session/sqlite3session.c 4c7689bd8286147f7d9bf5d4b6ca5e7e7ee588ab
-F ext/session/sqlite3session.h 66c14a2f6193c47773770307636e88c43db6f839
-F ext/session/test_session.c a252fb669d3a1b3552ee7b87fe610debc0afeb7b
+F ext/session/sqlite3session.c ead909b1b0976aa6d08dcb7b487a902e358f7e4c
+F ext/session/sqlite3session.h d074a929d368b438d32c15af8f8fe2afa80afe3f
+F ext/session/test_session.c e39119c8554fe1b0925a038423ca137ddf6f6bd9
 F ext/userauth/sqlite3userauth.h 19cb6f0e31316d0ee4afdfb7a85ef9da3333a220
 F ext/userauth/user-auth.txt e6641021a9210364665fe625d067617d03f27b04
 F ext/userauth/userauth.c 5fa3bdb492f481bbc1709fc83c91ebd13460c69e
@@ -776,7 +776,7 @@ F test/pagesize.test 1dd51367e752e742f58e861e65ed7390603827a0
 F test/pcache.test b09104b03160aca0d968d99e8cd2c5b1921a993d
 F test/pcache2.test a83efe2dec0d392f814bfc998def1d1833942025
 F test/percentile.test b98fc868d71eb5619d42a1702e9ab91718cbed54
-F test/permutations.test 89f594fdba922586d46c3e0a7ab4990b5a7f8da7
+F test/permutations.test b8ca6c9ecec6f360485a8cb61ef1b8734b31797b
 F test/pragma.test 19d0241a007bcdd77fc2606ec60fc60357e7fc8b
 F test/pragma2.test aea7b3d82c76034a2df2b38a13745172ddc0bc13
 F test/printf.test ec9870c4dce8686a37818e0bf1aba6e6a1863552
@@ -1216,7 +1216,7 @@ F tool/vdbe_profile.tcl 67746953071a9f8f2f668b73fe899074e2c6d8c1
 F tool/warnings-clang.sh f6aa929dc20ef1f856af04a730772f59283631d4
 F tool/warnings.sh 0abfd78ceb09b7f7c27c688c8e3fe93268a13b32
 F tool/win/sqlite.vsix deb315d026cc8400325c5863eef847784a219a2f
-P c2885c6bb24cc55178467e57e77bf71df58b3b13 d5880abd63c83c88e135257373afa0a3fd88297e
-R 9d7f30c83131a55806d3c1dd053387df
-U drh
-Z c53e54ee2e290caf615c330d235d140a
+P 6406b77f2c447751a2fbb16f01c61cdcfd6af59e
+R 67d8884fcab24bc7ffc4823662b37f4a
+U dan
+Z 2bbe541c87040990ceab83529c1e7e80
index a34006bb050db630858ef8195de4cb955d5c92c6..424c64b57aa68306bc6f2013f83f297a1bcf3823 100644 (file)
@@ -1 +1 @@
-6406b77f2c447751a2fbb16f01c61cdcfd6af59e
\ No newline at end of file
+3c7d3d950bbf5f5ed3696ebc61c77ca48bafe2b5
\ No newline at end of file
index 7bea39eb7af4c4ddc624cf4bf3cd52bae05949d3..44f571793c28bc25324aaf26c76f2c0d88c8f00e 100644 (file)
@@ -938,6 +938,14 @@ test_suite "session_eec" -description {
   sqlite3_extended_result_codes $::dbhandle 1
 }
 
+test_suite "session_str" -description {
+  All session module related tests using the streaming APIs.
+} -files [
+  glob -nocomplain $::testdir/../ext/session/*.test
+] -dbconfig {
+  set ::sqlite3session_streams 1
+}
+
 test_suite "no_optimization" -description {
   Run test scripts with optimizations disabled using the
   sqlite3_test_control(SQLITE_TESTCTRL_OPTIMIZATIONS) interface.