From 3954326e793bdef1e94e0ad781ed6cc7e48beebb Mon Sep 17 00:00:00 2001 From: Timo Sirainen Date: Sun, 6 Sep 2009 20:51:25 -0400 Subject: [PATCH] dict: Added support for async commits. Changed dict_atomic_inc() behavior. --HG-- branch : HEAD --- src/dict/dict-commands.c | 49 +++++++-- src/dict/dict-connection.h | 1 + src/lib-dict/dict-client.c | 197 ++++++++++++++++++++++++++++++------ src/lib-dict/dict-client.h | 4 +- src/lib-dict/dict-db.c | 15 ++- src/lib-dict/dict-file.c | 29 ++++-- src/lib-dict/dict-private.h | 5 +- src/lib-dict/dict-sql.c | 85 +++++++++++----- src/lib-dict/dict.c | 13 ++- src/lib-dict/dict.h | 20 +++- 10 files changed, 333 insertions(+), 85 deletions(-) diff --git a/src/dict/dict-commands.c b/src/dict/dict-commands.c index 52a316167b..c8cd4931f6 100644 --- a/src/dict/dict-commands.c +++ b/src/dict/dict-commands.c @@ -54,7 +54,8 @@ static int cmd_iterate_flush(struct dict_connection *conn) o_stream_cork(conn->output); while ((ret = dict_iterate(conn->iter_ctx, &key, &value)) > 0) { str_truncate(str, 0); - str_printfa(str, "%s\t%s\n", key, value); + str_printfa(str, "%c%s\t%s\n", DICT_PROTOCOL_REPLY_OK, + key, value); o_stream_send(conn->output, str_data(str), str_len(str)); if (o_stream_get_buffer_used_size(conn->output) > @@ -154,6 +155,7 @@ static int cmd_begin(struct dict_connection *conn, const char *line) /* */ trans = array_append_space(&conn->transactions); trans->id = id; + trans->conn = conn; trans->ctx = dict_transaction_begin(conn->dict); return 0; } @@ -182,7 +184,7 @@ dict_connection_transaction_lookup_parse(struct dict_connection *conn, static int cmd_commit(struct dict_connection *conn, const char *line) { struct dict_connection_transaction *trans; - const char *reply; + char chr; int ret; if (conn->iter_ctx != NULL) { @@ -194,13 +196,46 @@ static int cmd_commit(struct dict_connection *conn, const char *line) return -1; ret = dict_transaction_commit(&trans->ctx); - reply = t_strdup_printf("%c\n", ret == 0 ? DICT_PROTOCOL_REPLY_OK : - DICT_PROTOCOL_REPLY_FAIL); - o_stream_send_str(conn->output, reply); + switch (ret) { + case 1: + chr = DICT_PROTOCOL_REPLY_OK; + break; + case 0: + chr = DICT_PROTOCOL_REPLY_NOTFOUND; + break; + default: + chr = DICT_PROTOCOL_REPLY_FAIL; + break; + } + o_stream_send_str(conn->output, t_strdup_printf("%c\n", chr)); dict_connection_transaction_array_remove(conn, trans); return 0; } +static void cmd_commit_async_callback(int ret, void *context) +{ + struct dict_connection_transaction *trans = context; + const char *reply; + char chr; + + switch (ret) { + case 1: + chr = DICT_PROTOCOL_REPLY_OK; + break; + case 0: + chr = DICT_PROTOCOL_REPLY_NOTFOUND; + break; + default: + chr = DICT_PROTOCOL_REPLY_FAIL; + break; + } + reply = t_strdup_printf("%c%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_COMMIT, + chr, trans->id); + o_stream_send_str(trans->conn->output, reply); + + dict_connection_transaction_array_remove(trans->conn, trans); +} + static int cmd_commit_async(struct dict_connection *conn, const char *line) { @@ -214,8 +249,8 @@ cmd_commit_async(struct dict_connection *conn, const char *line) if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0) return -1; - dict_transaction_commit_async(&trans->ctx); - dict_connection_transaction_array_remove(conn, trans); + dict_transaction_commit_async(&trans->ctx, cmd_commit_async_callback, + trans); return 0; } diff --git a/src/dict/dict-connection.h b/src/dict/dict-connection.h index fcac0da841..1acf0e8d8b 100644 --- a/src/dict/dict-connection.h +++ b/src/dict/dict-connection.h @@ -5,6 +5,7 @@ struct dict_connection_transaction { unsigned int id; + struct dict_connection *conn; struct dict_transaction_context *ctx; }; diff --git a/src/lib-dict/dict-client.c b/src/lib-dict/dict-client.c index e1481eb7da..76b943b89f 100644 --- a/src/lib-dict/dict-client.c +++ b/src/lib-dict/dict-client.c @@ -1,6 +1,7 @@ /* Copyright (c) 2005-2009 Dovecot authors, see the included COPYING file */ #include "lib.h" +#include "llist.h" #include "str.h" #include "network.h" #include "istream.h" @@ -8,6 +9,7 @@ #include "dict-private.h" #include "dict-client.h" +#include #include #include @@ -24,9 +26,13 @@ struct client_dict { time_t last_connect_try; struct istream *input; struct ostream *output; + struct io *io; + + struct client_dict_transaction_context *transactions; unsigned int connect_counter; unsigned int transaction_id_counter; + unsigned int async_commits; unsigned int in_iteration:1; unsigned int handshaked:1; @@ -41,6 +47,11 @@ struct client_dict_iterate_context { struct client_dict_transaction_context { struct dict_transaction_context ctx; + struct client_dict_transaction_context *prev, *next; + + /* for async commits */ + dict_transaction_commit_callback_t *callback; + void *context; unsigned int id; unsigned int connect_counter; @@ -213,29 +224,97 @@ client_dict_send_transaction_query(struct client_dict_transaction_context *ctx, return 0; } -static char *client_dict_read_line(struct client_dict *dict) +static struct client_dict_transaction_context * +client_dict_transaction_find(struct client_dict *dict, unsigned int id) +{ + struct client_dict_transaction_context *ctx; + + for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) { + if (ctx->id == id) + return ctx; + } + return NULL; +} + +static void +client_dict_finish_transaction(struct client_dict *dict, + unsigned int id, int ret) +{ + struct client_dict_transaction_context *ctx; + + ctx = client_dict_transaction_find(dict, id); + if (ctx == NULL) { + i_error("dict-client: Unknown transaction id %u", id); + return; + } + if (ctx->callback != NULL) + ctx->callback(ret, ctx->context); + + DLLIST_REMOVE(&dict->transactions, ctx); + i_free(ctx); + + i_assert(dict->async_commits > 0); + if (--dict->async_commits == 0) + io_remove(&dict->io); +} + +static int client_dict_read_one_line(struct client_dict *dict, char **line_r) { + unsigned int id; char *line; int ret; - line = i_stream_next_line(dict->input); - if (line != NULL) - return line; - - while ((ret = i_stream_read(dict->input)) > 0) { - line = i_stream_next_line(dict->input); - if (line != NULL) - return line; + *line_r = NULL; + while ((line = i_stream_next_line(dict->input)) == NULL) { + ret = i_stream_read(dict->input); + switch (ret) { + case -1: + if (dict->input->stream_errno != 0) + i_error("read(%s) failed: %m", dict->path); + else { + i_error("read(%s) failed: Remote disconnected", + dict->path); + } + return -1; + case -2: + i_error("read(%s) returned too much data", dict->path); + return -1; + default: + i_assert(ret > 0); + break; + } + } + if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) { + switch (line[1]) { + case DICT_PROTOCOL_REPLY_OK: + ret = 1; + break; + case DICT_PROTOCOL_REPLY_NOTFOUND: + ret = 0; + break; + case DICT_PROTOCOL_REPLY_FAIL: + ret = -1; + break; + default: + i_error("dict-client: Invalid async commit line: %s", + line); + return 0; + } + id = strtoul(line+2, NULL, 10); + client_dict_finish_transaction(dict, id, ret); + return 0; } - i_assert(ret < 0); + *line_r = line; + return 1; +} - if (ret == -2) - i_error("read(%s) returned too much data", dict->path); - else if (dict->input->stream_errno == 0) - i_error("read(%s) failed: Remote disconnected", dict->path); - else - i_error("read(%s) failed: %m", dict->path); - return NULL; +static char *client_dict_read_line(struct client_dict *dict) +{ + char *line; + + while (client_dict_read_one_line(dict, &line) == 0) + ; + return line; } static int client_dict_connect(struct client_dict *dict) @@ -263,6 +342,7 @@ static int client_dict_connect(struct client_dict *dict) dict->input->blocking = TRUE; dict->output = o_stream_create_fd(dict->fd, 4096, FALSE); dict->transaction_id_counter = 0; + dict->async_commits = 0; query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n", DICT_PROTOCOL_CMD_HELLO, @@ -283,6 +363,8 @@ static void client_dict_disconnect(struct client_dict *dict) dict->connect_counter++; dict->handshaked = FALSE; + if (dict->io != NULL) + io_remove(&dict->io); if (dict->input != NULL) i_stream_destroy(&dict->input); if (dict->output != NULL) @@ -339,6 +421,21 @@ static void client_dict_deinit(struct dict *_dict) pool_unref(&dict->pool); } +static int client_dict_wait(struct dict *_dict) +{ + struct client_dict *dict = (struct client_dict *)_dict; + char *line; + int ret = 0; + + while (dict->async_commits > 0) { + if (client_dict_read_one_line(dict, &line) < 0) { + ret = -1; + break; + } + } + return ret; +} + static int client_dict_lookup(struct dict *_dict, pool_t pool, const char *key, const char **value_r) { @@ -420,7 +517,10 @@ static int client_dict_iterate(struct dict_iterate_context *_ctx, /* line contains key \t value */ p_clear(ctx->pool); - value = strchr(line, '\t'); + if (*line != DICT_PROTOCOL_REPLY_OK) + value = NULL; + else + value = strchr(++line, '\t'); if (value == NULL) { /* broken protocol */ i_error("dict client (%s) sent broken reply", dict->path); @@ -454,38 +554,72 @@ client_dict_transaction_init(struct dict *_dict) ctx->ctx.dict = _dict; ctx->id = ++dict->transaction_id_counter; + DLLIST_PREPEND(&dict->transactions, ctx); return &ctx->ctx; } -static int client_dict_transaction_commit(struct dict_transaction_context *_ctx, - bool async) +static void dict_async_input(struct client_dict *dict) +{ + char *line; + size_t size; + int ret; + + i_assert(!dict->in_iteration); + + do { + ret = client_dict_read_one_line(dict, &line); + (void)i_stream_get_data(dict->input, &size); + } while (ret == 0 && size > 0); + + if (ret < 0) + io_remove(&dict->io); +} + +static int +client_dict_transaction_commit(struct dict_transaction_context *_ctx, + bool async, + dict_transaction_commit_callback_t *callback, + void *context) { struct client_dict_transaction_context *ctx = (struct client_dict_transaction_context *)_ctx; struct client_dict *dict = (struct client_dict *)_ctx->dict; - int ret = ctx->failed ? -1 : 0; + int ret = ctx->failed ? -1 : 1; - if (ctx->sent_begin) T_BEGIN { + if (ctx->sent_begin && !ctx->failed) T_BEGIN { const char *query, *line; - query = t_strdup_printf("%c%u\n", ctx->failed ? - DICT_PROTOCOL_CMD_ROLLBACK : - (!async ? DICT_PROTOCOL_CMD_COMMIT : - DICT_PROTOCOL_CMD_COMMIT_ASYNC), + query = t_strdup_printf("%c%u\n", !async ? + DICT_PROTOCOL_CMD_COMMIT : + DICT_PROTOCOL_CMD_COMMIT_ASYNC, ctx->id); if (client_dict_send_transaction_query(ctx, query) < 0) ret = -1; - else if (ret < 0 || async) { - /* no reply */ + else if (async) { + ctx->callback = callback; + ctx->context = context; + if (dict->async_commits++ == 0) { + dict->io = io_add(dict->fd, IO_READ, + dict_async_input, dict); + } } else { /* sync commit, read reply */ line = client_dict_read_line(dict); - if (line == NULL || *line != DICT_PROTOCOL_REPLY_OK) + if (line == NULL) + ret = -1; + else if (*line == DICT_PROTOCOL_REPLY_OK) + ret = 1; + else if (*line == DICT_PROTOCOL_REPLY_NOTFOUND) + ret = 0; + else ret = -1; } } T_END; - i_free(ctx); + if (ret < 0 || !async) { + DLLIST_REMOVE(&dict->transactions, ctx); + i_free(ctx); + } return ret; } @@ -494,6 +628,7 @@ client_dict_transaction_rollback(struct dict_transaction_context *_ctx) { struct client_dict_transaction_context *ctx = (struct client_dict_transaction_context *)_ctx; + struct client_dict *dict = (struct client_dict *)_ctx->dict; if (ctx->sent_begin) T_BEGIN { const char *query; @@ -503,6 +638,7 @@ client_dict_transaction_rollback(struct dict_transaction_context *_ctx) (void)client_dict_send_transaction_query(ctx, query); } T_END; + DLLIST_REMOVE(&dict->transactions, ctx); i_free(ctx); } @@ -560,6 +696,7 @@ struct dict dict_driver_client = { { client_dict_init, client_dict_deinit, + client_dict_wait, client_dict_lookup, client_dict_iterate_init, client_dict_iterate, diff --git a/src/lib-dict/dict-client.h b/src/lib-dict/dict-client.h index 88d40ad9bb..e6156c3565 100644 --- a/src/lib-dict/dict-client.h +++ b/src/lib-dict/dict-client.h @@ -28,10 +28,10 @@ enum { }; enum { - /* For LOOKUP command */ DICT_PROTOCOL_REPLY_OK = 'O', /* */ DICT_PROTOCOL_REPLY_NOTFOUND = 'N', - DICT_PROTOCOL_REPLY_FAIL = 'F' + DICT_PROTOCOL_REPLY_FAIL = 'F', + DICT_PROTOCOL_REPLY_ASYNC_COMMIT = 'A' }; const char *dict_client_escape(const char *src); diff --git a/src/lib-dict/dict-db.c b/src/lib-dict/dict-db.c index ae3147c098..d601173d45 100644 --- a/src/lib-dict/dict-db.c +++ b/src/lib-dict/dict-db.c @@ -372,16 +372,22 @@ db_dict_transaction_init(struct dict *_dict) return &ctx->ctx; } -static int db_dict_transaction_commit(struct dict_transaction_context *_ctx, - bool async ATTR_UNUSED) +static int +db_dict_transaction_commit(struct dict_transaction_context *_ctx, + bool async ATTR_UNUSED, + dict_transaction_commit_callback_t *callback, + void *context) { struct db_dict_transaction_context *ctx = (struct db_dict_transaction_context *)_ctx; int ret; - ret = ctx->tid->commit(ctx->tid, 0); + ret = ctx->tid->commit(ctx->tid, 0) < 0 ? -1 : 1; i_free(ctx); - return ret == 0 ? 0 : -1; + + if (callback != NULL) + callback(ret, context); + return ret; } static void db_dict_transaction_rollback(struct dict_transaction_context *_ctx) @@ -447,6 +453,7 @@ struct dict dict_driver_db = { { db_dict_init, db_dict_deinit, + NULL, db_dict_lookup, db_dict_iterate_init, db_dict_iterate, diff --git a/src/lib-dict/dict-file.c b/src/lib-dict/dict-file.c index 085d651a58..84e04ddd36 100644 --- a/src/lib-dict/dict-file.c +++ b/src/lib-dict/dict-file.c @@ -54,6 +54,8 @@ struct file_dict_transaction_context { pool_t pool; ARRAY_DEFINE(changes, struct file_dict_change); + + unsigned int atomic_inc_not_found:1; }; static struct dotlock_settings file_dict_dotlock_settings = { @@ -252,9 +254,12 @@ static void file_dict_apply_changes(struct file_dict_transaction_context *ctx) switch (changes[i].type) { case FILE_DICT_CHANGE_TYPE_INC: - diff = old_value == NULL ? 0 : - strtoll(old_value, NULL, 10); - diff += changes[i].value.diff; + if (old_value == NULL) { + ctx->atomic_inc_not_found = TRUE; + break; + } + diff = strtoll(old_value, NULL, 10) + + changes[i].value.diff; tmp = t_strdup_printf("%lld", diff); new_len = strlen(tmp); if (old_value == NULL || new_len > strlen(old_value)) @@ -365,15 +370,26 @@ static int file_dict_write_changes(struct file_dict_transaction_context *ctx) return 0; } -static int file_dict_transaction_commit(struct dict_transaction_context *_ctx, - bool async ATTR_UNUSED) +static int +file_dict_transaction_commit(struct dict_transaction_context *_ctx, + bool async ATTR_UNUSED, + dict_transaction_commit_callback_t *callback, + void *context) { struct file_dict_transaction_context *ctx = (struct file_dict_transaction_context *)_ctx; int ret; - ret = file_dict_write_changes(ctx); + if (file_dict_write_changes(ctx) < 0) + ret = -1; + else if (ctx->atomic_inc_not_found) + ret = 0; + else + ret = 1; pool_unref(&ctx->pool); + + if (callback != NULL) + callback(ret, context); return ret; } @@ -429,6 +445,7 @@ struct dict dict_driver_file = { { file_dict_init, file_dict_deinit, + NULL, file_dict_lookup, file_dict_iterate_init, file_dict_iterate, diff --git a/src/lib-dict/dict-private.h b/src/lib-dict/dict-private.h index 7f423e3ede..9e29e8b3df 100644 --- a/src/lib-dict/dict-private.h +++ b/src/lib-dict/dict-private.h @@ -8,6 +8,7 @@ struct dict_vfuncs { enum dict_data_type value_type, const char *username, const char *base_dir); void (*deinit)(struct dict *dict); + int (*wait)(struct dict *dict); int (*lookup)(struct dict *dict, pool_t pool, const char *key, const char **value_r); @@ -21,7 +22,9 @@ struct dict_vfuncs { struct dict_transaction_context *(*transaction_init)(struct dict *dict); int (*transaction_commit)(struct dict_transaction_context *ctx, - bool async); + bool async, + dict_transaction_commit_callback_t *callback, + void *context); void (*transaction_rollback)(struct dict_transaction_context *ctx); void (*set)(struct dict_transaction_context *ctx, diff --git a/src/lib-dict/dict-sql.c b/src/lib-dict/dict-sql.c index 2cc332540c..47bd09272c 100644 --- a/src/lib-dict/dict-sql.c +++ b/src/lib-dict/dict-sql.c @@ -44,6 +44,11 @@ struct sql_dict_iterate_context { unsigned int key_prefix_len, pattern_prefix_len, next_map_idx; }; +struct sql_dict_inc_row { + struct sql_dict_inc_row *prev; + unsigned int rows; +}; + struct sql_dict_transaction_context { struct dict_transaction_context ctx; @@ -52,6 +57,8 @@ struct sql_dict_transaction_context { const struct dict_sql_map *prev_inc_map; char *prev_inc_key; long long prev_inc_diff; + pool_t inc_row_pool; + struct sql_dict_inc_row *inc_row; unsigned int failed:1; unsigned int changed:1; @@ -463,13 +470,16 @@ sql_dict_transaction_init(struct dict *_dict) return &ctx->ctx; } -static int sql_dict_transaction_commit(struct dict_transaction_context *_ctx, - bool async ATTR_UNUSED) +static int +sql_dict_transaction_commit(struct dict_transaction_context *_ctx, + bool async ATTR_UNUSED, + dict_transaction_commit_callback_t *callback, + void *context) { struct sql_dict_transaction_context *ctx = (struct sql_dict_transaction_context *)_ctx; const char *error; - int ret; + int ret = 1; if (ctx->prev_inc_map != NULL) sql_dict_prev_inc_flush(ctx); @@ -478,14 +488,27 @@ static int sql_dict_transaction_commit(struct dict_transaction_context *_ctx, sql_transaction_rollback(&ctx->sql_ctx); ret = -1; } else if (_ctx->changed) { - ret = sql_transaction_commit_s(&ctx->sql_ctx, &error); - if (ret < 0) + if (sql_transaction_commit_s(&ctx->sql_ctx, &error) < 0) { i_error("sql dict: commit failed: %s", error); - } else { - /* nothing to be done */ - ret = 0; + ret = -1; + } else { + while (ctx->inc_row != NULL) { + i_assert(ctx->inc_row->rows != -1UL); + if (ctx->inc_row->rows == 0) { + ret = 0; + break; + } + ctx->inc_row = ctx->inc_row->prev; + } + } } + if (ctx->inc_row_pool != NULL) + pool_unref(&ctx->inc_row_pool); + i_free(ctx->prev_inc_key); i_free(ctx); + + if (callback != NULL) + callback(ret, context); return ret; } @@ -496,6 +519,9 @@ static void sql_dict_transaction_rollback(struct dict_transaction_context *_ctx) if (_ctx->changed) sql_transaction_rollback(&ctx->sql_ctx); + + if (ctx->inc_row_pool != NULL) + pool_unref(&ctx->inc_row_pool); i_free(ctx->prev_inc_key); i_free(ctx); } @@ -603,8 +629,7 @@ sql_dict_update_query(const struct dict_sql_build_query *build) fields[i].map->value_field); if (fields[i].value[0] != '-') str_append_c(query, '+'); - else - str_append(query, fields[i].value); + str_append(query, fields[i].value); } sql_dict_where_build(dict, fields[0].map, build->extra_values, @@ -680,6 +705,22 @@ static void sql_dict_unset(struct dict_transaction_context *_ctx, } T_END; } +static unsigned int * +sql_dict_next_inc_row(struct sql_dict_transaction_context *ctx) +{ + struct sql_dict_inc_row *row; + + if (ctx->inc_row_pool == NULL) { + ctx->inc_row_pool = + pool_alloconly_create("sql dict inc rows", 128); + } + row = p_new(ctx->inc_row_pool, struct sql_dict_inc_row, 1); + row->prev = ctx->inc_row; + row->rows = -1UL; + ctx->inc_row = row; + return &row->rows; +} + static void sql_dict_atomic_inc_real(struct sql_dict_transaction_context *ctx, const char *key, long long diff) { @@ -693,7 +734,6 @@ static void sql_dict_atomic_inc_real(struct sql_dict_transaction_context *ctx, T_BEGIN { struct dict_sql_build_query build; struct dict_sql_build_query_field field; - const char *query; field.map = map; field.value = t_strdup_printf("%lld", diff); @@ -706,14 +746,8 @@ static void sql_dict_atomic_inc_real(struct sql_dict_transaction_context *ctx, build.key1 = key[0]; build.inc = TRUE; - if (diff >= 0) - query = sql_dict_set_query(&build); - else { - /* negative changes can't never be initial values, - use UPDATE directly. */ - query = sql_dict_update_query(&build); - } - sql_update(ctx->sql_ctx, query); + sql_update_get_rows(ctx->sql_ctx, sql_dict_update_query(&build), + sql_dict_next_inc_row(ctx)); } T_END; } @@ -784,6 +818,7 @@ static void sql_dict_atomic_inc(struct dict_transaction_context *_ctx, ctx->prev_inc_diff = diff; return; } + if (!sql_dict_maps_are_mergeable(dict, ctx->prev_inc_map, map, ctx->prev_inc_key, key, &values)) { sql_dict_prev_inc_flush(ctx); @@ -791,7 +826,6 @@ static void sql_dict_atomic_inc(struct dict_transaction_context *_ctx, } else T_BEGIN { struct dict_sql_build_query build; struct dict_sql_build_query_field *field; - const char *query; memset(&build, 0, sizeof(build)); build.dict = dict; @@ -807,14 +841,8 @@ static void sql_dict_atomic_inc(struct dict_transaction_context *_ctx, field->map = map; field->value = t_strdup_printf("%lld", diff); - if (diff >= 0) - query = sql_dict_set_query(&build); - else { - /* negative changes can't never be initial values, - use UPDATE directly. */ - query = sql_dict_update_query(&build); - } - sql_update(ctx->sql_ctx, query); + sql_update_get_rows(ctx->sql_ctx, sql_dict_update_query(&build), + sql_dict_next_inc_row(ctx)); i_free_and_null(ctx->prev_inc_key); ctx->prev_inc_map = NULL; @@ -827,6 +855,7 @@ static struct dict sql_dict = { { sql_dict_init, sql_dict_deinit, + NULL, sql_dict_lookup, sql_dict_iterate_init, sql_dict_iterate, diff --git a/src/lib-dict/dict.c b/src/lib-dict/dict.c index 38f05666d4..6d7d9435b0 100644 --- a/src/lib-dict/dict.c +++ b/src/lib-dict/dict.c @@ -96,6 +96,11 @@ void dict_deinit(struct dict **_dict) dict->v.deinit(dict); } +int dict_wait(struct dict *dict) +{ + return dict->v.wait == NULL ? 1 : dict->v.wait(dict); +} + static bool dict_key_prefix_is_valid(const char *key) { return strncmp(key, DICT_PATH_SHARED, strlen(DICT_PATH_SHARED)) == 0 || @@ -141,15 +146,17 @@ int dict_transaction_commit(struct dict_transaction_context **_ctx) struct dict_transaction_context *ctx = *_ctx; *_ctx = NULL; - return ctx->dict->v.transaction_commit(ctx, FALSE); + return ctx->dict->v.transaction_commit(ctx, FALSE, NULL, NULL); } -void dict_transaction_commit_async(struct dict_transaction_context **_ctx) +void dict_transaction_commit_async(struct dict_transaction_context **_ctx, + dict_transaction_commit_callback_t *callback, + void *context) { struct dict_transaction_context *ctx = *_ctx; *_ctx = NULL; - ctx->dict->v.transaction_commit(ctx, TRUE); + ctx->dict->v.transaction_commit(ctx, TRUE, callback, context); } void dict_transaction_rollback(struct dict_transaction_context **_ctx) diff --git a/src/lib-dict/dict.h b/src/lib-dict/dict.h index d5dc8a615a..540658175b 100644 --- a/src/lib-dict/dict.h +++ b/src/lib-dict/dict.h @@ -17,6 +17,8 @@ enum dict_data_type { DICT_DATA_TYPE_UINT32 }; +typedef void dict_transaction_commit_callback_t(int ret, void *context); + void dict_driver_register(struct dict *driver); void dict_driver_unregister(struct dict *driver); @@ -32,6 +34,9 @@ struct dict *dict_init(const char *uri, enum dict_data_type value_type, const char *username, const char *base_dir); /* Close dictionary. */ void dict_deinit(struct dict **dict); +/* Wait for all pending asynchronous transaction commits to finish. + Returns 0 if ok, -1 if error. */ +int dict_wait(struct dict *dict); /* Lookup value for key. Set it to NULL if it's not found. Returns 1 if found, 0 if not found and -1 if lookup failed. */ @@ -50,10 +55,16 @@ void dict_iterate_deinit(struct dict_iterate_context **ctx); /* Start a new dictionary transaction. */ struct dict_transaction_context *dict_transaction_begin(struct dict *dict); -/* Commit the transaction. Returns 0 if ok, -1 if failed. */ +/* Commit the transaction. Returns 1 if ok, 0 if dict_atomic_inc() was used + on a non-existing key, -1 if failed. */ int dict_transaction_commit(struct dict_transaction_context **ctx); -/* Commit the transaction, but don't wait to see if it finishes successfully. */ -void dict_transaction_commit_async(struct dict_transaction_context **ctx); +/* Commit the transaction, but don't wait to see if it finishes successfully. + If callback isn't NULL, it's called eventually. If it's not called by the + time you want to deinitialize dict, call dict_flush() to wait for the + result. */ +void dict_transaction_commit_async(struct dict_transaction_context **ctx, + dict_transaction_commit_callback_t *callback, + void *context); /* Rollback all changes made in transaction. */ void dict_transaction_rollback(struct dict_transaction_context **ctx); @@ -65,7 +76,8 @@ void dict_unset(struct dict_transaction_context *ctx, const char *key); /* Increase/decrease a numeric value in dictionary. Note that the value is changed when transaction is being committed, so you can't know beforehand - what the value will become. */ + what the value will become. The value is updated only if it already exists, + otherwise commit() will return 0. */ void dict_atomic_inc(struct dict_transaction_context *ctx, const char *key, long long diff); -- 2.47.3