From: Timo Sirainen Date: Wed, 1 Jun 2016 21:57:17 +0000 (+0300) Subject: lib-dict: dict-client rewrite to support async operations X-Git-Tag: 2.3.0.rc1~3481 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=a7e46c05358b059aad2b90f01e271ba6732c5eee;p=thirdparty%2Fdovecot%2Fcore.git lib-dict: dict-client rewrite to support async operations --- diff --git a/src/lib-dict/dict-client.c b/src/lib-dict/dict-client.c index b62beb14ad..9b31a909e9 100644 --- a/src/lib-dict/dict-client.c +++ b/src/lib-dict/dict-client.c @@ -1,11 +1,12 @@ /* Copyright (c) 2005-2016 Dovecot authors, see the included COPYING file */ #include "lib.h" +#include "array.h" #include "llist.h" #include "str.h" #include "strescape.h" -#include "net.h" -#include "istream.h" +#include "time-util.h" +#include "connection.h" #include "ostream.h" #include "eacces-error.h" #include "dict-private.h" @@ -22,340 +23,285 @@ #define DICT_CLIENT_DEFAULT_TIMEOUT_MSECS 0 /* Abort dict lookup after this many seconds. */ -#define DICT_CLIENT_READ_TIMEOUT_SECS 30 -/* Log a warning if dict lookup takes longer than this many seconds. */ -#define DICT_CLIENT_READ_WARN_TIMEOUT_SECS 5 +#define DICT_CLIENT_REQUEST_TIMEOUT_MSECS 30000 +/* Log a warning if dict lookup takes longer than this many milliseconds. */ +#define DICT_CLIENT_REQUEST_WARN_TIMEOUT_MSECS 5000 + +struct client_dict_cmd { + int refcount; + struct client_dict *dict; + struct timeval start_time; + char *query; + + bool retry_errors; + bool no_replies; + bool unfinished; + + void (*callback)(struct client_dict_cmd *cmd, + const char *line, const char *error); + struct client_dict_iterate_context *iter; + + struct { + dict_lookup_callback_t *lookup; + dict_transaction_commit_callback_t *commit; + void *context; + } api_callback; +}; + +struct dict_connection { + struct connection conn; + struct client_dict *dict; +}; struct client_dict { struct dict dict; + struct dict_connection conn; - pool_t pool; - int fd; - const char *uri; - const char *username; - const char *path; + char *uri, *username; enum dict_data_type value_type; time_t last_failed_connect; - struct istream *input; - struct ostream *output; - struct io *io; + struct ioloop *ioloop, *prev_ioloop; + struct timeout *to_requests; struct timeout *to_idle; unsigned int idle_msecs; + struct timeval last_input; + ARRAY(struct client_dict_cmd *) cmds; struct client_dict_transaction_context *transactions; - unsigned int connect_counter; unsigned int transaction_id_counter; - unsigned int async_commits; - unsigned int iter_replies_skip; +}; - bool in_iteration:1; - bool handshaked:1; +struct client_dict_iter_result { + const char *key, *value; }; struct client_dict_iterate_context { struct dict_iterate_context ctx; char *error; - pool_t pool; + pool_t results_pool; + ARRAY(struct client_dict_iter_result) results; + unsigned int result_idx; + + bool async; bool finished; + bool deinit; }; struct client_dict_transaction_context { struct dict_transaction_context ctx; struct client_dict_transaction_context *prev, *next; - /* for async commits */ - dict_transaction_commit_callback_t *callback; - void *context; - char *error; unsigned int id; - unsigned int connect_counter; bool sent_begin:1; - bool async:1; - bool committed:1; }; +static struct connection_list *dict_connections; + static int client_dict_connect(struct client_dict *dict, const char **error_r); static void client_dict_disconnect(struct client_dict *dict, const char *reason); -static int client_dict_send_query(struct client_dict *dict, const char *query, - const char **error_r) +static struct client_dict_cmd * +client_dict_cmd_init(struct client_dict *dict, const char *query) { - if (dict->output == NULL) { - /* not connected currently */ - if (client_dict_connect(dict, error_r) < 0) - return -1; - } - - if (o_stream_send_str(dict->output, query) < 0 || - o_stream_flush(dict->output) < 0) { - /* Send failed */ - *error_r = t_strdup_printf("write(%s) failed: %s", - dict->path, o_stream_get_error(dict->output)); - if (!dict->handshaked) { - /* we're trying to send hello, don't try to reconnect */ - return -1; - } - - /* Reconnect and try again. */ - client_dict_disconnect(dict, *error_r); - if (client_dict_connect(dict, error_r) < 0) - return -1; - - if (o_stream_send_str(dict->output, query) < 0 || - o_stream_flush(dict->output) < 0) { - *error_r = t_strdup_printf("write(%s) failed: %s", - dict->path, o_stream_get_error(dict->output)); - client_dict_disconnect(dict, *error_r); - return -1; - } - } - return 0; + struct client_dict_cmd *cmd; + + cmd = i_new(struct client_dict_cmd, 1); + cmd->refcount = 1; + cmd->dict = dict; + cmd->query = i_strdup(query); + cmd->start_time = ioloop_timeval; + return cmd; } -static int -client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx, - const char **error_r) +static void client_dict_cmd_ref(struct client_dict_cmd *cmd) { - struct client_dict *dict = (struct client_dict *)ctx->ctx.dict; - const char *query; - - i_assert(ctx->error == NULL); - - query = t_strdup_printf("%c%u\n", DICT_PROTOCOL_CMD_BEGIN, ctx->id); - if (client_dict_send_query(dict, query, error_r) < 0) - return -1; - ctx->connect_counter = dict->connect_counter; - return 0; + i_assert(cmd->refcount > 0); + cmd->refcount++; } -static int -client_dict_send_transaction_query(struct client_dict_transaction_context *ctx, - const char *query, const char **error_r) +static bool client_dict_cmd_unref(struct client_dict_cmd *cmd) { - struct client_dict *dict = (struct client_dict *)ctx->ctx.dict; - - i_assert(ctx->error == NULL); + i_assert(cmd->refcount > 0); + if (--cmd->refcount > 0) + return TRUE; - if (!ctx->sent_begin) { - if (client_dict_transaction_send_begin(ctx, error_r) < 0) - return -1; - ctx->sent_begin = TRUE; - } - - if (ctx->connect_counter != dict->connect_counter) { - *error_r = "Reconnected to dict-server - transaction lost"; - return -1; - } - - if (dict->output == NULL) { - /* not connected, this'll fail */ - *error_r = "Disconnected from dict-server"; - return -1; - } + i_free(cmd->query); + i_free(cmd); + return FALSE; +} - if (o_stream_send_str(dict->output, query) < 0 || - o_stream_flush(dict->output) < 0) { - /* Send failed. Our transactions have died, so don't even try - to re-send the command */ - *error_r = t_strdup_printf("write(%s) failed: %s", - dict->path, o_stream_get_error(dict->output)); - client_dict_disconnect(dict, *error_r); - return -1; +static void dict_pre_api_callback(struct client_dict *dict) +{ + if (dict->prev_ioloop != NULL) { + /* Don't let callback see that we've created our + internal ioloop in case it wants to add some ios + or timeouts. */ + current_ioloop = dict->prev_ioloop; } - return 0; } -static void -client_dict_try_send_transaction_query(struct client_dict_transaction_context *ctx, - const char *query) +static void dict_post_api_callback(struct client_dict *dict) { - const char *error; - - if (ctx->error != NULL) - return; - if (client_dict_send_transaction_query(ctx, query, &error) < 0) - ctx->error = i_strdup(error); + if (dict->prev_ioloop != NULL) { + current_ioloop = dict->ioloop; + /* stop client_dict_wait() */ + io_loop_stop(dict->ioloop); + } } -static struct client_dict_transaction_context * -client_dict_transaction_find(struct client_dict *dict, unsigned int id) +static bool +dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *line) { - struct client_dict_transaction_context *ctx; - - for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) { - if (ctx->id == id) - return ctx; - } - return NULL; + cmd->unfinished = FALSE; + cmd->callback(cmd, line, NULL); + return !cmd->unfinished; } static void -client_dict_finish_transaction(struct client_dict *dict, unsigned int id, - const struct dict_commit_result *result) +dict_cmd_callback_error(struct client_dict_cmd *cmd, const char *error) { - struct client_dict_transaction_context *ctx; - - ctx = client_dict_transaction_find(dict, id); - if (ctx == NULL) { - i_error("dict-client: Unknown transaction id %u", id); - return; - } - if (!ctx->committed) { - /* transaction isn't committed yet, but we disconnected from - dict. mark it as failed so commit will fail later. */ - if (result->ret >= 0) { - i_error("dict-client: Received transaction reply before it was committed"); - return; - } - if (ctx->error == NULL) - ctx->error = i_strdup(result->error); - return; - } + cmd->unfinished = FALSE; + if (cmd->callback != NULL) + cmd->callback(cmd, NULL, error); + i_assert(!cmd->unfinished); +} - /* the callback may call the dict code again, so remove this - transaction before calling it */ - i_assert(dict->async_commits > 0); - if (--dict->async_commits == 0) { - if (dict->io != NULL) - io_remove(&dict->io); - } - DLLIST_REMOVE(&dict->transactions, ctx); +static void client_dict_input_timeout(struct client_dict *dict) +{ + int diff = timeval_diff_msecs(&ioloop_timeval, &dict->last_input); - if (ctx->callback != NULL) - ctx->callback(result, ctx->context); - i_free(ctx); + client_dict_disconnect(dict, t_strdup_printf( + "Timeout: No input from dict for %u.%03u secs", + diff/1000, diff%1000)); } -static ssize_t client_dict_read_timeout(struct client_dict *dict) +static int +client_dict_cmd_query_send(struct client_dict *dict, const char *query) { - time_t now, timeout; - unsigned int diff; + struct const_iovec iov[2]; ssize_t ret; - now = time(NULL); - timeout = now + DICT_CLIENT_READ_TIMEOUT_SECS; - - do { - alarm(timeout - now); - ret = i_stream_read(dict->input); - alarm(0); - if (ret != 0) - break; - - /* interrupted most likely because of timeout, - but check anyway. */ - now = time(NULL); - } while (now < timeout); - - if (ret > 0) { - diff = time(NULL) - now; - if (diff >= DICT_CLIENT_READ_WARN_TIMEOUT_SECS) { - i_warning("read(%s): dict lookup took %u seconds", - dict->path, diff); - } - } - return ret; + iov[0].iov_base = query; + iov[0].iov_len = strlen(query); + iov[1].iov_base = "\n"; + iov[1].iov_len = 1; + ret = o_stream_sendv(dict->conn.conn.output, iov, 2); + if (ret < 0) + return -1; + i_assert((size_t)ret == iov[0].iov_len + 1); + return 0; } -static int -client_dict_read_one_line_real(struct client_dict *dict, char **line_r, - const char **error_r) +static bool +client_dict_cmd_send(struct client_dict *dict, struct client_dict_cmd **_cmd, + const char **error_r) { - unsigned int id; - char *line; - ssize_t ret; + struct client_dict_cmd *cmd = *_cmd; + const char *error = NULL; + bool retry = cmd->retry_errors; + int ret; - *line_r = NULL; - while ((line = i_stream_next_line(dict->input)) == NULL) { - ret = client_dict_read_timeout(dict); - switch (ret) { - case -1: - *error_r = t_strdup_printf("read(%s) failed: %s", - dict->path, i_stream_get_disconnect_reason(dict->input)); - return -1; - case -2: - *error_r = t_strdup_printf( - "read(%s) returned too much data", dict->path); - return -1; - case 0: - *error_r = t_strdup_printf( - "read(%s) failed: Timeout after %u seconds", - dict->path, DICT_CLIENT_READ_TIMEOUT_SECS); - return -1; - default: - i_assert(ret > 0); - break; + *_cmd = NULL; + + /* we're no longer idling. even with no_replies=TRUE we're going to + wait for COMMIT/ROLLBACK. */ + if (dict->to_idle != NULL) + timeout_remove(&dict->to_idle); + + if (client_dict_connect(dict, &error) < 0) { + retry = FALSE; + ret = -1; + } else { + ret = client_dict_cmd_query_send(dict, cmd->query); + if (ret < 0) { + error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name, + o_stream_get_error(dict->conn.conn.output)); } } - if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) { - struct dict_commit_result result; - - memset(&result, 0, sizeof(result)); - switch (line[1]) { - case DICT_PROTOCOL_REPLY_OK: - result.ret = 1; - break; - case DICT_PROTOCOL_REPLY_NOTFOUND: - result.ret = 0; - break; - case DICT_PROTOCOL_REPLY_FAIL: { - const char *error = strchr(line+2, '\t'); - - result.ret = -1; - result.error = t_strdup_printf( - "dict-server returned failure: %s", - error != NULL ? t_str_tabunescape(error) : ""); - break; - } - default: - *error_r = t_strdup_printf( - "dict-client: Invalid async commit line: %s", line); - return -1; - } - if (str_to_uint(t_strcut(line+2, '\t'), &id) < 0) { - *error_r = t_strdup_printf("dict-client: Invalid ID"); - return -1; + if (ret < 0 && retry) { + /* Reconnect and try again. */ + client_dict_disconnect(dict, error); + if (client_dict_connect(dict, &error) < 0) + ; + else if (client_dict_cmd_query_send(dict, cmd->query) < 0) { + error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name, + o_stream_get_error(dict->conn.conn.output)); + } else { + ret = 0; } - client_dict_finish_transaction(dict, id, &result); - return 0; } - if (dict->iter_replies_skip > 0) { - /* called aborted the iteration before finishing it. - skip over the iteration reply */ - if (*line == DICT_PROTOCOL_REPLY_OK) - return 0; - if (*line != '\0' && *line != DICT_PROTOCOL_REPLY_FAIL) { - *error_r = t_strdup_printf( - "dict-client: Invalid iteration reply line: %s", line); - return -1; + + if (cmd->no_replies) { + /* just send and forget */ + client_dict_cmd_unref(cmd); + return TRUE; + } else if (ret < 0) { + i_assert(error != NULL); + dict_cmd_callback_error(cmd, error); + client_dict_cmd_unref(cmd); + if (error_r != NULL) + *error_r = error; + return FALSE; + } else { + if (dict->to_requests == NULL) { + dict->to_requests = + timeout_add(DICT_CLIENT_REQUEST_TIMEOUT_MSECS, + client_dict_input_timeout, dict); } - dict->iter_replies_skip--; - return 0; + array_append(&dict->cmds, &cmd, 1); + return TRUE; } - *line_r = line; - return 1; } -static int client_dict_read_one_line(struct client_dict *dict, char **line_r, - const char **error_r) +static void +client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx) { - int ret; + struct client_dict *dict = (struct client_dict *)ctx->ctx.dict; + struct client_dict_cmd *cmd; + const char *query, *error; - if ((ret = client_dict_read_one_line_real(dict, line_r, error_r)) < 0) - client_dict_disconnect(dict, *error_r); - return ret; + i_assert(ctx->error == NULL); + + ctx->sent_begin = TRUE; + + /* transactions commands don't have replies. only COMMIT has. */ + query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_BEGIN, ctx->id); + cmd = client_dict_cmd_init(dict, query); + cmd->no_replies = TRUE; + cmd->retry_errors = TRUE; + if (!client_dict_cmd_send(dict, &cmd, &error)) + ctx->error = i_strdup(error); +} + +static void +client_dict_send_transaction_query(struct client_dict_transaction_context *ctx, + const char *query) +{ + struct client_dict *dict = (struct client_dict *)ctx->ctx.dict; + struct client_dict_cmd *cmd; + const char *error; + + if (ctx->error != NULL) + return; + + if (!ctx->sent_begin) + client_dict_transaction_send_begin(ctx); + + cmd = client_dict_cmd_init(dict, query); + cmd->no_replies = TRUE; + if (!client_dict_cmd_send(dict, &cmd, &error)) + ctx->error = i_strdup(error); } static bool client_dict_is_finished(struct client_dict *dict) { - return dict->transactions == NULL && !dict->in_iteration && - dict->async_commits == 0; + return dict->transactions == NULL && array_count(&dict->cmds) == 0; } static void client_dict_timeout(struct client_dict *dict) @@ -372,107 +318,153 @@ static void client_dict_add_timeout(struct client_dict *dict) } else if (client_dict_is_finished(dict)) { dict->to_idle = timeout_add(dict->idle_msecs, client_dict_timeout, dict); + if (dict->to_requests != NULL) + timeout_remove(&dict->to_requests); } } -static int client_dict_read_line(struct client_dict *dict, - char **line_r, const char **error_r) +static int dict_conn_input_line(struct connection *_conn, const char *line) { - int ret; + struct dict_connection *conn = (struct dict_connection *)_conn; + struct client_dict *dict = conn->dict; + struct client_dict_cmd *const *cmds; + unsigned int count; + bool finished; + int diff; - while ((ret = client_dict_read_one_line(dict, line_r, error_r)) == 0) - ; - i_assert(ret < 0 || *line_r != NULL); + dict->last_input = ioloop_timeval; + if (dict->to_requests != NULL) + timeout_reset(dict->to_requests); + + cmds = array_get(&conn->dict->cmds, &count); + if (count == 0) { + i_error("%s: Received reply without pending commands: %s", + dict->conn.conn.name, line); + return -1; + } + i_assert(!cmds[0]->no_replies); + + client_dict_cmd_ref(cmds[0]); + finished = dict_cmd_callback_line(cmds[0], line); + if (!client_dict_cmd_unref(cmds[0])) { + /* disconnected during command handling */ + return -1; + } + if (!finished) { + /* more lines needed for this command */ + return 1; + } + diff = timeval_diff_msecs(&ioloop_timeval, &cmds[0]->start_time); + if (diff >= DICT_CLIENT_REQUEST_WARN_TIMEOUT_MSECS) { + i_warning("read(%s): dict lookup took %u.%03u seconds: %s", + dict->conn.conn.name, diff/1000, diff % 1000, + cmds[0]->query); + } + client_dict_cmd_unref(cmds[0]); + array_delete(&dict->cmds, 0, 1); client_dict_add_timeout(dict); - return ret < 0 ? -1 : 0; + return 1; } static int client_dict_connect(struct client_dict *dict, const char **error_r) { const char *query; + if (dict->conn.conn.fd_in != -1) + return 0; if (dict->last_failed_connect == ioloop_time) { /* Try again later */ *error_r = "Waiting until the next connect attempt"; return -1; } - dict->fd = net_connect_unix(dict->path); - if (dict->fd == -1) { + if (connection_client_connect(&dict->conn.conn) < 0) { dict->last_failed_connect = ioloop_time; if (errno == EACCES) { *error_r = eacces_error_get("net_connect_unix", - dict->path); + dict->conn.conn.name); } else { *error_r = t_strdup_printf( - "net_connect_unix(%s) failed: %m", dict->path); + "net_connect_unix(%s) failed: %m", dict->conn.conn.name); } return -1; } - /* Dictionary lookups are blocking */ - net_set_nonblock(dict->fd, FALSE); - - dict->input = i_stream_create_fd(dict->fd, (size_t)-1); - dict->output = o_stream_create_fd(dict->fd, 4096); - query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n", DICT_PROTOCOL_CMD_HELLO, DICT_CLIENT_PROTOCOL_MAJOR_VERSION, DICT_CLIENT_PROTOCOL_MINOR_VERSION, dict->value_type, dict->username, dict->uri); - if (client_dict_send_query(dict, query, error_r) < 0) { - dict->last_failed_connect = ioloop_time; - client_dict_disconnect(dict, *error_r); - return -1; - } - - dict->handshaked = TRUE; + o_stream_nsend_str(dict->conn.conn.output, query); + client_dict_add_timeout(dict); return 0; } +static void +client_dict_abort_commands(struct client_dict *dict, const char *reason) +{ + ARRAY(struct client_dict_cmd *) cmds_copy; + struct client_dict_cmd *const *cmdp; + + /* abort all commands */ + t_array_init(&cmds_copy, array_count(&dict->cmds)); + array_append_array(&cmds_copy, &dict->cmds); + array_clear(&dict->cmds); + + array_foreach(&cmds_copy, cmdp) { + dict_cmd_callback_error(*cmdp, reason); + client_dict_cmd_unref(*cmdp); + } +} + static void client_dict_disconnect(struct client_dict *dict, const char *reason) { struct client_dict_transaction_context *ctx, *next; - struct dict_commit_result result = { -1, reason }; - dict->connect_counter++; - dict->handshaked = FALSE; - dict->iter_replies_skip = 0; + client_dict_abort_commands(dict, reason); - /* abort all pending async commits */ + /* all transactions that have sent BEGIN are no longer valid */ for (ctx = dict->transactions; ctx != NULL; ctx = next) { next = ctx->next; - if (ctx->async) - client_dict_finish_transaction(dict, ctx->id, &result); + if (ctx->sent_begin && ctx->error == NULL) + ctx->error = i_strdup(reason); } if (dict->to_idle != NULL) timeout_remove(&dict->to_idle); - if (dict->io != NULL) - io_remove(&dict->io); - if (dict->input != NULL) - i_stream_destroy(&dict->input); - if (dict->output != NULL) - o_stream_destroy(&dict->output); + if (dict->to_requests != NULL) + timeout_remove(&dict->to_requests); + connection_disconnect(&dict->conn.conn); +} - if (dict->fd != -1) { - if (close(dict->fd) < 0) - i_error("close(%s) failed: %m", dict->path); - dict->fd = -1; - } +static void dict_conn_destroy(struct connection *_conn) +{ + struct dict_connection *conn = (struct dict_connection *)_conn; + + client_dict_disconnect(conn->dict, connection_disconnect_reason(_conn)); } +static const struct connection_settings dict_conn_set = { + .input_max_size = (size_t)-1, + .output_max_size = (size_t)-1, + .client = TRUE +}; + +static const struct connection_vfuncs dict_conn_vfuncs = { + .destroy = dict_conn_destroy, + .input_line = dict_conn_input_line +}; + static int client_dict_init(struct dict *driver, const char *uri, const struct dict_settings *set, struct dict **dict_r, const char **error_r) { + struct ioloop *old_ioloop = current_ioloop; struct client_dict *dict; - const char *p, *dest_uri; + const char *p, *dest_uri, *path; unsigned int idle_msecs = DICT_CLIENT_DEFAULT_TIMEOUT_MSECS; - pool_t pool; /* uri = [idle_msecs=:] [] ":" */ if (strncmp(uri, "idle_msecs=", 11) == 0) { @@ -493,29 +485,36 @@ client_dict_init(struct dict *driver, const char *uri, return -1; } - pool = pool_alloconly_create("client dict", 1024); - dict = p_new(pool, struct client_dict, 1); - dict->pool = pool; + if (dict_connections == NULL) { + dict_connections = connection_list_init(&dict_conn_set, + &dict_conn_vfuncs); + } + + dict = i_new(struct client_dict, 1); dict->dict = *driver; + dict->conn.dict = dict; dict->value_type = set->value_type; - dict->username = p_strdup(pool, set->username); + dict->username = i_strdup(set->username); dict->idle_msecs = idle_msecs; - - dict->fd = -1; + i_array_init(&dict->cmds, 32); if (uri[0] == ':') { /* default path */ - dict->path = p_strconcat(pool, set->base_dir, - "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL); + path = t_strconcat(set->base_dir, + "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL); } else if (uri[0] == '/') { /* absolute path */ - dict->path = p_strdup_until(pool, uri, dest_uri); + path = t_strdup_until(uri, dest_uri); } else { /* relative path to base_dir */ - dict->path = p_strconcat(pool, set->base_dir, "/", - p_strdup_until(pool, uri, dest_uri), NULL); + path = t_strconcat(set->base_dir, "/", + t_strdup_until(uri, dest_uri), NULL); } - dict->uri = p_strdup(pool, dest_uri + 1); + connection_init_client_unix(dict_connections, &dict->conn.conn, path); + dict->uri = i_strdup(dest_uri + 1); + + dict->ioloop = io_loop_create(); + io_loop_set_current(old_ioloop); *dict_r = &dict->dict; return 0; } @@ -523,71 +522,233 @@ client_dict_init(struct dict *driver, const char *uri, static void client_dict_deinit(struct dict *_dict) { struct client_dict *dict = (struct client_dict *)_dict; + struct ioloop *old_ioloop = current_ioloop; + + client_dict_disconnect(dict, "Deinit"); + connection_deinit(&dict->conn.conn); - client_dict_disconnect(dict, "Deinit"); i_assert(dict->transactions == NULL); - pool_unref(&dict->pool); + i_assert(array_count(&dict->cmds) == 0); + + io_loop_set_current(dict->ioloop); + io_loop_destroy(&dict->ioloop); + io_loop_set_current(old_ioloop); + + array_free(&dict->cmds); + i_free(dict->username); + i_free(dict->uri); + i_free(dict); + + if (dict_connections->connections == NULL) + connection_list_deinit(&dict_connections); } static void client_dict_wait(struct dict *_dict) { struct client_dict *dict = (struct client_dict *)_dict; - const char *error; - char *line; - int ret; - while (dict->async_commits > 0) { - if ((ret = client_dict_read_one_line(dict, &line, &error)) < 0) { - i_error("%s", error); - break; - } + if (array_count(&dict->cmds) == 0) + return; - if (ret > 0) { - const char *reason = t_strdup_printf( - "dict-client: Unexpected reply waiting waiting for async commits: %s", line); - i_error("%s", reason); - client_dict_disconnect(dict, reason); - break; - } + dict->prev_ioloop = current_ioloop; + io_loop_set_current(dict->ioloop); + + if (dict->to_idle != NULL) + dict->to_idle = io_loop_move_timeout(&dict->to_idle); + if (dict->to_requests != NULL) + dict->to_requests = io_loop_move_timeout(&dict->to_requests); + connection_switch_ioloop(&dict->conn.conn); + + while (array_count(&dict->cmds) > 0) + io_loop_run(dict->ioloop); + + io_loop_set_current(dict->prev_ioloop); + dict->prev_ioloop = NULL; + + if (dict->to_idle != NULL) + dict->to_idle = io_loop_move_timeout(&dict->to_idle); + if (dict->to_requests != NULL) + dict->to_requests = io_loop_move_timeout(&dict->to_requests); + connection_switch_ioloop(&dict->conn.conn); +} + +static void +client_dict_lookup_async_callback(struct client_dict_cmd *cmd, const char *line, + const char *error) +{ + struct client_dict *dict = cmd->dict; + struct dict_lookup_result result; + + memset(&result, 0, sizeof(result)); + if (error != NULL) { + result.ret = -1; + result.error = error; + } else switch (*line) { + case DICT_PROTOCOL_REPLY_OK: + result.value = t_str_tabunescape(line + 1); + result.ret = 1; + break; + case DICT_PROTOCOL_REPLY_NOTFOUND: + result.ret = 0; + break; + case DICT_PROTOCOL_REPLY_FAIL: + result.error = line[1] == '\0' ? "dict-server returned failure" : + t_strdup_printf("dict-server returned failure: %s", + t_str_tabunescape(line+1)); + result.ret = -1; + break; + default: + result.error = t_strdup_printf( + "dict-client: Invalid lookup '%s' reply: %s", + cmd->query, line); + client_dict_disconnect(dict, result.error); + result.ret = -1; + break; } - /* we should have aborted all the async calls if we disconnected */ - i_assert(dict->async_commits == 0); + dict_pre_api_callback(dict); + cmd->api_callback.lookup(&result, cmd->api_callback.context); + dict_post_api_callback(dict); } -static int client_dict_lookup(struct dict *_dict, pool_t pool, const char *key, - const char **value_r, const char **error_r) +static void +client_dict_lookup_async(struct dict *_dict, const char *key, + dict_lookup_callback_t *callback, void *context) { struct client_dict *dict = (struct client_dict *)_dict; + struct client_dict_cmd *cmd; const char *query; - char *line; - query = t_strdup_printf("%c%s\n", DICT_PROTOCOL_CMD_LOOKUP, + query = t_strdup_printf("%c%s", DICT_PROTOCOL_CMD_LOOKUP, str_tabescape(key)); - if (client_dict_send_query(dict, query, error_r) < 0) - return -1; + cmd = client_dict_cmd_init(dict, query); + cmd->callback = client_dict_lookup_async_callback; + cmd->api_callback.lookup = callback; + cmd->api_callback.context = context; + cmd->retry_errors = TRUE; - /* read reply */ - if (client_dict_read_line(dict, &line, error_r) < 0) - return -1; + client_dict_cmd_send(dict, &cmd, NULL); +} - switch (*line) { - case DICT_PROTOCOL_REPLY_OK: - *value_r = p_strdup(pool, t_str_tabunescape(line + 1)); - return 1; - case DICT_PROTOCOL_REPLY_NOTFOUND: +static void client_dict_lookup_callback(const struct dict_lookup_result *result, + void *context) +{ + struct dict_lookup_result *result_copy = context; + + *result_copy = *result; +} + +static int client_dict_lookup(struct dict *_dict, pool_t pool, const char *key, + const char **value_r, const char **error_r) +{ + struct dict_lookup_result result; + + memset(&result, 0, sizeof(result)); + result.ret = -2; + + client_dict_lookup_async(_dict, key, client_dict_lookup_callback, &result); + if (result.ret == -2) + client_dict_wait(_dict); + + switch (result.ret) { + case -1: + *error_r = result.error; + return -1; + case 0: *value_r = NULL; return 0; + case 1: + *value_r = p_strdup(pool, result.value); + return 1; + } + i_unreached(); +} + +static void client_dict_iterate_free(struct client_dict_iterate_context *ctx) +{ + if (!ctx->deinit || !ctx->finished) + return; + i_free(ctx->error); + i_free(ctx); +} + +static void +client_dict_iter_api_callback(struct client_dict_iterate_context *ctx, + struct client_dict *dict) +{ + if (ctx->deinit) { + /* iterator was already deinitialized */ + return; + } + if (ctx->ctx.async_callback != NULL) { + dict_pre_api_callback(dict); + ctx->ctx.async_callback(ctx->ctx.async_context); + dict_post_api_callback(dict); + } else { + /* synchronous lookup */ + io_loop_stop(dict->ioloop); + } +} + +static void +client_dict_iter_async_callback(struct client_dict_cmd *cmd, const char *line, + const char *error) +{ + struct client_dict_iterate_context *ctx = cmd->iter; + struct client_dict *dict = cmd->dict; + struct client_dict_iter_result *result; + const char *key = NULL, *value = NULL; + + if (error != NULL) { + /* failed */ + } else switch (*line) { + case '\0': + /* end of iteration */ + ctx->finished = TRUE; + client_dict_iter_api_callback(ctx, dict); + client_dict_iterate_free(ctx); + return; + case DICT_PROTOCOL_REPLY_OK: + /* key \t value */ + key = line+1; + value = strchr(key, '\t'); + break; case DICT_PROTOCOL_REPLY_FAIL: - *error_r = line[1] == '\0' ? "dict-server returned failure" : - t_strdup_printf("dict-server returned failure: %s", - t_str_tabunescape(line+1)); - return -1; + error = t_strdup_printf("dict-server returned failure: %s", line+1); + break; default: - *error_r = t_strdup_printf( - "dict-client: Invalid lookup '%s' reply: %s", key, line); - client_dict_disconnect(dict, *error_r); - return -1; + break; + } + if (value == NULL && error == NULL) { + /* broken protocol */ + error = t_strdup_printf("dict client (%s) sent broken iterate reply: %s", + dict->conn.conn.name, line); + client_dict_disconnect(dict, error); + } + + if (error != NULL) { + if (ctx->error == NULL) + ctx->error = i_strdup(error); + ctx->finished = TRUE; + if (dict->prev_ioloop != NULL) { + /* stop client_dict_wait() */ + io_loop_stop(dict->ioloop); + } + client_dict_iterate_free(ctx); + return; + } + cmd->unfinished = TRUE; + + if (ctx->deinit) { + /* iterator was already deinitialized */ + return; } + + key = t_strdup_until(key, value++); + result = array_append_space(&ctx->results); + result->key = p_strdup(ctx->results_pool, t_str_tabunescape(key)); + result->value = p_strdup(ctx->results_pool, t_str_tabunescape(value)); + + client_dict_iter_api_callback(ctx, dict); } static struct dict_iterate_context * @@ -596,26 +757,28 @@ client_dict_iterate_init(struct dict *_dict, const char *const *paths, { struct client_dict *dict = (struct client_dict *)_dict; struct client_dict_iterate_context *ctx; + struct client_dict_cmd *cmd; string_t *query = t_str_new(256); unsigned int i; - const char *error; - - if (dict->in_iteration) - i_panic("dict-client: Only one iteration supported"); - dict->in_iteration = TRUE; ctx = i_new(struct client_dict_iterate_context, 1); ctx->ctx.dict = _dict; - ctx->pool = pool_alloconly_create("client dict iteration", 512); + ctx->results_pool = pool_alloconly_create("client dict iteration", 512); + ctx->async = (flags & DICT_ITERATE_FLAG_ASYNC) != 0; + i_array_init(&ctx->results, 64); str_printfa(query, "%c%d", DICT_PROTOCOL_CMD_ITERATE, flags); for (i = 0; paths[i] != NULL; i++) { str_append_c(query, '\t'); str_append(query, str_tabescape(paths[i])); } - str_append_c(query, '\n'); - if (client_dict_send_query(dict, str_c(query), &error) < 0) - ctx->error = i_strdup(error); + + cmd = client_dict_cmd_init(dict, str_c(query)); + cmd->iter = ctx; + cmd->callback = client_dict_iter_async_callback; + cmd->retry_errors = TRUE; + + client_dict_cmd_send(dict, &cmd, NULL); return &ctx->ctx; } @@ -624,51 +787,32 @@ static bool client_dict_iterate(struct dict_iterate_context *_ctx, { struct client_dict_iterate_context *ctx = (struct client_dict_iterate_context *)_ctx; - struct client_dict *dict = (struct client_dict *)_ctx->dict; - char *line, *key, *value; - const char *error; - - if (ctx->error != NULL) - return FALSE; + const struct client_dict_iter_result *results; + unsigned int count; - /* read next reply */ - if (client_dict_read_line(dict, &line, &error) < 0) { - ctx->error = i_strdup(error); + if (ctx->error != NULL) { + ctx->ctx.has_more = FALSE; return FALSE; } - if (*line == '\0') { - /* end of iteration */ - ctx->finished = TRUE; - return FALSE; + results = array_get(&ctx->results, &count); + if (ctx->result_idx < count) { + *key_r = results[ctx->result_idx].key; + *value_r = results[ctx->result_idx].value; + ctx->ctx.has_more = TRUE; + ctx->result_idx++; + return TRUE; } + ctx->ctx.has_more = !ctx->finished; + ctx->result_idx = 0; + array_clear(&ctx->results); + p_clear(ctx->results_pool); - /* line contains key \t value */ - p_clear(ctx->pool); - - switch (*line) { - case DICT_PROTOCOL_REPLY_OK: - key = line+1; - value = strchr(key, '\t'); - break; - case DICT_PROTOCOL_REPLY_FAIL: - ctx->error = i_strdup_printf("dict-server returned failure: %s", line+1); - return FALSE; - default: - key = NULL; - value = NULL; - break; + if (!ctx->async && ctx->ctx.has_more) { + client_dict_wait(_ctx->dict); + return client_dict_iterate(_ctx, key_r, value_r); } - if (value == NULL) { - /* broken protocol */ - ctx->error = i_strdup_printf("dict client (%s) sent broken iterate reply: %s", dict->path, line); - return FALSE; - } - *value++ = '\0'; - - *key_r = p_strdup(ctx->pool, t_str_tabunescape(key)); - *value_r = p_strdup(ctx->pool, t_str_tabunescape(value)); - return TRUE; + return FALSE; } static int client_dict_iterate_deinit(struct dict_iterate_context *_ctx, @@ -679,14 +823,12 @@ static int client_dict_iterate_deinit(struct dict_iterate_context *_ctx, (struct client_dict_iterate_context *)_ctx; int ret = ctx->error != NULL ? -1 : 0; - if (!ctx->finished) - dict->iter_replies_skip++; + ctx->deinit = TRUE; *error_r = t_strdup(ctx->error); - pool_unref(&ctx->pool); - i_free(ctx->error); - i_free(ctx); - dict->in_iteration = FALSE; + array_free(&ctx->results); + pool_unref(&ctx->results_pool); + client_dict_iterate_free(ctx); client_dict_add_timeout(dict); return ret; @@ -706,27 +848,42 @@ client_dict_transaction_init(struct dict *_dict) return &ctx->ctx; } -static void dict_async_input(struct client_dict *dict) +static void +client_dict_transaction_commit_callback(struct client_dict_cmd *cmd, + const char *line, const char *error) { - const char *error; - char *line; - int ret; - - i_assert(!dict->in_iteration); - - do { - ret = client_dict_read_one_line(dict, &line, &error); - } while (ret == 0 && i_stream_get_data_size(dict->input) > 0); + struct client_dict *dict = cmd->dict; + struct dict_commit_result result = { + .ret = -1, .error = NULL + }; + + if (error != NULL) { + /* failed */ + result.error = error; + } else switch (*line) { + case DICT_PROTOCOL_REPLY_OK: + result.ret = 1; + break; + case DICT_PROTOCOL_REPLY_NOTFOUND: + result.ret = 0; + break; + case DICT_PROTOCOL_REPLY_FAIL: { + const char *error = strchr(line+1, '\t'); - if (ret < 0) { - i_error("%s", error); - io_remove(&dict->io); - } else if (ret > 0) { - const char *reason = t_strdup_printf( - "dict-client: Unexpected reply waiting waiting for async commits: %s", line); - i_error("%s", reason); - client_dict_disconnect(dict, reason); + result.error = t_strdup_printf("dict-server returned failure: %s", + error != NULL ? t_str_tabunescape(error) : ""); + break; } + default: + result.ret = -1; + result.error = t_strdup_printf( + "dict-client: Invalid commit reply: %s", line); + client_dict_disconnect(dict, result.error); + break; + } + dict_pre_api_callback(dict); + cmd->api_callback.commit(&result, cmd->api_callback.context); + dict_post_api_callback(dict); } static void @@ -738,74 +895,35 @@ client_dict_transaction_commit(struct dict_transaction_context *_ctx, struct client_dict_transaction_context *ctx = (struct client_dict_transaction_context *)_ctx; struct client_dict *dict = (struct client_dict *)_ctx->dict; - unsigned int id; - struct dict_commit_result result; + struct client_dict_cmd *cmd; + const char *query; - memset(&result, 0, sizeof(result)); - result.ret = ctx->error != NULL ? -1 : 1; - result.error = t_strdup(ctx->error); + DLLIST_REMOVE(&dict->transactions, ctx); - ctx->committed = TRUE; if (ctx->sent_begin && ctx->error == NULL) { - const char *query; - char *line; - - query = t_strdup_printf("%c%u\n", !async ? - DICT_PROTOCOL_CMD_COMMIT : - DICT_PROTOCOL_CMD_COMMIT_ASYNC, - ctx->id); - if (client_dict_send_transaction_query(ctx, query, &result.error) < 0) - result.ret = -1; - else if (async) { - ctx->callback = callback; - ctx->context = context; - ctx->async = TRUE; - if (dict->async_commits++ == 0) { - dict->io = io_add(dict->fd, IO_READ, - dict_async_input, dict); - } - return; - } else { - /* sync commit, read reply */ - if (client_dict_read_line(dict, &line, &result.error) < 0) { - result.ret = -1; - } else switch (*line) { - case DICT_PROTOCOL_REPLY_OK: - result.ret = 1; - break; - case DICT_PROTOCOL_REPLY_NOTFOUND: - result.ret = 0; - break; - case DICT_PROTOCOL_REPLY_FAIL: { - const char *error = strchr(line+1, '\t'); - - result.ret = -1; - result.error = t_strdup_printf( - "dict-server returned failure: %s", - error != NULL ? t_str_tabunescape(error) : ""); - break; - } - default: - result.ret = -1; - result.error = t_strdup_printf( - "dict-client: Invalid commit reply: %s", line); - client_dict_disconnect(dict, result.error); - line = NULL; - break; - } - if (line != NULL && - (str_to_uint(t_strcut(line+1, '\t'), &id) < 0 || ctx->id != id)) { - result.ret = -1; - result.error = t_strdup_printf( - "dict-client: Invalid commit reply, " - "expected id=%u: %s", ctx->id, line); - client_dict_disconnect(dict, result.error); - } + query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_COMMIT, ctx->id); + cmd = client_dict_cmd_init(dict, query); + cmd->callback = client_dict_transaction_commit_callback; + cmd->api_callback.commit = callback; + cmd->api_callback.context = context; + if (client_dict_cmd_send(dict, &cmd, NULL)) { + if (!async) + client_dict_wait(_ctx->dict); } + } else if (ctx->error != NULL) { + /* already failed */ + struct dict_commit_result result = { + .ret = -1, .error = ctx->error + }; + callback(&result, context); + } else { + /* nothing changed */ + struct dict_commit_result result = { + .ret = 1, .error = NULL + }; + callback(&result, context); } - DLLIST_REMOVE(&dict->transactions, ctx); - callback(&result, context); i_free(ctx->error); i_free(ctx); @@ -822,9 +940,9 @@ client_dict_transaction_rollback(struct dict_transaction_context *_ctx) if (ctx->sent_begin) { const char *query; - query = t_strdup_printf("%c%u\n", DICT_PROTOCOL_CMD_ROLLBACK, + query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_ROLLBACK, ctx->id); - client_dict_try_send_transaction_query(ctx, query); + client_dict_send_transaction_query(ctx, query); } DLLIST_REMOVE(&dict->transactions, ctx); @@ -840,11 +958,11 @@ static void client_dict_set(struct dict_transaction_context *_ctx, (struct client_dict_transaction_context *)_ctx; const char *query; - query = t_strdup_printf("%c%u\t%s\t%s\n", + query = t_strdup_printf("%c%u\t%s\t%s", DICT_PROTOCOL_CMD_SET, ctx->id, str_tabescape(key), str_tabescape(value)); - client_dict_try_send_transaction_query(ctx, query); + client_dict_send_transaction_query(ctx, query); } static void client_dict_unset(struct dict_transaction_context *_ctx, @@ -854,10 +972,10 @@ static void client_dict_unset(struct dict_transaction_context *_ctx, (struct client_dict_transaction_context *)_ctx; const char *query; - query = t_strdup_printf("%c%u\t%s\n", + query = t_strdup_printf("%c%u\t%s", DICT_PROTOCOL_CMD_UNSET, ctx->id, str_tabescape(key)); - client_dict_try_send_transaction_query(ctx, query); + client_dict_send_transaction_query(ctx, query); } static void client_dict_atomic_inc(struct dict_transaction_context *_ctx, @@ -867,10 +985,10 @@ static void client_dict_atomic_inc(struct dict_transaction_context *_ctx, (struct client_dict_transaction_context *)_ctx; const char *query; - query = t_strdup_printf("%c%u\t%s\t%lld\n", + query = t_strdup_printf("%c%u\t%s\t%lld", DICT_PROTOCOL_CMD_ATOMIC_INC, ctx->id, str_tabescape(key), diff); - client_dict_try_send_transaction_query(ctx, query); + client_dict_send_transaction_query(ctx, query); } struct dict dict_driver_client = { @@ -890,6 +1008,6 @@ struct dict dict_driver_client = { client_dict_set, client_dict_unset, client_dict_atomic_inc, - NULL + client_dict_lookup_async } };