int fd_pipe[2];
struct io *io_pipe;
+ ARRAY(struct cassandra_sql_prepared_statement *) pending_prepares;
ARRAY(struct cassandra_callback *) callbacks;
ARRAY(struct cassandra_result *) results;
unsigned int callback_ids;
sql_commit_callback_t *callback;
void *context;
+ struct cassandra_sql_statement *stmt;
char *query;
char *error;
bool failed:1;
};
+struct cassandra_sql_arg {
+ unsigned int column_idx;
+
+ char *value_str;
+ unsigned char *value_binary;
+ size_t value_binary_size;
+ int64_t value_int64;
+};
+
+struct cassandra_sql_statement {
+ struct sql_statement stmt;
+
+ struct cassandra_sql_prepared_statement *prep;
+ CassStatement *cass_stmt;
+
+ ARRAY(struct cassandra_sql_arg) pending_args;
+ cass_int64_t pending_timestamp;
+
+ struct cassandra_result *result;
+};
+
+struct cassandra_sql_prepared_statement {
+ struct sql_prepared_statement prep_stmt;
+ char *query_template;
+
+ /* NULL, until the prepare is asynchronously finished */
+ const CassPrepared *prepared;
+ /* statements waiting for prepare to finish */
+ ARRAY(struct cassandra_sql_statement *) pending_statements;
+ /* an error here will cause the prepare to be retried on the next
+ execution attempt. */
+ char *error;
+
+ bool pending;
+};
+
extern const struct sql_db driver_cassandra_db;
extern const struct sql_result driver_cassandra_result;
{ CASS_LOG_TRACE, "trace" }
};
+static void driver_cassandra_prepare_pending(struct cassandra_db *db);
+static void
+prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt);
static void driver_cassandra_result_send_query(struct cassandra_result *result);
static void driver_cassandra_send_queries(struct cassandra_db *db);
static void result_finish(struct cassandra_result *result);
static void driver_cassandra_close(struct cassandra_db *db, const char *error)
{
+ struct cassandra_sql_prepared_statement *const *prep_stmtp;
struct cassandra_result *const *resultp;
if (db->io_pipe != NULL)
}
driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
+ array_foreach(&db->pending_prepares, prep_stmtp) {
+ (*prep_stmtp)->pending = FALSE;
+ (*prep_stmtp)->error = i_strdup(error);
+ prepare_finish_pending_statements(*prep_stmtp);
+ }
+ array_clear(&db->pending_prepares);
+
while (array_count(&db->results) > 0) {
resultp = array_idx(&db->results, 0);
if ((*resultp)->error == NULL)
finish */
io_loop_stop(db->ioloop);
}
+ driver_cassandra_prepare_pending(db);
driver_cassandra_send_queries(db);
}
db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write, db);
i_array_init(&db->results, 16);
i_array_init(&db->callbacks, 16);
+ i_array_init(&db->pending_prepares, 16);
return &db->api;
}
array_free(&db->callbacks);
i_assert(array_count(&db->results) == 0);
array_free(&db->results);
+ i_assert(array_count(&db->pending_prepares) == 0);
+ array_free(&db->pending_prepares);
cass_session_free(db->session);
cass_cluster_free(db->cluster);
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
CassFuture *future;
+ i_assert(result->statement != NULL);
+
db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
driver_cassandra_init_statement(result);
results = array_get(&db->results, &count);
for (i = 0; i < count; i++) {
- if (!results[i]->query_sent) {
+ if (!results[i]->query_sent && results[i]->statement != NULL) {
if (driver_cassandra_send_query(results[i]) <= 0)
break;
}
{
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
+ struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
enum cassandra_query_type query_type;
struct sql_commit_result result;
ctx->callback = callback;
ctx->context = context;
- if (ctx->failed || ctx->query == NULL) {
+ if (ctx->failed || (ctx->query == NULL && ctx->stmt == NULL)) {
if (ctx->failed)
result.error = ctx->error;
callback(&result, context);
driver_cassandra_transaction_unref(&ctx);
- } else {
- /* just a single query, send it */
- if (strncasecmp(ctx->query, "DELETE ", 7) == 0)
- query_type = CASSANDRA_QUERY_TYPE_DELETE;
- else
- query_type = CASSANDRA_QUERY_TYPE_WRITE;
- driver_cassandra_query_full(_ctx->db, ctx->query, query_type,
+ return;
+ }
+
+ /* just a single query, send it */
+ const char *query = ctx->query != NULL ?
+ ctx->query : sql_statement_get_query(&ctx->stmt->stmt);
+ if (strncasecmp(query, "DELETE ", 7) == 0)
+ query_type = CASSANDRA_QUERY_TYPE_DELETE;
+ else
+ query_type = CASSANDRA_QUERY_TYPE_WRITE;
+
+ if (ctx->query != NULL) {
+ driver_cassandra_query_full(_ctx->db, query, query_type,
transaction_commit_callback, ctx);
+ } else {
+ ctx->stmt->result =
+ driver_cassandra_query_init(db, query, query_type,
+ transaction_commit_callback, ctx);
+ if (ctx->stmt->cass_stmt == NULL) {
+ /* wait for prepare to finish */
+ } else {
+ ctx->stmt->result->statement = ctx->stmt->cass_stmt;
+ (void)driver_cassandra_send_query(ctx->stmt->result);
+ pool_unref(&ctx->stmt->stmt.pool);
+ }
}
}
struct cassandra_transaction_context *ctx =
(struct cassandra_transaction_context *)_ctx;
+ if (ctx->stmt != NULL) {
+ /* nothing should be using this - don't bother implementing */
+ i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
+ }
+
if (ctx->query != NULL && !ctx->failed)
driver_cassandra_try_commit_s(ctx);
*error_r = t_strdup(ctx->error);
i_assert(affected_rows == NULL);
- if (ctx->query != NULL) {
+ if (ctx->query != NULL || ctx->stmt != NULL) {
transaction_set_failed(ctx, "Multiple changes in transaction not supported");
return;
}
return str_c(str);
}
+static CassError
+driver_cassandra_bind_int(struct cassandra_sql_statement *stmt,
+ unsigned int column_idx, int64_t value)
+{
+ const CassDataType *data_type;
+ CassValueType value_type;
+
+ if (stmt->prep == NULL) {
+ value_type = value >= -2147483648 && value <= 2147483647 ?
+ CASS_VALUE_TYPE_INT : CASS_VALUE_TYPE_BIGINT;
+ } else {
+ /* prepared statements require exactly correct value type */
+ data_type = cass_prepared_parameter_data_type(stmt->prep->prepared, column_idx);
+ value_type = cass_data_type_type(data_type);
+ }
+
+ switch (value_type) {
+ case CASS_VALUE_TYPE_INT:
+ if (value < -2147483648 || value > 2147483647)
+ return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+ return cass_statement_bind_int32(stmt->cass_stmt, column_idx, value);
+ case CASS_VALUE_TYPE_BIGINT:
+ return cass_statement_bind_int64(stmt->cass_stmt, column_idx, value);
+ case CASS_VALUE_TYPE_SMALL_INT:
+ if (value < -32768 || value > 32767)
+ return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+ return cass_statement_bind_int16(stmt->cass_stmt, column_idx, value);
+ case CASS_VALUE_TYPE_TINY_INT:
+ if (value < -128 || value > 127)
+ return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+ return cass_statement_bind_int8(stmt->cass_stmt, column_idx, value);
+ default:
+ return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
+ }
+}
+
+static void prepare_finish_arg(struct cassandra_sql_statement *stmt,
+ const struct cassandra_sql_arg *arg)
+{
+ CassError rc;
+
+ if (arg->value_str != NULL) {
+ rc = cass_statement_bind_string(stmt->cass_stmt, arg->column_idx,
+ arg->value_str);
+ } else if (arg->value_binary != NULL) {
+ rc = cass_statement_bind_bytes(stmt->cass_stmt, arg->column_idx,
+ arg->value_binary,
+ arg->value_binary_size);
+ } else {
+ rc = driver_cassandra_bind_int(stmt, arg->column_idx,
+ arg->value_int64);
+ }
+ if (rc != CASS_OK) {
+ i_error("cassandra: Statement '%s': Failed to bind column %u: %s",
+ stmt->stmt.query_template, arg->column_idx,
+ cass_error_desc(rc));
+ }
+}
+
+static void prepare_finish_statement(struct cassandra_sql_statement *stmt)
+{
+ const struct cassandra_sql_arg *arg;
+
+ if (stmt->prep->prepared == NULL) {
+ i_assert(stmt->prep->error != NULL);
+
+ if (stmt->result != NULL) {
+ stmt->result->error = i_strdup(stmt->prep->error);
+ result_finish(stmt->result);
+ }
+ return;
+ }
+ stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared);
+
+ if (stmt->pending_timestamp != 0) {
+ cass_statement_set_timestamp(stmt->cass_stmt,
+ stmt->pending_timestamp);
+ }
+
+ if (array_is_created(&stmt->pending_args)) {
+ array_foreach(&stmt->pending_args, arg)
+ prepare_finish_arg(stmt, arg);
+ }
+ if (stmt->result != NULL) {
+ stmt->result->statement = stmt->cass_stmt;
+ (void)driver_cassandra_send_query(stmt->result);
+ pool_unref(&stmt->stmt.pool);
+ }
+}
+
+static void
+prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt)
+{
+ struct cassandra_sql_statement *const *stmtp;
+
+ array_foreach(&prep_stmt->pending_statements, stmtp)
+ prepare_finish_statement(*stmtp);
+ array_clear(&prep_stmt->pending_statements);
+}
+
+static void prepare_callback(CassFuture *future, void *context)
+{
+ struct cassandra_sql_prepared_statement *prep_stmt = context;
+ CassError error = cass_future_error_code(future);
+
+ if (error != CASS_OK) {
+ const char *errmsg;
+ size_t errsize;
+
+ cass_future_error_message(future, &errmsg, &errsize);
+ i_free(prep_stmt->error);
+ prep_stmt->error = i_strndup(errmsg, errsize);
+ } else {
+ prep_stmt->prepared = cass_future_get_prepared(future);
+ }
+
+ prepare_finish_pending_statements(prep_stmt);
+}
+
+static void prepare_start(struct cassandra_sql_prepared_statement *prep_stmt)
+{
+ struct cassandra_db *db = (struct cassandra_db *)prep_stmt->prep_stmt.db;
+ CassFuture *future;
+
+ if (!SQL_DB_IS_READY(&db->api)) {
+ if (!prep_stmt->pending) {
+ prep_stmt->pending = TRUE;
+ array_append(&db->pending_prepares, &prep_stmt, 1);
+
+ if (sql_connect(&db->api) < 0)
+ i_unreached();
+ }
+ return;
+ }
+
+ /* clear the current error in case we're retrying */
+ i_free_and_null(prep_stmt->error);
+
+ future = cass_session_prepare(db->session, prep_stmt->query_template);
+ driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt);
+}
+
+static void driver_cassandra_prepare_pending(struct cassandra_db *db)
+{
+ struct cassandra_sql_prepared_statement *const *prep_stmtp;
+
+ i_assert(SQL_DB_IS_READY(&db->api));
+
+ array_foreach(&db->pending_prepares, prep_stmtp) {
+ (*prep_stmtp)->pending = FALSE;
+ prepare_start(*prep_stmtp);
+ }
+ array_clear(&db->pending_prepares);
+}
+
+static struct sql_prepared_statement *
+driver_cassandra_prepared_statement_init(struct sql_db *db,
+ const char *query_template)
+{
+ struct cassandra_sql_prepared_statement *prep_stmt =
+ i_new(struct cassandra_sql_prepared_statement, 1);
+ prep_stmt->prep_stmt.db = db;
+ prep_stmt->query_template = i_strdup(query_template);
+ i_array_init(&prep_stmt->pending_statements, 4);
+ prepare_start(prep_stmt);
+ return &prep_stmt->prep_stmt;
+}
+
+static void
+driver_cassandra_prepared_statement_deinit(struct sql_prepared_statement *_prep_stmt)
+{
+ struct cassandra_sql_prepared_statement *prep_stmt =
+ (struct cassandra_sql_prepared_statement *)_prep_stmt;
+
+ i_assert(array_count(&prep_stmt->pending_statements) == 0);
+ if (prep_stmt->prepared != NULL)
+ cass_prepared_free(prep_stmt->prepared);
+ array_free(&prep_stmt->pending_statements);
+ i_free(prep_stmt->query_template);
+ i_free(prep_stmt->error);
+ i_free(prep_stmt);
+}
+
+static struct sql_statement *
+driver_cassandra_statement_init(struct sql_db *db ATTR_UNUSED,
+ const char *query_template)
+{
+ pool_t pool = pool_alloconly_create("cassandra sql statement", 1024);
+ struct cassandra_sql_statement *stmt =
+ p_new(pool, struct cassandra_sql_statement, 1);
+
+ /* Count the number of parameters in the query. We'll assume that all
+ the changing parameters are bound, so there shouldn't be any
+ quoted strings with '?' in them. */
+ const char *p = query_template;
+ size_t param_count = 0;
+ while ((p = strchr(p, '?')) != NULL) {
+ param_count++;
+ p++;
+ }
+
+ stmt->stmt.pool = pool;
+ stmt->cass_stmt = cass_statement_new(query_template, param_count);
+ return &stmt->stmt;
+}
+
+static struct sql_statement *
+driver_cassandra_statement_init_prepared(struct sql_prepared_statement *_prep_stmt)
+{
+ struct cassandra_sql_prepared_statement *prep_stmt =
+ (struct cassandra_sql_prepared_statement *)_prep_stmt;
+ pool_t pool = pool_alloconly_create("cassandra prepared sql statement", 1024);
+ struct cassandra_sql_statement *stmt =
+ p_new(pool, struct cassandra_sql_statement, 1);
+
+ stmt->stmt.pool = pool;
+ stmt->stmt.query_template =
+ p_strdup(stmt->stmt.pool, prep_stmt->query_template);
+ stmt->prep = prep_stmt;
+
+ if (prep_stmt->prepared != NULL) {
+ /* statement is already prepared. we can use it immediately. */
+ stmt->cass_stmt = cass_prepared_bind(prep_stmt->prepared);
+ } else {
+ if (prep_stmt->error != NULL)
+ prepare_start(prep_stmt);
+ /* need to wait until prepare is finished */
+ array_append(&prep_stmt->pending_statements, &stmt, 1);
+ }
+ return &stmt->stmt;
+}
+
+static void
+driver_cassandra_statement_abort(struct sql_statement *_stmt)
+{
+ struct cassandra_sql_statement *stmt =
+ (struct cassandra_sql_statement *)_stmt;
+
+ if (stmt->cass_stmt != NULL)
+ cass_statement_free(stmt->cass_stmt);
+}
+
+static void
+driver_cassandra_statement_set_timestamp(struct sql_statement *_stmt,
+ const struct timespec *ts)
+{
+ struct cassandra_sql_statement *stmt =
+ (struct cassandra_sql_statement *)_stmt;
+ cass_int64_t ts_msecs =
+ (cass_int64_t)ts->tv_sec * 1000 +
+ ts->tv_nsec / 1000000;
+
+ if (stmt->cass_stmt != NULL)
+ cass_statement_set_timestamp(stmt->cass_stmt, ts_msecs);
+ else
+ stmt->pending_timestamp = ts_msecs;
+}
+
+static struct cassandra_sql_arg *
+driver_cassandra_add_pending_arg(struct cassandra_sql_statement *stmt,
+ unsigned int column_idx)
+{
+ struct cassandra_sql_arg *arg;
+
+ if (!array_is_created(&stmt->pending_args))
+ p_array_init(&stmt->pending_args, stmt->stmt.pool, 8);
+ arg = array_append_space(&stmt->pending_args);
+ arg->column_idx = column_idx;
+ return arg;
+}
+
+static void
+driver_cassandra_statement_bind_str(struct sql_statement *_stmt,
+ unsigned int column_idx,
+ const char *value)
+{
+ struct cassandra_sql_statement *stmt =
+ (struct cassandra_sql_statement *)_stmt;
+ if (stmt->cass_stmt != NULL)
+ cass_statement_bind_string(stmt->cass_stmt, column_idx, value);
+ else {
+ struct cassandra_sql_arg *arg =
+ driver_cassandra_add_pending_arg(stmt, column_idx);
+ arg->value_str = p_strdup(_stmt->pool, value);
+ }
+}
+
+static void
+driver_cassandra_statement_bind_binary(struct sql_statement *_stmt,
+ unsigned int column_idx,
+ const void *value, size_t value_size)
+{
+ struct cassandra_sql_statement *stmt =
+ (struct cassandra_sql_statement *)_stmt;
+
+ if (stmt->cass_stmt != NULL) {
+ cass_statement_bind_bytes(stmt->cass_stmt, column_idx,
+ value, value_size);
+ } else {
+ struct cassandra_sql_arg *arg =
+ driver_cassandra_add_pending_arg(stmt, column_idx);
+ arg->value_binary = p_memdup(_stmt->pool, value, value_size);
+ arg->value_binary_size = value_size;
+ }
+}
+
+static void
+driver_cassandra_statement_bind_int64(struct sql_statement *_stmt,
+ unsigned int column_idx, int64_t value)
+{
+ struct cassandra_sql_statement *stmt =
+ (struct cassandra_sql_statement *)_stmt;
+
+ if (stmt->cass_stmt != NULL)
+ driver_cassandra_bind_int(stmt, column_idx, value);
+ else {
+ struct cassandra_sql_arg *arg =
+ driver_cassandra_add_pending_arg(stmt, column_idx);
+ arg->value_int64 = value;
+ }
+}
+
+static void
+driver_cassandra_statement_query(struct sql_statement *_stmt,
+ sql_query_callback_t *callback, void *context)
+{
+ struct cassandra_sql_statement *stmt =
+ (struct cassandra_sql_statement *)_stmt;
+ struct cassandra_db *db = (struct cassandra_db *)_stmt->db;
+
+ stmt->result = driver_cassandra_query_init(db, sql_statement_get_query(_stmt),
+ CASSANDRA_QUERY_TYPE_READ,
+ callback, context);
+ if (stmt->cass_stmt == NULL) {
+ /* wait for prepare to finish */
+ } else {
+ stmt->result->statement = stmt->cass_stmt;
+ (void)driver_cassandra_send_query(stmt->result);
+ pool_unref(&_stmt->pool);
+ }
+}
+
+static struct sql_result *
+driver_cassandra_statement_query_s(struct sql_statement *_stmt ATTR_UNUSED)
+{
+ i_panic("cassandra: sql_statement_query_s() not supported");
+}
+
+static void
+driver_cassandra_update_stmt(struct sql_transaction_context *_ctx,
+ struct sql_statement *_stmt,
+ unsigned int *affected_rows)
+{
+ struct cassandra_transaction_context *ctx =
+ (struct cassandra_transaction_context *)_ctx;
+ struct cassandra_sql_statement *stmt =
+ (struct cassandra_sql_statement *)_stmt;
+
+ i_assert(affected_rows == NULL);
+
+ if (ctx->query != NULL || ctx->stmt != NULL) {
+ transaction_set_failed(ctx, "Multiple changes in transaction not supported");
+ return;
+ }
+ ctx->stmt = stmt;
+}
+
const struct sql_db driver_cassandra_db = {
.name = "cassandra",
- .flags = 0,
+ .flags = SQL_DB_FLAG_PREP_STATEMENTS,
.v = {
.init = driver_cassandra_init_v,
.update = driver_cassandra_update,
.escape_blob = driver_cassandra_escape_blob,
+
+ .prepared_statement_init = driver_cassandra_prepared_statement_init,
+ .prepared_statement_deinit = driver_cassandra_prepared_statement_deinit,
+ .statement_init = driver_cassandra_statement_init,
+ .statement_init_prepared = driver_cassandra_statement_init_prepared,
+ .statement_abort = driver_cassandra_statement_abort,
+ .statement_set_timestamp = driver_cassandra_statement_set_timestamp,
+ .statement_bind_str = driver_cassandra_statement_bind_str,
+ .statement_bind_binary = driver_cassandra_statement_bind_binary,
+ .statement_bind_int64 = driver_cassandra_statement_bind_int64,
+ .statement_query = driver_cassandra_statement_query,
+ .statement_query_s = driver_cassandra_statement_query_s,
+ .update_stmt = driver_cassandra_update_stmt,
}
};