]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
lib-dict: dict-client rewrite to support async operations
authorTimo Sirainen <timo.sirainen@dovecot.fi>
Wed, 1 Jun 2016 21:57:17 +0000 (00:57 +0300)
committerGitLab <gitlab@git.dovecot.net>
Sun, 19 Jun 2016 23:37:38 +0000 (02:37 +0300)
src/lib-dict/dict-client.c

index b62beb14ade32941cd74ae43ffcd0b1b5469e16e..9b31a909e9d754549706d894620c040f37482093 100644 (file)
@@ -1,11 +1,12 @@
 /* Copyright (c) 2005-2016 Dovecot authors, see the included COPYING file */
 
 #include "lib.h"
+#include "array.h"
 #include "llist.h"
 #include "str.h"
 #include "strescape.h"
-#include "net.h"
-#include "istream.h"
+#include "time-util.h"
+#include "connection.h"
 #include "ostream.h"
 #include "eacces-error.h"
 #include "dict-private.h"
 #define DICT_CLIENT_DEFAULT_TIMEOUT_MSECS 0
 
 /* Abort dict lookup after this many seconds. */
-#define DICT_CLIENT_READ_TIMEOUT_SECS 30
-/* Log a warning if dict lookup takes longer than this many seconds. */
-#define DICT_CLIENT_READ_WARN_TIMEOUT_SECS 5
+#define DICT_CLIENT_REQUEST_TIMEOUT_MSECS 30000
+/* Log a warning if dict lookup takes longer than this many milliseconds. */
+#define DICT_CLIENT_REQUEST_WARN_TIMEOUT_MSECS 5000
+
+struct client_dict_cmd {
+       int refcount;
+       struct client_dict *dict;
+       struct timeval start_time;
+       char *query;
+
+       bool retry_errors;
+       bool no_replies;
+       bool unfinished;
+
+       void (*callback)(struct client_dict_cmd *cmd,
+                        const char *line, const char *error);
+        struct client_dict_iterate_context *iter;
+
+       struct {
+               dict_lookup_callback_t *lookup;
+               dict_transaction_commit_callback_t *commit;
+               void *context;
+       } api_callback;
+};
+
+struct dict_connection {
+       struct connection conn;
+       struct client_dict *dict;
+};
 
 struct client_dict {
        struct dict dict;
+       struct dict_connection conn;
 
-       pool_t pool;
-       int fd;
-       const char *uri;
-       const char *username;
-       const char *path;
+       char *uri, *username;
        enum dict_data_type value_type;
 
        time_t last_failed_connect;
-       struct istream *input;
-       struct ostream *output;
-       struct io *io;
+       struct ioloop *ioloop, *prev_ioloop;
+       struct timeout *to_requests;
        struct timeout *to_idle;
        unsigned int idle_msecs;
+       struct timeval last_input;
 
+       ARRAY(struct client_dict_cmd *) cmds;
        struct client_dict_transaction_context *transactions;
 
-       unsigned int connect_counter;
        unsigned int transaction_id_counter;
-       unsigned int async_commits;
-       unsigned int iter_replies_skip;
+};
 
-       bool in_iteration:1;
-       bool handshaked:1;
+struct client_dict_iter_result {
+       const char *key, *value;
 };
 
 struct client_dict_iterate_context {
        struct dict_iterate_context ctx;
        char *error;
 
-       pool_t pool;
+       pool_t results_pool;
+       ARRAY(struct client_dict_iter_result) results;
+       unsigned int result_idx;
+
+       bool async;
        bool finished;
+       bool deinit;
 };
 
 struct client_dict_transaction_context {
        struct dict_transaction_context ctx;
        struct client_dict_transaction_context *prev, *next;
 
-       /* for async commits */
-       dict_transaction_commit_callback_t *callback;
-       void *context;
-
        char *error;
 
        unsigned int id;
-       unsigned int connect_counter;
 
        bool sent_begin:1;
-       bool async:1;
-       bool committed:1;
 };
 
+static struct connection_list *dict_connections;
+
 static int client_dict_connect(struct client_dict *dict, const char **error_r);
 static void client_dict_disconnect(struct client_dict *dict, const char *reason);
 
-static int client_dict_send_query(struct client_dict *dict, const char *query,
-                                 const char **error_r)
+static struct client_dict_cmd *
+client_dict_cmd_init(struct client_dict *dict, const char *query)
 {
-       if (dict->output == NULL) {
-               /* not connected currently */
-               if (client_dict_connect(dict, error_r) < 0)
-                       return -1;
-       }
-
-       if (o_stream_send_str(dict->output, query) < 0 ||
-           o_stream_flush(dict->output) < 0) {
-               /* Send failed */
-               *error_r = t_strdup_printf("write(%s) failed: %s",
-                       dict->path, o_stream_get_error(dict->output));
-               if (!dict->handshaked) {
-                       /* we're trying to send hello, don't try to reconnect */
-                       return -1;
-               }
-
-               /* Reconnect and try again. */
-               client_dict_disconnect(dict, *error_r);
-               if (client_dict_connect(dict, error_r) < 0)
-                       return -1;
-
-               if (o_stream_send_str(dict->output, query) < 0 ||
-                   o_stream_flush(dict->output) < 0) {
-                       *error_r = t_strdup_printf("write(%s) failed: %s",
-                               dict->path, o_stream_get_error(dict->output));
-                       client_dict_disconnect(dict, *error_r);
-                       return -1;
-               }
-       }
-       return 0;
+       struct client_dict_cmd *cmd;
+
+       cmd = i_new(struct client_dict_cmd, 1);
+       cmd->refcount = 1;
+       cmd->dict = dict;
+       cmd->query = i_strdup(query);
+       cmd->start_time = ioloop_timeval;
+       return cmd;
 }
 
-static int
-client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx,
-                                  const char **error_r)
+static void client_dict_cmd_ref(struct client_dict_cmd *cmd)
 {
-       struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
-       const char *query;
-
-       i_assert(ctx->error == NULL);
-
-       query = t_strdup_printf("%c%u\n", DICT_PROTOCOL_CMD_BEGIN, ctx->id);
-       if (client_dict_send_query(dict, query, error_r) < 0)
-               return -1;
-       ctx->connect_counter = dict->connect_counter;
-       return 0;
+       i_assert(cmd->refcount > 0);
+       cmd->refcount++;
 }
 
-static int
-client_dict_send_transaction_query(struct client_dict_transaction_context *ctx,
-                                  const char *query, const char **error_r)
+static bool client_dict_cmd_unref(struct client_dict_cmd *cmd)
 {
-       struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
-
-       i_assert(ctx->error == NULL);
+       i_assert(cmd->refcount > 0);
+       if (--cmd->refcount > 0)
+               return TRUE;
 
-       if (!ctx->sent_begin) {
-               if (client_dict_transaction_send_begin(ctx, error_r) < 0)
-                       return -1;
-               ctx->sent_begin = TRUE;
-       }
-
-       if (ctx->connect_counter != dict->connect_counter) {
-               *error_r = "Reconnected to dict-server - transaction lost";
-               return -1;
-       }
-
-       if (dict->output == NULL) {
-               /* not connected, this'll fail */
-               *error_r = "Disconnected from dict-server";
-               return -1;
-       }
+       i_free(cmd->query);
+       i_free(cmd);
+       return FALSE;
+}
 
-       if (o_stream_send_str(dict->output, query) < 0 ||
-           o_stream_flush(dict->output) < 0) {
-               /* Send failed. Our transactions have died, so don't even try
-                  to re-send the command */
-               *error_r = t_strdup_printf("write(%s) failed: %s",
-                       dict->path, o_stream_get_error(dict->output));
-               client_dict_disconnect(dict, *error_r);
-               return -1;
+static void dict_pre_api_callback(struct client_dict *dict)
+{
+       if (dict->prev_ioloop != NULL) {
+               /* Don't let callback see that we've created our
+                  internal ioloop in case it wants to add some ios
+                  or timeouts. */
+               current_ioloop = dict->prev_ioloop;
        }
-       return 0;
 }
 
-static void
-client_dict_try_send_transaction_query(struct client_dict_transaction_context *ctx,
-                                      const char *query)
+static void dict_post_api_callback(struct client_dict *dict)
 {
-       const char *error;
-
-       if (ctx->error != NULL)
-               return;
-       if (client_dict_send_transaction_query(ctx, query, &error) < 0)
-               ctx->error = i_strdup(error);
+       if (dict->prev_ioloop != NULL) {
+               current_ioloop = dict->ioloop;
+               /* stop client_dict_wait() */
+               io_loop_stop(dict->ioloop);
+       }
 }
 
-static struct client_dict_transaction_context *
-client_dict_transaction_find(struct client_dict *dict, unsigned int id)
+static bool
+dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *line)
 {
-       struct client_dict_transaction_context *ctx;
-
-       for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) {
-               if (ctx->id == id)
-                       return ctx;
-       }
-       return NULL;
+       cmd->unfinished = FALSE;
+       cmd->callback(cmd, line, NULL);
+       return !cmd->unfinished;
 }
 
 static void
-client_dict_finish_transaction(struct client_dict *dict, unsigned int id,
-                              const struct dict_commit_result *result)
+dict_cmd_callback_error(struct client_dict_cmd *cmd, const char *error)
 {
-       struct client_dict_transaction_context *ctx;
-
-       ctx = client_dict_transaction_find(dict, id);
-       if (ctx == NULL) {
-               i_error("dict-client: Unknown transaction id %u", id);
-               return;
-       }
-       if (!ctx->committed) {
-               /* transaction isn't committed yet, but we disconnected from
-                  dict. mark it as failed so commit will fail later. */
-               if (result->ret >= 0) {
-                       i_error("dict-client: Received transaction reply before it was committed");
-                       return;
-               }
-               if (ctx->error == NULL)
-                       ctx->error = i_strdup(result->error);
-               return;
-       }
+       cmd->unfinished = FALSE;
+       if (cmd->callback != NULL)
+               cmd->callback(cmd, NULL, error);
+       i_assert(!cmd->unfinished);
+}
 
-       /* the callback may call the dict code again, so remove this
-          transaction before calling it */
-       i_assert(dict->async_commits > 0);
-       if (--dict->async_commits == 0) {
-               if (dict->io != NULL)
-                       io_remove(&dict->io);
-       }
-       DLLIST_REMOVE(&dict->transactions, ctx);
+static void client_dict_input_timeout(struct client_dict *dict)
+{
+       int diff = timeval_diff_msecs(&ioloop_timeval, &dict->last_input);
 
-       if (ctx->callback != NULL)
-               ctx->callback(result, ctx->context);
-       i_free(ctx);
+       client_dict_disconnect(dict, t_strdup_printf(
+               "Timeout: No input from dict for %u.%03u secs",
+               diff/1000, diff%1000));
 }
 
-static ssize_t client_dict_read_timeout(struct client_dict *dict)
+static int
+client_dict_cmd_query_send(struct client_dict *dict, const char *query)
 {
-       time_t now, timeout;
-       unsigned int diff;
+       struct const_iovec iov[2];
        ssize_t ret;
 
-       now = time(NULL);
-       timeout = now + DICT_CLIENT_READ_TIMEOUT_SECS;
-
-       do {
-               alarm(timeout - now);
-               ret = i_stream_read(dict->input);
-               alarm(0);
-               if (ret != 0)
-                       break;
-
-               /* interrupted most likely because of timeout,
-                  but check anyway. */
-               now = time(NULL);
-       } while (now < timeout);
-
-       if (ret > 0) {
-               diff = time(NULL) - now;
-               if (diff >= DICT_CLIENT_READ_WARN_TIMEOUT_SECS) {
-                       i_warning("read(%s): dict lookup took %u seconds",
-                                 dict->path, diff);
-               }
-       }
-       return ret;
+       iov[0].iov_base = query;
+       iov[0].iov_len = strlen(query);
+       iov[1].iov_base = "\n";
+       iov[1].iov_len = 1;
+       ret = o_stream_sendv(dict->conn.conn.output, iov, 2);
+       if (ret < 0)
+               return -1;
+       i_assert((size_t)ret == iov[0].iov_len + 1);
+       return 0;
 }
 
-static int
-client_dict_read_one_line_real(struct client_dict *dict, char **line_r,
-                              const char **error_r)
+static bool
+client_dict_cmd_send(struct client_dict *dict, struct client_dict_cmd **_cmd,
+                    const char **error_r)
 {
-       unsigned int id;
-       char *line;
-       ssize_t ret;
+       struct client_dict_cmd *cmd = *_cmd;
+       const char *error = NULL;
+       bool retry = cmd->retry_errors;
+       int ret;
 
-       *line_r = NULL;
-       while ((line = i_stream_next_line(dict->input)) == NULL) {
-               ret = client_dict_read_timeout(dict);
-               switch (ret) {
-               case -1:
-                       *error_r = t_strdup_printf("read(%s) failed: %s",
-                               dict->path, i_stream_get_disconnect_reason(dict->input));
-                       return -1;
-               case -2:
-                       *error_r = t_strdup_printf(
-                               "read(%s) returned too much data", dict->path);
-                       return -1;
-               case 0:
-                       *error_r = t_strdup_printf(
-                               "read(%s) failed: Timeout after %u seconds",
-                               dict->path, DICT_CLIENT_READ_TIMEOUT_SECS);
-                       return -1;
-               default:
-                       i_assert(ret > 0);
-                       break;
+       *_cmd = NULL;
+
+       /* we're no longer idling. even with no_replies=TRUE we're going to
+          wait for COMMIT/ROLLBACK. */
+       if (dict->to_idle != NULL)
+               timeout_remove(&dict->to_idle);
+
+       if (client_dict_connect(dict, &error) < 0) {
+               retry = FALSE;
+               ret = -1;
+       } else {
+               ret = client_dict_cmd_query_send(dict, cmd->query);
+               if (ret < 0) {
+                       error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name,
+                                       o_stream_get_error(dict->conn.conn.output));
                }
        }
-       if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) {
-               struct dict_commit_result result;
-
-               memset(&result, 0, sizeof(result));
-               switch (line[1]) {
-               case DICT_PROTOCOL_REPLY_OK:
-                       result.ret = 1;
-                       break;
-               case DICT_PROTOCOL_REPLY_NOTFOUND:
-                       result.ret = 0;
-                       break;
-               case DICT_PROTOCOL_REPLY_FAIL: {
-                       const char *error = strchr(line+2, '\t');
-
-                       result.ret = -1;
-                       result.error = t_strdup_printf(
-                               "dict-server returned failure: %s",
-                               error != NULL ? t_str_tabunescape(error) : "");
-                       break;
-               }
-               default:
-                       *error_r = t_strdup_printf(
-                               "dict-client: Invalid async commit line: %s", line);
-                       return -1;
-               }
-               if (str_to_uint(t_strcut(line+2, '\t'), &id) < 0) {
-                       *error_r = t_strdup_printf("dict-client: Invalid ID");
-                       return -1;
+       if (ret < 0 && retry) {
+               /* Reconnect and try again. */
+               client_dict_disconnect(dict, error);
+               if (client_dict_connect(dict, &error) < 0)
+                       ;
+               else if (client_dict_cmd_query_send(dict, cmd->query) < 0) {
+                       error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name,
+                               o_stream_get_error(dict->conn.conn.output));
+               } else {
+                       ret = 0;
                }
-               client_dict_finish_transaction(dict, id, &result);
-               return 0;
        }
-       if (dict->iter_replies_skip > 0) {
-               /* called aborted the iteration before finishing it.
-                  skip over the iteration reply */
-               if (*line == DICT_PROTOCOL_REPLY_OK)
-                       return 0;
-               if (*line != '\0' && *line != DICT_PROTOCOL_REPLY_FAIL) {
-                       *error_r = t_strdup_printf(
-                               "dict-client: Invalid iteration reply line: %s", line);
-                       return -1;
+
+       if (cmd->no_replies) {
+               /* just send and forget */
+               client_dict_cmd_unref(cmd);
+               return TRUE;
+       } else if (ret < 0) {
+               i_assert(error != NULL);
+               dict_cmd_callback_error(cmd, error);
+               client_dict_cmd_unref(cmd);
+               if (error_r != NULL)
+                       *error_r = error;
+               return FALSE;
+       } else {
+               if (dict->to_requests == NULL) {
+                       dict->to_requests =
+                               timeout_add(DICT_CLIENT_REQUEST_TIMEOUT_MSECS,
+                                           client_dict_input_timeout, dict);
                }
-               dict->iter_replies_skip--;
-               return 0;
+               array_append(&dict->cmds, &cmd, 1);
+               return TRUE;
        }
-       *line_r = line;
-       return 1;
 }
 
-static int client_dict_read_one_line(struct client_dict *dict, char **line_r,
-                                    const char **error_r)
+static void
+client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx)
 {
-       int ret;
+       struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
+       struct client_dict_cmd *cmd;
+       const char *query, *error;
 
-       if ((ret = client_dict_read_one_line_real(dict, line_r, error_r)) < 0)
-               client_dict_disconnect(dict, *error_r);
-       return ret;
+       i_assert(ctx->error == NULL);
+
+       ctx->sent_begin = TRUE;
+
+       /* transactions commands don't have replies. only COMMIT has. */
+       query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_BEGIN, ctx->id);
+       cmd = client_dict_cmd_init(dict, query);
+       cmd->no_replies = TRUE;
+       cmd->retry_errors = TRUE;
+       if (!client_dict_cmd_send(dict, &cmd, &error))
+               ctx->error = i_strdup(error);
+}
+
+static void
+client_dict_send_transaction_query(struct client_dict_transaction_context *ctx,
+                                  const char *query)
+{
+       struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
+       struct client_dict_cmd *cmd;
+       const char *error;
+
+       if (ctx->error != NULL)
+               return;
+
+       if (!ctx->sent_begin)
+               client_dict_transaction_send_begin(ctx);
+
+       cmd = client_dict_cmd_init(dict, query);
+       cmd->no_replies = TRUE;
+       if (!client_dict_cmd_send(dict, &cmd, &error))
+               ctx->error = i_strdup(error);
 }
 
 static bool client_dict_is_finished(struct client_dict *dict)
 {
-       return dict->transactions == NULL && !dict->in_iteration &&
-               dict->async_commits == 0;
+       return dict->transactions == NULL && array_count(&dict->cmds) == 0;
 }
 
 static void client_dict_timeout(struct client_dict *dict)
@@ -372,107 +318,153 @@ static void client_dict_add_timeout(struct client_dict *dict)
        } else if (client_dict_is_finished(dict)) {
                dict->to_idle = timeout_add(dict->idle_msecs,
                                            client_dict_timeout, dict);
+               if (dict->to_requests != NULL)
+                       timeout_remove(&dict->to_requests);
        }
 }
 
-static int client_dict_read_line(struct client_dict *dict,
-                                char **line_r, const char **error_r)
+static int dict_conn_input_line(struct connection *_conn, const char *line)
 {
-       int ret;
+       struct dict_connection *conn = (struct dict_connection *)_conn;
+       struct client_dict *dict = conn->dict;
+       struct client_dict_cmd *const *cmds;
+       unsigned int count;
+       bool finished;
+       int diff;
 
-       while ((ret = client_dict_read_one_line(dict, line_r, error_r)) == 0)
-               ;
-       i_assert(ret < 0 || *line_r != NULL);
+       dict->last_input = ioloop_timeval;
+       if (dict->to_requests != NULL)
+               timeout_reset(dict->to_requests);
+
+       cmds = array_get(&conn->dict->cmds, &count);
+       if (count == 0) {
+               i_error("%s: Received reply without pending commands: %s",
+                       dict->conn.conn.name, line);
+               return -1;
+       }
+       i_assert(!cmds[0]->no_replies);
+
+       client_dict_cmd_ref(cmds[0]);
+       finished = dict_cmd_callback_line(cmds[0], line);
+       if (!client_dict_cmd_unref(cmds[0])) {
+               /* disconnected during command handling */
+               return -1;
+       }
+       if (!finished) {
+               /* more lines needed for this command */
+               return 1;
+       }
+       diff = timeval_diff_msecs(&ioloop_timeval, &cmds[0]->start_time);
+       if (diff >= DICT_CLIENT_REQUEST_WARN_TIMEOUT_MSECS) {
+               i_warning("read(%s): dict lookup took %u.%03u seconds: %s",
+                         dict->conn.conn.name, diff/1000, diff % 1000,
+                         cmds[0]->query);
+       }
+       client_dict_cmd_unref(cmds[0]);
+       array_delete(&dict->cmds, 0, 1);
 
        client_dict_add_timeout(dict);
-       return ret < 0 ? -1 : 0;
+       return 1;
 }
 
 static int client_dict_connect(struct client_dict *dict, const char **error_r)
 {
        const char *query;
 
+       if (dict->conn.conn.fd_in != -1)
+               return 0;
        if (dict->last_failed_connect == ioloop_time) {
                /* Try again later */
                *error_r = "Waiting until the next connect attempt";
                return -1;
        }
 
-       dict->fd = net_connect_unix(dict->path);
-       if (dict->fd == -1) {
+       if (connection_client_connect(&dict->conn.conn) < 0) {
                dict->last_failed_connect = ioloop_time;
                if (errno == EACCES) {
                        *error_r = eacces_error_get("net_connect_unix",
-                                                   dict->path);
+                                                   dict->conn.conn.name);
                } else {
                        *error_r = t_strdup_printf(
-                               "net_connect_unix(%s) failed: %m", dict->path);
+                               "net_connect_unix(%s) failed: %m", dict->conn.conn.name);
                }
                return -1;
        }
 
-       /* Dictionary lookups are blocking */
-       net_set_nonblock(dict->fd, FALSE);
-
-       dict->input = i_stream_create_fd(dict->fd, (size_t)-1);
-       dict->output = o_stream_create_fd(dict->fd, 4096);
-
        query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n",
                                DICT_PROTOCOL_CMD_HELLO,
                                DICT_CLIENT_PROTOCOL_MAJOR_VERSION,
                                DICT_CLIENT_PROTOCOL_MINOR_VERSION,
                                dict->value_type, dict->username, dict->uri);
-       if (client_dict_send_query(dict, query, error_r) < 0) {
-               dict->last_failed_connect = ioloop_time;
-               client_dict_disconnect(dict, *error_r);
-               return -1;
-       }
-
-       dict->handshaked = TRUE;
+       o_stream_nsend_str(dict->conn.conn.output, query);
+       client_dict_add_timeout(dict);
        return 0;
 }
 
+static void
+client_dict_abort_commands(struct client_dict *dict, const char *reason)
+{
+       ARRAY(struct client_dict_cmd *) cmds_copy;
+       struct client_dict_cmd *const *cmdp;
+
+       /* abort all commands */
+       t_array_init(&cmds_copy, array_count(&dict->cmds));
+       array_append_array(&cmds_copy, &dict->cmds);
+       array_clear(&dict->cmds);
+
+       array_foreach(&cmds_copy, cmdp) {
+               dict_cmd_callback_error(*cmdp, reason);
+               client_dict_cmd_unref(*cmdp);
+       }
+}
+
 static void client_dict_disconnect(struct client_dict *dict, const char *reason)
 {
        struct client_dict_transaction_context *ctx, *next;
-       struct dict_commit_result result = { -1, reason };
 
-       dict->connect_counter++;
-       dict->handshaked = FALSE;
-       dict->iter_replies_skip = 0;
+       client_dict_abort_commands(dict, reason);
 
-       /* abort all pending async commits */
+       /* all transactions that have sent BEGIN are no longer valid */
        for (ctx = dict->transactions; ctx != NULL; ctx = next) {
                next = ctx->next;
-               if (ctx->async)
-                       client_dict_finish_transaction(dict, ctx->id, &result);
+               if (ctx->sent_begin && ctx->error == NULL)
+                       ctx->error = i_strdup(reason);
        }
 
        if (dict->to_idle != NULL)
                timeout_remove(&dict->to_idle);
-       if (dict->io != NULL)
-               io_remove(&dict->io);
-       if (dict->input != NULL)
-               i_stream_destroy(&dict->input);
-       if (dict->output != NULL)
-               o_stream_destroy(&dict->output);
+       if (dict->to_requests != NULL)
+               timeout_remove(&dict->to_requests);
+       connection_disconnect(&dict->conn.conn);
+}
 
-       if (dict->fd != -1) {
-               if (close(dict->fd) < 0)
-                       i_error("close(%s) failed: %m", dict->path);
-               dict->fd = -1;
-       }
+static void dict_conn_destroy(struct connection *_conn)
+{
+       struct dict_connection *conn = (struct dict_connection *)_conn;
+
+       client_dict_disconnect(conn->dict, connection_disconnect_reason(_conn));
 }
 
+static const struct connection_settings dict_conn_set = {
+       .input_max_size = (size_t)-1,
+       .output_max_size = (size_t)-1,
+       .client = TRUE
+};
+
+static const struct connection_vfuncs dict_conn_vfuncs = {
+       .destroy = dict_conn_destroy,
+       .input_line = dict_conn_input_line
+};
+
 static int
 client_dict_init(struct dict *driver, const char *uri,
                 const struct dict_settings *set,
                 struct dict **dict_r, const char **error_r)
 {
+       struct ioloop *old_ioloop = current_ioloop;
        struct client_dict *dict;
-       const char *p, *dest_uri;
+       const char *p, *dest_uri, *path;
        unsigned int idle_msecs = DICT_CLIENT_DEFAULT_TIMEOUT_MSECS;
-       pool_t pool;
 
        /* uri = [idle_msecs=<n>:] [<path>] ":" <uri> */
        if (strncmp(uri, "idle_msecs=", 11) == 0) {
@@ -493,29 +485,36 @@ client_dict_init(struct dict *driver, const char *uri,
                return -1;
        }
 
-       pool = pool_alloconly_create("client dict", 1024);
-       dict = p_new(pool, struct client_dict, 1);
-       dict->pool = pool;
+       if (dict_connections == NULL) {
+               dict_connections = connection_list_init(&dict_conn_set,
+                                                       &dict_conn_vfuncs);
+       }
+
+       dict = i_new(struct client_dict, 1);
        dict->dict = *driver;
+       dict->conn.dict = dict;
        dict->value_type = set->value_type;
-       dict->username = p_strdup(pool, set->username);
+       dict->username = i_strdup(set->username);
        dict->idle_msecs = idle_msecs;
-
-       dict->fd = -1;
+       i_array_init(&dict->cmds, 32);
 
        if (uri[0] == ':') {
                /* default path */
-               dict->path = p_strconcat(pool, set->base_dir,
-                               "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL);
+               path = t_strconcat(set->base_dir,
+                       "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL);
        } else if (uri[0] == '/') {
                /* absolute path */
-               dict->path = p_strdup_until(pool, uri, dest_uri);
+               path = t_strdup_until(uri, dest_uri);
        } else {
                /* relative path to base_dir */
-               dict->path = p_strconcat(pool, set->base_dir, "/",
-                               p_strdup_until(pool, uri, dest_uri), NULL);
+               path = t_strconcat(set->base_dir, "/",
+                       t_strdup_until(uri, dest_uri), NULL);
        }
-       dict->uri = p_strdup(pool, dest_uri + 1);
+       connection_init_client_unix(dict_connections, &dict->conn.conn, path);
+       dict->uri = i_strdup(dest_uri + 1);
+
+       dict->ioloop = io_loop_create();
+       io_loop_set_current(old_ioloop);
        *dict_r = &dict->dict;
        return 0;
 }
@@ -523,71 +522,233 @@ client_dict_init(struct dict *driver, const char *uri,
 static void client_dict_deinit(struct dict *_dict)
 {
        struct client_dict *dict = (struct client_dict *)_dict;
+       struct ioloop *old_ioloop = current_ioloop;
+
+       client_dict_disconnect(dict, "Deinit");
+       connection_deinit(&dict->conn.conn);
 
-        client_dict_disconnect(dict, "Deinit");
        i_assert(dict->transactions == NULL);
-       pool_unref(&dict->pool);
+       i_assert(array_count(&dict->cmds) == 0);
+
+       io_loop_set_current(dict->ioloop);
+       io_loop_destroy(&dict->ioloop);
+       io_loop_set_current(old_ioloop);
+
+       array_free(&dict->cmds);
+       i_free(dict->username);
+       i_free(dict->uri);
+       i_free(dict);
+
+       if (dict_connections->connections == NULL)
+               connection_list_deinit(&dict_connections);
 }
 
 static void client_dict_wait(struct dict *_dict)
 {
        struct client_dict *dict = (struct client_dict *)_dict;
-       const char *error;
-       char *line;
-       int ret;
 
-       while (dict->async_commits > 0) {
-               if ((ret = client_dict_read_one_line(dict, &line, &error)) < 0) {
-                       i_error("%s", error);
-                       break;
-               }
+       if (array_count(&dict->cmds) == 0)
+               return;
 
-               if (ret > 0) {
-                       const char *reason = t_strdup_printf(
-                               "dict-client: Unexpected reply waiting waiting for async commits: %s", line);
-                       i_error("%s", reason);
-                       client_dict_disconnect(dict, reason);
-                       break;
-               }
+       dict->prev_ioloop = current_ioloop;
+       io_loop_set_current(dict->ioloop);
+
+       if (dict->to_idle != NULL)
+               dict->to_idle = io_loop_move_timeout(&dict->to_idle);
+       if (dict->to_requests != NULL)
+               dict->to_requests = io_loop_move_timeout(&dict->to_requests);
+       connection_switch_ioloop(&dict->conn.conn);
+
+       while (array_count(&dict->cmds) > 0)
+               io_loop_run(dict->ioloop);
+
+       io_loop_set_current(dict->prev_ioloop);
+       dict->prev_ioloop = NULL;
+
+       if (dict->to_idle != NULL)
+               dict->to_idle = io_loop_move_timeout(&dict->to_idle);
+       if (dict->to_requests != NULL)
+               dict->to_requests = io_loop_move_timeout(&dict->to_requests);
+       connection_switch_ioloop(&dict->conn.conn);
+}
+
+static void
+client_dict_lookup_async_callback(struct client_dict_cmd *cmd, const char *line,
+                                 const char *error)
+{
+       struct client_dict *dict = cmd->dict;
+       struct dict_lookup_result result;
+
+       memset(&result, 0, sizeof(result));
+       if (error != NULL) {
+               result.ret = -1;
+               result.error = error;
+       } else switch (*line) {
+       case DICT_PROTOCOL_REPLY_OK:
+               result.value = t_str_tabunescape(line + 1);
+               result.ret = 1;
+               break;
+       case DICT_PROTOCOL_REPLY_NOTFOUND:
+               result.ret = 0;
+               break;
+       case DICT_PROTOCOL_REPLY_FAIL:
+               result.error = line[1] == '\0' ? "dict-server returned failure" :
+                       t_strdup_printf("dict-server returned failure: %s",
+                       t_str_tabunescape(line+1));
+               result.ret = -1;
+               break;
+       default:
+               result.error = t_strdup_printf(
+                       "dict-client: Invalid lookup '%s' reply: %s",
+                       cmd->query, line);
+               client_dict_disconnect(dict, result.error);
+               result.ret = -1;
+               break;
        }
-       /* we should have aborted all the async calls if we disconnected */
-       i_assert(dict->async_commits == 0);
+       dict_pre_api_callback(dict);
+       cmd->api_callback.lookup(&result, cmd->api_callback.context);
+       dict_post_api_callback(dict);
 }
 
-static int client_dict_lookup(struct dict *_dict, pool_t pool, const char *key,
-                             const char **value_r, const char **error_r)
+static void
+client_dict_lookup_async(struct dict *_dict, const char *key,
+                        dict_lookup_callback_t *callback, void *context)
 {
        struct client_dict *dict = (struct client_dict *)_dict;
+       struct client_dict_cmd *cmd;
        const char *query;
-       char *line;
 
-       query = t_strdup_printf("%c%s\n", DICT_PROTOCOL_CMD_LOOKUP,
+       query = t_strdup_printf("%c%s", DICT_PROTOCOL_CMD_LOOKUP,
                                str_tabescape(key));
-       if (client_dict_send_query(dict, query, error_r) < 0)
-               return -1;
+       cmd = client_dict_cmd_init(dict, query);
+       cmd->callback = client_dict_lookup_async_callback;
+       cmd->api_callback.lookup = callback;
+       cmd->api_callback.context = context;
+       cmd->retry_errors = TRUE;
 
-       /* read reply */
-       if (client_dict_read_line(dict, &line, error_r) < 0)
-               return -1;
+       client_dict_cmd_send(dict, &cmd, NULL);
+}
 
-       switch (*line) {
-       case DICT_PROTOCOL_REPLY_OK:
-               *value_r = p_strdup(pool, t_str_tabunescape(line + 1));
-               return 1;
-       case DICT_PROTOCOL_REPLY_NOTFOUND:
+static void client_dict_lookup_callback(const struct dict_lookup_result *result,
+                                       void *context)
+{
+       struct dict_lookup_result *result_copy = context;
+
+       *result_copy = *result;
+}
+
+static int client_dict_lookup(struct dict *_dict, pool_t pool, const char *key,
+                             const char **value_r, const char **error_r)
+{
+       struct dict_lookup_result result;
+
+       memset(&result, 0, sizeof(result));
+       result.ret = -2;
+
+       client_dict_lookup_async(_dict, key, client_dict_lookup_callback, &result);
+       if (result.ret == -2)
+               client_dict_wait(_dict);
+
+       switch (result.ret) {
+       case -1:
+               *error_r = result.error;
+               return -1;
+       case 0:
                *value_r = NULL;
                return 0;
+       case 1:
+               *value_r = p_strdup(pool, result.value);
+               return 1;
+       }
+       i_unreached();
+}
+
+static void client_dict_iterate_free(struct client_dict_iterate_context *ctx)
+{
+       if (!ctx->deinit || !ctx->finished)
+               return;
+       i_free(ctx->error);
+       i_free(ctx);
+}
+
+static void
+client_dict_iter_api_callback(struct client_dict_iterate_context *ctx,
+                             struct client_dict *dict)
+{
+       if (ctx->deinit) {
+               /* iterator was already deinitialized */
+               return;
+       }
+       if (ctx->ctx.async_callback != NULL) {
+               dict_pre_api_callback(dict);
+               ctx->ctx.async_callback(ctx->ctx.async_context);
+               dict_post_api_callback(dict);
+       } else {
+               /* synchronous lookup */
+               io_loop_stop(dict->ioloop);
+       }
+}
+
+static void
+client_dict_iter_async_callback(struct client_dict_cmd *cmd, const char *line,
+                               const char *error)
+{
+       struct client_dict_iterate_context *ctx = cmd->iter;
+       struct client_dict *dict = cmd->dict;
+       struct client_dict_iter_result *result;
+       const char *key = NULL, *value = NULL;
+
+       if (error != NULL) {
+               /* failed */
+       } else switch (*line) {
+       case '\0':
+               /* end of iteration */
+               ctx->finished = TRUE;
+               client_dict_iter_api_callback(ctx, dict);
+               client_dict_iterate_free(ctx);
+               return;
+       case DICT_PROTOCOL_REPLY_OK:
+               /* key \t value */
+               key = line+1;
+               value = strchr(key, '\t');
+               break;
        case DICT_PROTOCOL_REPLY_FAIL:
-               *error_r = line[1] == '\0' ? "dict-server returned failure" :
-                       t_strdup_printf("dict-server returned failure: %s",
-                       t_str_tabunescape(line+1));
-               return -1;
+               error = t_strdup_printf("dict-server returned failure: %s", line+1);
+               break;
        default:
-               *error_r = t_strdup_printf(
-                       "dict-client: Invalid lookup '%s' reply: %s", key, line);
-               client_dict_disconnect(dict, *error_r);
-               return -1;
+               break;
+       }
+       if (value == NULL && error == NULL) {
+               /* broken protocol */
+               error = t_strdup_printf("dict client (%s) sent broken iterate reply: %s",
+                                       dict->conn.conn.name, line);
+               client_dict_disconnect(dict, error);
+       }
+
+       if (error != NULL) {
+               if (ctx->error == NULL)
+                       ctx->error = i_strdup(error);
+               ctx->finished = TRUE;
+               if (dict->prev_ioloop != NULL) {
+                       /* stop client_dict_wait() */
+                       io_loop_stop(dict->ioloop);
+               }
+               client_dict_iterate_free(ctx);
+               return;
+       }
+       cmd->unfinished = TRUE;
+
+       if (ctx->deinit) {
+               /* iterator was already deinitialized */
+               return;
        }
+
+       key = t_strdup_until(key, value++);
+       result = array_append_space(&ctx->results);
+       result->key = p_strdup(ctx->results_pool, t_str_tabunescape(key));
+       result->value = p_strdup(ctx->results_pool, t_str_tabunescape(value));
+
+       client_dict_iter_api_callback(ctx, dict);
 }
 
 static struct dict_iterate_context *
@@ -596,26 +757,28 @@ client_dict_iterate_init(struct dict *_dict, const char *const *paths,
 {
        struct client_dict *dict = (struct client_dict *)_dict;
         struct client_dict_iterate_context *ctx;
+       struct client_dict_cmd *cmd;
        string_t *query = t_str_new(256);
        unsigned int i;
-       const char *error;
-
-       if (dict->in_iteration)
-               i_panic("dict-client: Only one iteration supported");
-       dict->in_iteration = TRUE;
 
        ctx = i_new(struct client_dict_iterate_context, 1);
        ctx->ctx.dict = _dict;
-       ctx->pool = pool_alloconly_create("client dict iteration", 512);
+       ctx->results_pool = pool_alloconly_create("client dict iteration", 512);
+       ctx->async = (flags & DICT_ITERATE_FLAG_ASYNC) != 0;
+       i_array_init(&ctx->results, 64);
 
        str_printfa(query, "%c%d", DICT_PROTOCOL_CMD_ITERATE, flags);
        for (i = 0; paths[i] != NULL; i++) {
                str_append_c(query, '\t');
                        str_append(query, str_tabescape(paths[i]));
        }
-       str_append_c(query, '\n');
-       if (client_dict_send_query(dict, str_c(query), &error) < 0)
-               ctx->error = i_strdup(error);
+
+       cmd = client_dict_cmd_init(dict, str_c(query));
+       cmd->iter = ctx;
+       cmd->callback = client_dict_iter_async_callback;
+       cmd->retry_errors = TRUE;
+
+       client_dict_cmd_send(dict, &cmd, NULL);
        return &ctx->ctx;
 }
 
@@ -624,51 +787,32 @@ static bool client_dict_iterate(struct dict_iterate_context *_ctx,
 {
        struct client_dict_iterate_context *ctx =
                (struct client_dict_iterate_context *)_ctx;
-       struct client_dict *dict = (struct client_dict *)_ctx->dict;
-       char *line, *key, *value;
-       const char *error;
-
-       if (ctx->error != NULL)
-               return FALSE;
+       const struct client_dict_iter_result *results;
+       unsigned int count;
 
-       /* read next reply */
-       if (client_dict_read_line(dict, &line, &error) < 0) {
-               ctx->error = i_strdup(error);
+       if (ctx->error != NULL) {
+               ctx->ctx.has_more = FALSE;
                return FALSE;
        }
 
-       if (*line == '\0') {
-               /* end of iteration */
-               ctx->finished = TRUE;
-               return FALSE;
+       results = array_get(&ctx->results, &count);
+       if (ctx->result_idx < count) {
+               *key_r = results[ctx->result_idx].key;
+               *value_r = results[ctx->result_idx].value;
+               ctx->ctx.has_more = TRUE;
+               ctx->result_idx++;
+               return TRUE;
        }
+       ctx->ctx.has_more = !ctx->finished;
+       ctx->result_idx = 0;
+       array_clear(&ctx->results);
+       p_clear(ctx->results_pool);
 
-       /* line contains key \t value */
-       p_clear(ctx->pool);
-
-       switch (*line) {
-       case DICT_PROTOCOL_REPLY_OK:
-               key = line+1;
-               value = strchr(key, '\t');
-               break;
-       case DICT_PROTOCOL_REPLY_FAIL:
-               ctx->error = i_strdup_printf("dict-server returned failure: %s", line+1);
-               return FALSE;
-       default:
-               key = NULL;
-               value = NULL;
-               break;
+       if (!ctx->async && ctx->ctx.has_more) {
+               client_dict_wait(_ctx->dict);
+               return client_dict_iterate(_ctx, key_r, value_r);
        }
-       if (value == NULL) {
-               /* broken protocol */
-               ctx->error = i_strdup_printf("dict client (%s) sent broken iterate reply: %s", dict->path, line);
-               return FALSE;
-       }
-       *value++ = '\0';
-
-       *key_r = p_strdup(ctx->pool, t_str_tabunescape(key));
-       *value_r = p_strdup(ctx->pool, t_str_tabunescape(value));
-       return TRUE;
+       return FALSE;
 }
 
 static int client_dict_iterate_deinit(struct dict_iterate_context *_ctx,
@@ -679,14 +823,12 @@ static int client_dict_iterate_deinit(struct dict_iterate_context *_ctx,
                (struct client_dict_iterate_context *)_ctx;
        int ret = ctx->error != NULL ? -1 : 0;
 
-       if (!ctx->finished)
-               dict->iter_replies_skip++;
+       ctx->deinit = TRUE;
 
        *error_r = t_strdup(ctx->error);
-       pool_unref(&ctx->pool);
-       i_free(ctx->error);
-       i_free(ctx);
-       dict->in_iteration = FALSE;
+       array_free(&ctx->results);
+       pool_unref(&ctx->results_pool);
+       client_dict_iterate_free(ctx);
 
        client_dict_add_timeout(dict);
        return ret;
@@ -706,27 +848,42 @@ client_dict_transaction_init(struct dict *_dict)
        return &ctx->ctx;
 }
 
-static void dict_async_input(struct client_dict *dict)
+static void
+client_dict_transaction_commit_callback(struct client_dict_cmd *cmd,
+                                       const char *line, const char *error)
 {
-       const char *error;
-       char *line;
-       int ret;
-
-       i_assert(!dict->in_iteration);
-
-       do {
-               ret = client_dict_read_one_line(dict, &line, &error);
-       } while (ret == 0 && i_stream_get_data_size(dict->input) > 0);
+       struct client_dict *dict = cmd->dict;
+       struct dict_commit_result result = {
+               .ret = -1, .error = NULL
+       };
+
+       if (error != NULL) {
+               /* failed */
+               result.error = error;
+       } else switch (*line) {
+       case DICT_PROTOCOL_REPLY_OK:
+               result.ret = 1;
+               break;
+       case DICT_PROTOCOL_REPLY_NOTFOUND:
+               result.ret = 0;
+               break;
+       case DICT_PROTOCOL_REPLY_FAIL: {
+               const char *error = strchr(line+1, '\t');
 
-       if (ret < 0) {
-               i_error("%s", error);
-               io_remove(&dict->io);
-       } else if (ret > 0) {
-               const char *reason = t_strdup_printf(
-                       "dict-client: Unexpected reply waiting waiting for async commits: %s", line);
-               i_error("%s", reason);
-               client_dict_disconnect(dict, reason);
+               result.error = t_strdup_printf("dict-server returned failure: %s",
+                       error != NULL ? t_str_tabunescape(error) : "");
+               break;
        }
+       default:
+               result.ret = -1;
+               result.error = t_strdup_printf(
+                       "dict-client: Invalid commit reply: %s", line);
+               client_dict_disconnect(dict, result.error);
+               break;
+       }
+       dict_pre_api_callback(dict);
+       cmd->api_callback.commit(&result, cmd->api_callback.context);
+       dict_post_api_callback(dict);
 }
 
 static void
@@ -738,74 +895,35 @@ client_dict_transaction_commit(struct dict_transaction_context *_ctx,
        struct client_dict_transaction_context *ctx =
                (struct client_dict_transaction_context *)_ctx;
        struct client_dict *dict = (struct client_dict *)_ctx->dict;
-       unsigned int id;
-       struct dict_commit_result result;
+       struct client_dict_cmd *cmd;
+       const char *query;
 
-       memset(&result, 0, sizeof(result));
-       result.ret = ctx->error != NULL ? -1 : 1;
-       result.error = t_strdup(ctx->error);
+       DLLIST_REMOVE(&dict->transactions, ctx);
 
-       ctx->committed = TRUE;
        if (ctx->sent_begin && ctx->error == NULL) {
-               const char *query;
-               char *line;
-
-               query = t_strdup_printf("%c%u\n", !async ?
-                                       DICT_PROTOCOL_CMD_COMMIT :
-                                       DICT_PROTOCOL_CMD_COMMIT_ASYNC,
-                                       ctx->id);
-               if (client_dict_send_transaction_query(ctx, query, &result.error) < 0)
-                       result.ret = -1;
-               else if (async) {
-                       ctx->callback = callback;
-                       ctx->context = context;
-                       ctx->async = TRUE;
-                       if (dict->async_commits++ == 0) {
-                               dict->io = io_add(dict->fd, IO_READ,
-                                                 dict_async_input, dict);
-                       }
-                       return;
-               } else {
-                       /* sync commit, read reply */
-                       if (client_dict_read_line(dict, &line, &result.error) < 0) {
-                               result.ret = -1;
-                       } else switch (*line) {
-                       case DICT_PROTOCOL_REPLY_OK:
-                               result.ret = 1;
-                               break;
-                       case DICT_PROTOCOL_REPLY_NOTFOUND:
-                               result.ret = 0;
-                               break;
-                       case DICT_PROTOCOL_REPLY_FAIL: {
-                               const char *error = strchr(line+1, '\t');
-
-                               result.ret = -1;
-                               result.error = t_strdup_printf(
-                                       "dict-server returned failure: %s",
-                                       error != NULL ? t_str_tabunescape(error) : "");
-                               break;
-                       }
-                       default:
-                               result.ret = -1;
-                               result.error = t_strdup_printf(
-                                       "dict-client: Invalid commit reply: %s", line);
-                               client_dict_disconnect(dict, result.error);
-                               line = NULL;
-                               break;
-                       }
-                       if (line != NULL &&
-                           (str_to_uint(t_strcut(line+1, '\t'), &id) < 0 || ctx->id != id)) {
-                               result.ret = -1;
-                               result.error = t_strdup_printf(
-                                       "dict-client: Invalid commit reply, "
-                                       "expected id=%u: %s", ctx->id, line);
-                               client_dict_disconnect(dict, result.error);
-                       }
+               query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_COMMIT, ctx->id);
+               cmd = client_dict_cmd_init(dict, query);
+               cmd->callback = client_dict_transaction_commit_callback;
+               cmd->api_callback.commit = callback;
+               cmd->api_callback.context = context;
+               if (client_dict_cmd_send(dict, &cmd, NULL)) {
+                       if (!async)
+                               client_dict_wait(_ctx->dict);
                }
+       } else if (ctx->error != NULL) {
+               /* already failed */
+               struct dict_commit_result result = {
+                       .ret = -1, .error = ctx->error
+               };
+               callback(&result, context);
+       } else {
+               /* nothing changed */
+               struct dict_commit_result result = {
+                       .ret = 1, .error = NULL
+               };
+               callback(&result, context);
        }
-       DLLIST_REMOVE(&dict->transactions, ctx);
 
-       callback(&result, context);
        i_free(ctx->error);
        i_free(ctx);
 
@@ -822,9 +940,9 @@ client_dict_transaction_rollback(struct dict_transaction_context *_ctx)
        if (ctx->sent_begin) {
                const char *query;
 
-               query = t_strdup_printf("%c%u\n", DICT_PROTOCOL_CMD_ROLLBACK,
+               query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_ROLLBACK,
                                        ctx->id);
-               client_dict_try_send_transaction_query(ctx, query);
+               client_dict_send_transaction_query(ctx, query);
        }
 
        DLLIST_REMOVE(&dict->transactions, ctx);
@@ -840,11 +958,11 @@ static void client_dict_set(struct dict_transaction_context *_ctx,
                (struct client_dict_transaction_context *)_ctx;
        const char *query;
 
-       query = t_strdup_printf("%c%u\t%s\t%s\n",
+       query = t_strdup_printf("%c%u\t%s\t%s",
                                DICT_PROTOCOL_CMD_SET, ctx->id,
                                str_tabescape(key),
                                str_tabescape(value));
-       client_dict_try_send_transaction_query(ctx, query);
+       client_dict_send_transaction_query(ctx, query);
 }
 
 static void client_dict_unset(struct dict_transaction_context *_ctx,
@@ -854,10 +972,10 @@ static void client_dict_unset(struct dict_transaction_context *_ctx,
                (struct client_dict_transaction_context *)_ctx;
        const char *query;
 
-       query = t_strdup_printf("%c%u\t%s\n",
+       query = t_strdup_printf("%c%u\t%s",
                                DICT_PROTOCOL_CMD_UNSET, ctx->id,
                                str_tabescape(key));
-       client_dict_try_send_transaction_query(ctx, query);
+       client_dict_send_transaction_query(ctx, query);
 }
 
 static void client_dict_atomic_inc(struct dict_transaction_context *_ctx,
@@ -867,10 +985,10 @@ static void client_dict_atomic_inc(struct dict_transaction_context *_ctx,
                (struct client_dict_transaction_context *)_ctx;
        const char *query;
 
-       query = t_strdup_printf("%c%u\t%s\t%lld\n",
+       query = t_strdup_printf("%c%u\t%s\t%lld",
                                DICT_PROTOCOL_CMD_ATOMIC_INC,
                                ctx->id, str_tabescape(key), diff);
-       client_dict_try_send_transaction_query(ctx, query);
+       client_dict_send_transaction_query(ctx, query);
 }
 
 struct dict dict_driver_client = {
@@ -890,6 +1008,6 @@ struct dict dict_driver_client = {
                client_dict_set,
                client_dict_unset,
                client_dict_atomic_inc,
-               NULL
+               client_dict_lookup_async
        }
 };