From: Aki Tuomi Date: Mon, 6 Oct 2025 09:39:50 +0000 (+0300) Subject: lib-sql: sql-api - Implement async calls for drivers that can't X-Git-Tag: 2.4.2~44 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=a78c015eae67a891a0c14b5434dd1f99838ae4cc;p=thirdparty%2Fdovecot%2Fcore.git lib-sql: sql-api - Implement async calls for drivers that can't Changes the behaviour of asynchronous functions to be truly asynchronous even if the underlying driver isn't capable of doing this. Implemented by adding immediate timeouts to call the callbacks after returning from the synchronous function and ending up back to ioloop. --- diff --git a/src/lib-sql/sql-api-private.h b/src/lib-sql/sql-api-private.h index 00b1d8bb63..fb41db775b 100644 --- a/src/lib-sql/sql-api-private.h +++ b/src/lib-sql/sql-api-private.h @@ -84,11 +84,15 @@ struct sql_db_vfuncs { const char *(*escape_string)(struct sql_db *db, const char *string); void (*exec)(struct sql_db *db, const char *query); + /* Only implement this if the driver can really do asynchronous callbacks, + otherwise let sql API handle it. */ void (*query)(struct sql_db *db, const char *query, sql_query_callback_t *callback, void *context); struct sql_result *(*query_s)(struct sql_db *db, const char *query); struct sql_transaction_context *(*transaction_begin)(struct sql_db *db); + /* Only implement this if the driver can really do asynchronous callbacks, + otherwise let sql API handle it. */ void (*transaction_commit)(struct sql_transaction_context *ctx, sql_commit_callback_t *callback, void *context); @@ -154,6 +158,9 @@ struct sql_db { struct event *event; HASH_TABLE(char *, struct sql_prepared_statement *) prepared_stmt_hash; + struct sql_query_result_delayed *query_delayed_list; + struct sql_commit_result_delayed *commit_delayed_list; + enum sql_db_state state; /* last time we started connecting to this server (which may or may not have succeeded) */ diff --git a/src/lib-sql/sql-api.c b/src/lib-sql/sql-api.c index 32ff57aa6e..24a1509afd 100644 --- a/src/lib-sql/sql-api.c +++ b/src/lib-sql/sql-api.c @@ -4,6 +4,7 @@ #include "array.h" #include "ioloop.h" #include "hash.h" +#include "llist.h" #include "str.h" #include "time-util.h" #include "settings.h" @@ -12,6 +13,28 @@ #include +struct sql_query_result_delayed { + struct sql_query_result_delayed *prev, *next; + + struct sql_db *db; + struct timeout *to; + + struct sql_result *result; + sql_query_callback_t *callback; + void *context; +}; + +struct sql_commit_result_delayed { + struct sql_commit_result_delayed *prev, *next; + + struct sql_db *db; + struct timeout *to; + + char *error; + sql_commit_callback_t *callback; + void *context; +}; + struct event_category event_category_sql = { .name = "sql", }; @@ -40,6 +63,9 @@ const struct setting_parser_info sql_setting_parser_info = { struct sql_db_module_register sql_db_module_register = { 0 }; ARRAY_TYPE(sql_drivers) sql_drivers; +static void sql_query_delayed_callback(struct sql_query_result_delayed *cb); +static void sql_commit_delayed_callback(struct sql_commit_result_delayed *cb); + void sql_drivers_init_without_drivers(void) { i_array_init(&sql_drivers, 8); @@ -228,9 +254,19 @@ int sql_connect(struct sql_db *db) return db->v.connect(db); } +static void sql_call_delayed_callbacks(struct sql_db *db) +{ + /* flush any pending results */ + while (db->query_delayed_list != NULL) + sql_query_delayed_callback(db->query_delayed_list); + while (db->commit_delayed_list != NULL) + sql_commit_delayed_callback(db->commit_delayed_list); +} + void sql_disconnect(struct sql_db *db) { timeout_remove(&db->to_reconnect); + sql_call_delayed_callbacks(db); db->v.disconnect(db); } @@ -250,11 +286,31 @@ void sql_exec(struct sql_db *db, const char *query) db->v.exec(db, query); } +static void sql_query_delayed_callback(struct sql_query_result_delayed *cb) +{ + timeout_remove(&cb->to); + DLLIST_REMOVE(&cb->db->query_delayed_list, cb); + cb->callback(cb->result, cb->context); + sql_result_unref(cb->result); + i_free(cb); +} + #undef sql_query void sql_query(struct sql_db *db, const char *query, sql_query_callback_t *callback, void *context) { - db->v.query(db, query, callback, context); + if (db->v.query != NULL) { + db->v.query(db, query, callback, context); + return; + } + + struct sql_query_result_delayed *cb = i_new(struct sql_query_result_delayed, 1); + cb->db = db; + cb->result = sql_query_s(db, query); + cb->callback = callback; + cb->context = context; + cb->to = timeout_add_short(0, sql_query_delayed_callback, cb); + DLLIST_PREPEND(&db->query_delayed_list, cb); } struct sql_result *sql_query_s(struct sql_db *db, const char *query) @@ -496,11 +552,21 @@ void sql_statement_query(struct sql_statement **_stmt, sql_query_callback_t *callback, void *context) { struct sql_statement *stmt = *_stmt; - *_stmt = NULL; + if (stmt->db->v.statement_query != NULL) stmt->db->v.statement_query(stmt, callback, context); - else + else if (stmt->db->v.statement_query_s != NULL) { + struct sql_db *db = stmt->db; + struct sql_query_result_delayed *cb = + i_new(struct sql_query_result_delayed, 1); + cb->db = db; + cb->callback = callback; + cb->context = context; + cb->result = sql_statement_query_s(&stmt); + cb->to = timeout_add_short(0, sql_query_delayed_callback, cb); + DLLIST_PREPEND(&db->query_delayed_list, cb); + } else default_sql_statement_query(stmt, callback, context); } @@ -757,14 +823,40 @@ void sql_transaction_set_non_atomic(struct sql_transaction_context *ctx) ctx->non_atomic = TRUE; } +static void sql_commit_delayed_callback(struct sql_commit_result_delayed *cb) +{ + struct sql_commit_result result = { + .error = cb->error, + }; + timeout_remove(&cb->to); + DLLIST_REMOVE(&cb->db->commit_delayed_list, cb); + cb->callback(&result, cb->context); + i_free(cb->error); + i_free(cb); +} + #undef sql_transaction_commit void sql_transaction_commit(struct sql_transaction_context **_ctx, sql_commit_callback_t *callback, void *context) { struct sql_transaction_context *ctx = *_ctx; - + struct sql_db *db = ctx->db; *_ctx = NULL; - ctx->db->v.transaction_commit(ctx, callback, context); + + if (ctx->db->v.transaction_commit != NULL) { + ctx->db->v.transaction_commit(ctx, callback, context); + return; + } + + struct sql_commit_result_delayed *cb = i_new(struct sql_commit_result_delayed, 1); + const char *error = NULL; + ctx->db->v.transaction_commit_s(ctx, &error); + cb->db = db; + cb->error = i_strdup(error); + cb->callback = callback; + cb->context = context; + cb->to = timeout_add_short(0, sql_commit_delayed_callback, cb); + DLLIST_PREPEND(&db->commit_delayed_list, cb); } int sql_transaction_commit_s(struct sql_transaction_context **_ctx, @@ -903,6 +995,7 @@ void sql_wait(struct sql_db *db) { if (db->v.wait != NULL) db->v.wait(db); + sql_call_delayed_callbacks(db); } diff --git a/src/lib-sql/sql-api.h b/src/lib-sql/sql-api.h index 41dd2f76e4..f61abf7b2d 100644 --- a/src/lib-sql/sql-api.h +++ b/src/lib-sql/sql-api.h @@ -127,7 +127,7 @@ const char *sql_escape_blob(struct sql_db *db, void sql_exec(struct sql_db *db, const char *query); /* Execute SQL query and return result in callback. If fields list is given, the returned fields are validated to be of correct type, and you can use - sql_result_next_row_get() */ + sql_result_next_row_get(). The callback is never called immediately. */ void sql_query(struct sql_db *db, const char *query, sql_query_callback_t *callback, void *context); #define sql_query(db, query, callback, context) \ @@ -233,7 +233,7 @@ struct sql_transaction_context *sql_transaction_begin(struct sql_db *db); /* Don't require transaction to be atomic. Currently this is implemented only with Cassandra to use UNLOGGED BATCH operations. */ void sql_transaction_set_non_atomic(struct sql_transaction_context *ctx); -/* Commit transaction. */ +/* Commit transaction. The callback is never called immediately. */ void sql_transaction_commit(struct sql_transaction_context **ctx, sql_commit_callback_t *callback, void *context); #define sql_transaction_commit(ctx, callback, context) \