}
/* Column numbers */
-#define LSM1_COLUMN_KEY 0
-#define LSM1_COLUMN_VALUE 1
-#define LSM1_COLUMN_COMMAND 2
+#define LSM1_COLUMN_KEY 0
+#define LSM1_COLUMN_BLOBKEY 1
+#define LSM1_COLUMN_VALUE 2
+#define LSM1_COLUMN_BLOBVALUE 3
+#define LSM1_COLUMN_COMMAND 4
rc = sqlite3_declare_vtab(db,
- "CREATE TABLE x(key,value,command hidden)");
+ "CREATE TABLE x("
+ " key," /* The primary key. Any non-NULL */
+ " blobkey," /* Pure BLOB primary key */
+ " value," /* The value associated with key. Any non-NULL */
+ " blobvalue," /* Pure BLOB value */
+ " command hidden" /* Insert here for control operations */
+ ");"
+ );
connect_failed:
if( rc!=SQLITE_OK ){
if( pNew ){
return pCur->atEof;
}
+/*
+** Rowids are not supported by the underlying virtual table. So always
+** return 0 for the rowid.
+*/
+static int lsm1Rowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid){
+ *pRowid = 0;
+ return SQLITE_OK;
+}
+
+/*
+** Type prefixes on LSM keys
+*/
+#define LSM1_TYPE_NEGATIVE 0
+#define LSM1_TYPE_POSITIVE 1
+#define LSM1_TYPE_TEXT 2
+#define LSM1_TYPE_BLOB 3
+
+/*
+** Generate a key encoding for pValue such that all keys compare in
+** lexicographical order. Return an SQLite error code or SQLITE_OK.
+**
+** The key encoding is *pnKey bytes in length written into *ppKey.
+** Space to hold the key is taken from pSpace if sufficient, or else
+** from sqlite3_malloc(). The caller is responsible for freeing malloced
+** space.
+*/
+static int lsm1EncodeKey(
+ sqlite3_value *pValue, /* Value to be encoded */
+ unsigned char **ppKey, /* Write the encoding here */
+ int *pnKey, /* Write the size of the encoding here */
+ unsigned char *pSpace, /* Use this space if it is large enough */
+ int nSpace /* Size of pSpace[] */
+){
+ int eType = sqlite3_value_type(pValue);
+ *ppKey = 0;
+ *pnKey = 0;
+ switch( eType ){
+ default: {
+ return SQLITE_ERROR; /* We cannot handle NULL keys */
+ }
+ case SQLITE_BLOB:
+ case SQLITE_TEXT: {
+ int nVal = sqlite3_value_bytes(pValue);
+ const void *pVal;
+ if( eType==SQLITE_BLOB ){
+ eType = LSM1_TYPE_BLOB;
+ pVal = sqlite3_value_blob(pValue);
+ }else{
+ eType = LSM1_TYPE_TEXT;
+ pVal = (const void*)sqlite3_value_text(pValue);
+ if( pVal==0 ) return SQLITE_NOMEM;
+ }
+ if( nVal+1>nSpace ){
+ pSpace = sqlite3_malloc( nVal+1 );
+ if( pSpace==0 ) return SQLITE_NOMEM;
+ }
+ pSpace[0] = eType;
+ memcpy(&pSpace[1], pVal, nVal);
+ *ppKey = pSpace;
+ *pnKey = nVal+1;
+ break;
+ }
+ }
+ return SQLITE_OK;
+}
+
/*
** Return values of columns for the row at which the lsm1_cursor
** is currently pointing.
){
lsm1_cursor *pCur = (lsm1_cursor*)cur;
switch( i ){
+ case LSM1_COLUMN_BLOBKEY: {
+ const void *pVal;
+ int nVal;
+ if( lsm_csr_key(pCur->pLsmCur, &pVal, &nVal)==LSM_OK ){
+ sqlite3_result_blob(ctx, pVal, nVal, SQLITE_TRANSIENT);
+ }
+ break;
+ }
case LSM1_COLUMN_KEY: {
+ const unsigned char *pVal;
+ int nVal;
+ if( lsm_csr_key(pCur->pLsmCur, (const void**)&pVal, &nVal)==LSM_OK
+ && nVal>=1
+ ){
+ if( pVal[0]==LSM1_TYPE_BLOB ){
+ sqlite3_result_blob(ctx, (const void*)&pVal[1],nVal-1,
+ SQLITE_TRANSIENT);
+ }else if( pVal[0]==LSM1_TYPE_TEXT ){
+ sqlite3_result_text(ctx, (const char*)&pVal[1],nVal-1,
+ SQLITE_TRANSIENT);
+ }
+ }
+ break;
+ }
+ case LSM1_COLUMN_BLOBVALUE: {
const void *pVal;
int nVal;
- if( lsm_csr_key(pCur->pLsmCur, (const void**)&pVal, &nVal)==LSM_OK ){
+ if( lsm_csr_value(pCur->pLsmCur, (const void**)&pVal, &nVal)==LSM_OK ){
sqlite3_result_blob(ctx, pVal, nVal, SQLITE_TRANSIENT);
}
break;
case SQLITE_INTEGER: {
sqlite3_uint64 x = 0;
int j;
- for(j=1; j<=8; j++){
+ for(j=1; j<nVal; j++){
x = (x<<8) | aVal[j];
}
if( aVal[0]==SQLITE_INTEGER ){
return SQLITE_OK;
}
-/*
-** Rowids are not supported by the underlying virtual table. So always
-** return 0 for the rowid.
-*/
-static int lsm1Rowid(sqlite3_vtab_cursor *cur, sqlite_int64 *pRowid){
- *pRowid = 0;
- return SQLITE_OK;
-}
-
/* Move to the first row to return.
*/
static int lsm1Filter(
** Only comparisons against the key are allowed. The idxNum defines
** which comparisons are available:
**
-** 0 Full table scan only
-** 1 key==? single argument for ?
-**
+** 0 Full table scan only
+** bit 1 key==?1 single argument for ?1
+** bit 2 key>?1
+** bit 3 key>=?1
+** bit 4 key<?N (N==1 if bits 2,3 clear, or 2 if bits2,3 set)
+** bit 5 key<=?N (N==1 if bits 2,3 clear, or 2 if bits2,3 set)
+** bit 6 Use blobkey instead of key
+**
+** To put it another way:
+**
+** 0 Full table scan.
+** 1 key==?1
+** 2 key>?1
+** 4 key>=?1
+** 8 key<?1
+** 10 key>?1 AND key<?2
+** 12 key>=?1 AND key<?2
+** 16 key<=?1
+** 18 key>?1 AND key<=?2
+** 20 key>=?1 AND key<=?2
+** 33..52 Use blobkey in place of key...
*/
static int lsm1BestIndex(
sqlite3_vtab *tab,
int eType;
int rc;
sqlite3_value *pValue;
+ const unsigned char *pVal;
+ unsigned char *pData;
+ int nVal;
+ unsigned char pSpace[100];
+
if( argc==1 ){
pVTab->zErrMsg = sqlite3_mprintf("cannot DELETE");
return SQLITE_ERROR;
if( sqlite3_value_type(argv[2+LSM1_COLUMN_COMMAND])!=SQLITE_NULL ){
return SQLITE_OK;
}
- if( sqlite3_value_type(argv[2+LSM1_COLUMN_KEY])!=SQLITE_BLOB ){
- pVTab->zErrMsg = sqlite3_mprintf("BLOB keys only");
- return SQLITE_ERROR;
+ if( sqlite3_value_type(argv[2+LSM1_COLUMN_BLOBKEY])==SQLITE_BLOB ){
+ /* Use the blob key exactly as supplied */
+ pKey = sqlite3_value_blob(argv[2+LSM1_COLUMN_BLOBKEY]);
+ nKey = sqlite3_value_bytes(argv[2+LSM1_COLUMN_BLOBKEY]);
+ }else{
+ /* Use a key encoding that sorts in lexicographical order */
+ rc = lsm1EncodeKey(argv[2+LSM1_COLUMN_KEY],
+ (unsigned char**)&pKey,&nKey,
+ pSpace,sizeof(pSpace));
+ if( rc ) return rc;
}
- pKey = sqlite3_value_blob(argv[2+LSM1_COLUMN_KEY]);
- nKey = sqlite3_value_bytes(argv[2+LSM1_COLUMN_KEY]);
- pValue = argv[2+LSM1_COLUMN_VALUE];
- eType = sqlite3_value_type(pValue);
- switch( eType ){
- case SQLITE_NULL: {
- rc = lsm_delete(p->pDb, pKey, nKey);
- break;
- }
- case SQLITE_BLOB:
- case SQLITE_TEXT: {
- const unsigned char *pVal;
- unsigned char *pData;
- int nVal;
- if( eType==SQLITE_TEXT ){
- pVal = sqlite3_value_text(pValue);
- }else{
- pVal = (unsigned char*)sqlite3_value_blob(pValue);
- }
- nVal = sqlite3_value_bytes(pValue);
- pData = sqlite3_malloc( nVal+1 );
- if( pData==0 ){
- rc = SQLITE_NOMEM;
- }else{
- pData[0] = eType;
- memcpy(&pData[1], pVal, nVal);
- rc = lsm_insert(p->pDb, pKey, nKey, pData, nVal+1);
- sqlite3_free(pData);
+ if( sqlite3_value_type(argv[2+LSM1_COLUMN_BLOBVALUE])==SQLITE_BLOB ){
+ pVal = sqlite3_value_blob(argv[2+LSM1_COLUMN_BLOBVALUE]);
+ nVal = sqlite3_value_bytes(argv[2+LSM1_COLUMN_BLOBVALUE]);
+ rc = lsm_insert(p->pDb, pKey, nKey, pVal, nVal);
+ }else{
+ pValue = argv[2+LSM1_COLUMN_VALUE];
+ eType = sqlite3_value_type(pValue);
+ switch( eType ){
+ case SQLITE_NULL: {
+ rc = lsm_delete(p->pDb, pKey, nKey);
+ break;
}
- break;
- }
- case SQLITE_INTEGER:
- case SQLITE_FLOAT: {
- sqlite3_uint64 x;
- unsigned char aVal[9];
- int i;
- if( eType==SQLITE_INTEGER ){
- *(sqlite3_int64*)&x = sqlite3_value_int64(pValue);
- }else{
- *(double*)&x = sqlite3_value_double(pValue);
+ case SQLITE_BLOB:
+ case SQLITE_TEXT: {
+ if( eType==SQLITE_TEXT ){
+ pVal = sqlite3_value_text(pValue);
+ }else{
+ pVal = (unsigned char*)sqlite3_value_blob(pValue);
+ }
+ nVal = sqlite3_value_bytes(pValue);
+ pData = sqlite3_malloc( nVal+1 );
+ if( pData==0 ){
+ rc = SQLITE_NOMEM;
+ }else{
+ pData[0] = eType;
+ memcpy(&pData[1], pVal, nVal);
+ rc = lsm_insert(p->pDb, pKey, nKey, pData, nVal+1);
+ sqlite3_free(pData);
+ }
+ break;
}
- for(i=8; i>=1; i--){
- aVal[i] = x & 0xff;
- x >>= 8;
+ case SQLITE_INTEGER:
+ case SQLITE_FLOAT: {
+ sqlite3_uint64 x;
+ unsigned char aVal[9];
+ int i;
+ if( eType==SQLITE_INTEGER ){
+ *(sqlite3_int64*)&x = sqlite3_value_int64(pValue);
+ }else{
+ *(double*)&x = sqlite3_value_double(pValue);
+ }
+ for(i=8; x>0 && i>=1; i--){
+ aVal[i] = x & 0xff;
+ x >>= 8;
+ }
+ aVal[i] = eType;
+ rc = lsm_insert(p->pDb, pKey, nKey, &aVal[i], 9-i);
+ break;
}
- aVal[0] = eType;
- rc = lsm_insert(p->pDb, pKey, nKey, aVal, 9);
- break;
}
}
+ if( pKey!=(const void*)pSpace ) sqlite3_free((void*)pKey);
return rc==LSM_OK ? SQLITE_OK : SQLITE_ERROR;
}