]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
dict-client: Server can now send command replies in any order.
authorTimo Sirainen <timo.sirainen@dovecot.fi>
Mon, 21 Nov 2016 17:05:49 +0000 (19:05 +0200)
committerTimo Sirainen <timo.sirainen@dovecot.fi>
Mon, 21 Nov 2016 20:44:55 +0000 (22:44 +0200)
This way one slow lookup doesn't block all the other ones.

This change keeps backwards compatibility in the dict protocol for both
client and server.

src/dict/dict-commands.c
src/dict/dict-connection.h
src/lib-dict/dict-client.c
src/lib-dict/dict-client.h

index 97ea9394c7f397e5d4af7d28b0c0841dbce7f9d9..529fe857a62630988c053394c6c5728cd6c7b921 100644 (file)
@@ -14,6 +14,7 @@
 #include "main.h"
 
 #define DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION 1
+#define DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION 1
 #define DICT_OUTPUT_OPTIMAL_SIZE 1024
 
 struct dict_cmd_func {
@@ -30,7 +31,8 @@ struct dict_connection_cmd {
        struct dict_iterate_context *iter;
        enum dict_iterate_flags iter_flags;
 
-       unsigned int trans_id;
+       unsigned int async_reply_id;
+       unsigned int trans_id; /* obsolete */
 };
 
 struct dict_command_stats cmd_stats;
@@ -70,11 +72,15 @@ static void dict_connection_cmds_flush(struct dict_connection *conn)
 {
        struct dict_connection_cmd *cmd, *const *first_cmdp;
 
+       i_assert(conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION);
+
        dict_connection_ref(conn);
        while (array_count(&conn->cmds) > 0) {
                first_cmdp = array_idx(&conn->cmds, 0);
                cmd = *first_cmdp;
 
+               i_assert(cmd->async_reply_id == 0);
+
                /* we may be able to start outputting iterations now. */
                if (cmd->iter != NULL)
                        (void)cmd_iterate_flush(cmd);
@@ -90,6 +96,37 @@ static void dict_connection_cmds_flush(struct dict_connection *conn)
        dict_connection_unref_safe(conn);
 }
 
+static void dict_connection_cmd_try_flush(struct dict_connection_cmd **_cmd)
+{
+       struct dict_connection_cmd *cmd = *_cmd;
+
+       *_cmd = NULL;
+       if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION) {
+               dict_connection_cmds_flush(cmd->conn);
+               return;
+       }
+       i_assert(cmd->async_reply_id != 0);
+       i_assert(cmd->reply != NULL);
+
+       o_stream_nsend_str(cmd->conn->output, t_strdup_printf("%c%u\t%s",
+               DICT_PROTOCOL_REPLY_ASYNC_REPLY,
+               cmd->async_reply_id, cmd->reply));
+       dict_connection_cmd_remove(cmd);
+}
+
+static void dict_connection_cmd_async(struct dict_connection_cmd *cmd)
+{
+       if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION)
+               return;
+
+       i_assert(cmd->async_reply_id == 0);
+       cmd->async_reply_id = ++cmd->conn->async_id_counter;
+       if (cmd->async_reply_id == 0)
+               cmd->async_reply_id = ++cmd->conn->async_id_counter;
+       o_stream_nsend_str(cmd->conn->output, t_strdup_printf("%c%u\n",
+               DICT_PROTOCOL_REPLY_ASYNC_ID, cmd->async_reply_id));
+}
+
 static void
 cmd_stats_update(struct dict_connection_cmd *cmd, struct timing *timing)
 {
@@ -141,12 +178,13 @@ cmd_lookup_callback(const struct dict_lookup_result *result, void *context)
        str_append_c(str, '\n');
 
        cmd->reply = i_strdup(str_c(str));
-       dict_connection_cmds_flush(cmd->conn);
+       dict_connection_cmd_try_flush(&cmd);
 }
 
 static int cmd_lookup(struct dict_connection_cmd *cmd, const char *line)
 {
        /* <key> */
+       dict_connection_cmd_async(cmd);
        dict_lookup_async(cmd->conn->dict, line, cmd_lookup_callback, cmd);
        return 1;
 }
@@ -177,6 +215,10 @@ static int cmd_iterate_flush(struct dict_connection_cmd *cmd)
        str = t_str_new(256);
        while (dict_iterate(cmd->iter, &key, &value)) {
                str_truncate(str, 0);
+               if (cmd->async_reply_id != 0) {
+                       str_append_c(str, DICT_PROTOCOL_REPLY_ASYNC_REPLY);
+                       str_printfa(str, "%u\t", cmd->async_reply_id);
+               }
                str_append_c(str, DICT_PROTOCOL_REPLY_OK);
                str_append_tabescaped(str, key);
                str_append_c(str, '\t');
@@ -228,6 +270,7 @@ static int cmd_iterate(struct dict_connection_cmd *cmd, const char *line)
                i_error("dict client: ITERATE: broken input");
                return -1;
        }
+       dict_connection_cmd_async(cmd);
 
        /* <flags> <max_rows> <path> */
        flags |= DICT_ITERATE_FLAG_ASYNC;
@@ -350,7 +393,7 @@ cmd_commit_finish(struct dict_connection_cmd *cmd, int ret, bool async)
        cmd->reply = i_strdup(str_c(str));
 
        dict_connection_transaction_array_remove(cmd->conn, cmd->trans_id);
-       dict_connection_cmds_flush(cmd->conn);
+       dict_connection_cmd_try_flush(&cmd);
 }
 
 static void cmd_commit_callback(int ret, void *context)
@@ -376,6 +419,7 @@ cmd_commit(struct dict_connection_cmd *cmd, const char *line)
                return -1;
        cmd->trans_id = trans->id;
 
+       dict_connection_cmd_async(cmd);
        dict_transaction_commit_async(&trans->ctx, cmd_commit_callback, cmd);
        return 1;
 }
@@ -389,6 +433,7 @@ cmd_commit_async(struct dict_connection_cmd *cmd, const char *line)
                return -1;
        cmd->trans_id = trans->id;
 
+       dict_connection_cmd_async(cmd);
        dict_transaction_commit_async(&trans->ctx, cmd_commit_callback_async, cmd);
        return 1;
 }
@@ -532,36 +577,48 @@ int dict_command_input(struct dict_connection *conn, const char *line)
        return 0;
 }
 
-static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd)
+static bool dict_connection_cmds_try_output_more(struct dict_connection *conn)
 {
-       struct dict_connection_cmd *const *first_cmdp;
-       
-       first_cmdp = array_idx(&cmd->conn->cmds, 0);
-       if (*first_cmdp == cmd) {
-               if (cmd_iterate_flush(cmd) > 0)
-                       dict_connection_cmds_flush(cmd->conn);
+       struct dict_connection_cmd *const *cmdp, *cmd;
+
+       /* only iterators may be returning a lot of data */
+       array_foreach(&conn->cmds, cmdp) {
+               cmd = *cmdp;
+
+               if (cmd->iter == NULL) {
+                       /* not an iterator */
+               } else if (cmd_iterate_flush(cmd) == 0) {
+                       /* unfinished */
+               } else {
+                       dict_connection_cmd_try_flush(&cmd);
+                       /* cmd should be freed now, restart output */
+                       return TRUE;
+               }
+               if (conn->minor_version < DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION)
+                       break;
+               /* try to flush the rest */
        }
+       return FALSE;
 }
 
 void dict_connection_cmds_output_more(struct dict_connection *conn)
 {
-       struct dict_connection_cmd *cmd, *const *first_cmdp;
-
-       /* only iterators may be returning a lot of data */
        while (array_count(&conn->cmds) > 0) {
-               first_cmdp = array_idx(&conn->cmds, 0);
-               cmd = *first_cmdp;
-
-               if (cmd->iter == NULL)
+               if (!dict_connection_cmds_try_output_more(conn))
                        break;
+       }
+}
 
-               if (cmd_iterate_flush(cmd) == 0) {
-                       /* unfinished */
-                       break;
-               }
-               dict_connection_cmds_flush(cmd->conn);
-               /* cmd should be freed now */
+static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd)
+{
+       struct dict_connection_cmd *const *first_cmdp;
+
+       if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION) {
+               first_cmdp = array_idx(&cmd->conn->cmds, 0);
+               if (*first_cmdp != cmd)
+                       return;
        }
+       (void)dict_connection_cmds_try_output_more(cmd->conn);
 }
 
 void dict_commands_init(void)
index 6202149c5888a4b518aed5601d7ef97defefce18..bfef4ec9cf7738a0c277066ae67b469b45b193c7 100644 (file)
@@ -31,6 +31,7 @@ struct dict_connection {
           array is fast enough */
        ARRAY(struct dict_connection_transaction) transactions;
        ARRAY(struct dict_connection_cmd *) cmds;
+       unsigned int async_id_counter;
 
        unsigned int destroyed:1;
 };
index 1678a5dc685fbb4d965af42fe2ed19d5f089d237..faf8d9e0f4462c2182c738b734312661f2dc0932 100644 (file)
@@ -33,6 +33,8 @@ struct client_dict_cmd {
        struct client_dict *dict;
        struct timeval start_time;
        char *query;
+       unsigned int async_id;
+       struct timeval async_id_received_time;
 
        uint64_t start_global_ioloop_usecs;
        uint64_t start_dict_ioloop_usecs;
@@ -184,9 +186,8 @@ static void dict_post_api_callback(struct client_dict *dict)
 }
 
 static bool
-dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *line)
+dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *const *args)
 {
-       const char *const *args = t_strsplit_tabescaped(line);
        const char *value = args[0];
        enum dict_protocol_reply reply;
 
@@ -198,6 +199,7 @@ dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *line)
                value++;
                args++;
        }
+
        cmd->unfinished = FALSE;
        cmd->callback(cmd, reply, value, args, NULL, FALSE);
        return !cmd->unfinished;
@@ -442,28 +444,94 @@ static void client_dict_cmd_backgrounded(struct client_dict *dict)
        }
 }
 
+static int
+dict_conn_assign_next_async_id(struct dict_connection *conn, const char *line)
+{
+       struct client_dict_cmd *const *cmds;
+       unsigned int i, count, async_id;
+
+       i_assert(line[0] == DICT_PROTOCOL_REPLY_ASYNC_ID);
+
+       if (str_to_uint(line+1, &async_id) < 0 || async_id == 0) {
+               i_error("%s: Received invalid async-id line: %s",
+                       conn->conn.name, line);
+               return -1;
+       }
+       cmds = array_get(&conn->dict->cmds, &count);
+       for (i = 0; i < count; i++) {
+               if (cmds[i]->async_id == 0) {
+                       cmds[i]->async_id = async_id;
+                       cmds[i]->async_id_received_time = ioloop_timeval;
+                       return 0;
+               }
+       }
+       i_error("%s: Received async-id line, but all %u commands already have it: %s",
+               conn->conn.name, count, line);
+       return -1;
+}
+
+static int dict_conn_find_async_id(struct dict_connection *conn,
+                                  const char *async_arg,
+                                  const char *line, unsigned int *idx_r)
+{
+       struct client_dict_cmd *const *cmds;
+       unsigned int i, count, async_id;
+
+       i_assert(async_arg[0] == DICT_PROTOCOL_REPLY_ASYNC_REPLY);
+
+       if (str_to_uint(async_arg+1, &async_id) < 0 || async_id == 0) {
+               i_error("%s: Received invalid async-reply line: %s",
+                       conn->conn.name, line);
+               return -1;
+       }
+
+       cmds = array_get(&conn->dict->cmds, &count);
+       for (i = 0; i < count; i++) {
+               if (cmds[i]->async_id == async_id) {
+                       *idx_r = i;
+                       return 0;
+               }
+       }
+       i_error("%s: Received reply for nonexistent async-id %u: %s",
+               conn->conn.name, async_id, line);
+       return -1;
+}
+
 static int dict_conn_input_line(struct connection *_conn, const char *line)
 {
        struct dict_connection *conn = (struct dict_connection *)_conn;
        struct client_dict *dict = conn->dict;
        struct client_dict_cmd *const *cmds;
-       unsigned int count;
+       const char *const *args;
+       unsigned int i, count;
        bool finished;
 
        if (dict->to_requests != NULL)
                timeout_reset(dict->to_requests);
 
+       if (line[0] == DICT_PROTOCOL_REPLY_ASYNC_ID)
+               return dict_conn_assign_next_async_id(conn, line) < 0 ? -1 : 1;
+
        cmds = array_get(&conn->dict->cmds, &count);
        if (count == 0) {
                i_error("%s: Received reply without pending commands: %s",
                        dict->conn.conn.name, line);
                return -1;
        }
-       i_assert(!cmds[0]->no_replies);
 
-       client_dict_cmd_ref(cmds[0]);
-       finished = dict_cmd_callback_line(cmds[0], line);
-       if (!client_dict_cmd_unref(cmds[0])) {
+       args = t_strsplit_tabescaped(line);
+       if (args[0] != NULL && args[0][0] == DICT_PROTOCOL_REPLY_ASYNC_REPLY) {
+               if (dict_conn_find_async_id(conn, args[0], line, &i) < 0)
+                       return -1;
+               args++;
+       } else {
+               i = 0;
+       }
+       i_assert(!cmds[i]->no_replies);
+
+       client_dict_cmd_ref(cmds[i]);
+       finished = dict_cmd_callback_line(cmds[i], args);
+       if (!client_dict_cmd_unref(cmds[i])) {
                /* disconnected during command handling */
                return -1;
        }
@@ -471,8 +539,8 @@ static int dict_conn_input_line(struct connection *_conn, const char *line)
                /* more lines needed for this command */
                return 1;
        }
-       client_dict_cmd_unref(cmds[0]);
-       array_delete(&dict->cmds, 0, 1);
+       client_dict_cmd_unref(cmds[i]);
+       array_delete(&dict->cmds, i, 1);
 
        client_dict_add_timeout(dict);
        return 1;
@@ -594,6 +662,7 @@ static int client_dict_reconnect(struct client_dict *dict, const char *reason,
        array_foreach(&retry_cmds, cmdp) {
                cmd = *cmdp;
                cmd->reconnected = TRUE;
+               cmd->async_id = 0;
                /* if it fails again, don't retry anymore */
                cmd->retry_errors = FALSE;
                if (ret < 0) {
@@ -798,6 +867,12 @@ dict_warnings_sec(const struct client_dict_cmd *cmd, int msecs,
                str_printfa(str, ", reconnected %u.%03u secs ago",
                            reconnected_msecs/1000, reconnected_msecs%1000);
        }
+       if (cmd->async_id != 0) {
+               int async_reply_msecs =
+                       timeval_diff_msecs(&ioloop_timeval, &cmd->async_id_received_time);
+               str_printfa(str, ", async-id reply %u.%03u secs ago",
+                           async_reply_msecs/1000, async_reply_msecs%1000);
+       }
        if (str_array_length(extra_args) >= 4 &&
            str_to_time(extra_args[0], &tv_start.tv_sec) == 0 &&
            str_to_uint(extra_args[1], &tv_start_usec) == 0 &&
index 4a4a3a4c764154feb75e0062ca56a562e1669d97..cf4427fa290ba01516acdc161699e118bd891418 100644 (file)
@@ -37,6 +37,8 @@ enum dict_protocol_reply {
        DICT_PROTOCOL_REPLY_WRITE_UNCERTAIN = 'W',
        DICT_PROTOCOL_REPLY_ASYNC_COMMIT = 'A',
        DICT_PROTOCOL_REPLY_ITER_FINISHED = '\0',
+       DICT_PROTOCOL_REPLY_ASYNC_ID = '*',
+       DICT_PROTOCOL_REPLY_ASYNC_REPLY = '+',
 };
 
 const char *dict_client_escape(const char *src);