]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
dict: Use the new async APIs for everything.
authorTimo Sirainen <tss@iki.fi>
Wed, 2 Sep 2015 14:36:47 +0000 (17:36 +0300)
committerTimo Sirainen <tss@iki.fi>
Wed, 2 Sep 2015 14:36:47 +0000 (17:36 +0300)
If the dict backend supports async operations, this means that dict service
can now be configured with client_count>1.

src/dict/dict-commands.c
src/dict/dict-commands.h
src/dict/dict-connection.c
src/dict/dict-connection.h

index 23e8193a61b968bbd777e4f03e13c17bb4ec5ebe..2fb43fc98eb3c4fd637ee69b7bf25df60ae91b8b 100644 (file)
 
 #define DICT_OUTPUT_OPTIMAL_SIZE 1024
 
-struct dict_client_cmd {
-       int cmd;
-       int (*func)(struct dict_connection *conn, const char *line);
+struct dict_cmd_func {
+       enum dict_protocol_cmd cmd;
+       int (*func)(struct dict_connection_cmd *cmd, const char *line);
 };
 
-static int cmd_lookup(struct dict_connection *conn, const char *line)
+struct dict_connection_cmd {
+       const struct dict_cmd_func *cmd;
+       struct dict_connection *conn;
+       char *reply;
+
+       struct dict_iterate_context *iter;
+       enum dict_iterate_flags iter_flags;
+
+       struct dict_connection_transaction *trans;
+};
+
+static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd);
+
+static void dict_connection_cmd_free(struct dict_connection_cmd *cmd)
 {
-       const char *reply;
-       const char *value;
-       int ret;
+       if (cmd->iter != NULL)
+               (void)dict_iterate_deinit(&cmd->iter);
+       array_delete(&cmd->conn->cmds, 0, 1);
+       i_free(cmd->reply);
 
-       if (conn->iter_ctx != NULL) {
-               i_error("dict client: LOOKUP: Can't lookup while iterating");
-               return -1;
+       dict_connection_continue_input(cmd->conn);
+       i_free(cmd);
+}
+
+static void dict_connection_cmd_remove(struct dict_connection_cmd *cmd)
+{
+       struct dict_connection_cmd *const *cmds;
+       unsigned int i, count;
+
+       cmds = array_get(&cmd->conn->cmds, &count);
+       for (i = 0; i < count; i++) {
+               if (cmds[i] == cmd) {
+                       dict_connection_cmd_free(cmd);
+                       return;
+               }
        }
+       i_unreached();
+}
 
-       /* <key> */
-       ret = dict_lookup(conn->dict, pool_datastack_create(), line, &value);
-       if (ret > 0) {
-               reply = t_strdup_printf("%c%s\n",
-                       DICT_PROTOCOL_REPLY_OK, str_tabescape(value));
-               o_stream_nsend_str(conn->output, reply);
+static void dict_connection_cmds_flush(struct dict_connection *conn)
+{
+       struct dict_connection_cmd *cmd, *const *first_cmdp;
+
+       while (array_count(&conn->cmds) > 0) {
+               first_cmdp = array_idx(&conn->cmds, 0);
+               cmd = *first_cmdp;
+
+               if (cmd->reply == NULL) {
+                       /* command not finished yet */
+                       break;
+               }
+
+               o_stream_nsend_str(conn->output, cmd->reply);
+               dict_connection_cmd_remove(cmd);
+       }
+}
+
+void dict_connection_cmds_free(struct dict_connection *conn)
+{
+       struct dict_connection_cmd *const *first_cmdp;
+
+       while (array_count(&conn->cmds) > 0) {
+               first_cmdp = array_idx(&conn->cmds, 0);
+               dict_connection_cmd_remove(*first_cmdp);
+       }
+}
+
+static void
+cmd_lookup_callback(const struct dict_lookup_result *result, void *context)
+{
+       struct dict_connection_cmd *cmd = context;
+
+       if (result->ret > 0) {
+               cmd->reply = i_strdup_printf("%c%s\n",
+                       DICT_PROTOCOL_REPLY_OK, str_tabescape(result->value));
+       } else if (result->ret == 0) {
+               cmd->reply = i_strdup_printf("%c\n", DICT_PROTOCOL_REPLY_NOTFOUND);
        } else {
-               reply = t_strdup_printf("%c\n", ret == 0 ?
-                                       DICT_PROTOCOL_REPLY_NOTFOUND :
-                                       DICT_PROTOCOL_REPLY_FAIL);
-               o_stream_nsend_str(conn->output, reply);
+               i_error("%s", result->error);
+               cmd->reply = i_strdup_printf("%c\n", DICT_PROTOCOL_REPLY_FAIL);
        }
-       return 0;
+       dict_connection_cmds_flush(cmd->conn);
 }
 
-static int cmd_iterate_flush(struct dict_connection *conn)
+static int cmd_lookup(struct dict_connection_cmd *cmd, const char *line)
+{
+       /* <key> */
+       dict_lookup_async(cmd->conn->dict, line, cmd_lookup_callback, cmd);
+       return 1;
+}
+
+static int cmd_iterate_flush(struct dict_connection_cmd *cmd)
 {
        string_t *str;
        const char *key, *value;
 
        str = t_str_new(256);
-       o_stream_cork(conn->output);
-       while (dict_iterate(conn->iter_ctx, &key, &value)) {
+       o_stream_cork(cmd->conn->output);
+       while (dict_iterate(cmd->iter, &key, &value)) {
                str_truncate(str, 0);
                str_append_c(str, DICT_PROTOCOL_REPLY_OK);
                str_append_tabescaped(str, key);
                str_append_c(str, '\t');
-               if ((conn->iter_flags & DICT_ITERATE_FLAG_NO_VALUE) == 0)
+               if ((cmd->iter_flags & DICT_ITERATE_FLAG_NO_VALUE) == 0)
                        str_append_tabescaped(str, value);
                str_append_c(str, '\n');
-               o_stream_nsend(conn->output, str_data(str), str_len(str));
+               o_stream_nsend(cmd->conn->output, str_data(str), str_len(str));
 
-               if (o_stream_get_buffer_used_size(conn->output) >
+               if (o_stream_get_buffer_used_size(cmd->conn->output) >
                    DICT_OUTPUT_OPTIMAL_SIZE) {
-                       if (o_stream_flush(conn->output) <= 0) {
-                               /* continue later */
-                               o_stream_uncork(conn->output);
+                       if (o_stream_flush(cmd->conn->output) <= 0) {
+                               /* continue later when there's more space
+                                  in output buffer */
+                               o_stream_uncork(cmd->conn->output);
+                               o_stream_set_flush_pending(cmd->conn->output, TRUE);
                                return 0;
                        }
                        /* flushed everything, continue */
                }
        }
-
-       /* finished iterating */
-       o_stream_unset_flush_callback(conn->output);
+       if (dict_iterate_has_more(cmd->iter)) {
+               /* wait for the next iteration callback */
+               return 0;
+       }
 
        str_truncate(str, 0);
-       if (dict_iterate_deinit(&conn->iter_ctx) < 0)
+       if (dict_iterate_deinit(&cmd->iter) < 0)
                str_append_c(str, DICT_PROTOCOL_REPLY_FAIL);
        str_append_c(str, '\n');
-       o_stream_nsend(conn->output, str_data(str), str_len(str));
-       o_stream_uncork(conn->output);
+       o_stream_uncork(cmd->conn->output);
+
+       cmd->reply = i_strdup(str_c(str));
+       dict_connection_cmds_flush(cmd->conn);
        return 1;
 }
 
-static int cmd_iterate(struct dict_connection *conn, const char *line)
+static void cmd_iterate_callback(void *context)
+{
+       struct dict_connection_cmd *cmd = context;
+
+       dict_connection_cmd_output_more(cmd);
+}
+
+static int cmd_iterate(struct dict_connection_cmd *cmd, const char *line)
 {
        const char *const *args;
        unsigned int flags;
 
-       if (conn->iter_ctx != NULL) {
-               i_error("dict client: ITERATE: Already iterating");
-               return -1;
-       }
-
        args = t_strsplit_tabescaped(line);
        if (str_array_length(args) < 2 ||
            str_to_uint(args[0], &flags) < 0) {
@@ -103,12 +175,12 @@ static int cmd_iterate(struct dict_connection *conn, const char *line)
        }
 
        /* <flags> <path> */
-       conn->iter_ctx = dict_iterate_init_multiple(conn->dict, args+1, flags);
-       conn->iter_flags = flags;
-
-       o_stream_set_flush_callback(conn->output, cmd_iterate_flush, conn);
-       (void)cmd_iterate_flush(conn);
-       return 0;
+       flags |= DICT_ITERATE_FLAG_ASYNC;
+       cmd->iter = dict_iterate_init_multiple(cmd->conn->dict, args+1, flags);
+       cmd->iter_flags = flags;
+       dict_iterate_set_async_callback(cmd->iter, cmd_iterate_callback, cmd);
+       dict_connection_cmd_output_more(cmd);
+       return 1;
 }
 
 static struct dict_connection_transaction *
@@ -134,6 +206,8 @@ dict_connection_transaction_array_remove(struct dict_connection *conn,
        const struct dict_connection_transaction *transactions;
        unsigned int i, count;
 
+       i_assert(trans->ctx == NULL);
+
        transactions = array_get(&conn->transactions, &count);
        for (i = 0; i < count; i++) {
                if (&transactions[i] == trans) {
@@ -143,7 +217,7 @@ dict_connection_transaction_array_remove(struct dict_connection *conn,
        }
 }
 
-static int cmd_begin(struct dict_connection *conn, const char *line)
+static int cmd_begin(struct dict_connection_cmd *cmd, const char *line)
 {
        struct dict_connection_transaction *trans;
        unsigned int id;
@@ -152,19 +226,19 @@ static int cmd_begin(struct dict_connection *conn, const char *line)
                i_error("dict client: Invalid transaction ID %s", line);
                return -1;
        }
-       if (dict_connection_transaction_lookup(conn, id) != NULL) {
+       if (dict_connection_transaction_lookup(cmd->conn, id) != NULL) {
                i_error("dict client: Transaction ID %u already exists", id);
                return -1;
        }
 
-       if (!array_is_created(&conn->transactions))
-               i_array_init(&conn->transactions, 4);
+       if (!array_is_created(&cmd->conn->transactions))
+               i_array_init(&cmd->conn->transactions, 4);
 
        /* <id> */
-       trans = array_append_space(&conn->transactions);
+       trans = array_append_space(&cmd->conn->transactions);
        trans->id = id;
-       trans->conn = conn;
-       trans->ctx = dict_transaction_begin(conn->dict);
+       trans->conn = cmd->conn;
+       trans->ctx = dict_transaction_begin(cmd->conn->dict);
        return 0;
 }
 
@@ -187,21 +261,11 @@ dict_connection_transaction_lookup_parse(struct dict_connection *conn,
        return 0;
 }
 
-static int cmd_commit(struct dict_connection *conn, const char *line)
+static void
+cmd_commit_finish(struct dict_connection_cmd *cmd, int ret, bool async)
 {
-       struct dict_connection_transaction *trans;
        char chr;
-       int ret;
-
-       if (conn->iter_ctx != NULL) {
-               i_error("dict client: COMMIT: Can't commit while iterating");
-               return -1;
-       }
-
-       if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
-               return -1;
 
-       ret = dict_transaction_commit(&trans->ctx);
        switch (ret) {
        case 1:
                chr = DICT_PROTOCOL_REPLY_OK;
@@ -213,66 +277,63 @@ static int cmd_commit(struct dict_connection *conn, const char *line)
                chr = DICT_PROTOCOL_REPLY_FAIL;
                break;
        }
-       o_stream_nsend_str(conn->output, t_strdup_printf("%c\n", chr));
-       dict_connection_transaction_array_remove(conn, trans);
-       return 0;
+       if (async) {
+               cmd->reply = i_strdup_printf("%c%c%u\n",
+                       DICT_PROTOCOL_REPLY_ASYNC_COMMIT, chr, cmd->trans->id);
+       } else {
+               cmd->reply = i_strdup_printf("%c%u\n", chr, cmd->trans->id);
+       }
+       dict_connection_transaction_array_remove(cmd->conn, cmd->trans);
+       dict_connection_cmds_flush(cmd->conn);
 }
 
-static void cmd_commit_async_callback(int ret, void *context)
+static void cmd_commit_callback(int ret, void *context)
 {
-       struct dict_connection_transaction *trans = context;
-       const char *reply;
-       char chr;
+       struct dict_connection_cmd *cmd = context;
 
-       switch (ret) {
-       case 1:
-               chr = DICT_PROTOCOL_REPLY_OK;
-               break;
-       case 0:
-               chr = DICT_PROTOCOL_REPLY_NOTFOUND;
-               break;
-       default:
-               chr = DICT_PROTOCOL_REPLY_FAIL;
-               break;
-       }
-       reply = t_strdup_printf("%c%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_COMMIT,
-                               chr, trans->id);
-       o_stream_nsend_str(trans->conn->output, reply);
+       cmd_commit_finish(cmd, ret, FALSE);
+}
 
-       dict_connection_transaction_array_remove(trans->conn, trans);
+static void cmd_commit_callback_async(int ret, void *context)
+{
+       struct dict_connection_cmd *cmd = context;
+
+       cmd_commit_finish(cmd, ret, TRUE);
 }
 
 static int
-cmd_commit_async(struct dict_connection *conn, const char *line)
+cmd_commit(struct dict_connection_cmd *cmd, const char *line)
 {
-       struct dict_connection_transaction *trans;
-
-       if (conn->iter_ctx != NULL) {
-               i_error("dict client: COMMIT: Can't commit while iterating");
+       if (dict_connection_transaction_lookup_parse(cmd->conn, line, &cmd->trans) < 0)
                return -1;
-       }
 
-       if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
+       dict_transaction_commit_async(&cmd->trans->ctx, cmd_commit_callback, cmd);
+       return 1;
+}
+
+static int
+cmd_commit_async(struct dict_connection_cmd *cmd, const char *line)
+{
+       if (dict_connection_transaction_lookup_parse(cmd->conn, line, &cmd->trans) < 0)
                return -1;
 
-       dict_transaction_commit_async(&trans->ctx, cmd_commit_async_callback,
-                                     trans);
-       return 0;
+       dict_transaction_commit_async(&cmd->trans->ctx, cmd_commit_callback_async, cmd);
+       return 1;
 }
 
-static int cmd_rollback(struct dict_connection *conn, const char *line)
+static int cmd_rollback(struct dict_connection_cmd *cmd, const char *line)
 {
        struct dict_connection_transaction *trans;
 
-       if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
+       if (dict_connection_transaction_lookup_parse(cmd->conn, line, &trans) < 0)
                return -1;
 
        dict_transaction_rollback(&trans->ctx);
-       dict_connection_transaction_array_remove(conn, trans);
+       dict_connection_transaction_array_remove(cmd->conn, trans);
        return 0;
 }
 
-static int cmd_set(struct dict_connection *conn, const char *line)
+static int cmd_set(struct dict_connection_cmd *cmd, const char *line)
 {
        struct dict_connection_transaction *trans;
        const char *const *args;
@@ -284,14 +345,13 @@ static int cmd_set(struct dict_connection *conn, const char *line)
                return -1;
        }
 
-       if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0)
+       if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0)
                return -1;
-
         dict_set(trans->ctx, args[1], args[2]);
        return 0;
 }
 
-static int cmd_unset(struct dict_connection *conn, const char *line)
+static int cmd_unset(struct dict_connection_cmd *cmd, const char *line)
 {
        struct dict_connection_transaction *trans;
        const char *const *args;
@@ -303,14 +363,13 @@ static int cmd_unset(struct dict_connection *conn, const char *line)
                return -1;
        }
 
-       if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0)
+       if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0)
                return -1;
-
         dict_unset(trans->ctx, args[1]);
        return 0;
 }
 
-static int cmd_append(struct dict_connection *conn, const char *line)
+static int cmd_append(struct dict_connection_cmd *cmd, const char *line)
 {
        struct dict_connection_transaction *trans;
        const char *const *args;
@@ -322,14 +381,14 @@ static int cmd_append(struct dict_connection *conn, const char *line)
                return -1;
        }
 
-       if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0)
+       if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0)
                return -1;
 
         dict_append(trans->ctx, args[1], args[2]);
        return 0;
 }
 
-static int cmd_atomic_inc(struct dict_connection *conn, const char *line)
+static int cmd_atomic_inc(struct dict_connection_cmd *cmd, const char *line)
 {
        struct dict_connection_transaction *trans;
        const char *const *args;
@@ -343,14 +402,14 @@ static int cmd_atomic_inc(struct dict_connection *conn, const char *line)
                return -1;
        }
 
-       if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0)
+       if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0)
                return -1;
 
         dict_atomic_inc(trans->ctx, args[1], diff);
        return 0;
 }
 
-static struct dict_client_cmd cmds[] = {
+static const struct dict_cmd_func cmds[] = {
        { DICT_PROTOCOL_CMD_LOOKUP, cmd_lookup },
        { DICT_PROTOCOL_CMD_ITERATE, cmd_iterate },
        { DICT_PROTOCOL_CMD_BEGIN, cmd_begin },
@@ -365,7 +424,7 @@ static struct dict_client_cmd cmds[] = {
        { 0, NULL }
 };
 
-static struct dict_client_cmd *dict_command_find(char cmd)
+static const struct dict_cmd_func *dict_command_find(enum dict_protocol_cmd cmd)
 {
        unsigned int i;
 
@@ -378,13 +437,52 @@ static struct dict_client_cmd *dict_command_find(char cmd)
 
 int dict_command_input(struct dict_connection *conn, const char *line)
 {
-       struct dict_client_cmd *cmd;
+       const struct dict_cmd_func *cmd_func;
+       struct dict_connection_cmd *cmd;
+       int ret;
 
-       cmd = dict_command_find(*line);
-       if (cmd == NULL) {
+       cmd_func = dict_command_find((enum dict_protocol_cmd)*line);
+       if (cmd_func == NULL) {
                i_error("dict client: Unknown command %c", *line);
                return -1;
        }
 
-       return cmd->func(conn, line + 1);
+       cmd = i_new(struct dict_connection_cmd, 1);
+       cmd->conn = conn;
+       cmd->cmd = cmd_func;
+       array_append(&conn->cmds, &cmd, 1);
+       if ((ret = cmd_func->func(cmd, line + 1)) <= 0) {
+               dict_connection_cmd_remove(cmd);
+               return ret;
+       }
+       return 0;
+}
+
+static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd)
+{
+       struct dict_connection_cmd *const *first_cmdp;
+       
+       first_cmdp = array_idx(&cmd->conn->cmds, 0);
+       if (*first_cmdp == cmd)
+               (void)cmd_iterate_flush(cmd);
+}
+
+void dict_connection_cmds_output_more(struct dict_connection *conn)
+{
+       struct dict_connection_cmd *cmd, *const *first_cmdp;
+
+       /* only iterators may be returning a lot of data */
+       while (array_count(&conn->cmds) > 0) {
+               first_cmdp = array_idx(&conn->cmds, 0);
+               cmd = *first_cmdp;
+
+               if (cmd->iter == NULL)
+                       break;
+
+               if (cmd_iterate_flush(cmd) == 0) {
+                       /* unfinished */
+                       break;
+               }
+               /* cmd should be freed now */
+       }
 }
index 9c3afa8119d3803f6679338b3a766d388d6de532..6fe32b2de0f3ae3ec5e6a4b01ad44a521fba8892 100644 (file)
@@ -5,4 +5,7 @@ struct dict_connection;
 
 int dict_command_input(struct dict_connection *conn, const char *line);
 
+void dict_connection_cmds_output_more(struct dict_connection *conn);
+void dict_connection_cmds_free(struct dict_connection *conn);
+
 #endif
index a9860cfa1c942175abf39d73935a7814f5906570..cd13cb643bfda072f4e76b1dcd8797cbf8773b9a 100644 (file)
@@ -15,6 +15,8 @@
 #include <stdlib.h>
 #include <unistd.h>
 
+#define DICT_CONN_MAX_PENDING_COMMANDS 5
+
 static struct dict_connection *dict_connections;
 
 static int dict_connection_parse_handshake(struct dict_connection *conn,
@@ -103,6 +105,9 @@ static void dict_connection_input(struct dict_connection *conn)
        const char *line;
        int ret;
 
+       if (conn->to_input != NULL)
+               timeout_remove(&conn->to_input);
+
        switch (i_stream_read(conn->input)) {
        case 0:
                return;
@@ -142,7 +147,35 @@ static void dict_connection_input(struct dict_connection *conn)
                        dict_connection_destroy(conn);
                        break;
                }
+               if (array_count(&conn->cmds) >= DICT_CONN_MAX_PENDING_COMMANDS) {
+                       io_remove(&conn->io);
+                       if (conn->to_input != NULL)
+                               timeout_remove(&conn->to_input);
+               }
+       }
+}
+
+void dict_connection_continue_input(struct dict_connection *conn)
+{
+       if (conn->io != NULL)
+               return;
+
+       conn->io = io_add(conn->fd, IO_READ, dict_connection_input, conn);
+       if (conn->to_input == NULL)
+               conn->to_input = timeout_add_short(0, dict_connection_input, conn);
+}
+
+static int dict_connection_output(struct dict_connection *conn)
+{
+       int ret;
+
+       if ((ret = o_stream_flush(conn->output)) < 0) {
+               dict_connection_destroy(conn);
+               return 1;
        }
+       if (ret > 0)
+               dict_connection_cmds_output_more(conn);
+       return ret;
 }
 
 struct dict_connection *dict_connection_create(int fd)
@@ -155,7 +188,9 @@ struct dict_connection *dict_connection_create(int fd)
                                         FALSE);
        conn->output = o_stream_create_fd(fd, 128*1024, FALSE);
        o_stream_set_no_error_handling(conn->output, TRUE);
+       o_stream_set_flush_callback(conn->output, dict_connection_output, conn);
        conn->io = io_add(fd, IO_READ, dict_connection_input, conn);
+       i_array_init(&conn->cmds, DICT_CONN_MAX_PENDING_COMMANDS);
        DLLIST_PREPEND(&dict_connections, conn);
        return conn;
 }
@@ -166,23 +201,30 @@ void dict_connection_destroy(struct dict_connection *conn)
 
        DLLIST_REMOVE(&dict_connections, conn);
 
+       /* deinit dict before anything else, so any pending dict operations
+          are aborted and their callbacks called. */
+       if (conn->dict != NULL)
+               dict_deinit(&conn->dict);
+
        if (array_is_created(&conn->transactions)) {
                array_foreach_modifiable(&conn->transactions, transaction)
                        dict_transaction_rollback(&transaction->ctx);
                array_free(&conn->transactions);
        }
 
-       if (conn->iter_ctx != NULL)
-               (void)dict_iterate_deinit(&conn->iter_ctx);
+       /* this may end up adding conn->io back, so keep this early */
+       dict_connection_cmds_free(conn);
+       array_free(&conn->cmds);
 
-       io_remove(&conn->io);
+       if (conn->to_input != NULL)
+               timeout_remove(&conn->to_input);
+       if (conn->io != NULL)
+               io_remove(&conn->io);
        i_stream_destroy(&conn->input);
        o_stream_destroy(&conn->output);
        if (close(conn->fd) < 0)
                i_error("close(dict client) failed: %m");
 
-       if (conn->dict != NULL)
-               dict_deinit(&conn->dict);
        i_free(conn->name);
        i_free(conn->username);
        i_free(conn);
index fb1c2440746477d461df31205370045af948c303..3d437d7e4997d384a6a040d8cee243c53b3f60f1 100644 (file)
@@ -22,18 +22,19 @@ struct dict_connection {
        struct io *io;
        struct istream *input;
        struct ostream *output;
-
-       struct dict_iterate_context *iter_ctx;
-       enum dict_iterate_flags iter_flags;
+       struct timeout *to_input;
 
        /* There are only a few transactions per client, so keeping them in
           array is fast enough */
        ARRAY(struct dict_connection_transaction) transactions;
+       ARRAY(struct dict_connection_cmd *) cmds;
 };
 
 struct dict_connection *dict_connection_create(int fd);
 void dict_connection_destroy(struct dict_connection *conn);
 
+void dict_connection_continue_input(struct dict_connection *conn);
+
 void dict_connections_destroy_all(void);
 
 #endif