const char *(*escape_string)(struct sql_db *db, const char *string);
void (*exec)(struct sql_db *db, const char *query);
+ /* Only implement this if the driver can really do asynchronous callbacks,
+ otherwise let sql API handle it. */
void (*query)(struct sql_db *db, const char *query,
sql_query_callback_t *callback, void *context);
struct sql_result *(*query_s)(struct sql_db *db, const char *query);
struct sql_transaction_context *(*transaction_begin)(struct sql_db *db);
+ /* Only implement this if the driver can really do asynchronous callbacks,
+ otherwise let sql API handle it. */
void (*transaction_commit)(struct sql_transaction_context *ctx,
sql_commit_callback_t *callback,
void *context);
struct event *event;
HASH_TABLE(char *, struct sql_prepared_statement *) prepared_stmt_hash;
+ struct sql_query_result_delayed *query_delayed_list;
+ struct sql_commit_result_delayed *commit_delayed_list;
+
enum sql_db_state state;
/* last time we started connecting to this server
(which may or may not have succeeded) */
#include "array.h"
#include "ioloop.h"
#include "hash.h"
+#include "llist.h"
#include "str.h"
#include "time-util.h"
#include "settings.h"
#include <time.h>
+struct sql_query_result_delayed {
+ struct sql_query_result_delayed *prev, *next;
+
+ struct sql_db *db;
+ struct timeout *to;
+
+ struct sql_result *result;
+ sql_query_callback_t *callback;
+ void *context;
+};
+
+struct sql_commit_result_delayed {
+ struct sql_commit_result_delayed *prev, *next;
+
+ struct sql_db *db;
+ struct timeout *to;
+
+ char *error;
+ sql_commit_callback_t *callback;
+ void *context;
+};
+
struct event_category event_category_sql = {
.name = "sql",
};
struct sql_db_module_register sql_db_module_register = { 0 };
ARRAY_TYPE(sql_drivers) sql_drivers;
+static void sql_query_delayed_callback(struct sql_query_result_delayed *cb);
+static void sql_commit_delayed_callback(struct sql_commit_result_delayed *cb);
+
void sql_drivers_init_without_drivers(void)
{
i_array_init(&sql_drivers, 8);
return db->v.connect(db);
}
+static void sql_call_delayed_callbacks(struct sql_db *db)
+{
+ /* flush any pending results */
+ while (db->query_delayed_list != NULL)
+ sql_query_delayed_callback(db->query_delayed_list);
+ while (db->commit_delayed_list != NULL)
+ sql_commit_delayed_callback(db->commit_delayed_list);
+}
+
void sql_disconnect(struct sql_db *db)
{
timeout_remove(&db->to_reconnect);
+ sql_call_delayed_callbacks(db);
db->v.disconnect(db);
}
db->v.exec(db, query);
}
+static void sql_query_delayed_callback(struct sql_query_result_delayed *cb)
+{
+ timeout_remove(&cb->to);
+ DLLIST_REMOVE(&cb->db->query_delayed_list, cb);
+ cb->callback(cb->result, cb->context);
+ sql_result_unref(cb->result);
+ i_free(cb);
+}
+
#undef sql_query
void sql_query(struct sql_db *db, const char *query,
sql_query_callback_t *callback, void *context)
{
- db->v.query(db, query, callback, context);
+ if (db->v.query != NULL) {
+ db->v.query(db, query, callback, context);
+ return;
+ }
+
+ struct sql_query_result_delayed *cb = i_new(struct sql_query_result_delayed, 1);
+ cb->db = db;
+ cb->result = sql_query_s(db, query);
+ cb->callback = callback;
+ cb->context = context;
+ cb->to = timeout_add_short(0, sql_query_delayed_callback, cb);
+ DLLIST_PREPEND(&db->query_delayed_list, cb);
}
struct sql_result *sql_query_s(struct sql_db *db, const char *query)
sql_query_callback_t *callback, void *context)
{
struct sql_statement *stmt = *_stmt;
-
*_stmt = NULL;
+
if (stmt->db->v.statement_query != NULL)
stmt->db->v.statement_query(stmt, callback, context);
- else
+ else if (stmt->db->v.statement_query_s != NULL) {
+ struct sql_db *db = stmt->db;
+ struct sql_query_result_delayed *cb =
+ i_new(struct sql_query_result_delayed, 1);
+ cb->db = db;
+ cb->callback = callback;
+ cb->context = context;
+ cb->result = sql_statement_query_s(&stmt);
+ cb->to = timeout_add_short(0, sql_query_delayed_callback, cb);
+ DLLIST_PREPEND(&db->query_delayed_list, cb);
+ } else
default_sql_statement_query(stmt, callback, context);
}
ctx->non_atomic = TRUE;
}
+static void sql_commit_delayed_callback(struct sql_commit_result_delayed *cb)
+{
+ struct sql_commit_result result = {
+ .error = cb->error,
+ };
+ timeout_remove(&cb->to);
+ DLLIST_REMOVE(&cb->db->commit_delayed_list, cb);
+ cb->callback(&result, cb->context);
+ i_free(cb->error);
+ i_free(cb);
+}
+
#undef sql_transaction_commit
void sql_transaction_commit(struct sql_transaction_context **_ctx,
sql_commit_callback_t *callback, void *context)
{
struct sql_transaction_context *ctx = *_ctx;
-
+ struct sql_db *db = ctx->db;
*_ctx = NULL;
- ctx->db->v.transaction_commit(ctx, callback, context);
+
+ if (ctx->db->v.transaction_commit != NULL) {
+ ctx->db->v.transaction_commit(ctx, callback, context);
+ return;
+ }
+
+ struct sql_commit_result_delayed *cb = i_new(struct sql_commit_result_delayed, 1);
+ const char *error = NULL;
+ ctx->db->v.transaction_commit_s(ctx, &error);
+ cb->db = db;
+ cb->error = i_strdup(error);
+ cb->callback = callback;
+ cb->context = context;
+ cb->to = timeout_add_short(0, sql_commit_delayed_callback, cb);
+ DLLIST_PREPEND(&db->commit_delayed_list, cb);
}
int sql_transaction_commit_s(struct sql_transaction_context **_ctx,
{
if (db->v.wait != NULL)
db->v.wait(db);
+ sql_call_delayed_callbacks(db);
}
void sql_exec(struct sql_db *db, const char *query);
/* Execute SQL query and return result in callback. If fields list is given,
the returned fields are validated to be of correct type, and you can use
- sql_result_next_row_get() */
+ sql_result_next_row_get(). The callback is never called immediately. */
void sql_query(struct sql_db *db, const char *query,
sql_query_callback_t *callback, void *context);
#define sql_query(db, query, callback, context) \
/* Don't require transaction to be atomic. Currently this is implemented only
with Cassandra to use UNLOGGED BATCH operations. */
void sql_transaction_set_non_atomic(struct sql_transaction_context *ctx);
-/* Commit transaction. */
+/* Commit transaction. The callback is never called immediately. */
void sql_transaction_commit(struct sql_transaction_context **ctx,
sql_commit_callback_t *callback, void *context);
#define sql_transaction_commit(ctx, callback, context) \