From: Timo Sirainen Date: Wed, 2 Sep 2015 14:36:47 +0000 (+0300) Subject: dict: Use the new async APIs for everything. X-Git-Tag: 2.2.19.rc1~129 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=1a883718249b8db128a105ee7f03293873f1f9c5;p=thirdparty%2Fdovecot%2Fcore.git dict: Use the new async APIs for everything. If the dict backend supports async operations, this means that dict service can now be configured with client_count>1. --- diff --git a/src/dict/dict-commands.c b/src/dict/dict-commands.c index 23e8193a61..2fb43fc98e 100644 --- a/src/dict/dict-commands.c +++ b/src/dict/dict-commands.c @@ -14,87 +14,159 @@ #define DICT_OUTPUT_OPTIMAL_SIZE 1024 -struct dict_client_cmd { - int cmd; - int (*func)(struct dict_connection *conn, const char *line); +struct dict_cmd_func { + enum dict_protocol_cmd cmd; + int (*func)(struct dict_connection_cmd *cmd, const char *line); }; -static int cmd_lookup(struct dict_connection *conn, const char *line) +struct dict_connection_cmd { + const struct dict_cmd_func *cmd; + struct dict_connection *conn; + char *reply; + + struct dict_iterate_context *iter; + enum dict_iterate_flags iter_flags; + + struct dict_connection_transaction *trans; +}; + +static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd); + +static void dict_connection_cmd_free(struct dict_connection_cmd *cmd) { - const char *reply; - const char *value; - int ret; + if (cmd->iter != NULL) + (void)dict_iterate_deinit(&cmd->iter); + array_delete(&cmd->conn->cmds, 0, 1); + i_free(cmd->reply); - if (conn->iter_ctx != NULL) { - i_error("dict client: LOOKUP: Can't lookup while iterating"); - return -1; + dict_connection_continue_input(cmd->conn); + i_free(cmd); +} + +static void dict_connection_cmd_remove(struct dict_connection_cmd *cmd) +{ + struct dict_connection_cmd *const *cmds; + unsigned int i, count; + + cmds = array_get(&cmd->conn->cmds, &count); + for (i = 0; i < count; i++) { + if (cmds[i] == cmd) { + dict_connection_cmd_free(cmd); + return; + } } + i_unreached(); +} - /* */ - ret = dict_lookup(conn->dict, pool_datastack_create(), line, &value); - if (ret > 0) { - reply = t_strdup_printf("%c%s\n", - DICT_PROTOCOL_REPLY_OK, str_tabescape(value)); - o_stream_nsend_str(conn->output, reply); +static void dict_connection_cmds_flush(struct dict_connection *conn) +{ + struct dict_connection_cmd *cmd, *const *first_cmdp; + + while (array_count(&conn->cmds) > 0) { + first_cmdp = array_idx(&conn->cmds, 0); + cmd = *first_cmdp; + + if (cmd->reply == NULL) { + /* command not finished yet */ + break; + } + + o_stream_nsend_str(conn->output, cmd->reply); + dict_connection_cmd_remove(cmd); + } +} + +void dict_connection_cmds_free(struct dict_connection *conn) +{ + struct dict_connection_cmd *const *first_cmdp; + + while (array_count(&conn->cmds) > 0) { + first_cmdp = array_idx(&conn->cmds, 0); + dict_connection_cmd_remove(*first_cmdp); + } +} + +static void +cmd_lookup_callback(const struct dict_lookup_result *result, void *context) +{ + struct dict_connection_cmd *cmd = context; + + if (result->ret > 0) { + cmd->reply = i_strdup_printf("%c%s\n", + DICT_PROTOCOL_REPLY_OK, str_tabescape(result->value)); + } else if (result->ret == 0) { + cmd->reply = i_strdup_printf("%c\n", DICT_PROTOCOL_REPLY_NOTFOUND); } else { - reply = t_strdup_printf("%c\n", ret == 0 ? - DICT_PROTOCOL_REPLY_NOTFOUND : - DICT_PROTOCOL_REPLY_FAIL); - o_stream_nsend_str(conn->output, reply); + i_error("%s", result->error); + cmd->reply = i_strdup_printf("%c\n", DICT_PROTOCOL_REPLY_FAIL); } - return 0; + dict_connection_cmds_flush(cmd->conn); } -static int cmd_iterate_flush(struct dict_connection *conn) +static int cmd_lookup(struct dict_connection_cmd *cmd, const char *line) +{ + /* */ + dict_lookup_async(cmd->conn->dict, line, cmd_lookup_callback, cmd); + return 1; +} + +static int cmd_iterate_flush(struct dict_connection_cmd *cmd) { string_t *str; const char *key, *value; str = t_str_new(256); - o_stream_cork(conn->output); - while (dict_iterate(conn->iter_ctx, &key, &value)) { + o_stream_cork(cmd->conn->output); + while (dict_iterate(cmd->iter, &key, &value)) { str_truncate(str, 0); str_append_c(str, DICT_PROTOCOL_REPLY_OK); str_append_tabescaped(str, key); str_append_c(str, '\t'); - if ((conn->iter_flags & DICT_ITERATE_FLAG_NO_VALUE) == 0) + if ((cmd->iter_flags & DICT_ITERATE_FLAG_NO_VALUE) == 0) str_append_tabescaped(str, value); str_append_c(str, '\n'); - o_stream_nsend(conn->output, str_data(str), str_len(str)); + o_stream_nsend(cmd->conn->output, str_data(str), str_len(str)); - if (o_stream_get_buffer_used_size(conn->output) > + if (o_stream_get_buffer_used_size(cmd->conn->output) > DICT_OUTPUT_OPTIMAL_SIZE) { - if (o_stream_flush(conn->output) <= 0) { - /* continue later */ - o_stream_uncork(conn->output); + if (o_stream_flush(cmd->conn->output) <= 0) { + /* continue later when there's more space + in output buffer */ + o_stream_uncork(cmd->conn->output); + o_stream_set_flush_pending(cmd->conn->output, TRUE); return 0; } /* flushed everything, continue */ } } - - /* finished iterating */ - o_stream_unset_flush_callback(conn->output); + if (dict_iterate_has_more(cmd->iter)) { + /* wait for the next iteration callback */ + return 0; + } str_truncate(str, 0); - if (dict_iterate_deinit(&conn->iter_ctx) < 0) + if (dict_iterate_deinit(&cmd->iter) < 0) str_append_c(str, DICT_PROTOCOL_REPLY_FAIL); str_append_c(str, '\n'); - o_stream_nsend(conn->output, str_data(str), str_len(str)); - o_stream_uncork(conn->output); + o_stream_uncork(cmd->conn->output); + + cmd->reply = i_strdup(str_c(str)); + dict_connection_cmds_flush(cmd->conn); return 1; } -static int cmd_iterate(struct dict_connection *conn, const char *line) +static void cmd_iterate_callback(void *context) +{ + struct dict_connection_cmd *cmd = context; + + dict_connection_cmd_output_more(cmd); +} + +static int cmd_iterate(struct dict_connection_cmd *cmd, const char *line) { const char *const *args; unsigned int flags; - if (conn->iter_ctx != NULL) { - i_error("dict client: ITERATE: Already iterating"); - return -1; - } - args = t_strsplit_tabescaped(line); if (str_array_length(args) < 2 || str_to_uint(args[0], &flags) < 0) { @@ -103,12 +175,12 @@ static int cmd_iterate(struct dict_connection *conn, const char *line) } /* */ - conn->iter_ctx = dict_iterate_init_multiple(conn->dict, args+1, flags); - conn->iter_flags = flags; - - o_stream_set_flush_callback(conn->output, cmd_iterate_flush, conn); - (void)cmd_iterate_flush(conn); - return 0; + flags |= DICT_ITERATE_FLAG_ASYNC; + cmd->iter = dict_iterate_init_multiple(cmd->conn->dict, args+1, flags); + cmd->iter_flags = flags; + dict_iterate_set_async_callback(cmd->iter, cmd_iterate_callback, cmd); + dict_connection_cmd_output_more(cmd); + return 1; } static struct dict_connection_transaction * @@ -134,6 +206,8 @@ dict_connection_transaction_array_remove(struct dict_connection *conn, const struct dict_connection_transaction *transactions; unsigned int i, count; + i_assert(trans->ctx == NULL); + transactions = array_get(&conn->transactions, &count); for (i = 0; i < count; i++) { if (&transactions[i] == trans) { @@ -143,7 +217,7 @@ dict_connection_transaction_array_remove(struct dict_connection *conn, } } -static int cmd_begin(struct dict_connection *conn, const char *line) +static int cmd_begin(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; unsigned int id; @@ -152,19 +226,19 @@ static int cmd_begin(struct dict_connection *conn, const char *line) i_error("dict client: Invalid transaction ID %s", line); return -1; } - if (dict_connection_transaction_lookup(conn, id) != NULL) { + if (dict_connection_transaction_lookup(cmd->conn, id) != NULL) { i_error("dict client: Transaction ID %u already exists", id); return -1; } - if (!array_is_created(&conn->transactions)) - i_array_init(&conn->transactions, 4); + if (!array_is_created(&cmd->conn->transactions)) + i_array_init(&cmd->conn->transactions, 4); /* */ - trans = array_append_space(&conn->transactions); + trans = array_append_space(&cmd->conn->transactions); trans->id = id; - trans->conn = conn; - trans->ctx = dict_transaction_begin(conn->dict); + trans->conn = cmd->conn; + trans->ctx = dict_transaction_begin(cmd->conn->dict); return 0; } @@ -187,21 +261,11 @@ dict_connection_transaction_lookup_parse(struct dict_connection *conn, return 0; } -static int cmd_commit(struct dict_connection *conn, const char *line) +static void +cmd_commit_finish(struct dict_connection_cmd *cmd, int ret, bool async) { - struct dict_connection_transaction *trans; char chr; - int ret; - - if (conn->iter_ctx != NULL) { - i_error("dict client: COMMIT: Can't commit while iterating"); - return -1; - } - - if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0) - return -1; - ret = dict_transaction_commit(&trans->ctx); switch (ret) { case 1: chr = DICT_PROTOCOL_REPLY_OK; @@ -213,66 +277,63 @@ static int cmd_commit(struct dict_connection *conn, const char *line) chr = DICT_PROTOCOL_REPLY_FAIL; break; } - o_stream_nsend_str(conn->output, t_strdup_printf("%c\n", chr)); - dict_connection_transaction_array_remove(conn, trans); - return 0; + if (async) { + cmd->reply = i_strdup_printf("%c%c%u\n", + DICT_PROTOCOL_REPLY_ASYNC_COMMIT, chr, cmd->trans->id); + } else { + cmd->reply = i_strdup_printf("%c%u\n", chr, cmd->trans->id); + } + dict_connection_transaction_array_remove(cmd->conn, cmd->trans); + dict_connection_cmds_flush(cmd->conn); } -static void cmd_commit_async_callback(int ret, void *context) +static void cmd_commit_callback(int ret, void *context) { - struct dict_connection_transaction *trans = context; - const char *reply; - char chr; + struct dict_connection_cmd *cmd = context; - switch (ret) { - case 1: - chr = DICT_PROTOCOL_REPLY_OK; - break; - case 0: - chr = DICT_PROTOCOL_REPLY_NOTFOUND; - break; - default: - chr = DICT_PROTOCOL_REPLY_FAIL; - break; - } - reply = t_strdup_printf("%c%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_COMMIT, - chr, trans->id); - o_stream_nsend_str(trans->conn->output, reply); + cmd_commit_finish(cmd, ret, FALSE); +} - dict_connection_transaction_array_remove(trans->conn, trans); +static void cmd_commit_callback_async(int ret, void *context) +{ + struct dict_connection_cmd *cmd = context; + + cmd_commit_finish(cmd, ret, TRUE); } static int -cmd_commit_async(struct dict_connection *conn, const char *line) +cmd_commit(struct dict_connection_cmd *cmd, const char *line) { - struct dict_connection_transaction *trans; - - if (conn->iter_ctx != NULL) { - i_error("dict client: COMMIT: Can't commit while iterating"); + if (dict_connection_transaction_lookup_parse(cmd->conn, line, &cmd->trans) < 0) return -1; - } - if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0) + dict_transaction_commit_async(&cmd->trans->ctx, cmd_commit_callback, cmd); + return 1; +} + +static int +cmd_commit_async(struct dict_connection_cmd *cmd, const char *line) +{ + if (dict_connection_transaction_lookup_parse(cmd->conn, line, &cmd->trans) < 0) return -1; - dict_transaction_commit_async(&trans->ctx, cmd_commit_async_callback, - trans); - return 0; + dict_transaction_commit_async(&cmd->trans->ctx, cmd_commit_callback_async, cmd); + return 1; } -static int cmd_rollback(struct dict_connection *conn, const char *line) +static int cmd_rollback(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; - if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0) + if (dict_connection_transaction_lookup_parse(cmd->conn, line, &trans) < 0) return -1; dict_transaction_rollback(&trans->ctx); - dict_connection_transaction_array_remove(conn, trans); + dict_connection_transaction_array_remove(cmd->conn, trans); return 0; } -static int cmd_set(struct dict_connection *conn, const char *line) +static int cmd_set(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; @@ -284,14 +345,13 @@ static int cmd_set(struct dict_connection *conn, const char *line) return -1; } - if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0) + if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; - dict_set(trans->ctx, args[1], args[2]); return 0; } -static int cmd_unset(struct dict_connection *conn, const char *line) +static int cmd_unset(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; @@ -303,14 +363,13 @@ static int cmd_unset(struct dict_connection *conn, const char *line) return -1; } - if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0) + if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; - dict_unset(trans->ctx, args[1]); return 0; } -static int cmd_append(struct dict_connection *conn, const char *line) +static int cmd_append(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; @@ -322,14 +381,14 @@ static int cmd_append(struct dict_connection *conn, const char *line) return -1; } - if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0) + if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; dict_append(trans->ctx, args[1], args[2]); return 0; } -static int cmd_atomic_inc(struct dict_connection *conn, const char *line) +static int cmd_atomic_inc(struct dict_connection_cmd *cmd, const char *line) { struct dict_connection_transaction *trans; const char *const *args; @@ -343,14 +402,14 @@ static int cmd_atomic_inc(struct dict_connection *conn, const char *line) return -1; } - if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0) + if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0) return -1; dict_atomic_inc(trans->ctx, args[1], diff); return 0; } -static struct dict_client_cmd cmds[] = { +static const struct dict_cmd_func cmds[] = { { DICT_PROTOCOL_CMD_LOOKUP, cmd_lookup }, { DICT_PROTOCOL_CMD_ITERATE, cmd_iterate }, { DICT_PROTOCOL_CMD_BEGIN, cmd_begin }, @@ -365,7 +424,7 @@ static struct dict_client_cmd cmds[] = { { 0, NULL } }; -static struct dict_client_cmd *dict_command_find(char cmd) +static const struct dict_cmd_func *dict_command_find(enum dict_protocol_cmd cmd) { unsigned int i; @@ -378,13 +437,52 @@ static struct dict_client_cmd *dict_command_find(char cmd) int dict_command_input(struct dict_connection *conn, const char *line) { - struct dict_client_cmd *cmd; + const struct dict_cmd_func *cmd_func; + struct dict_connection_cmd *cmd; + int ret; - cmd = dict_command_find(*line); - if (cmd == NULL) { + cmd_func = dict_command_find((enum dict_protocol_cmd)*line); + if (cmd_func == NULL) { i_error("dict client: Unknown command %c", *line); return -1; } - return cmd->func(conn, line + 1); + cmd = i_new(struct dict_connection_cmd, 1); + cmd->conn = conn; + cmd->cmd = cmd_func; + array_append(&conn->cmds, &cmd, 1); + if ((ret = cmd_func->func(cmd, line + 1)) <= 0) { + dict_connection_cmd_remove(cmd); + return ret; + } + return 0; +} + +static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd) +{ + struct dict_connection_cmd *const *first_cmdp; + + first_cmdp = array_idx(&cmd->conn->cmds, 0); + if (*first_cmdp == cmd) + (void)cmd_iterate_flush(cmd); +} + +void dict_connection_cmds_output_more(struct dict_connection *conn) +{ + struct dict_connection_cmd *cmd, *const *first_cmdp; + + /* only iterators may be returning a lot of data */ + while (array_count(&conn->cmds) > 0) { + first_cmdp = array_idx(&conn->cmds, 0); + cmd = *first_cmdp; + + if (cmd->iter == NULL) + break; + + if (cmd_iterate_flush(cmd) == 0) { + /* unfinished */ + break; + } + /* cmd should be freed now */ + } } diff --git a/src/dict/dict-commands.h b/src/dict/dict-commands.h index 9c3afa8119..6fe32b2de0 100644 --- a/src/dict/dict-commands.h +++ b/src/dict/dict-commands.h @@ -5,4 +5,7 @@ struct dict_connection; int dict_command_input(struct dict_connection *conn, const char *line); +void dict_connection_cmds_output_more(struct dict_connection *conn); +void dict_connection_cmds_free(struct dict_connection *conn); + #endif diff --git a/src/dict/dict-connection.c b/src/dict/dict-connection.c index a9860cfa1c..cd13cb643b 100644 --- a/src/dict/dict-connection.c +++ b/src/dict/dict-connection.c @@ -15,6 +15,8 @@ #include #include +#define DICT_CONN_MAX_PENDING_COMMANDS 5 + static struct dict_connection *dict_connections; static int dict_connection_parse_handshake(struct dict_connection *conn, @@ -103,6 +105,9 @@ static void dict_connection_input(struct dict_connection *conn) const char *line; int ret; + if (conn->to_input != NULL) + timeout_remove(&conn->to_input); + switch (i_stream_read(conn->input)) { case 0: return; @@ -142,7 +147,35 @@ static void dict_connection_input(struct dict_connection *conn) dict_connection_destroy(conn); break; } + if (array_count(&conn->cmds) >= DICT_CONN_MAX_PENDING_COMMANDS) { + io_remove(&conn->io); + if (conn->to_input != NULL) + timeout_remove(&conn->to_input); + } + } +} + +void dict_connection_continue_input(struct dict_connection *conn) +{ + if (conn->io != NULL) + return; + + conn->io = io_add(conn->fd, IO_READ, dict_connection_input, conn); + if (conn->to_input == NULL) + conn->to_input = timeout_add_short(0, dict_connection_input, conn); +} + +static int dict_connection_output(struct dict_connection *conn) +{ + int ret; + + if ((ret = o_stream_flush(conn->output)) < 0) { + dict_connection_destroy(conn); + return 1; } + if (ret > 0) + dict_connection_cmds_output_more(conn); + return ret; } struct dict_connection *dict_connection_create(int fd) @@ -155,7 +188,9 @@ struct dict_connection *dict_connection_create(int fd) FALSE); conn->output = o_stream_create_fd(fd, 128*1024, FALSE); o_stream_set_no_error_handling(conn->output, TRUE); + o_stream_set_flush_callback(conn->output, dict_connection_output, conn); conn->io = io_add(fd, IO_READ, dict_connection_input, conn); + i_array_init(&conn->cmds, DICT_CONN_MAX_PENDING_COMMANDS); DLLIST_PREPEND(&dict_connections, conn); return conn; } @@ -166,23 +201,30 @@ void dict_connection_destroy(struct dict_connection *conn) DLLIST_REMOVE(&dict_connections, conn); + /* deinit dict before anything else, so any pending dict operations + are aborted and their callbacks called. */ + if (conn->dict != NULL) + dict_deinit(&conn->dict); + if (array_is_created(&conn->transactions)) { array_foreach_modifiable(&conn->transactions, transaction) dict_transaction_rollback(&transaction->ctx); array_free(&conn->transactions); } - if (conn->iter_ctx != NULL) - (void)dict_iterate_deinit(&conn->iter_ctx); + /* this may end up adding conn->io back, so keep this early */ + dict_connection_cmds_free(conn); + array_free(&conn->cmds); - io_remove(&conn->io); + if (conn->to_input != NULL) + timeout_remove(&conn->to_input); + if (conn->io != NULL) + io_remove(&conn->io); i_stream_destroy(&conn->input); o_stream_destroy(&conn->output); if (close(conn->fd) < 0) i_error("close(dict client) failed: %m"); - if (conn->dict != NULL) - dict_deinit(&conn->dict); i_free(conn->name); i_free(conn->username); i_free(conn); diff --git a/src/dict/dict-connection.h b/src/dict/dict-connection.h index fb1c244074..3d437d7e49 100644 --- a/src/dict/dict-connection.h +++ b/src/dict/dict-connection.h @@ -22,18 +22,19 @@ struct dict_connection { struct io *io; struct istream *input; struct ostream *output; - - struct dict_iterate_context *iter_ctx; - enum dict_iterate_flags iter_flags; + struct timeout *to_input; /* There are only a few transactions per client, so keeping them in array is fast enough */ ARRAY(struct dict_connection_transaction) transactions; + ARRAY(struct dict_connection_cmd *) cmds; }; struct dict_connection *dict_connection_create(int fd); void dict_connection_destroy(struct dict_connection *conn); +void dict_connection_continue_input(struct dict_connection *conn); + void dict_connections_destroy_all(void); #endif