]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
dict: Added support for async commits. Changed dict_atomic_inc() behavior.
authorTimo Sirainen <tss@iki.fi>
Mon, 7 Sep 2009 00:51:25 +0000 (20:51 -0400)
committerTimo Sirainen <tss@iki.fi>
Mon, 7 Sep 2009 00:51:25 +0000 (20:51 -0400)
--HG--
branch : HEAD

src/dict/dict-commands.c
src/dict/dict-connection.h
src/lib-dict/dict-client.c
src/lib-dict/dict-client.h
src/lib-dict/dict-db.c
src/lib-dict/dict-file.c
src/lib-dict/dict-private.h
src/lib-dict/dict-sql.c
src/lib-dict/dict.c
src/lib-dict/dict.h

index 52a316167b346cdd989ff5819c54714104b70608..c8cd4931f6577f639f50c7a6128170bba8259667 100644 (file)
@@ -54,7 +54,8 @@ static int cmd_iterate_flush(struct dict_connection *conn)
        o_stream_cork(conn->output);
        while ((ret = dict_iterate(conn->iter_ctx, &key, &value)) > 0) {
                str_truncate(str, 0);
-               str_printfa(str, "%s\t%s\n", key, value);
+               str_printfa(str, "%c%s\t%s\n", DICT_PROTOCOL_REPLY_OK,
+                           key, value);
                o_stream_send(conn->output, str_data(str), str_len(str));
 
                if (o_stream_get_buffer_used_size(conn->output) >
@@ -154,6 +155,7 @@ static int cmd_begin(struct dict_connection *conn, const char *line)
        /* <id> */
        trans = array_append_space(&conn->transactions);
        trans->id = id;
+       trans->conn = conn;
        trans->ctx = dict_transaction_begin(conn->dict);
        return 0;
 }
@@ -182,7 +184,7 @@ dict_connection_transaction_lookup_parse(struct dict_connection *conn,
 static int cmd_commit(struct dict_connection *conn, const char *line)
 {
        struct dict_connection_transaction *trans;
-       const char *reply;
+       char chr;
        int ret;
 
        if (conn->iter_ctx != NULL) {
@@ -194,13 +196,46 @@ static int cmd_commit(struct dict_connection *conn, const char *line)
                return -1;
 
        ret = dict_transaction_commit(&trans->ctx);
-       reply = t_strdup_printf("%c\n", ret == 0 ? DICT_PROTOCOL_REPLY_OK :
-                               DICT_PROTOCOL_REPLY_FAIL);
-       o_stream_send_str(conn->output, reply);
+       switch (ret) {
+       case 1:
+               chr = DICT_PROTOCOL_REPLY_OK;
+               break;
+       case 0:
+               chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+               break;
+       default:
+               chr = DICT_PROTOCOL_REPLY_FAIL;
+               break;
+       }
+       o_stream_send_str(conn->output, t_strdup_printf("%c\n", chr));
        dict_connection_transaction_array_remove(conn, trans);
        return 0;
 }
 
+static void cmd_commit_async_callback(int ret, void *context)
+{
+       struct dict_connection_transaction *trans = context;
+       const char *reply;
+       char chr;
+
+       switch (ret) {
+       case 1:
+               chr = DICT_PROTOCOL_REPLY_OK;
+               break;
+       case 0:
+               chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+               break;
+       default:
+               chr = DICT_PROTOCOL_REPLY_FAIL;
+               break;
+       }
+       reply = t_strdup_printf("%c%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_COMMIT,
+                               chr, trans->id);
+       o_stream_send_str(trans->conn->output, reply);
+
+       dict_connection_transaction_array_remove(trans->conn, trans);
+}
+
 static int
 cmd_commit_async(struct dict_connection *conn, const char *line)
 {
@@ -214,8 +249,8 @@ cmd_commit_async(struct dict_connection *conn, const char *line)
        if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
                return -1;
 
-       dict_transaction_commit_async(&trans->ctx);
-       dict_connection_transaction_array_remove(conn, trans);
+       dict_transaction_commit_async(&trans->ctx, cmd_commit_async_callback,
+                                     trans);
        return 0;
 }
 
index fcac0da841eff85e2dcdf4950120e304473b5c66..1acf0e8d8bce1d2f77323858123b0a3dd6e83d70 100644 (file)
@@ -5,6 +5,7 @@
 
 struct dict_connection_transaction {
        unsigned int id;
+       struct dict_connection *conn;
        struct dict_transaction_context *ctx;
 };
 
index e1481eb7daab79ee3a73914de51ceb2b74b86abb..76b943b89f82b90175e1abaf0aaf144b6907da1b 100644 (file)
@@ -1,6 +1,7 @@
 /* Copyright (c) 2005-2009 Dovecot authors, see the included COPYING file */
 
 #include "lib.h"
+#include "llist.h"
 #include "str.h"
 #include "network.h"
 #include "istream.h"
@@ -8,6 +9,7 @@
 #include "dict-private.h"
 #include "dict-client.h"
 
+#include <stdlib.h>
 #include <unistd.h>
 #include <fcntl.h>
 
@@ -24,9 +26,13 @@ struct client_dict {
        time_t last_connect_try;
        struct istream *input;
        struct ostream *output;
+       struct io *io;
+
+       struct client_dict_transaction_context *transactions;
 
        unsigned int connect_counter;
        unsigned int transaction_id_counter;
+       unsigned int async_commits;
 
        unsigned int in_iteration:1;
        unsigned int handshaked:1;
@@ -41,6 +47,11 @@ struct client_dict_iterate_context {
 
 struct client_dict_transaction_context {
        struct dict_transaction_context ctx;
+       struct client_dict_transaction_context *prev, *next;
+
+       /* for async commits */
+       dict_transaction_commit_callback_t *callback;
+       void *context;
 
        unsigned int id;
        unsigned int connect_counter;
@@ -213,29 +224,97 @@ client_dict_send_transaction_query(struct client_dict_transaction_context *ctx,
        return 0;
 }
 
-static char *client_dict_read_line(struct client_dict *dict)
+static struct client_dict_transaction_context *
+client_dict_transaction_find(struct client_dict *dict, unsigned int id)
+{
+       struct client_dict_transaction_context *ctx;
+
+       for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) {
+               if (ctx->id == id)
+                       return ctx;
+       }
+       return NULL;
+}
+
+static void
+client_dict_finish_transaction(struct client_dict *dict,
+                              unsigned int id, int ret)
+{
+       struct client_dict_transaction_context *ctx;
+
+       ctx = client_dict_transaction_find(dict, id);
+       if (ctx == NULL) {
+               i_error("dict-client: Unknown transaction id %u", id);
+               return;
+       }
+       if (ctx->callback != NULL)
+               ctx->callback(ret, ctx->context);
+
+       DLLIST_REMOVE(&dict->transactions, ctx);
+       i_free(ctx);
+
+       i_assert(dict->async_commits > 0);
+       if (--dict->async_commits == 0)
+               io_remove(&dict->io);
+}
+
+static int client_dict_read_one_line(struct client_dict *dict, char **line_r)
 {
+       unsigned int id;
        char *line;
        int ret;
 
-       line = i_stream_next_line(dict->input);
-       if (line != NULL)
-               return line;
-
-       while ((ret = i_stream_read(dict->input)) > 0) {
-               line = i_stream_next_line(dict->input);
-               if (line != NULL)
-                       return line;
+       *line_r = NULL;
+       while ((line = i_stream_next_line(dict->input)) == NULL) {
+               ret = i_stream_read(dict->input);
+               switch (ret) {
+               case -1:
+                       if (dict->input->stream_errno != 0)
+                               i_error("read(%s) failed: %m", dict->path);
+                       else {
+                               i_error("read(%s) failed: Remote disconnected",
+                                       dict->path);
+                       }
+                       return -1;
+               case -2:
+                       i_error("read(%s) returned too much data", dict->path);
+                       return -1;
+               default:
+                       i_assert(ret > 0);
+                       break;
+               }
+       }
+       if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) {
+               switch (line[1]) {
+               case DICT_PROTOCOL_REPLY_OK:
+                       ret = 1;
+                       break;
+               case DICT_PROTOCOL_REPLY_NOTFOUND:
+                       ret = 0;
+                       break;
+               case DICT_PROTOCOL_REPLY_FAIL:
+                       ret = -1;
+                       break;
+               default:
+                       i_error("dict-client: Invalid async commit line: %s",
+                               line);
+                       return 0;
+               }
+               id = strtoul(line+2, NULL, 10);
+               client_dict_finish_transaction(dict, id, ret);
+               return 0;
        }
-       i_assert(ret < 0);
+       *line_r = line;
+       return 1;
+}
 
-       if (ret == -2)
-               i_error("read(%s) returned too much data", dict->path);
-       else if (dict->input->stream_errno == 0)
-               i_error("read(%s) failed: Remote disconnected", dict->path);
-       else
-               i_error("read(%s) failed: %m", dict->path);
-       return NULL;
+static char *client_dict_read_line(struct client_dict *dict)
+{
+       char *line;
+
+       while (client_dict_read_one_line(dict, &line) == 0)
+               ;
+       return line;
 }
 
 static int client_dict_connect(struct client_dict *dict)
@@ -263,6 +342,7 @@ static int client_dict_connect(struct client_dict *dict)
        dict->input->blocking = TRUE;
        dict->output = o_stream_create_fd(dict->fd, 4096, FALSE);
        dict->transaction_id_counter = 0;
+       dict->async_commits = 0;
 
        query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n",
                                DICT_PROTOCOL_CMD_HELLO,
@@ -283,6 +363,8 @@ static void client_dict_disconnect(struct client_dict *dict)
        dict->connect_counter++;
        dict->handshaked = FALSE;
 
+       if (dict->io != NULL)
+               io_remove(&dict->io);
        if (dict->input != NULL)
                i_stream_destroy(&dict->input);
        if (dict->output != NULL)
@@ -339,6 +421,21 @@ static void client_dict_deinit(struct dict *_dict)
        pool_unref(&dict->pool);
 }
 
+static int client_dict_wait(struct dict *_dict)
+{
+       struct client_dict *dict = (struct client_dict *)_dict;
+       char *line;
+       int ret = 0;
+
+       while (dict->async_commits > 0) {
+               if (client_dict_read_one_line(dict, &line) < 0) {
+                       ret = -1;
+                       break;
+               }
+       }
+       return ret;
+}
+
 static int client_dict_lookup(struct dict *_dict, pool_t pool,
                              const char *key, const char **value_r)
 {
@@ -420,7 +517,10 @@ static int client_dict_iterate(struct dict_iterate_context *_ctx,
        /* line contains key \t value */
        p_clear(ctx->pool);
 
-       value = strchr(line, '\t');
+       if (*line != DICT_PROTOCOL_REPLY_OK)
+               value = NULL;
+       else
+               value = strchr(++line, '\t');
        if (value == NULL) {
                /* broken protocol */
                i_error("dict client (%s) sent broken reply", dict->path);
@@ -454,38 +554,72 @@ client_dict_transaction_init(struct dict *_dict)
        ctx->ctx.dict = _dict;
        ctx->id = ++dict->transaction_id_counter;
 
+       DLLIST_PREPEND(&dict->transactions, ctx);
        return &ctx->ctx;
 }
 
-static int client_dict_transaction_commit(struct dict_transaction_context *_ctx,
-                                         bool async)
+static void dict_async_input(struct client_dict *dict)
+{
+       char *line;
+       size_t size;
+       int ret;
+
+       i_assert(!dict->in_iteration);
+
+       do {
+               ret = client_dict_read_one_line(dict, &line);
+               (void)i_stream_get_data(dict->input, &size);
+       } while (ret == 0 && size > 0);
+
+       if (ret < 0)
+               io_remove(&dict->io);
+}
+
+static int
+client_dict_transaction_commit(struct dict_transaction_context *_ctx,
+                              bool async,
+                              dict_transaction_commit_callback_t *callback,
+                              void *context)
 {
        struct client_dict_transaction_context *ctx =
                (struct client_dict_transaction_context *)_ctx;
        struct client_dict *dict = (struct client_dict *)_ctx->dict;
-       int ret = ctx->failed ? -1 : 0;
+       int ret = ctx->failed ? -1 : 1;
 
-       if (ctx->sent_begin) T_BEGIN {
+       if (ctx->sent_begin && !ctx->failed) T_BEGIN {
                const char *query, *line;
 
-               query = t_strdup_printf("%c%u\n", ctx->failed ?
-                                       DICT_PROTOCOL_CMD_ROLLBACK :
-                                       (!async ? DICT_PROTOCOL_CMD_COMMIT :
-                                        DICT_PROTOCOL_CMD_COMMIT_ASYNC),
+               query = t_strdup_printf("%c%u\n", !async ?
+                                       DICT_PROTOCOL_CMD_COMMIT :
+                                       DICT_PROTOCOL_CMD_COMMIT_ASYNC,
                                        ctx->id);
                if (client_dict_send_transaction_query(ctx, query) < 0)
                        ret = -1;
-               else if (ret < 0 || async) {
-                       /* no reply */
+               else if (async) {
+                       ctx->callback = callback;
+                       ctx->context = context;
+                       if (dict->async_commits++ == 0) {
+                               dict->io = io_add(dict->fd, IO_READ,
+                                                 dict_async_input, dict);
+                       }
                } else {
                        /* sync commit, read reply */
                        line = client_dict_read_line(dict);
-                       if (line == NULL || *line != DICT_PROTOCOL_REPLY_OK)
+                       if (line == NULL)
+                               ret = -1;
+                       else if (*line == DICT_PROTOCOL_REPLY_OK)
+                               ret = 1;
+                       else if (*line == DICT_PROTOCOL_REPLY_NOTFOUND)
+                               ret = 0;
+                       else
                                ret = -1;
                }
        } T_END;
 
-       i_free(ctx);
+       if (ret < 0 || !async) {
+               DLLIST_REMOVE(&dict->transactions, ctx);
+               i_free(ctx);
+       }
        return ret;
 }
 
@@ -494,6 +628,7 @@ client_dict_transaction_rollback(struct dict_transaction_context *_ctx)
 {
        struct client_dict_transaction_context *ctx =
                (struct client_dict_transaction_context *)_ctx;
+       struct client_dict *dict = (struct client_dict *)_ctx->dict;
 
        if (ctx->sent_begin) T_BEGIN {
                const char *query;
@@ -503,6 +638,7 @@ client_dict_transaction_rollback(struct dict_transaction_context *_ctx)
                (void)client_dict_send_transaction_query(ctx, query);
        } T_END;
 
+       DLLIST_REMOVE(&dict->transactions, ctx);
        i_free(ctx);
 }
 
@@ -560,6 +696,7 @@ struct dict dict_driver_client = {
        {
                client_dict_init,
                client_dict_deinit,
+               client_dict_wait,
                client_dict_lookup,
                client_dict_iterate_init,
                client_dict_iterate,
index 88d40ad9bbb6cd0bba138c057dbbfc85df41c02c..e6156c3565a9dcbc515016074ac246e2d1114456 100644 (file)
@@ -28,10 +28,10 @@ enum {
 };
 
 enum {
-       /* For LOOKUP command */
        DICT_PROTOCOL_REPLY_OK = 'O', /* <value> */
        DICT_PROTOCOL_REPLY_NOTFOUND = 'N',
-       DICT_PROTOCOL_REPLY_FAIL = 'F'
+       DICT_PROTOCOL_REPLY_FAIL = 'F',
+       DICT_PROTOCOL_REPLY_ASYNC_COMMIT = 'A'
 };
 
 const char *dict_client_escape(const char *src);
index ae3147c098a705843995f45181d22ffefdad27f8..d601173d45ff5573ba64cb602fb7f4b9ccfc65e7 100644 (file)
@@ -372,16 +372,22 @@ db_dict_transaction_init(struct dict *_dict)
        return &ctx->ctx;
 }
 
-static int db_dict_transaction_commit(struct dict_transaction_context *_ctx,
-                                     bool async ATTR_UNUSED)
+static int
+db_dict_transaction_commit(struct dict_transaction_context *_ctx,
+                          bool async ATTR_UNUSED,
+                          dict_transaction_commit_callback_t *callback,
+                          void *context)
 {
        struct db_dict_transaction_context *ctx =
                (struct db_dict_transaction_context *)_ctx;
        int ret;
 
-       ret = ctx->tid->commit(ctx->tid, 0);
+       ret = ctx->tid->commit(ctx->tid, 0) < 0 ? -1 : 1;
        i_free(ctx);
-       return ret == 0 ? 0 : -1;
+
+       if (callback != NULL)
+               callback(ret, context);
+       return ret;
 }
 
 static void db_dict_transaction_rollback(struct dict_transaction_context *_ctx)
@@ -447,6 +453,7 @@ struct dict dict_driver_db = {
        {
                db_dict_init,
                db_dict_deinit,
+               NULL,
                db_dict_lookup,
                db_dict_iterate_init,
                db_dict_iterate,
index 085d651a5839c017d9057869b720985a1cf29d8c..84e04ddd366c654e21d0717fbba6b967cb986e8e 100644 (file)
@@ -54,6 +54,8 @@ struct file_dict_transaction_context {
 
        pool_t pool;
        ARRAY_DEFINE(changes, struct file_dict_change);
+
+       unsigned int atomic_inc_not_found:1;
 };
 
 static struct dotlock_settings file_dict_dotlock_settings = {
@@ -252,9 +254,12 @@ static void file_dict_apply_changes(struct file_dict_transaction_context *ctx)
 
                switch (changes[i].type) {
                case FILE_DICT_CHANGE_TYPE_INC:
-                       diff = old_value == NULL ? 0 :
-                               strtoll(old_value, NULL, 10);
-                       diff += changes[i].value.diff;
+                       if (old_value == NULL) {
+                               ctx->atomic_inc_not_found = TRUE;
+                               break;
+                       }
+                       diff = strtoll(old_value, NULL, 10) +
+                               changes[i].value.diff;
                        tmp = t_strdup_printf("%lld", diff);
                        new_len = strlen(tmp);
                        if (old_value == NULL || new_len > strlen(old_value))
@@ -365,15 +370,26 @@ static int file_dict_write_changes(struct file_dict_transaction_context *ctx)
        return 0;
 }
 
-static int file_dict_transaction_commit(struct dict_transaction_context *_ctx,
-                                       bool async ATTR_UNUSED)
+static int
+file_dict_transaction_commit(struct dict_transaction_context *_ctx,
+                            bool async ATTR_UNUSED,
+                            dict_transaction_commit_callback_t *callback,
+                            void *context)
 {
        struct file_dict_transaction_context *ctx =
                (struct file_dict_transaction_context *)_ctx;
        int ret;
 
-       ret = file_dict_write_changes(ctx);
+       if (file_dict_write_changes(ctx) < 0)
+               ret = -1;
+       else if (ctx->atomic_inc_not_found)
+               ret = 0;
+       else
+               ret = 1;
        pool_unref(&ctx->pool);
+
+       if (callback != NULL)
+               callback(ret, context);
        return ret;
 }
 
@@ -429,6 +445,7 @@ struct dict dict_driver_file = {
        {
                file_dict_init,
                file_dict_deinit,
+               NULL,
                file_dict_lookup,
                file_dict_iterate_init,
                file_dict_iterate,
index 7f423e3ede4d8cbda40fa6ea83a4e46d6bc29600..9e29e8b3dfa0545457f0f120a7fbaaea8ae63f1a 100644 (file)
@@ -8,6 +8,7 @@ struct dict_vfuncs {
                             enum dict_data_type value_type,
                             const char *username, const char *base_dir);
        void (*deinit)(struct dict *dict);
+       int (*wait)(struct dict *dict);
 
        int (*lookup)(struct dict *dict, pool_t pool,
                      const char *key, const char **value_r);
@@ -21,7 +22,9 @@ struct dict_vfuncs {
 
        struct dict_transaction_context *(*transaction_init)(struct dict *dict);
        int (*transaction_commit)(struct dict_transaction_context *ctx,
-                                 bool async);
+                                 bool async,
+                                 dict_transaction_commit_callback_t *callback,
+                                 void *context);
        void (*transaction_rollback)(struct dict_transaction_context *ctx);
 
        void (*set)(struct dict_transaction_context *ctx,
index 2cc332540c53d08198e741fef700e19aa3ef7423..47bd09272c6df63a93a8f3f4de5d36c54ec0c0af 100644 (file)
@@ -44,6 +44,11 @@ struct sql_dict_iterate_context {
        unsigned int key_prefix_len, pattern_prefix_len, next_map_idx;
 };
 
+struct sql_dict_inc_row {
+       struct sql_dict_inc_row *prev;
+       unsigned int rows;
+};
+
 struct sql_dict_transaction_context {
        struct dict_transaction_context ctx;
 
@@ -52,6 +57,8 @@ struct sql_dict_transaction_context {
        const struct dict_sql_map *prev_inc_map;
        char *prev_inc_key;
        long long prev_inc_diff;
+       pool_t inc_row_pool;
+       struct sql_dict_inc_row *inc_row;
 
        unsigned int failed:1;
        unsigned int changed:1;
@@ -463,13 +470,16 @@ sql_dict_transaction_init(struct dict *_dict)
        return &ctx->ctx;
 }
 
-static int sql_dict_transaction_commit(struct dict_transaction_context *_ctx,
-                                      bool async ATTR_UNUSED)
+static int
+sql_dict_transaction_commit(struct dict_transaction_context *_ctx,
+                           bool async ATTR_UNUSED,
+                           dict_transaction_commit_callback_t *callback,
+                           void *context)
 {
        struct sql_dict_transaction_context *ctx =
                (struct sql_dict_transaction_context *)_ctx;
        const char *error;
-       int ret;
+       int ret = 1;
 
        if (ctx->prev_inc_map != NULL)
                sql_dict_prev_inc_flush(ctx);
@@ -478,14 +488,27 @@ static int sql_dict_transaction_commit(struct dict_transaction_context *_ctx,
                sql_transaction_rollback(&ctx->sql_ctx);
                ret = -1;
        } else if (_ctx->changed) {
-               ret = sql_transaction_commit_s(&ctx->sql_ctx, &error);
-               if (ret < 0)
+               if (sql_transaction_commit_s(&ctx->sql_ctx, &error) < 0) {
                        i_error("sql dict: commit failed: %s", error);
-       } else {
-               /* nothing to be done */
-               ret = 0;
+                       ret = -1;
+               } else {
+                       while (ctx->inc_row != NULL) {
+                               i_assert(ctx->inc_row->rows != -1UL);
+                               if (ctx->inc_row->rows == 0) {
+                                       ret = 0;
+                                       break;
+                               }
+                               ctx->inc_row = ctx->inc_row->prev;
+                       }
+               }
        }
+       if (ctx->inc_row_pool != NULL)
+               pool_unref(&ctx->inc_row_pool);
+       i_free(ctx->prev_inc_key);
        i_free(ctx);
+
+       if (callback != NULL)
+               callback(ret, context);
        return ret;
 }
 
@@ -496,6 +519,9 @@ static void sql_dict_transaction_rollback(struct dict_transaction_context *_ctx)
 
        if (_ctx->changed)
                sql_transaction_rollback(&ctx->sql_ctx);
+
+       if (ctx->inc_row_pool != NULL)
+               pool_unref(&ctx->inc_row_pool);
        i_free(ctx->prev_inc_key);
        i_free(ctx);
 }
@@ -603,8 +629,7 @@ sql_dict_update_query(const struct dict_sql_build_query *build)
                            fields[i].map->value_field);
                if (fields[i].value[0] != '-')
                        str_append_c(query, '+');
-               else
-                       str_append(query, fields[i].value);
+               str_append(query, fields[i].value);
        }
 
        sql_dict_where_build(dict, fields[0].map, build->extra_values,
@@ -680,6 +705,22 @@ static void sql_dict_unset(struct dict_transaction_context *_ctx,
        } T_END;
 }
 
+static unsigned int *
+sql_dict_next_inc_row(struct sql_dict_transaction_context *ctx)
+{
+       struct sql_dict_inc_row *row;
+
+       if (ctx->inc_row_pool == NULL) {
+               ctx->inc_row_pool =
+                       pool_alloconly_create("sql dict inc rows", 128);
+       }
+       row = p_new(ctx->inc_row_pool, struct sql_dict_inc_row, 1);
+       row->prev = ctx->inc_row;
+       row->rows = -1UL;
+       ctx->inc_row = row;
+       return &row->rows;
+}
+
 static void sql_dict_atomic_inc_real(struct sql_dict_transaction_context *ctx,
                                     const char *key, long long diff)
 {
@@ -693,7 +734,6 @@ static void sql_dict_atomic_inc_real(struct sql_dict_transaction_context *ctx,
        T_BEGIN {
                struct dict_sql_build_query build;
                struct dict_sql_build_query_field field;
-               const char *query;
 
                field.map = map;
                field.value = t_strdup_printf("%lld", diff);
@@ -706,14 +746,8 @@ static void sql_dict_atomic_inc_real(struct sql_dict_transaction_context *ctx,
                build.key1 = key[0];
                build.inc = TRUE;
 
-               if (diff >= 0)
-                       query = sql_dict_set_query(&build);
-               else {
-                       /* negative changes can't never be initial values,
-                          use UPDATE directly. */
-                       query = sql_dict_update_query(&build);
-               }
-               sql_update(ctx->sql_ctx, query);
+               sql_update_get_rows(ctx->sql_ctx, sql_dict_update_query(&build),
+                                   sql_dict_next_inc_row(ctx));
        } T_END;
 }
 
@@ -784,6 +818,7 @@ static void sql_dict_atomic_inc(struct dict_transaction_context *_ctx,
                ctx->prev_inc_diff = diff;
                return;
        }
+
        if (!sql_dict_maps_are_mergeable(dict, ctx->prev_inc_map, map,
                                         ctx->prev_inc_key, key, &values)) {
                sql_dict_prev_inc_flush(ctx);
@@ -791,7 +826,6 @@ static void sql_dict_atomic_inc(struct dict_transaction_context *_ctx,
        } else T_BEGIN {
                struct dict_sql_build_query build;
                struct dict_sql_build_query_field *field;
-               const char *query;
 
                memset(&build, 0, sizeof(build));
                build.dict = dict;
@@ -807,14 +841,8 @@ static void sql_dict_atomic_inc(struct dict_transaction_context *_ctx,
                field->map = map;
                field->value = t_strdup_printf("%lld", diff);
 
-               if (diff >= 0)
-                       query = sql_dict_set_query(&build);
-               else {
-                       /* negative changes can't never be initial values,
-                          use UPDATE directly. */
-                       query = sql_dict_update_query(&build);
-               }
-               sql_update(ctx->sql_ctx, query);
+               sql_update_get_rows(ctx->sql_ctx, sql_dict_update_query(&build),
+                                   sql_dict_next_inc_row(ctx));
 
                i_free_and_null(ctx->prev_inc_key);
                ctx->prev_inc_map = NULL;
@@ -827,6 +855,7 @@ static struct dict sql_dict = {
        {
                sql_dict_init,
                sql_dict_deinit,
+               NULL,
                sql_dict_lookup,
                sql_dict_iterate_init,
                sql_dict_iterate,
index 38f05666d4cd92294360c372144f7f2c3cf5310e..6d7d9435b08d19c7d0e7dd232a6add9313e650e8 100644 (file)
@@ -96,6 +96,11 @@ void dict_deinit(struct dict **_dict)
        dict->v.deinit(dict);
 }
 
+int dict_wait(struct dict *dict)
+{
+       return dict->v.wait == NULL ? 1 : dict->v.wait(dict);
+}
+
 static bool dict_key_prefix_is_valid(const char *key)
 {
        return strncmp(key, DICT_PATH_SHARED, strlen(DICT_PATH_SHARED)) == 0 ||
@@ -141,15 +146,17 @@ int dict_transaction_commit(struct dict_transaction_context **_ctx)
        struct dict_transaction_context *ctx = *_ctx;
 
        *_ctx = NULL;
-       return ctx->dict->v.transaction_commit(ctx, FALSE);
+       return ctx->dict->v.transaction_commit(ctx, FALSE, NULL, NULL);
 }
 
-void dict_transaction_commit_async(struct dict_transaction_context **_ctx)
+void dict_transaction_commit_async(struct dict_transaction_context **_ctx,
+                                  dict_transaction_commit_callback_t *callback,
+                                  void *context)
 {
        struct dict_transaction_context *ctx = *_ctx;
 
        *_ctx = NULL;
-       ctx->dict->v.transaction_commit(ctx, TRUE);
+       ctx->dict->v.transaction_commit(ctx, TRUE, callback, context);
 }
 
 void dict_transaction_rollback(struct dict_transaction_context **_ctx)
index d5dc8a615ac3fb6233c8bb164997aec72a755c76..540658175b37df9634a63b580be6a824514735ba 100644 (file)
@@ -17,6 +17,8 @@ enum dict_data_type {
        DICT_DATA_TYPE_UINT32
 };
 
+typedef void dict_transaction_commit_callback_t(int ret, void *context);
+
 void dict_driver_register(struct dict *driver);
 void dict_driver_unregister(struct dict *driver);
 
@@ -32,6 +34,9 @@ struct dict *dict_init(const char *uri, enum dict_data_type value_type,
                       const char *username, const char *base_dir);
 /* Close dictionary. */
 void dict_deinit(struct dict **dict);
+/* Wait for all pending asynchronous transaction commits to finish.
+   Returns 0 if ok, -1 if error. */
+int dict_wait(struct dict *dict);
 
 /* Lookup value for key. Set it to NULL if it's not found.
    Returns 1 if found, 0 if not found and -1 if lookup failed. */
@@ -50,10 +55,16 @@ void dict_iterate_deinit(struct dict_iterate_context **ctx);
 
 /* Start a new dictionary transaction. */
 struct dict_transaction_context *dict_transaction_begin(struct dict *dict);
-/* Commit the transaction. Returns 0 if ok, -1 if failed. */
+/* Commit the transaction. Returns 1 if ok, 0 if dict_atomic_inc() was used
+   on a non-existing key, -1 if failed. */
 int dict_transaction_commit(struct dict_transaction_context **ctx);
-/* Commit the transaction, but don't wait to see if it finishes successfully. */
-void dict_transaction_commit_async(struct dict_transaction_context **ctx);
+/* Commit the transaction, but don't wait to see if it finishes successfully.
+   If callback isn't NULL, it's called eventually. If it's not called by the
+   time you want to deinitialize dict, call dict_flush() to wait for the
+   result. */
+void dict_transaction_commit_async(struct dict_transaction_context **ctx,
+                                  dict_transaction_commit_callback_t *callback,
+                                  void *context);
 /* Rollback all changes made in transaction. */
 void dict_transaction_rollback(struct dict_transaction_context **ctx);
 
@@ -65,7 +76,8 @@ void dict_unset(struct dict_transaction_context *ctx,
                const char *key);
 /* Increase/decrease a numeric value in dictionary. Note that the value is
    changed when transaction is being committed, so you can't know beforehand
-   what the value will become. */
+   what the value will become. The value is updated only if it already exists,
+   otherwise commit() will return 0. */
 void dict_atomic_inc(struct dict_transaction_context *ctx,
                     const char *key, long long diff);