]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
cassandra: Add support for prepared statements
authorTimo Sirainen <timo.sirainen@dovecot.fi>
Tue, 22 Aug 2017 11:35:11 +0000 (14:35 +0300)
committerAki Tuomi <aki.tuomi@dovecot.fi>
Fri, 8 Sep 2017 10:18:32 +0000 (13:18 +0300)
src/lib-sql/driver-cassandra.c

index 97e5c79b512f4b4e35c225654bb9e5f606a54d9a..32c78f2ba315710889c91b4e1df88e4776c6949d 100644 (file)
@@ -99,6 +99,7 @@ struct cassandra_db {
 
        int fd_pipe[2];
        struct io *io_pipe;
+       ARRAY(struct cassandra_sql_prepared_statement *) pending_prepares;
        ARRAY(struct cassandra_callback *) callbacks;
        ARRAY(struct cassandra_result *) results;
        unsigned int callback_ids;
@@ -149,6 +150,7 @@ struct cassandra_transaction_context {
        sql_commit_callback_t *callback;
        void *context;
 
+       struct cassandra_sql_statement *stmt;
        char *query;
        char *error;
 
@@ -157,6 +159,42 @@ struct cassandra_transaction_context {
        bool failed:1;
 };
 
+struct cassandra_sql_arg {
+       unsigned int column_idx;
+
+       char *value_str;
+       unsigned char *value_binary;
+       size_t value_binary_size;
+       int64_t value_int64;
+};
+
+struct cassandra_sql_statement {
+       struct sql_statement stmt;
+
+       struct cassandra_sql_prepared_statement *prep;
+       CassStatement *cass_stmt;
+
+       ARRAY(struct cassandra_sql_arg) pending_args;
+       cass_int64_t pending_timestamp;
+
+       struct cassandra_result *result;
+};
+
+struct cassandra_sql_prepared_statement {
+       struct sql_prepared_statement prep_stmt;
+       char *query_template;
+
+       /* NULL, until the prepare is asynchronously finished */
+       const CassPrepared *prepared;
+       /* statements waiting for prepare to finish */
+       ARRAY(struct cassandra_sql_statement *) pending_statements;
+       /* an error here will cause the prepare to be retried on the next
+          execution attempt. */
+       char *error;
+
+       bool pending;
+};
+
 extern const struct sql_db driver_cassandra_db;
 extern const struct sql_result driver_cassandra_result;
 
@@ -189,6 +227,9 @@ static struct {
        { CASS_LOG_TRACE, "trace" }
 };
 
+static void driver_cassandra_prepare_pending(struct cassandra_db *db);
+static void
+prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt);
 static void driver_cassandra_result_send_query(struct cassandra_result *result);
 static void driver_cassandra_send_queries(struct cassandra_db *db);
 static void result_finish(struct cassandra_result *result);
@@ -232,6 +273,7 @@ static void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_stat
 
 static void driver_cassandra_close(struct cassandra_db *db, const char *error)
 {
+       struct cassandra_sql_prepared_statement *const *prep_stmtp;
        struct cassandra_result *const *resultp;
 
        if (db->io_pipe != NULL)
@@ -242,6 +284,13 @@ static void driver_cassandra_close(struct cassandra_db *db, const char *error)
        }
        driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
 
+       array_foreach(&db->pending_prepares, prep_stmtp) {
+               (*prep_stmtp)->pending = FALSE;
+               (*prep_stmtp)->error = i_strdup(error);
+               prepare_finish_pending_statements(*prep_stmtp);
+       }
+       array_clear(&db->pending_prepares);
+
        while (array_count(&db->results) > 0) {
                resultp = array_idx(&db->results, 0);
                if ((*resultp)->error == NULL)
@@ -365,6 +414,7 @@ static void connect_callback(CassFuture *future, void *context)
                   finish */
                io_loop_stop(db->ioloop);
        }
+       driver_cassandra_prepare_pending(db);
        driver_cassandra_send_queries(db);
 }
 
@@ -659,6 +709,7 @@ static struct sql_db *driver_cassandra_init_v(const char *connect_string)
                db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
        i_array_init(&db->results, 16);
        i_array_init(&db->callbacks, 16);
+       i_array_init(&db->pending_prepares, 16);
        return &db->api;
 }
 
@@ -672,6 +723,8 @@ static void driver_cassandra_deinit_v(struct sql_db *_db)
        array_free(&db->callbacks);
        i_assert(array_count(&db->results) == 0);
        array_free(&db->results);
+       i_assert(array_count(&db->pending_prepares) == 0);
+       array_free(&db->pending_prepares);
 
        cass_session_free(db->session);
        cass_cluster_free(db->cluster);
@@ -930,6 +983,8 @@ static void driver_cassandra_result_send_query(struct cassandra_result *result)
        struct cassandra_db *db = (struct cassandra_db *)result->api.db;
        CassFuture *future;
 
+       i_assert(result->statement != NULL);
+
        db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
        driver_cassandra_init_statement(result);
 
@@ -1028,7 +1083,7 @@ static void driver_cassandra_send_queries(struct cassandra_db *db)
 
        results = array_get(&db->results, &count);
        for (i = 0; i < count; i++) {
-               if (!results[i]->query_sent) {
+               if (!results[i]->query_sent && results[i]->statement != NULL) {
                        if (driver_cassandra_send_query(results[i]) <= 0)
                                break;
                }
@@ -1459,6 +1514,7 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
 {
        struct cassandra_transaction_context *ctx =
                (struct cassandra_transaction_context *)_ctx;
+       struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
        enum cassandra_query_type query_type;
        struct sql_commit_result result;
 
@@ -1466,20 +1522,37 @@ driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
        ctx->callback = callback;
        ctx->context = context;
 
-       if (ctx->failed || ctx->query == NULL) {
+       if (ctx->failed || (ctx->query == NULL && ctx->stmt == NULL)) {
                if (ctx->failed)
                        result.error = ctx->error;
 
                callback(&result, context);
                driver_cassandra_transaction_unref(&ctx);
-       } else {
-               /* just a single query, send it */
-               if (strncasecmp(ctx->query, "DELETE ", 7) == 0)
-                       query_type = CASSANDRA_QUERY_TYPE_DELETE;
-               else
-                       query_type = CASSANDRA_QUERY_TYPE_WRITE;
-               driver_cassandra_query_full(_ctx->db, ctx->query, query_type,
+               return;
+       }
+
+       /* just a single query, send it */
+       const char *query = ctx->query != NULL ?
+               ctx->query : sql_statement_get_query(&ctx->stmt->stmt);
+       if (strncasecmp(query, "DELETE ", 7) == 0)
+               query_type = CASSANDRA_QUERY_TYPE_DELETE;
+       else
+               query_type = CASSANDRA_QUERY_TYPE_WRITE;
+
+       if (ctx->query != NULL) {
+               driver_cassandra_query_full(_ctx->db, query, query_type,
                          transaction_commit_callback, ctx);
+       } else {
+               ctx->stmt->result =
+                       driver_cassandra_query_init(db, query, query_type,
+                               transaction_commit_callback, ctx);
+               if (ctx->stmt->cass_stmt == NULL) {
+                       /* wait for prepare to finish */
+               } else {
+                       ctx->stmt->result->statement = ctx->stmt->cass_stmt;
+                       (void)driver_cassandra_send_query(ctx->stmt->result);
+                       pool_unref(&ctx->stmt->stmt.pool);
+               }
        }
 }
 
@@ -1512,6 +1585,11 @@ driver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx,
        struct cassandra_transaction_context *ctx =
                (struct cassandra_transaction_context *)_ctx;
 
+       if (ctx->stmt != NULL) {
+               /* nothing should be using this - don't bother implementing */
+               i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
+       }
+
        if (ctx->query != NULL && !ctx->failed)
                driver_cassandra_try_commit_s(ctx);
        *error_r = t_strdup(ctx->error);
@@ -1541,7 +1619,7 @@ driver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
 
        i_assert(affected_rows == NULL);
 
-       if (ctx->query != NULL) {
+       if (ctx->query != NULL || ctx->stmt != NULL) {
                transaction_set_failed(ctx, "Multiple changes in transaction not supported");
                return;
        }
@@ -1559,9 +1637,376 @@ driver_cassandra_escape_blob(struct sql_db *_db ATTR_UNUSED,
        return str_c(str);
 }
 
+static CassError
+driver_cassandra_bind_int(struct cassandra_sql_statement *stmt,
+                         unsigned int column_idx, int64_t value)
+{
+       const CassDataType *data_type;
+       CassValueType value_type;
+
+       if (stmt->prep == NULL) {
+               value_type = value >= -2147483648 && value <= 2147483647 ?
+                       CASS_VALUE_TYPE_INT : CASS_VALUE_TYPE_BIGINT;
+       } else {
+               /* prepared statements require exactly correct value type */
+               data_type = cass_prepared_parameter_data_type(stmt->prep->prepared, column_idx);
+               value_type = cass_data_type_type(data_type);
+       }
+
+       switch (value_type) {
+       case CASS_VALUE_TYPE_INT:
+               if (value < -2147483648 || value > 2147483647)
+                       return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+               return cass_statement_bind_int32(stmt->cass_stmt, column_idx, value);
+       case CASS_VALUE_TYPE_BIGINT:
+               return cass_statement_bind_int64(stmt->cass_stmt, column_idx, value);
+       case CASS_VALUE_TYPE_SMALL_INT:
+               if (value < -32768 || value > 32767)
+                       return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+               return cass_statement_bind_int16(stmt->cass_stmt, column_idx, value);
+       case CASS_VALUE_TYPE_TINY_INT:
+               if (value < -128 || value > 127)
+                       return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+               return cass_statement_bind_int8(stmt->cass_stmt, column_idx, value);
+       default:
+               return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+       }
+}
+
+static void prepare_finish_arg(struct cassandra_sql_statement *stmt,
+                              const struct cassandra_sql_arg *arg)
+{
+       CassError rc;
+
+       if (arg->value_str != NULL) {
+               rc = cass_statement_bind_string(stmt->cass_stmt, arg->column_idx,
+                                               arg->value_str);
+       } else if (arg->value_binary != NULL) {
+               rc = cass_statement_bind_bytes(stmt->cass_stmt, arg->column_idx,
+                                              arg->value_binary,
+                                              arg->value_binary_size);
+       } else {
+               rc = driver_cassandra_bind_int(stmt, arg->column_idx,
+                                              arg->value_int64);
+       }
+       if (rc != CASS_OK) {
+               i_error("cassandra: Statement '%s': Failed to bind column %u: %s",
+                       stmt->stmt.query_template, arg->column_idx,
+                       cass_error_desc(rc));
+       }
+}
+
+static void prepare_finish_statement(struct cassandra_sql_statement *stmt)
+{
+       const struct cassandra_sql_arg *arg;
+
+       if (stmt->prep->prepared == NULL) {
+               i_assert(stmt->prep->error != NULL);
+
+               if (stmt->result != NULL) {
+                       stmt->result->error = i_strdup(stmt->prep->error);
+                       result_finish(stmt->result);
+               }
+               return;
+       }
+       stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared);
+
+       if (stmt->pending_timestamp != 0) {
+               cass_statement_set_timestamp(stmt->cass_stmt,
+                                            stmt->pending_timestamp);
+       }
+
+       if (array_is_created(&stmt->pending_args)) {
+               array_foreach(&stmt->pending_args, arg)
+                       prepare_finish_arg(stmt, arg);
+       }
+       if (stmt->result != NULL) {
+               stmt->result->statement = stmt->cass_stmt;
+               (void)driver_cassandra_send_query(stmt->result);
+               pool_unref(&stmt->stmt.pool);
+       }
+}
+
+static void
+prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt)
+{
+       struct cassandra_sql_statement *const *stmtp;
+
+       array_foreach(&prep_stmt->pending_statements, stmtp)
+               prepare_finish_statement(*stmtp);
+       array_clear(&prep_stmt->pending_statements);
+}
+
+static void prepare_callback(CassFuture *future, void *context)
+{
+       struct cassandra_sql_prepared_statement *prep_stmt = context;
+       CassError error = cass_future_error_code(future);
+
+       if (error != CASS_OK) {
+               const char *errmsg;
+               size_t errsize;
+
+               cass_future_error_message(future, &errmsg, &errsize);
+               i_free(prep_stmt->error);
+               prep_stmt->error = i_strndup(errmsg, errsize);
+       } else {
+               prep_stmt->prepared = cass_future_get_prepared(future);
+       }
+
+       prepare_finish_pending_statements(prep_stmt);
+}
+
+static void prepare_start(struct cassandra_sql_prepared_statement *prep_stmt)
+{
+       struct cassandra_db *db = (struct cassandra_db *)prep_stmt->prep_stmt.db;
+       CassFuture *future;
+
+       if (!SQL_DB_IS_READY(&db->api)) {
+               if (!prep_stmt->pending) {
+                       prep_stmt->pending = TRUE;
+                       array_append(&db->pending_prepares, &prep_stmt, 1);
+
+                       if (sql_connect(&db->api) < 0)
+                               i_unreached();
+               }
+               return;
+       }
+
+       /* clear the current error in case we're retrying */
+       i_free_and_null(prep_stmt->error);
+
+       future = cass_session_prepare(db->session, prep_stmt->query_template);
+       driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt);
+}
+
+static void driver_cassandra_prepare_pending(struct cassandra_db *db)
+{
+       struct cassandra_sql_prepared_statement *const *prep_stmtp;
+
+       i_assert(SQL_DB_IS_READY(&db->api));
+
+       array_foreach(&db->pending_prepares, prep_stmtp) {
+               (*prep_stmtp)->pending = FALSE;
+               prepare_start(*prep_stmtp);
+       }
+       array_clear(&db->pending_prepares);
+}
+
+static struct sql_prepared_statement *
+driver_cassandra_prepared_statement_init(struct sql_db *db,
+                                        const char *query_template)
+{
+       struct cassandra_sql_prepared_statement *prep_stmt =
+               i_new(struct cassandra_sql_prepared_statement, 1);
+       prep_stmt->prep_stmt.db = db;
+       prep_stmt->query_template = i_strdup(query_template);
+       i_array_init(&prep_stmt->pending_statements, 4);
+       prepare_start(prep_stmt);
+       return &prep_stmt->prep_stmt;
+}
+
+static void
+driver_cassandra_prepared_statement_deinit(struct sql_prepared_statement *_prep_stmt)
+{
+       struct cassandra_sql_prepared_statement *prep_stmt =
+               (struct cassandra_sql_prepared_statement *)_prep_stmt;
+
+       i_assert(array_count(&prep_stmt->pending_statements) == 0);
+       if (prep_stmt->prepared != NULL)
+               cass_prepared_free(prep_stmt->prepared);
+       array_free(&prep_stmt->pending_statements);
+       i_free(prep_stmt->query_template);
+       i_free(prep_stmt->error);
+       i_free(prep_stmt);
+}
+
+static struct sql_statement *
+driver_cassandra_statement_init(struct sql_db *db ATTR_UNUSED,
+                               const char *query_template)
+{
+       pool_t pool = pool_alloconly_create("cassandra sql statement", 1024);
+       struct cassandra_sql_statement *stmt =
+               p_new(pool, struct cassandra_sql_statement, 1);
+
+       /* Count the number of parameters in the query. We'll assume that all
+          the changing parameters are bound, so there shouldn't be any
+          quoted strings with '?' in them. */
+       const char *p = query_template;
+       size_t param_count = 0;
+       while ((p = strchr(p, '?')) != NULL) {
+               param_count++;
+               p++;
+       }
+
+       stmt->stmt.pool = pool;
+       stmt->cass_stmt = cass_statement_new(query_template, param_count);
+       return &stmt->stmt;
+}
+
+static struct sql_statement *
+driver_cassandra_statement_init_prepared(struct sql_prepared_statement *_prep_stmt)
+{
+       struct cassandra_sql_prepared_statement *prep_stmt =
+               (struct cassandra_sql_prepared_statement *)_prep_stmt;
+       pool_t pool = pool_alloconly_create("cassandra prepared sql statement", 1024);
+       struct cassandra_sql_statement *stmt =
+               p_new(pool, struct cassandra_sql_statement, 1);
+
+       stmt->stmt.pool = pool;
+       stmt->stmt.query_template =
+               p_strdup(stmt->stmt.pool, prep_stmt->query_template);
+       stmt->prep = prep_stmt;
+
+       if (prep_stmt->prepared != NULL) {
+               /* statement is already prepared. we can use it immediately. */
+               stmt->cass_stmt = cass_prepared_bind(prep_stmt->prepared);
+       } else {
+               if (prep_stmt->error != NULL)
+                       prepare_start(prep_stmt);
+               /* need to wait until prepare is finished */
+               array_append(&prep_stmt->pending_statements, &stmt, 1);
+       }
+       return &stmt->stmt;
+}
+
+static void
+driver_cassandra_statement_abort(struct sql_statement *_stmt)
+{
+       struct cassandra_sql_statement *stmt =
+               (struct cassandra_sql_statement *)_stmt;
+
+       if (stmt->cass_stmt != NULL)
+               cass_statement_free(stmt->cass_stmt);
+}
+
+static void
+driver_cassandra_statement_set_timestamp(struct sql_statement *_stmt,
+                                        const struct timespec *ts)
+{
+       struct cassandra_sql_statement *stmt =
+               (struct cassandra_sql_statement *)_stmt;
+       cass_int64_t ts_msecs =
+               (cass_int64_t)ts->tv_sec * 1000 +
+               ts->tv_nsec / 1000000;
+
+       if (stmt->cass_stmt != NULL)
+               cass_statement_set_timestamp(stmt->cass_stmt, ts_msecs);
+       else
+               stmt->pending_timestamp = ts_msecs;
+}
+
+static struct cassandra_sql_arg *
+driver_cassandra_add_pending_arg(struct cassandra_sql_statement *stmt,
+                                unsigned int column_idx)
+{
+       struct cassandra_sql_arg *arg;
+
+       if (!array_is_created(&stmt->pending_args))
+               p_array_init(&stmt->pending_args, stmt->stmt.pool, 8);
+       arg = array_append_space(&stmt->pending_args);
+       arg->column_idx = column_idx;
+       return arg;
+}
+
+static void
+driver_cassandra_statement_bind_str(struct sql_statement *_stmt,
+                                   unsigned int column_idx,
+                                   const char *value)
+{
+       struct cassandra_sql_statement *stmt =
+               (struct cassandra_sql_statement *)_stmt;
+       if (stmt->cass_stmt != NULL)
+               cass_statement_bind_string(stmt->cass_stmt, column_idx, value);
+       else {
+               struct cassandra_sql_arg *arg =
+                       driver_cassandra_add_pending_arg(stmt, column_idx);
+               arg->value_str = p_strdup(_stmt->pool, value);
+       }
+}
+
+static void
+driver_cassandra_statement_bind_binary(struct sql_statement *_stmt,
+                                      unsigned int column_idx,
+                                      const void *value, size_t value_size)
+{
+       struct cassandra_sql_statement *stmt =
+               (struct cassandra_sql_statement *)_stmt;
+
+       if (stmt->cass_stmt != NULL) {
+               cass_statement_bind_bytes(stmt->cass_stmt, column_idx,
+                                         value, value_size);
+       } else {
+               struct cassandra_sql_arg *arg =
+                       driver_cassandra_add_pending_arg(stmt, column_idx);
+               arg->value_binary = p_memdup(_stmt->pool, value, value_size);
+               arg->value_binary_size = value_size;
+       }
+}
+
+static void
+driver_cassandra_statement_bind_int64(struct sql_statement *_stmt,
+                                     unsigned int column_idx, int64_t value)
+{
+       struct cassandra_sql_statement *stmt =
+               (struct cassandra_sql_statement *)_stmt;
+
+       if (stmt->cass_stmt != NULL)
+               driver_cassandra_bind_int(stmt, column_idx, value);
+       else {
+               struct cassandra_sql_arg *arg =
+                       driver_cassandra_add_pending_arg(stmt, column_idx);
+               arg->value_int64 = value;
+       }
+}
+
+static void
+driver_cassandra_statement_query(struct sql_statement *_stmt,
+                                sql_query_callback_t *callback, void *context)
+{
+       struct cassandra_sql_statement *stmt =
+               (struct cassandra_sql_statement *)_stmt;
+       struct cassandra_db *db = (struct cassandra_db *)_stmt->db;
+
+       stmt->result = driver_cassandra_query_init(db, sql_statement_get_query(_stmt),
+                                                  CASSANDRA_QUERY_TYPE_READ,
+                                                  callback, context);
+       if (stmt->cass_stmt == NULL) {
+               /* wait for prepare to finish */
+       } else {
+               stmt->result->statement = stmt->cass_stmt;
+               (void)driver_cassandra_send_query(stmt->result);
+               pool_unref(&_stmt->pool);
+       }
+}
+
+static struct sql_result *
+driver_cassandra_statement_query_s(struct sql_statement *_stmt ATTR_UNUSED)
+{
+       i_panic("cassandra: sql_statement_query_s() not supported");
+}
+
+static void
+driver_cassandra_update_stmt(struct sql_transaction_context *_ctx,
+                            struct sql_statement *_stmt,
+                            unsigned int *affected_rows)
+{
+       struct cassandra_transaction_context *ctx =
+               (struct cassandra_transaction_context *)_ctx;
+       struct cassandra_sql_statement *stmt =
+               (struct cassandra_sql_statement *)_stmt;
+
+       i_assert(affected_rows == NULL);
+
+       if (ctx->query != NULL || ctx->stmt != NULL) {
+               transaction_set_failed(ctx, "Multiple changes in transaction not supported");
+               return;
+       }
+       ctx->stmt = stmt;
+}
+
 const struct sql_db driver_cassandra_db = {
        .name = "cassandra",
-       .flags = 0,
+       .flags = SQL_DB_FLAG_PREP_STATEMENTS,
 
        .v = {
                .init = driver_cassandra_init_v,
@@ -1581,6 +2026,19 @@ const struct sql_db driver_cassandra_db = {
                .update = driver_cassandra_update,
 
                .escape_blob = driver_cassandra_escape_blob,
+
+               .prepared_statement_init = driver_cassandra_prepared_statement_init,
+               .prepared_statement_deinit = driver_cassandra_prepared_statement_deinit,
+               .statement_init = driver_cassandra_statement_init,
+               .statement_init_prepared = driver_cassandra_statement_init_prepared,
+               .statement_abort = driver_cassandra_statement_abort,
+               .statement_set_timestamp = driver_cassandra_statement_set_timestamp,
+               .statement_bind_str = driver_cassandra_statement_bind_str,
+               .statement_bind_binary = driver_cassandra_statement_bind_binary,
+               .statement_bind_int64 = driver_cassandra_statement_bind_int64,
+               .statement_query = driver_cassandra_statement_query,
+               .statement_query_s = driver_cassandra_statement_query_s,
+               .update_stmt = driver_cassandra_update_stmt,
        }
 };