**
** This file implements a simple virtual table wrapper around the LSM
** storage engine from SQLite4.
+**
+** USAGE
+**
+** CREATE VIRTUAL TABLE demo USING lsm1(filename,key,keytype,value1,...);
+**
+** The keytype must be one of: UINT, TEXT, BLOB. All keys must be of that
+** one type. "UINT" means unsigned integer. The values may be any
+** SQLite datatype.
+**
+** INTERNALS
+**
+** The key encoding for BLOB and TEXT is just a copy of the blob or text.
+** UTF-8 is used for text. The key encoding for UINT is the variable-length
+** integer format at https://sqlite.org/src4/doc/trunk/www/varint.wiki.
+**
+** The values are encoded as a single blob (since that is what lsm stores as
+** its content). There is a "type integer" followed by "content" for each
+** value, alternating back and forth. The content might be empty.
+**
+** TYPE1 CONTENT1 TYPE2 CONTENT2 TYPE3 CONTENT3 ....
+**
+** Each "type integer" is encoded as a variable-length integer in the
+** format of the link above. Let the type integer be T. The actual
+** datatype is an integer 0-5 equal to T%6. Values 1 through 5 correspond
+** to SQLITE_INTEGER through SQLITE_NULL. The size of the content in bytes
+** is T/6. Type value 0 means that the value is an integer whose actual
+** values is T/6 and there is no content. The type-value-0 integer format
+** only works for integers in the range of 0 through 40.
+**
+** There is no content for NULL or type-0 integers. For BLOB and TEXT
+** values, the content is the blob data or the UTF-8 text data. For
+** non-negative integers X, the content is a variable-length integer X*2.
+** For negative integers Y, the content is varaible-length integer (1-Y)*2+1.
+** For FLOAT values, the content is the variable length encoding of the
+** integer with the same bit pattern as the IEEE754 floating point value.
*/
#include "sqlite3ext.h"
SQLITE_EXTENSION_INIT1
/* Forward declaration of subclasses of virtual table objects */
typedef struct lsm1_vtab lsm1_vtab;
typedef struct lsm1_cursor lsm1_cursor;
+typedef struct lsm1_vblob lsm1_vblob;
/* Primitive types */
typedef unsigned char u8;
+typedef unsigned int u32;
+typedef sqlite3_uint64 u64;
/* An open connection to an LSM table */
struct lsm1_vtab {
sqlite3_vtab base; /* Base class - must be first */
lsm_db *pDb; /* Open connection to the LSM table */
+ u8 keyType; /* SQLITE_BLOB, _TEXT, or _INTEGER */
+ u32 nVal; /* Number of value columns */
};
u8 isDesc; /* 0: scan forward. 1: scan reverse */
u8 atEof; /* True if the scan is complete */
u8 bUnique; /* True if no more than one row of output */
+ u8 *zData; /* Content of the current row */
+ u32 nData; /* Number of bytes in the current row */
+ u8 *aeType; /* Types for all column values */
+ u32 *aiOfst; /* Offsets to the various fields */
+ u32 *aiLen; /* Length of each field */
+ u8 *pKey2; /* Loop termination key, or NULL */
+ u32 nKey2; /* Length of the loop termination key */
};
+/* An extensible buffer object.
+**
+** Content can be appended. Space to hold new content is automatically
+** allocated.
+*/
+struct lsm1_vblob {
+ u8 *a; /* Space to hold content, from sqlite3_malloc64() */
+ u64 n; /* Bytes of space used */
+ u64 nAlloc; /* Bytes of space allocated */
+ u8 errNoMem; /* True if a memory allocation error has been seen */
+};
+
+#if defined(__GNUC__)
+# define LSM1_NOINLINE __attribute__((noinline))
+#elif defined(_MSC_VER) && _MSC_VER>=1310
+# define LSM1_NOINLINE __declspec(noinline)
+#else
+# define LSM1_NOINLINE
+#endif
+
+
+/* Increase the available space in the vblob object so that it can hold
+** at least N more bytes. Return the number of errors.
+*/
+static int lsm1VblobEnlarge(lsm1_vblob *p, u32 N){
+ if( p->n+N>p->nAlloc ){
+ if( p->errNoMem ) return 1;
+ p->nAlloc += N + (p->nAlloc ? p->nAlloc : N);
+ p->a = sqlite3_realloc64(p->a, p->nAlloc);
+ if( p->a==0 ){
+ p->n = 0;
+ p->nAlloc = 0;
+ p->errNoMem = 1;
+ return 1;
+ }
+ p->nAlloc = sqlite3_msize(p->a);
+ }
+ return 0;
+}
+
+/* Append N bytes to a vblob after first enlarging it */
+static LSM1_NOINLINE void lsm1VblobEnlargeAndAppend(
+ lsm1_vblob *p,
+ const u8 *pData,
+ u32 N
+){
+ if( p->n+N>p->nAlloc && lsm1VblobEnlarge(p, N) ) return;
+ memcpy(p->a+p->n, pData, N);
+ p->n += N;
+}
+
+/* Append N bytes to a vblob */
+static void lsm1VblobAppend(lsm1_vblob *p, const u8 *pData, u32 N){
+ sqlite3_int64 n = p->n;
+ if( n+N>p->nAlloc ){
+ lsm1VblobEnlargeAndAppend(p, pData, N);
+ }else{
+ p->n += N;
+ memcpy(p->a+n, pData, N);
+ }
+}
+
+/* append text to a vblob */
+static void lsm1VblobAppendText(lsm1_vblob *p, const char *z){
+ lsm1VblobAppend(p, (u8*)z, (u32)strlen(z));
+}
+
/* Dequote the string */
static void lsm1Dequote(char *z){
int j;
lsm1_vtab *pNew;
int rc;
char *zFilename;
+ u8 keyType = 0;
+ int i;
+ lsm1_vblob sql;
+ static const char *azTypes[] = { "UINT", "TEXT", "BLOB" };
+ static const u8 aeTypes[] = { SQLITE_INTEGER, SQLITE_TEXT, SQLITE_BLOB };
+ static const char *azArgName[] = {"filename", "key", "key type", "value1" };
- if( argc!=4 || argv[3]==0 || argv[3][0]==0 ){
- *pzErr = sqlite3_mprintf("filename argument missing");
+ for(i=0; i<sizeof(azArgName)/sizeof(azArgName[0]); i++){
+ if( argc<i+4 || argv[i+3]==0 || argv[i+3][0]==0 ){
+ *pzErr = sqlite3_mprintf("%s (%r) argument missing",
+ azArgName[i], i+1);
+ return SQLITE_ERROR;
+ }
+ }
+ for(i=0; i<sizeof(azTypes)/sizeof(azTypes[0]); i++){
+ if( sqlite3_stricmp(azTypes[i],argv[5])==0 ){
+ keyType = aeTypes[i];
+ break;
+ }
+ }
+ if( keyType==0 ){
+ *pzErr = sqlite3_mprintf("key type should be INT, TEXT, or BLOB");
return SQLITE_ERROR;
}
*ppVtab = sqlite3_malloc( sizeof(*pNew) );
return SQLITE_NOMEM;
}
memset(pNew, 0, sizeof(*pNew));
+ pNew->keyType = keyType;
rc = lsm_new(0, &pNew->pDb);
if( rc ){
*pzErr = sqlite3_mprintf("lsm_new failed with error code %d", rc);
goto connect_failed;
}
-/* Column numbers */
-#define LSM1_COLUMN_KEY 0
-#define LSM1_COLUMN_BLOBKEY 1
-#define LSM1_COLUMN_VALUE 2
-#define LSM1_COLUMN_BLOBVALUE 3
-#define LSM1_COLUMN_COMMAND 4
-
- rc = sqlite3_declare_vtab(db,
- "CREATE TABLE x("
- " key," /* The primary key. Any non-NULL */
- " blobkey," /* Pure BLOB primary key */
- " value," /* The value associated with key. Any non-NULL */
- " blobvalue," /* Pure BLOB value */
- " command hidden" /* Insert here for control operations */
- ");"
- );
+ memset(&sql, 0, sizeof(sql));
+ lsm1VblobAppendText(&sql, "CREATE TABLE x(");
+ lsm1VblobAppendText(&sql, argv[4]);
+ lsm1VblobAppendText(&sql, " ");
+ lsm1VblobAppendText(&sql, argv[5]);
+ for(i=6; i<argc; i++){
+ lsm1VblobAppendText(&sql, ", ");
+ lsm1VblobAppendText(&sql, argv[i]);
+ pNew->nVal++;
+ }
+ lsm1VblobAppendText(&sql,
+ ", lsm1_command HIDDEN, lsm1_key HIDDEN, lsm1_value HIDDEN)");
+ lsm1VblobAppend(&sql, (u8*)"", 1);
+ if( sql.errNoMem ){
+ rc = SQLITE_NOMEM;
+ goto connect_failed;
+ }
+ rc = sqlite3_declare_vtab(db, (const char*)sql.a);
+ sqlite3_free(sql.a);
+
connect_failed:
if( rc!=SQLITE_OK ){
if( pNew ){
lsm1_vtab *p = (lsm1_vtab*)pVtab;
lsm1_cursor *pCur;
int rc;
- pCur = sqlite3_malloc( sizeof(*pCur) );
+ pCur = sqlite3_malloc64( sizeof(*pCur)
+ + p->nVal*(sizeof(pCur->aiOfst)+sizeof(pCur->aiLen)+1) );
if( pCur==0 ) return SQLITE_NOMEM;
memset(pCur, 0, sizeof(*pCur));
+ pCur->aiOfst = (u32*)&pCur[1];
+ pCur->aiLen = &pCur->aiOfst[p->nVal];
+ pCur->aeType = (u8*)&pCur->aiLen[p->nVal];
*ppCursor = &pCur->base;
rc = lsm_csr_open(p->pDb, &pCur->pLsmCur);
if( rc==LSM_OK ){
*/
static int lsm1Close(sqlite3_vtab_cursor *cur){
lsm1_cursor *pCur = (lsm1_cursor*)cur;
+ sqlite3_free(pCur->pKey2);
lsm_csr_close(pCur->pLsmCur);
sqlite3_free(pCur);
return SQLITE_OK;
if( rc==LSM_OK && lsm_csr_valid(pCur->pLsmCur)==0 ){
pCur->atEof = 1;
}
+ if( pCur->pKey2 && pCur->atEof==0 ){
+ const u8 *pVal;
+ u32 nVal;
+ assert( pCur->isDesc==0 );
+ rc = lsm_csr_key(pCur->pLsmCur, (const void**)&pVal, (int*)&nVal);
+ if( rc==LSM_OK ){
+ u32 len = pCur->nKey2;
+ int c;
+ if( len>nVal ) len = nVal;
+ c = memcmp(pVal, pCur->pKey2, len);
+ if( c==0 ) c = nVal - pCur->nKey2;
+ if( c>0 ) pCur->atEof = 1;
+ }
+ }
+ pCur->zData = 0;
}
return rc==LSM_OK ? SQLITE_OK : SQLITE_ERROR;
}
return 9;
}
+/* Append non-negative integer x as a variable-length integer.
+*/
+static void lsm1VblobAppendVarint(lsm1_vblob *p, sqlite3_uint64 x){
+ sqlite3_int64 n = p->n;
+ if( n+9>p->nAlloc && lsm1VblobEnlarge(p, 9) ) return;
+ p->n += lsm1PutVarint64(p->a+p->n, x);
+}
+
/*
** Decode the varint in the first n bytes z[]. Write the integer value
** into *pResult and return the number of bytes in the varint.
return 9;
}
-/*
-** Generate a key encoding for pValue such that all keys compare in
-** lexicographical order. Return an SQLite error code or SQLITE_OK.
+/* Encoded a signed integer as a varint. Numbers close to zero uses fewer
+** bytes than numbers far away from zero. However, the result is not in
+** lexicographical order.
**
-** The key encoding is *pnKey bytes in length written into *ppKey.
-** Space to hold the key is taken from pSpace if sufficient, or else
-** from sqlite3_malloc(). The caller is responsible for freeing malloced
-** space.
+** Encoding: Non-negative integer X is encoding as an unsigned
+** varint X*2. Negative integer Y is encoding as an unsigned
+** varint (1-Y)*2 + 1.
*/
-static int lsm1EncodeKey(
- sqlite3_value *pValue, /* Value to be encoded */
- unsigned char **ppKey, /* Write the encoding here */
- int *pnKey, /* Write the size of the encoding here */
- unsigned char *pSpace, /* Use this space if it is large enough */
- int nSpace /* Size of pSpace[] */
+static int lsm1PutSignedVarint64(u8 *z, sqlite3_int64 v){
+ sqlite3_uint64 u;
+ if( v>=0 ){
+ u = (sqlite3_uint64)v;
+ return lsm1PutVarint64(z, u*2);
+ }else{
+ u = (sqlite3_uint64)(1-v);
+ return lsm1PutVarint64(z, u*2+1);
+ }
+}
+
+/* Decoded a signed varint. */
+static int lsm1GetSignedVarint64(
+ const unsigned char *z,
+ int n,
+ sqlite3_int64 *pResult
){
- int eType = sqlite3_value_type(pValue);
- *ppKey = 0;
- *pnKey = 0;
- assert( nSpace>=32 );
- switch( eType ){
- default: {
- return SQLITE_ERROR; /* We cannot handle NULL keys */
- }
- case SQLITE_BLOB:
- case SQLITE_TEXT: {
- int nVal = sqlite3_value_bytes(pValue);
- const void *pVal;
- if( eType==SQLITE_BLOB ){
- eType = LSM1_TYPE_BLOB;
- pVal = sqlite3_value_blob(pValue);
- }else{
- eType = LSM1_TYPE_TEXT;
- pVal = (const void*)sqlite3_value_text(pValue);
- if( pVal==0 ) return SQLITE_NOMEM;
- }
- if( nVal+1>nSpace ){
- pSpace = sqlite3_malloc( nVal+1 );
- if( pSpace==0 ) return SQLITE_NOMEM;
- }
- pSpace[0] = (unsigned char)eType;
- memcpy(&pSpace[1], pVal, nVal);
- *ppKey = pSpace;
- *pnKey = nVal+1;
- break;
- }
- case SQLITE_INTEGER: {
- sqlite3_int64 iVal = sqlite3_value_int64(pValue);
- sqlite3_uint64 uVal;
- if( iVal<0 ){
- if( iVal==0xffffffffffffffffLL ) return SQLITE_ERROR;
- uVal = *(sqlite3_uint64*)&iVal;
- eType = LSM1_TYPE_NEGATIVE;
- }else{
- uVal = iVal;
- eType = LSM1_TYPE_POSITIVE;
- }
- pSpace[0] = (unsigned char)eType;
- *ppKey = pSpace;
- *pnKey = 1 + lsm1PutVarint64(&pSpace[1], uVal);
+ sqlite3_uint64 u = 0;
+ n = lsm1GetVarint64(z, n, &u);
+ if( u&1 ){
+ *pResult = -1 - (sqlite3_int64)(u>>1);
+ }else{
+ *pResult = (sqlite3_int64)(u>>1);
+ }
+ return n;
+}
+
+
+/*
+** Read the value part of the key-value pair and decode it into columns.
+*/
+static int lsm1DecodeValues(lsm1_cursor *pCur){
+ lsm1_vtab *pTab = (lsm1_vtab*)(pCur->base.pVtab);
+ int i, n;
+ int rc;
+ u8 eType;
+ sqlite3_uint64 v;
+
+ if( pCur->zData ) return 1;
+ rc = lsm_csr_value(pCur->pLsmCur, (const void**)&pCur->zData,
+ (int*)&pCur->nData);
+ if( rc ) return 0;
+ for(i=n=0; i<pTab->nVal; i++){
+ v = 0;
+ n += lsm1GetVarint64(pCur->zData+n, pCur->nData-n, &v);
+ pCur->aeType[i] = eType = (u8)(v%6);
+ if( eType==0 ){
+ pCur->aiOfst[i] = (u32)(v/6);
+ pCur->aiLen[i] = 0;
+ }else{
+ pCur->aiOfst[i] = n;
+ n += (pCur->aiLen[i] = (u32)(v/6));
}
+ if( n>pCur->nData ) break;
}
- return SQLITE_OK;
+ if( i<pTab->nVal ){
+ pCur->zData = 0;
+ return 0;
+ }
+ return 1;
}
/*
int i /* Which column to return */
){
lsm1_cursor *pCur = (lsm1_cursor*)cur;
- switch( i ){
- case LSM1_COLUMN_BLOBKEY: {
- const void *pVal;
- int nVal;
- if( lsm_csr_key(pCur->pLsmCur, &pVal, &nVal)==LSM_OK ){
+ lsm1_vtab *pTab = (lsm1_vtab*)(cur->pVtab);
+ if( i==0 ){
+ /* The key column */
+ const void *pVal;
+ int nVal;
+ if( lsm_csr_key(pCur->pLsmCur, &pVal, &nVal)==LSM_OK ){
+ if( pTab->keyType==SQLITE_BLOB ){
sqlite3_result_blob(ctx, pVal, nVal, SQLITE_TRANSIENT);
+ }else if( pTab->keyType==SQLITE_TEXT ){
+ sqlite3_result_text(ctx,(const char*)pVal, nVal, SQLITE_TRANSIENT);
+ }else{
+ const unsigned char *z = (const unsigned char*)pVal;
+ sqlite3_uint64 v1;
+ lsm1GetVarint64(z, nVal, &v1);
+ sqlite3_result_int64(ctx, (sqlite3_int64)v1);
}
- break;
}
- case LSM1_COLUMN_KEY: {
- const unsigned char *pVal;
+ }else if( i>pTab->nVal ){
+ if( i==pTab->nVal+2 ){ /* lsm1_key */
+ const void *pVal;
int nVal;
- if( lsm_csr_key(pCur->pLsmCur, (const void**)&pVal, &nVal)==LSM_OK
- && nVal>=1
- ){
- if( pVal[0]==LSM1_TYPE_BLOB ){
- sqlite3_result_blob(ctx, (const void*)&pVal[1],nVal-1,
- SQLITE_TRANSIENT);
- }else if( pVal[0]==LSM1_TYPE_TEXT ){
- sqlite3_result_text(ctx, (const char*)&pVal[1],nVal-1,
- SQLITE_TRANSIENT);
- }else if( nVal>=2 && nVal<=10 &&
- (pVal[0]==LSM1_TYPE_POSITIVE || pVal[0]==LSM1_TYPE_NEGATIVE)
- ){
- sqlite3_int64 iVal;
- lsm1GetVarint64(pVal+1, nVal-1, (sqlite3_uint64*)&iVal);
- sqlite3_result_int64(ctx, iVal);
- }
+ if( lsm_csr_key(pCur->pLsmCur, &pVal, &nVal)==LSM_OK ){
+ sqlite3_result_blob(ctx, pVal, nVal, SQLITE_TRANSIENT);
}
- break;
- }
- case LSM1_COLUMN_BLOBVALUE: {
+ }else if( i==pTab->nVal+3 ){ /* lsm1_value */
const void *pVal;
int nVal;
- if( lsm_csr_value(pCur->pLsmCur, (const void**)&pVal, &nVal)==LSM_OK ){
+ if( lsm_csr_value(pCur->pLsmCur, &pVal, &nVal)==LSM_OK ){
sqlite3_result_blob(ctx, pVal, nVal, SQLITE_TRANSIENT);
}
- break;
}
- case LSM1_COLUMN_VALUE: {
- const unsigned char *aVal;
- int nVal;
- if( lsm_csr_value(pCur->pLsmCur, (const void**)&aVal, &nVal)==LSM_OK
- && nVal>=1
- ){
- switch( aVal[0] ){
- case SQLITE_FLOAT:
- case SQLITE_INTEGER: {
- sqlite3_uint64 x = 0;
- int j;
- for(j=1; j<nVal; j++){
- x = (x<<8) | aVal[j];
- }
- if( aVal[0]==SQLITE_INTEGER ){
- sqlite3_result_int64(ctx, *(sqlite3_int64*)&x);
- }else{
- double r;
- assert( sizeof(r)==sizeof(x) );
- memcpy(&r, &x, sizeof(r));
- sqlite3_result_double(ctx, r);
- }
- break;
- }
- case SQLITE_TEXT: {
- sqlite3_result_text(ctx, (char*)&aVal[1], nVal-1, SQLITE_TRANSIENT);
- break;
- }
- case SQLITE_BLOB: {
- sqlite3_result_blob(ctx, &aVal[1], nVal-1, SQLITE_TRANSIENT);
- break;
- }
- }
+ }else if( lsm1DecodeValues(pCur) ){
+ /* The i-th value column (where leftmost is 1) */
+ const u8 *zData;
+ u32 nData;
+ i--;
+ zData = pCur->zData + pCur->aiOfst[i];
+ nData = pCur->aiLen[i];
+ switch( pCur->aeType[i] ){
+ case 0: { /* in-line integer */
+ sqlite3_result_int(ctx, pCur->aiOfst[i]);
+ break;
+ }
+ case SQLITE_INTEGER: {
+ sqlite3_int64 v;
+ lsm1GetSignedVarint64(zData, nData, &v);
+ sqlite3_result_int64(ctx, v);
+ break;
+ }
+ case SQLITE_FLOAT: {
+ sqlite3_uint64 v1 = 0;
+ double v;
+ lsm1GetVarint64(zData, nData, &v1);
+ memcpy(&v, &v1, sizeof(v));
+ sqlite3_result_double(ctx, v);
+ break;
+ }
+ case SQLITE_TEXT: {
+ sqlite3_result_text(ctx, (const char*)zData, nData, SQLITE_TRANSIENT);
+ }
+ case SQLITE_BLOB: {
+ sqlite3_result_blob(ctx, zData, nData, SQLITE_TRANSIENT);
+ }
+ default: {
+ /* A NULL. Do nothing */
}
- break;
- }
- default: {
- break;
}
}
return SQLITE_OK;
int argc, sqlite3_value **argv
){
lsm1_cursor *pCur = (lsm1_cursor *)pVtabCursor;
+ lsm1_vtab *pTab = (lsm1_vtab*)(pCur->base.pVtab);
int rc = LSM_OK;
+ int seekType = -1;
+ const void *pVal = 0;
+ int nVal;
+ u8 keyType = pTab->keyType;
+ u8 aKey1[16];
+
pCur->atEof = 1;
- if( idxNum==1 ){
- assert( argc==1 );
- pCur->isDesc = 0;
- pCur->bUnique = 1;
- if( sqlite3_value_type(argv[0])==SQLITE_BLOB ){
- const void *pVal = sqlite3_value_blob(argv[0]);
- int nVal = sqlite3_value_bytes(argv[0]);
- rc = lsm_csr_seek(pCur->pLsmCur, pVal, nVal, LSM_SEEK_EQ);
+ sqlite3_free(pCur->pKey2);
+ pCur->pKey2 = 0;
+ if( idxNum<99 ){
+ if( keyType==SQLITE_BLOB ){
+ pVal = sqlite3_value_blob(argv[0]);
+ nVal = sqlite3_value_bytes(argv[0]);
+ }else if( keyType==SQLITE_TEXT ){
+ pVal = sqlite3_value_text(argv[0]);
+ nVal = sqlite3_value_bytes(argv[0]);
+ }else{
+ sqlite3_int64 v = sqlite3_value_int64(argv[0]);
+ if( v<0 ) v = 0;
+ nVal = lsm1PutVarint64(aKey1, v);
+ pVal = aKey1;
+ }
+ }
+ switch( idxNum ){
+ case 0: { /* key==argv[0] */
+ assert( argc==1 );
+ seekType = LSM_SEEK_EQ;
+ pCur->isDesc = 0;
+ pCur->bUnique = 1;
+ break;
+ }
+ case 1: { /* key>=argv[0] AND key<=argv[1] */
+ u8 aKey[12];
+ seekType = LSM_SEEK_GE;
+ pCur->isDesc = 0;
+ pCur->bUnique = 0;
+ if( keyType==SQLITE_INTEGER ){
+ sqlite3_int64 v = sqlite3_value_int64(argv[1]);
+ if( v<0 ) v = 0;
+ pCur->nKey2 = lsm1PutVarint64(aKey, (sqlite3_uint64)v);
+ pCur->pKey2 = sqlite3_malloc( pCur->nKey2 );
+ if( pCur->pKey2==0 ) return SQLITE_NOMEM;
+ memcpy(pCur->pKey2, aKey, pCur->nKey2);
+ }else{
+ pCur->nKey2 = sqlite3_value_bytes(argv[1]);
+ pCur->pKey2 = sqlite3_malloc( pCur->nKey2 );
+ if( pCur->pKey2==0 ) return SQLITE_NOMEM;
+ if( keyType==SQLITE_BLOB ){
+ memcpy(pCur->pKey2, sqlite3_value_blob(argv[1]), pCur->nKey2);
+ }else{
+ memcpy(pCur->pKey2, sqlite3_value_text(argv[1]), pCur->nKey2);
+ }
+ }
+ break;
+ }
+ case 2: { /* key>=argv[0] */
+ seekType = LSM_SEEK_GE;
+ pCur->isDesc = 0;
+ pCur->bUnique = 0;
+ break;
+ }
+ case 3: { /* key<=argv[0] */
+ seekType = LSM_SEEK_LE;
+ pCur->isDesc = 1;
+ pCur->bUnique = 0;
+ break;
}
+ default: { /* full table scan */
+ pCur->isDesc = 0;
+ pCur->bUnique = 0;
+ break;
+ }
+ }
+ if( pVal ){
+ rc = lsm_csr_seek(pCur->pLsmCur, pVal, nVal, seekType);
}else{
rc = lsm_csr_first(pCur->pLsmCur);
- pCur->isDesc = 0;
- pCur->bUnique = 0;
}
if( rc==LSM_OK && lsm_csr_valid(pCur->pLsmCur)!=0 ){
pCur->atEof = 0;
** Only comparisons against the key are allowed. The idxNum defines
** which comparisons are available:
**
-** 0 Full table scan only
-** bit 1 key==?1 single argument for ?1
-** bit 2 key>?1
-** bit 3 key>=?1
-** bit 4 key<?N (N==1 if bits 2,3 clear, or 2 if bits2,3 set)
-** bit 5 key<=?N (N==1 if bits 2,3 clear, or 2 if bits2,3 set)
-** bit 6 Use blobkey instead of key
-**
-** To put it another way:
-**
-** 0 Full table scan.
-** 1 key==?1
-** 2 key>?1
-** 4 key>=?1
-** 8 key<?1
-** 10 key>?1 AND key<?2
-** 12 key>=?1 AND key<?2
-** 16 key<=?1
-** 18 key>?1 AND key<=?2
-** 20 key>=?1 AND key<=?2
-** 33..52 Use blobkey in place of key...
+** 0 key==?1
+** 1 key>=?1 AND key<=?2
+** 2 key>?1 or key>=?1
+** 3 key<?1 or key<=?1
+** 99 Full table scan only
*/
static int lsm1BestIndex(
sqlite3_vtab *tab,
sqlite3_index_info *pIdxInfo
){
int i; /* Loop over constraints */
- int idxNum = 0; /* The query plan bitmask */
+ int idxNum = 99; /* The query plan */
int nArg = 0; /* Number of arguments to xFilter */
- int eqIdx = -1; /* Index of the key== constraint, or -1 if none */
+ int argIdx = -1; /* Index of the key== constraint, or -1 if none */
+ int iIdx2 = -1; /* The index of the second key */
+ int omit1 = 0;
+ int omit2 = 0;
const struct sqlite3_index_constraint *pConstraint;
pConstraint = pIdxInfo->aConstraint;
for(i=0; i<pIdxInfo->nConstraint && idxNum<16; i++, pConstraint++){
if( pConstraint->usable==0 ) continue;
- if( pConstraint->iColumn!=LSM1_COLUMN_KEY ) continue;
- if( pConstraint->op!=SQLITE_INDEX_CONSTRAINT_EQ ) continue;
+ if( pConstraint->iColumn!=0 ) continue;
switch( pConstraint->op ){
case SQLITE_INDEX_CONSTRAINT_EQ: {
- eqIdx = i;
- idxNum = 1;
+ if( idxNum>0 ){
+ argIdx = i;
+ iIdx2 = -1;
+ idxNum = 0;
+ omit1 = 1;
+ }
+ break;
+ }
+ case SQLITE_INDEX_CONSTRAINT_GE:
+ case SQLITE_INDEX_CONSTRAINT_GT: {
+ if( idxNum==99 ){
+ argIdx = i;
+ idxNum = 2;
+ omit1 = pConstraint->op==SQLITE_INDEX_CONSTRAINT_GE;
+ }else if( idxNum==3 ){
+ iIdx2 = idxNum;
+ omit2 = omit1;
+ argIdx = i;
+ idxNum = 1;
+ omit1 = pConstraint->op==SQLITE_INDEX_CONSTRAINT_GE;
+ }
+ break;
+ }
+ case SQLITE_INDEX_CONSTRAINT_LE:
+ case SQLITE_INDEX_CONSTRAINT_LT: {
+ if( idxNum==99 ){
+ argIdx = i;
+ idxNum = 3;
+ omit1 = pConstraint->op==SQLITE_INDEX_CONSTRAINT_LE;
+ }else if( idxNum==2 ){
+ iIdx2 = i;
+ idxNum = 1;
+ omit1 = pConstraint->op==SQLITE_INDEX_CONSTRAINT_LE;
+ }
break;
}
}
}
- if( eqIdx>=0 ){
- pIdxInfo->aConstraintUsage[eqIdx].argvIndex = ++nArg;
- pIdxInfo->aConstraintUsage[eqIdx].omit = 1;
+ if( argIdx>=0 ){
+ pIdxInfo->aConstraintUsage[argIdx].argvIndex = ++nArg;
+ pIdxInfo->aConstraintUsage[argIdx].omit = omit1;
+ }
+ if( iIdx2>=0 ){
+ pIdxInfo->aConstraintUsage[iIdx2].argvIndex = ++nArg;
+ pIdxInfo->aConstraintUsage[iIdx2].omit = omit2;
}
- if( idxNum==1 ){
+ if( idxNum==0 ){
pIdxInfo->estimatedCost = (double)1;
pIdxInfo->estimatedRows = 1;
pIdxInfo->orderByConsumed = 1;
+ }else if( idxNum==1 ){
+ pIdxInfo->estimatedCost = (double)100;
+ pIdxInfo->estimatedRows = 100;
+ }else if( idxNum<99 ){
+ pIdxInfo->estimatedCost = (double)5000;
+ pIdxInfo->estimatedRows = 5000;
}else{
/* Full table scan */
pIdxInfo->estimatedCost = (double)2147483647;
sqlite_int64 *pRowid
){
lsm1_vtab *p = (lsm1_vtab*)pVTab;
- const void *pKey;
- void *pFree = 0;
int nKey;
- int eType;
+ int i;
int rc = LSM_OK;
- sqlite3_value *pValue;
- const unsigned char *pVal;
- unsigned char *pData;
- int nVal;
- unsigned char pSpace[100];
+ unsigned char *pKey;
+ unsigned char aKey[16];
+ unsigned char pSpace[16];
+ lsm1_vblob val;
if( argc==1 ){
pVTab->zErrMsg = sqlite3_mprintf("cannot DELETE");
return SQLITE_ERROR;
}
- /* "INSERT INTO tab(command) VALUES('....')" is used to implement
+ /* "INSERT INTO tab(lsm1_command) VALUES('....')" is used to implement
** special commands.
*/
- if( sqlite3_value_type(argv[2+LSM1_COLUMN_COMMAND])!=SQLITE_NULL ){
+ if( sqlite3_value_type(argv[3+p->nVal])!=SQLITE_NULL ){
return SQLITE_OK;
}
- if( sqlite3_value_type(argv[2+LSM1_COLUMN_BLOBKEY])==SQLITE_BLOB ){
- /* Use the blob key exactly as supplied */
- pKey = sqlite3_value_blob(argv[2+LSM1_COLUMN_BLOBKEY]);
- nKey = sqlite3_value_bytes(argv[2+LSM1_COLUMN_BLOBKEY]);
- }else{
- /* Use a key encoding that sorts in lexicographical order */
- rc = lsm1EncodeKey(argv[2+LSM1_COLUMN_KEY],
- (unsigned char**)&pKey,&nKey,
- pSpace,sizeof(pSpace));
- if( rc ) return rc;
- if( pKey!=(const void*)pSpace ) pFree = (void*)pKey;
- }
- if( sqlite3_value_type(argv[2+LSM1_COLUMN_BLOBVALUE])==SQLITE_BLOB ){
- pVal = sqlite3_value_blob(argv[2+LSM1_COLUMN_BLOBVALUE]);
- nVal = sqlite3_value_bytes(argv[2+LSM1_COLUMN_BLOBVALUE]);
- rc = lsm_insert(p->pDb, pKey, nKey, pVal, nVal);
+ if( p->keyType==SQLITE_BLOB ){
+ pKey = (u8*)sqlite3_value_blob(argv[2]);
+ nKey = sqlite3_value_bytes(argv[2]);
+ }else if( p->keyType==SQLITE_TEXT ){
+ pKey = (u8*)sqlite3_value_text(argv[2]);
+ nKey = sqlite3_value_bytes(argv[2]);
}else{
- pValue = argv[2+LSM1_COLUMN_VALUE];
- eType = sqlite3_value_type(pValue);
+ sqlite3_int64 v = sqlite3_value_int64(argv[2]);
+ if( v>=0 ){
+ nKey = lsm1PutVarint64(aKey, (sqlite3_uint64)v);
+ pKey = aKey;
+ }else{
+ pVTab->zErrMsg = sqlite3_mprintf("key must be non-negative");
+ return SQLITE_ERROR;
+ }
+ }
+ memset(&val, 0, sizeof(val));
+ for(i=0; i<p->nVal; i++){
+ u8 eType = sqlite3_value_type(argv[3+i]);
switch( eType ){
case SQLITE_NULL: {
- rc = lsm_delete(p->pDb, pKey, nKey);
+ lsm1VblobAppendVarint(&val, SQLITE_NULL);
break;
}
- case SQLITE_BLOB:
- case SQLITE_TEXT: {
- if( eType==SQLITE_TEXT ){
- pVal = sqlite3_value_text(pValue);
- }else{
- pVal = (unsigned char*)sqlite3_value_blob(pValue);
- }
- nVal = sqlite3_value_bytes(pValue);
- pData = sqlite3_malloc( nVal+1 );
- if( pData==0 ){
- rc = SQLITE_NOMEM;
+ case SQLITE_INTEGER: {
+ sqlite3_int64 v = sqlite3_value_int64(argv[3+i]);
+ if( v>=0 && v<=240/6 ){
+ lsm1VblobAppendVarint(&val, v*6);
}else{
- pData[0] = (unsigned char)eType;
- memcpy(&pData[1], pVal, nVal);
- rc = lsm_insert(p->pDb, pKey, nKey, pData, nVal+1);
- sqlite3_free(pData);
+ int n = lsm1PutSignedVarint64(pSpace, v);
+ lsm1VblobAppendVarint(&val, SQLITE_INTEGER + n*6);
+ lsm1VblobAppend(&val, pSpace, n);
}
break;
}
- case SQLITE_INTEGER:
case SQLITE_FLOAT: {
- sqlite3_uint64 x;
- unsigned char aVal[9];
- int i;
- if( eType==SQLITE_INTEGER ){
- *(sqlite3_int64*)&x = sqlite3_value_int64(pValue);
- }else{
- double r = sqlite3_value_double(pValue);
- assert( sizeof(r)==sizeof(x) );
- memcpy(&x, &r, sizeof(r));
- }
- for(i=8; x>0 && i>=1; i--){
- aVal[i] = x & 0xff;
- x >>= 8;
- }
- aVal[i] = (unsigned char)eType;
- rc = lsm_insert(p->pDb, pKey, nKey, &aVal[i], 9-i);
+ double r = sqlite3_value_double(argv[3+i]);
+ sqlite3_uint64 u;
+ int n;
+ memcpy(&u, &r, 8);
+ n = lsm1PutSignedVarint64(pSpace, u);
+ lsm1VblobAppendVarint(&val, SQLITE_FLOAT + n*6);
+ lsm1VblobAppend(&val, pSpace, n);
+ break;
+ }
+ case SQLITE_BLOB: {
+ int n = sqlite3_value_bytes(argv[3+i]);
+ lsm1VblobAppendVarint(&val, n*6 + SQLITE_BLOB);
+ lsm1VblobAppend(&val, sqlite3_value_blob(argv[2+i]), n);
+ break;
+ }
+ case SQLITE_TEXT: {
+ int n = sqlite3_value_bytes(argv[3+i]);
+ lsm1VblobAppendVarint(&val, n*6 + SQLITE_TEXT);
+ lsm1VblobAppend(&val, sqlite3_value_text(argv[2+i]), n);
break;
}
}
}
- sqlite3_free(pFree);
+ if( val.errNoMem ){
+ return SQLITE_NOMEM;
+ }
+ rc = lsm_insert(p->pDb, pKey, nKey, val.a, val.n);
+ sqlite3_free(val.a);
return rc==LSM_OK ? SQLITE_OK : SQLITE_ERROR;
}