This way one slow lookup doesn't block all the other ones.
This change keeps backwards compatibility in the dict protocol for both
client and server.
#include "main.h"
#define DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION 1
+#define DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION 1
#define DICT_OUTPUT_OPTIMAL_SIZE 1024
struct dict_cmd_func {
struct dict_iterate_context *iter;
enum dict_iterate_flags iter_flags;
- unsigned int trans_id;
+ unsigned int async_reply_id;
+ unsigned int trans_id; /* obsolete */
};
struct dict_command_stats cmd_stats;
{
struct dict_connection_cmd *cmd, *const *first_cmdp;
+ i_assert(conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION);
+
dict_connection_ref(conn);
while (array_count(&conn->cmds) > 0) {
first_cmdp = array_idx(&conn->cmds, 0);
cmd = *first_cmdp;
+ i_assert(cmd->async_reply_id == 0);
+
/* we may be able to start outputting iterations now. */
if (cmd->iter != NULL)
(void)cmd_iterate_flush(cmd);
dict_connection_unref_safe(conn);
}
+static void dict_connection_cmd_try_flush(struct dict_connection_cmd **_cmd)
+{
+ struct dict_connection_cmd *cmd = *_cmd;
+
+ *_cmd = NULL;
+ if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION) {
+ dict_connection_cmds_flush(cmd->conn);
+ return;
+ }
+ i_assert(cmd->async_reply_id != 0);
+ i_assert(cmd->reply != NULL);
+
+ o_stream_nsend_str(cmd->conn->output, t_strdup_printf("%c%u\t%s",
+ DICT_PROTOCOL_REPLY_ASYNC_REPLY,
+ cmd->async_reply_id, cmd->reply));
+ dict_connection_cmd_remove(cmd);
+}
+
+static void dict_connection_cmd_async(struct dict_connection_cmd *cmd)
+{
+ if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_UNORDERED_MIN_VERSION)
+ return;
+
+ i_assert(cmd->async_reply_id == 0);
+ cmd->async_reply_id = ++cmd->conn->async_id_counter;
+ if (cmd->async_reply_id == 0)
+ cmd->async_reply_id = ++cmd->conn->async_id_counter;
+ o_stream_nsend_str(cmd->conn->output, t_strdup_printf("%c%u\n",
+ DICT_PROTOCOL_REPLY_ASYNC_ID, cmd->async_reply_id));
+}
+
static void
cmd_stats_update(struct dict_connection_cmd *cmd, struct timing *timing)
{
str_append_c(str, '\n');
cmd->reply = i_strdup(str_c(str));
- dict_connection_cmds_flush(cmd->conn);
+ dict_connection_cmd_try_flush(&cmd);
}
static int cmd_lookup(struct dict_connection_cmd *cmd, const char *line)
{
/* <key> */
+ dict_connection_cmd_async(cmd);
dict_lookup_async(cmd->conn->dict, line, cmd_lookup_callback, cmd);
return 1;
}
str = t_str_new(256);
while (dict_iterate(cmd->iter, &key, &value)) {
str_truncate(str, 0);
+ if (cmd->async_reply_id != 0) {
+ str_append_c(str, DICT_PROTOCOL_REPLY_ASYNC_REPLY);
+ str_printfa(str, "%u\t", cmd->async_reply_id);
+ }
str_append_c(str, DICT_PROTOCOL_REPLY_OK);
str_append_tabescaped(str, key);
str_append_c(str, '\t');
i_error("dict client: ITERATE: broken input");
return -1;
}
+ dict_connection_cmd_async(cmd);
/* <flags> <max_rows> <path> */
flags |= DICT_ITERATE_FLAG_ASYNC;
cmd->reply = i_strdup(str_c(str));
dict_connection_transaction_array_remove(cmd->conn, cmd->trans_id);
- dict_connection_cmds_flush(cmd->conn);
+ dict_connection_cmd_try_flush(&cmd);
}
static void cmd_commit_callback(int ret, void *context)
return -1;
cmd->trans_id = trans->id;
+ dict_connection_cmd_async(cmd);
dict_transaction_commit_async(&trans->ctx, cmd_commit_callback, cmd);
return 1;
}
return -1;
cmd->trans_id = trans->id;
+ dict_connection_cmd_async(cmd);
dict_transaction_commit_async(&trans->ctx, cmd_commit_callback_async, cmd);
return 1;
}
return 0;
}
-static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd)
+static bool dict_connection_cmds_try_output_more(struct dict_connection *conn)
{
- struct dict_connection_cmd *const *first_cmdp;
-
- first_cmdp = array_idx(&cmd->conn->cmds, 0);
- if (*first_cmdp == cmd) {
- if (cmd_iterate_flush(cmd) > 0)
- dict_connection_cmds_flush(cmd->conn);
+ struct dict_connection_cmd *const *cmdp, *cmd;
+
+ /* only iterators may be returning a lot of data */
+ array_foreach(&conn->cmds, cmdp) {
+ cmd = *cmdp;
+
+ if (cmd->iter == NULL) {
+ /* not an iterator */
+ } else if (cmd_iterate_flush(cmd) == 0) {
+ /* unfinished */
+ } else {
+ dict_connection_cmd_try_flush(&cmd);
+ /* cmd should be freed now, restart output */
+ return TRUE;
+ }
+ if (conn->minor_version < DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION)
+ break;
+ /* try to flush the rest */
}
+ return FALSE;
}
void dict_connection_cmds_output_more(struct dict_connection *conn)
{
- struct dict_connection_cmd *cmd, *const *first_cmdp;
-
- /* only iterators may be returning a lot of data */
while (array_count(&conn->cmds) > 0) {
- first_cmdp = array_idx(&conn->cmds, 0);
- cmd = *first_cmdp;
-
- if (cmd->iter == NULL)
+ if (!dict_connection_cmds_try_output_more(conn))
break;
+ }
+}
- if (cmd_iterate_flush(cmd) == 0) {
- /* unfinished */
- break;
- }
- dict_connection_cmds_flush(cmd->conn);
- /* cmd should be freed now */
+static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd)
+{
+ struct dict_connection_cmd *const *first_cmdp;
+
+ if (cmd->conn->minor_version < DICT_CLIENT_PROTOCOL_TIMINGS_MIN_VERSION) {
+ first_cmdp = array_idx(&cmd->conn->cmds, 0);
+ if (*first_cmdp != cmd)
+ return;
}
+ (void)dict_connection_cmds_try_output_more(cmd->conn);
}
void dict_commands_init(void)
array is fast enough */
ARRAY(struct dict_connection_transaction) transactions;
ARRAY(struct dict_connection_cmd *) cmds;
+ unsigned int async_id_counter;
unsigned int destroyed:1;
};
struct client_dict *dict;
struct timeval start_time;
char *query;
+ unsigned int async_id;
+ struct timeval async_id_received_time;
uint64_t start_global_ioloop_usecs;
uint64_t start_dict_ioloop_usecs;
}
static bool
-dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *line)
+dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *const *args)
{
- const char *const *args = t_strsplit_tabescaped(line);
const char *value = args[0];
enum dict_protocol_reply reply;
value++;
args++;
}
+
cmd->unfinished = FALSE;
cmd->callback(cmd, reply, value, args, NULL, FALSE);
return !cmd->unfinished;
}
}
+static int
+dict_conn_assign_next_async_id(struct dict_connection *conn, const char *line)
+{
+ struct client_dict_cmd *const *cmds;
+ unsigned int i, count, async_id;
+
+ i_assert(line[0] == DICT_PROTOCOL_REPLY_ASYNC_ID);
+
+ if (str_to_uint(line+1, &async_id) < 0 || async_id == 0) {
+ i_error("%s: Received invalid async-id line: %s",
+ conn->conn.name, line);
+ return -1;
+ }
+ cmds = array_get(&conn->dict->cmds, &count);
+ for (i = 0; i < count; i++) {
+ if (cmds[i]->async_id == 0) {
+ cmds[i]->async_id = async_id;
+ cmds[i]->async_id_received_time = ioloop_timeval;
+ return 0;
+ }
+ }
+ i_error("%s: Received async-id line, but all %u commands already have it: %s",
+ conn->conn.name, count, line);
+ return -1;
+}
+
+static int dict_conn_find_async_id(struct dict_connection *conn,
+ const char *async_arg,
+ const char *line, unsigned int *idx_r)
+{
+ struct client_dict_cmd *const *cmds;
+ unsigned int i, count, async_id;
+
+ i_assert(async_arg[0] == DICT_PROTOCOL_REPLY_ASYNC_REPLY);
+
+ if (str_to_uint(async_arg+1, &async_id) < 0 || async_id == 0) {
+ i_error("%s: Received invalid async-reply line: %s",
+ conn->conn.name, line);
+ return -1;
+ }
+
+ cmds = array_get(&conn->dict->cmds, &count);
+ for (i = 0; i < count; i++) {
+ if (cmds[i]->async_id == async_id) {
+ *idx_r = i;
+ return 0;
+ }
+ }
+ i_error("%s: Received reply for nonexistent async-id %u: %s",
+ conn->conn.name, async_id, line);
+ return -1;
+}
+
static int dict_conn_input_line(struct connection *_conn, const char *line)
{
struct dict_connection *conn = (struct dict_connection *)_conn;
struct client_dict *dict = conn->dict;
struct client_dict_cmd *const *cmds;
- unsigned int count;
+ const char *const *args;
+ unsigned int i, count;
bool finished;
if (dict->to_requests != NULL)
timeout_reset(dict->to_requests);
+ if (line[0] == DICT_PROTOCOL_REPLY_ASYNC_ID)
+ return dict_conn_assign_next_async_id(conn, line) < 0 ? -1 : 1;
+
cmds = array_get(&conn->dict->cmds, &count);
if (count == 0) {
i_error("%s: Received reply without pending commands: %s",
dict->conn.conn.name, line);
return -1;
}
- i_assert(!cmds[0]->no_replies);
- client_dict_cmd_ref(cmds[0]);
- finished = dict_cmd_callback_line(cmds[0], line);
- if (!client_dict_cmd_unref(cmds[0])) {
+ args = t_strsplit_tabescaped(line);
+ if (args[0] != NULL && args[0][0] == DICT_PROTOCOL_REPLY_ASYNC_REPLY) {
+ if (dict_conn_find_async_id(conn, args[0], line, &i) < 0)
+ return -1;
+ args++;
+ } else {
+ i = 0;
+ }
+ i_assert(!cmds[i]->no_replies);
+
+ client_dict_cmd_ref(cmds[i]);
+ finished = dict_cmd_callback_line(cmds[i], args);
+ if (!client_dict_cmd_unref(cmds[i])) {
/* disconnected during command handling */
return -1;
}
/* more lines needed for this command */
return 1;
}
- client_dict_cmd_unref(cmds[0]);
- array_delete(&dict->cmds, 0, 1);
+ client_dict_cmd_unref(cmds[i]);
+ array_delete(&dict->cmds, i, 1);
client_dict_add_timeout(dict);
return 1;
array_foreach(&retry_cmds, cmdp) {
cmd = *cmdp;
cmd->reconnected = TRUE;
+ cmd->async_id = 0;
/* if it fails again, don't retry anymore */
cmd->retry_errors = FALSE;
if (ret < 0) {
str_printfa(str, ", reconnected %u.%03u secs ago",
reconnected_msecs/1000, reconnected_msecs%1000);
}
+ if (cmd->async_id != 0) {
+ int async_reply_msecs =
+ timeval_diff_msecs(&ioloop_timeval, &cmd->async_id_received_time);
+ str_printfa(str, ", async-id reply %u.%03u secs ago",
+ async_reply_msecs/1000, async_reply_msecs%1000);
+ }
if (str_array_length(extra_args) >= 4 &&
str_to_time(extra_args[0], &tv_start.tv_sec) == 0 &&
str_to_uint(extra_args[1], &tv_start_usec) == 0 &&
DICT_PROTOCOL_REPLY_WRITE_UNCERTAIN = 'W',
DICT_PROTOCOL_REPLY_ASYNC_COMMIT = 'A',
DICT_PROTOCOL_REPLY_ITER_FINISHED = '\0',
+ DICT_PROTOCOL_REPLY_ASYNC_ID = '*',
+ DICT_PROTOCOL_REPLY_ASYNC_REPLY = '+',
};
const char *dict_client_escape(const char *src);