From: Timo Sirainen Date: Mon, 21 Nov 2016 17:05:49 +0000 (+0200) Subject: dict-client: Server can now send command replies in any order. X-Git-Tag: 2.2.27~85 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=4330fa04ae73ddaff6182d624ca7249cbd9767be;p=thirdparty%2Fdovecot%2Fcore.git dict-client: Server can now send command replies in any order. This way one slow lookup doesn't block all the other ones. This change keeps backwards compatibility in the dict protocol for both client and server. --- diff --git a/src/dict/dict-commands.c b/src/dict/dict-commands.c index 97ea9394c7..529fe857a6 100644 --- a/src/dict/dict-commands.c +++ b/src/dict/dict-commands.c @@ -14,6 +14,7 @@ #include "main.h" #define DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION 1 +#define DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION 1 #define DICT_OUTPUT_OPTIMAL_SIZE 1024 struct dict_cmd_func { @@ -30,7 +31,8 @@ struct dict_connection_cmd { struct dict_iterate_context *iter; enum dict_iterate_flags iter_flags; - unsigned int trans_id; + unsigned int async_reply_id; + unsigned int trans_id; /* obsolete */ }; struct dict_command_stats cmd_stats; @@ -70,11 +72,15 @@ static void dict_connection_cmds_flush(struct dict_connection *conn) { struct dict_connection_cmd *cmd, *const *first_cmdp; + i_assert(conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION); + dict_connection_ref(conn); while (array_count(&conn->cmds) > 0) { first_cmdp = array_idx(&conn->cmds, 0); cmd = *first_cmdp; + i_assert(cmd->async_reply_id == 0); + /* we may be able to start outputting iterations now. */ if (cmd->iter != NULL) (void)cmd_iterate_flush(cmd); @@ -90,6 +96,37 @@ static void dict_connection_cmds_flush(struct dict_connection *conn) dict_connection_unref_safe(conn); } +static void dict_connection_cmd_try_flush(struct dict_connection_cmd **_cmd) +{ + struct dict_connection_cmd *cmd = *_cmd; + + *_cmd = NULL; + if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION) { + dict_connection_cmds_flush(cmd->conn); + return; + } + i_assert(cmd->async_reply_id != 0); + i_assert(cmd->reply != NULL); + + o_stream_nsend_str(cmd->conn->output, t_strdup_printf("%c%u\t%s", + DICT_PROTOCOL_REPLY_ASYNC_REPLY, + cmd->async_reply_id, cmd->reply)); + dict_connection_cmd_remove(cmd); +} + +static void dict_connection_cmd_async(struct dict_connection_cmd *cmd) +{ + if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION) + return; + + i_assert(cmd->async_reply_id == 0); + cmd->async_reply_id = ++cmd->conn->async_id_counter; + if (cmd->async_reply_id == 0) + cmd->async_reply_id = ++cmd->conn->async_id_counter; + o_stream_nsend_str(cmd->conn->output, t_strdup_printf("%c%u\n", + DICT_PROTOCOL_REPLY_ASYNC_ID, cmd->async_reply_id)); +} + static void cmd_stats_update(struct dict_connection_cmd *cmd, struct timing *timing) { @@ -141,12 +178,13 @@ cmd_lookup_callback(const struct dict_lookup_result *result, void *context) str_append_c(str, '\n'); cmd->reply = i_strdup(str_c(str)); - dict_connection_cmds_flush(cmd->conn); + dict_connection_cmd_try_flush(&cmd); } static int cmd_lookup(struct dict_connection_cmd *cmd, const char *line) { /* */ + dict_connection_cmd_async(cmd); dict_lookup_async(cmd->conn->dict, line, cmd_lookup_callback, cmd); return 1; } @@ -177,6 +215,10 @@ static int cmd_iterate_flush(struct dict_connection_cmd *cmd) str = t_str_new(256); while (dict_iterate(cmd->iter, &key, &value)) { str_truncate(str, 0); + if (cmd->async_reply_id != 0) { + str_append_c(str, DICT_PROTOCOL_REPLY_ASYNC_REPLY); + str_printfa(str, "%u\t", cmd->async_reply_id); + } str_append_c(str, DICT_PROTOCOL_REPLY_OK); str_append_tabescaped(str, key); str_append_c(str, '\t'); @@ -228,6 +270,7 @@ static int cmd_iterate(struct dict_connection_cmd *cmd, const char *line) i_error("dict client: ITERATE: broken input"); return -1; } + dict_connection_cmd_async(cmd); /* */ flags |= DICT_ITERATE_FLAG_ASYNC; @@ -350,7 +393,7 @@ cmd_commit_finish(struct dict_connection_cmd *cmd, int ret, bool async) cmd->reply = i_strdup(str_c(str)); dict_connection_transaction_array_remove(cmd->conn, cmd->trans_id); - dict_connection_cmds_flush(cmd->conn); + dict_connection_cmd_try_flush(&cmd); } static void cmd_commit_callback(int ret, void *context) @@ -376,6 +419,7 @@ cmd_commit(struct dict_connection_cmd *cmd, const char *line) return -1; cmd->trans_id = trans->id; + dict_connection_cmd_async(cmd); dict_transaction_commit_async(&trans->ctx, cmd_commit_callback, cmd); return 1; } @@ -389,6 +433,7 @@ cmd_commit_async(struct dict_connection_cmd *cmd, const char *line) return -1; cmd->trans_id = trans->id; + dict_connection_cmd_async(cmd); dict_transaction_commit_async(&trans->ctx, cmd_commit_callback_async, cmd); return 1; } @@ -532,36 +577,48 @@ int dict_command_input(struct dict_connection *conn, const char *line) return 0; } -static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd) +static bool dict_connection_cmds_try_output_more(struct dict_connection *conn) { - struct dict_connection_cmd *const *first_cmdp; - - first_cmdp = array_idx(&cmd->conn->cmds, 0); - if (*first_cmdp == cmd) { - if (cmd_iterate_flush(cmd) > 0) - dict_connection_cmds_flush(cmd->conn); + struct dict_connection_cmd *const *cmdp, *cmd; + + /* only iterators may be returning a lot of data */ + array_foreach(&conn->cmds, cmdp) { + cmd = *cmdp; + + if (cmd->iter == NULL) { + /* not an iterator */ + } else if (cmd_iterate_flush(cmd) == 0) { + /* unfinished */ + } else { + dict_connection_cmd_try_flush(&cmd); + /* cmd should be freed now, restart output */ + return TRUE; + } + if (conn->minor_version < DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION) + break; + /* try to flush the rest */ } + return FALSE; } void dict_connection_cmds_output_more(struct dict_connection *conn) { - struct dict_connection_cmd *cmd, *const *first_cmdp; - - /* only iterators may be returning a lot of data */ while (array_count(&conn->cmds) > 0) { - first_cmdp = array_idx(&conn->cmds, 0); - cmd = *first_cmdp; - - if (cmd->iter == NULL) + if (!dict_connection_cmds_try_output_more(conn)) break; + } +} - if (cmd_iterate_flush(cmd) == 0) { - /* unfinished */ - break; - } - dict_connection_cmds_flush(cmd->conn); - /* cmd should be freed now */ +static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd) +{ + struct dict_connection_cmd *const *first_cmdp; + + if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION) { + first_cmdp = array_idx(&cmd->conn->cmds, 0); + if (*first_cmdp != cmd) + return; } + (void)dict_connection_cmds_try_output_more(cmd->conn); } void dict_commands_init(void) diff --git a/src/dict/dict-connection.h b/src/dict/dict-connection.h index 6202149c58..bfef4ec9cf 100644 --- a/src/dict/dict-connection.h +++ b/src/dict/dict-connection.h @@ -31,6 +31,7 @@ struct dict_connection { array is fast enough */ ARRAY(struct dict_connection_transaction) transactions; ARRAY(struct dict_connection_cmd *) cmds; + unsigned int async_id_counter; unsigned int destroyed:1; }; diff --git a/src/lib-dict/dict-client.c b/src/lib-dict/dict-client.c index 1678a5dc68..faf8d9e0f4 100644 --- a/src/lib-dict/dict-client.c +++ b/src/lib-dict/dict-client.c @@ -33,6 +33,8 @@ struct client_dict_cmd { struct client_dict *dict; struct timeval start_time; char *query; + unsigned int async_id; + struct timeval async_id_received_time; uint64_t start_global_ioloop_usecs; uint64_t start_dict_ioloop_usecs; @@ -184,9 +186,8 @@ static void dict_post_api_callback(struct client_dict *dict) } static bool -dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *line) +dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *const *args) { - const char *const *args = t_strsplit_tabescaped(line); const char *value = args[0]; enum dict_protocol_reply reply; @@ -198,6 +199,7 @@ dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *line) value++; args++; } + cmd->unfinished = FALSE; cmd->callback(cmd, reply, value, args, NULL, FALSE); return !cmd->unfinished; @@ -442,28 +444,94 @@ static void client_dict_cmd_backgrounded(struct client_dict *dict) } } +static int +dict_conn_assign_next_async_id(struct dict_connection *conn, const char *line) +{ + struct client_dict_cmd *const *cmds; + unsigned int i, count, async_id; + + i_assert(line[0] == DICT_PROTOCOL_REPLY_ASYNC_ID); + + if (str_to_uint(line+1, &async_id) < 0 || async_id == 0) { + i_error("%s: Received invalid async-id line: %s", + conn->conn.name, line); + return -1; + } + cmds = array_get(&conn->dict->cmds, &count); + for (i = 0; i < count; i++) { + if (cmds[i]->async_id == 0) { + cmds[i]->async_id = async_id; + cmds[i]->async_id_received_time = ioloop_timeval; + return 0; + } + } + i_error("%s: Received async-id line, but all %u commands already have it: %s", + conn->conn.name, count, line); + return -1; +} + +static int dict_conn_find_async_id(struct dict_connection *conn, + const char *async_arg, + const char *line, unsigned int *idx_r) +{ + struct client_dict_cmd *const *cmds; + unsigned int i, count, async_id; + + i_assert(async_arg[0] == DICT_PROTOCOL_REPLY_ASYNC_REPLY); + + if (str_to_uint(async_arg+1, &async_id) < 0 || async_id == 0) { + i_error("%s: Received invalid async-reply line: %s", + conn->conn.name, line); + return -1; + } + + cmds = array_get(&conn->dict->cmds, &count); + for (i = 0; i < count; i++) { + if (cmds[i]->async_id == async_id) { + *idx_r = i; + return 0; + } + } + i_error("%s: Received reply for nonexistent async-id %u: %s", + conn->conn.name, async_id, line); + return -1; +} + static int dict_conn_input_line(struct connection *_conn, const char *line) { struct dict_connection *conn = (struct dict_connection *)_conn; struct client_dict *dict = conn->dict; struct client_dict_cmd *const *cmds; - unsigned int count; + const char *const *args; + unsigned int i, count; bool finished; if (dict->to_requests != NULL) timeout_reset(dict->to_requests); + if (line[0] == DICT_PROTOCOL_REPLY_ASYNC_ID) + return dict_conn_assign_next_async_id(conn, line) < 0 ? -1 : 1; + cmds = array_get(&conn->dict->cmds, &count); if (count == 0) { i_error("%s: Received reply without pending commands: %s", dict->conn.conn.name, line); return -1; } - i_assert(!cmds[0]->no_replies); - client_dict_cmd_ref(cmds[0]); - finished = dict_cmd_callback_line(cmds[0], line); - if (!client_dict_cmd_unref(cmds[0])) { + args = t_strsplit_tabescaped(line); + if (args[0] != NULL && args[0][0] == DICT_PROTOCOL_REPLY_ASYNC_REPLY) { + if (dict_conn_find_async_id(conn, args[0], line, &i) < 0) + return -1; + args++; + } else { + i = 0; + } + i_assert(!cmds[i]->no_replies); + + client_dict_cmd_ref(cmds[i]); + finished = dict_cmd_callback_line(cmds[i], args); + if (!client_dict_cmd_unref(cmds[i])) { /* disconnected during command handling */ return -1; } @@ -471,8 +539,8 @@ static int dict_conn_input_line(struct connection *_conn, const char *line) /* more lines needed for this command */ return 1; } - client_dict_cmd_unref(cmds[0]); - array_delete(&dict->cmds, 0, 1); + client_dict_cmd_unref(cmds[i]); + array_delete(&dict->cmds, i, 1); client_dict_add_timeout(dict); return 1; @@ -594,6 +662,7 @@ static int client_dict_reconnect(struct client_dict *dict, const char *reason, array_foreach(&retry_cmds, cmdp) { cmd = *cmdp; cmd->reconnected = TRUE; + cmd->async_id = 0; /* if it fails again, don't retry anymore */ cmd->retry_errors = FALSE; if (ret < 0) { @@ -798,6 +867,12 @@ dict_warnings_sec(const struct client_dict_cmd *cmd, int msecs, str_printfa(str, ", reconnected %u.%03u secs ago", reconnected_msecs/1000, reconnected_msecs%1000); } + if (cmd->async_id != 0) { + int async_reply_msecs = + timeval_diff_msecs(&ioloop_timeval, &cmd->async_id_received_time); + str_printfa(str, ", async-id reply %u.%03u secs ago", + async_reply_msecs/1000, async_reply_msecs%1000); + } if (str_array_length(extra_args) >= 4 && str_to_time(extra_args[0], &tv_start.tv_sec) == 0 && str_to_uint(extra_args[1], &tv_start_usec) == 0 && diff --git a/src/lib-dict/dict-client.h b/src/lib-dict/dict-client.h index 4a4a3a4c76..cf4427fa29 100644 --- a/src/lib-dict/dict-client.h +++ b/src/lib-dict/dict-client.h @@ -37,6 +37,8 @@ enum dict_protocol_reply { DICT_PROTOCOL_REPLY_WRITE_UNCERTAIN = 'W', DICT_PROTOCOL_REPLY_ASYNC_COMMIT = 'A', DICT_PROTOCOL_REPLY_ITER_FINISHED = '\0', + DICT_PROTOCOL_REPLY_ASYNC_ID = '*', + DICT_PROTOCOL_REPLY_ASYNC_REPLY = '+', }; const char *dict_client_escape(const char *src);