From: Timo Sirainen Date: Wed, 1 Jun 2016 21:57:17 +0000 (+0300) Subject: lib-dict: dict-client rewrite to support async operations X-Git-Tag: 2.2.25.rc1~130 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=d05c72b4547763ed0a7f859e6690d12aa2ec1ba8;p=thirdparty%2Fdovecot%2Fcore.git lib-dict: dict-client rewrite to support async operations --- diff --git a/src/lib-dict/dict-client.c b/src/lib-dict/dict-client.c index b4ceb038f2..e1716b8f9c 100644 --- a/src/lib-dict/dict-client.c +++ b/src/lib-dict/dict-client.c @@ -1,10 +1,12 @@ /* Copyright (c) 2005-2016 Dovecot authors, see the included COPYING file */ #include "lib.h" +#include "array.h" #include "llist.h" #include "str.h" -#include "net.h" -#include "istream.h" +#include "strescape.h" +#include "time-util.h" +#include "connection.h" #include "ostream.h" #include "eacces-error.h" #include "dict-private.h" @@ -21,390 +23,291 @@ #define DICT_CLIENT_DEFAULT_TIMEOUT_MSECS 0 /* Abort dict lookup after this many seconds. */ -#define DICT_CLIENT_READ_TIMEOUT_SECS 30 -/* Log a warning if dict lookup takes longer than this many seconds. */ -#define DICT_CLIENT_READ_WARN_TIMEOUT_SECS 5 +#define DICT_CLIENT_REQUEST_TIMEOUT_MSECS 30000 +/* Log a warning if dict lookup takes longer than this many milliseconds. */ +#define DICT_CLIENT_REQUEST_WARN_TIMEOUT_MSECS 5000 + +struct client_dict_cmd { + int refcount; + struct client_dict *dict; + struct timeval start_time; + char *query; + + bool retry_errors; + bool no_replies; + bool unfinished; + + void (*callback)(struct client_dict_cmd *cmd, + const char *line, const char *error); + struct client_dict_iterate_context *iter; + + struct { + dict_lookup_callback_t *lookup; + dict_transaction_commit_callback_t *commit; + void *context; + } api_callback; +}; + +struct dict_connection { + struct connection conn; + struct client_dict *dict; +}; struct client_dict { struct dict dict; + struct dict_connection conn; - pool_t pool; - int fd; - const char *uri; - const char *username; - const char *path; + char *uri, *username; enum dict_data_type value_type; time_t last_failed_connect; - struct istream *input; - struct ostream *output; - struct io *io; + struct ioloop *ioloop, *prev_ioloop; + struct timeout *to_requests; struct timeout *to_idle; unsigned int idle_msecs; + struct timeval last_input; + ARRAY(struct client_dict_cmd *) cmds; struct client_dict_transaction_context *transactions; - unsigned int connect_counter; unsigned int transaction_id_counter; - unsigned int async_commits; - unsigned int iter_replies_skip; +}; - unsigned int in_iteration:1; - unsigned int handshaked:1; +struct client_dict_iter_result { + const char *key, *value; }; struct client_dict_iterate_context { struct dict_iterate_context ctx; + char *error; + + pool_t results_pool; + ARRAY(struct client_dict_iter_result) results; + unsigned int result_idx; - pool_t pool; - bool failed; + bool async; bool finished; + bool deinit; }; struct client_dict_transaction_context { struct dict_transaction_context ctx; struct client_dict_transaction_context *prev, *next; - /* for async commits */ - dict_transaction_commit_callback_t *callback; - void *context; + char *error; unsigned int id; - unsigned int connect_counter; - unsigned int failed:1; - unsigned int sent_begin:1; - unsigned int async:1; - unsigned int committed:1; + bool sent_begin:1; }; -static int client_dict_connect(struct client_dict *dict); -static void client_dict_disconnect(struct client_dict *dict); - -const char *dict_client_escape(const char *src) -{ - const char *p; - string_t *dest; - - /* first do a quick lookup to see if there's anything to escape. - probably not. */ - for (p = src; *p != '\0'; p++) { - if (*p == '\t' || *p == '\n' || *p == '\001') - break; - } - - if (*p == '\0') - return src; - - dest = t_str_new(256); - str_append_n(dest, src, p - src); - - for (; *p != '\0'; p++) { - switch (*p) { - case '\t': - str_append_c(dest, '\001'); - str_append_c(dest, 't'); - break; - case '\n': - str_append_c(dest, '\001'); - str_append_c(dest, 'n'); - break; - case '\001': - str_append_c(dest, '\001'); - str_append_c(dest, '1'); - break; - default: - str_append_c(dest, *p); - break; - } - } - return str_c(dest); -} - -const char *dict_client_unescape(const char *src) -{ - const char *p; - string_t *dest; - - /* first do a quick lookup to see if there's anything to unescape. - probably not. */ - for (p = src; *p != '\0'; p++) { - if (*p == '\001') - break; - } - - if (*p == '\0') - return src; - - dest = t_str_new(256); - str_append_n(dest, src, p - src); - for (; *p != '\0'; p++) { - if (*p != '\001') - str_append_c(dest, *p); - else if (p[1] != '\0') { - p++; - switch (*p) { - case '1': - str_append_c(dest, '\001'); - break; - case 't': - str_append_c(dest, '\t'); - break; - case 'n': - str_append_c(dest, '\n'); - break; - } - } - } - return str_c(dest); -} - -static int client_dict_send_query(struct client_dict *dict, const char *query) -{ - if (dict->output == NULL) { - /* not connected currently */ - if (client_dict_connect(dict) < 0) - return -1; - } +static struct connection_list *dict_connections; - if (o_stream_send_str(dict->output, query) < 0 || - o_stream_flush(dict->output) < 0) { - /* Send failed */ - if (!dict->handshaked) { - /* we're trying to send hello, don't try to reconnect */ - return -1; - } +static int client_dict_connect(struct client_dict *dict, const char **error_r); +static void client_dict_disconnect(struct client_dict *dict, const char *reason); - /* Reconnect and try again. */ - client_dict_disconnect(dict); - if (client_dict_connect(dict) < 0) - return -1; - - if (o_stream_send_str(dict->output, query) < 0 || - o_stream_flush(dict->output) < 0) { - i_error("write(%s) failed: %m", dict->path); - client_dict_disconnect(dict); - return -1; - } - } - return 0; +static struct client_dict_cmd * +client_dict_cmd_init(struct client_dict *dict, const char *query) +{ + struct client_dict_cmd *cmd; + + cmd = i_new(struct client_dict_cmd, 1); + cmd->refcount = 1; + cmd->dict = dict; + cmd->query = i_strdup(query); + cmd->start_time = ioloop_timeval; + return cmd; } -static int -client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx) +static void client_dict_cmd_ref(struct client_dict_cmd *cmd) { - struct client_dict *dict = (struct client_dict *)ctx->ctx.dict; - - if (ctx->failed) - return -1; - - T_BEGIN { - const char *query; - - query = t_strdup_printf("%c%u\n", DICT_PROTOCOL_CMD_BEGIN, - ctx->id); - if (client_dict_send_query(dict, query) < 0) - ctx->failed = TRUE; - else - ctx->connect_counter = dict->connect_counter; - } T_END; - - return ctx->failed ? -1 : 0; + i_assert(cmd->refcount > 0); + cmd->refcount++; } -static int ATTR_NOWARN_UNUSED_RESULT -client_dict_send_transaction_query(struct client_dict_transaction_context *ctx, - const char *query) +static bool client_dict_cmd_unref(struct client_dict_cmd *cmd) { - struct client_dict *dict = (struct client_dict *)ctx->ctx.dict; + i_assert(cmd->refcount > 0); + if (--cmd->refcount > 0) + return TRUE; - if (!ctx->sent_begin) { - if (client_dict_transaction_send_begin(ctx) < 0) - return -1; - ctx->sent_begin = TRUE; - } - - if (ctx->connect_counter != dict->connect_counter || ctx->failed) - return -1; + i_free(cmd->query); + i_free(cmd); + return FALSE; +} - if (dict->output == NULL) { - /* not connected, this'll fail */ - return -1; +static void dict_pre_api_callback(struct client_dict *dict) +{ + if (dict->prev_ioloop != NULL) { + /* Don't let callback see that we've created our + internal ioloop in case it wants to add some ios + or timeouts. */ + current_ioloop = dict->prev_ioloop; } +} - if (o_stream_send_str(dict->output, query) < 0 || - o_stream_flush(dict->output) < 0) { - /* Send failed. Our transactions have died, so don't even try - to re-send the command */ - ctx->failed = TRUE; - client_dict_disconnect(dict); - return -1; +static void dict_post_api_callback(struct client_dict *dict) +{ + if (dict->prev_ioloop != NULL) { + current_ioloop = dict->ioloop; + /* stop client_dict_wait() */ + io_loop_stop(dict->ioloop); } - return 0; } -static struct client_dict_transaction_context * -client_dict_transaction_find(struct client_dict *dict, unsigned int id) +static bool +dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *line) { - struct client_dict_transaction_context *ctx; - - for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) { - if (ctx->id == id) - return ctx; - } - return NULL; + cmd->unfinished = FALSE; + cmd->callback(cmd, line, NULL); + return !cmd->unfinished; } static void -client_dict_finish_transaction(struct client_dict *dict, - unsigned int id, int ret) +dict_cmd_callback_error(struct client_dict_cmd *cmd, const char *error) { - struct client_dict_transaction_context *ctx; - - ctx = client_dict_transaction_find(dict, id); - if (ctx == NULL) { - i_error("dict-client: Unknown transaction id %u", id); - return; - } - ctx->failed = TRUE; - if (!ctx->committed) - return; + cmd->unfinished = FALSE; + if (cmd->callback != NULL) + cmd->callback(cmd, NULL, error); + i_assert(!cmd->unfinished); +} - /* the callback may call the dict code again, so remove this - transaction before calling it */ - i_assert(dict->async_commits > 0); - if (--dict->async_commits == 0) { - if (dict->io != NULL) - io_remove(&dict->io); - } - DLLIST_REMOVE(&dict->transactions, ctx); +static void client_dict_input_timeout(struct client_dict *dict) +{ + int diff = timeval_diff_msecs(&ioloop_timeval, &dict->last_input); - if (ctx->callback != NULL) - ctx->callback(ret, ctx->context); - i_free(ctx); + client_dict_disconnect(dict, t_strdup_printf( + "Timeout: No input from dict for %u.%03u secs", + diff/1000, diff%1000)); } -static ssize_t client_dict_read_timeout(struct client_dict *dict) +static int +client_dict_cmd_query_send(struct client_dict *dict, const char *query) { - time_t now, timeout; - unsigned int diff; + struct const_iovec iov[2]; ssize_t ret; - now = time(NULL); - timeout = now + DICT_CLIENT_READ_TIMEOUT_SECS; - - do { - alarm(timeout - now); - ret = i_stream_read(dict->input); - alarm(0); - if (ret != 0) - break; - - /* interrupted most likely because of timeout, - but check anyway. */ - now = time(NULL); - } while (now < timeout); - - if (ret > 0) { - diff = time(NULL) - now; - if (diff >= DICT_CLIENT_READ_WARN_TIMEOUT_SECS) { - i_warning("read(%s): dict lookup took %u seconds", - dict->path, diff); - } - } - return ret; + iov[0].iov_base = query; + iov[0].iov_len = strlen(query); + iov[1].iov_base = "\n"; + iov[1].iov_len = 1; + ret = o_stream_sendv(dict->conn.conn.output, iov, 2); + if (ret < 0) + return -1; + i_assert((size_t)ret == iov[0].iov_len + 1); + return 0; } -static int -client_dict_read_one_line_real(struct client_dict *dict, char **line_r) +static bool +client_dict_cmd_send(struct client_dict *dict, struct client_dict_cmd **_cmd, + const char **error_r) { - unsigned int id; - char *line; - ssize_t ret; + struct client_dict_cmd *cmd = *_cmd; + const char *error = NULL; + bool retry = cmd->retry_errors; + int ret; - *line_r = NULL; - while ((line = i_stream_next_line(dict->input)) == NULL) { - ret = client_dict_read_timeout(dict); - switch (ret) { - case -1: - if (dict->input->stream_errno != 0) - i_error("read(%s) failed: %m", dict->path); - else { - i_error("read(%s) failed: Remote disconnected", - dict->path); - } - return -1; - case -2: - i_error("read(%s) returned too much data", dict->path); - return -1; - case 0: - i_error("read(%s) failed: Timeout after %u seconds", - dict->path, DICT_CLIENT_READ_TIMEOUT_SECS); - return -1; - default: - i_assert(ret > 0); - break; + *_cmd = NULL; + + /* we're no longer idling. even with no_replies=TRUE we're going to + wait for COMMIT/ROLLBACK. */ + if (dict->to_idle != NULL) + timeout_remove(&dict->to_idle); + + if (client_dict_connect(dict, &error) < 0) { + retry = FALSE; + ret = -1; + } else { + ret = client_dict_cmd_query_send(dict, cmd->query); + if (ret < 0) { + error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name, + o_stream_get_error(dict->conn.conn.output)); } } - if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) { - switch (line[1]) { - case DICT_PROTOCOL_REPLY_OK: - ret = 1; - break; - case DICT_PROTOCOL_REPLY_NOTFOUND: + if (ret < 0 && retry) { + /* Reconnect and try again. */ + client_dict_disconnect(dict, error); + if (client_dict_connect(dict, &error) < 0) + ; + else if (client_dict_cmd_query_send(dict, cmd->query) < 0) { + error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name, + o_stream_get_error(dict->conn.conn.output)); + } else { ret = 0; - break; - case DICT_PROTOCOL_REPLY_FAIL: - ret = -1; - break; - default: - i_error("dict-client: Invalid async commit line: %s", - line); - return -1; - } - if (str_to_uint(line+2, &id) < 0) { - i_error("dict-client: Invalid ID"); - return -1; } - client_dict_finish_transaction(dict, id, ret); - return 0; } - if (dict->iter_replies_skip > 0) { - /* called aborted the iteration before finishing it. - skip over the iteration reply */ - if (*line == DICT_PROTOCOL_REPLY_OK) - return 0; - if (*line != '\0' && *line != DICT_PROTOCOL_REPLY_FAIL) { - i_error("dict-client: Invalid iteration reply line: %s", - line); - return -1; + + if (cmd->no_replies) { + /* just send and forget */ + client_dict_cmd_unref(cmd); + return TRUE; + } else if (ret < 0) { + i_assert(error != NULL); + dict_cmd_callback_error(cmd, error); + client_dict_cmd_unref(cmd); + if (error_r != NULL) + *error_r = error; + return FALSE; + } else { + if (dict->to_requests == NULL) { + dict->to_requests = + timeout_add(DICT_CLIENT_REQUEST_TIMEOUT_MSECS, + client_dict_input_timeout, dict); } - dict->iter_replies_skip--; - return 0; + array_append(&dict->cmds, &cmd, 1); + return TRUE; } - *line_r = line; - return 1; } -static int client_dict_read_one_line(struct client_dict *dict, char **line_r) +static void +client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx) { - int ret; + struct client_dict *dict = (struct client_dict *)ctx->ctx.dict; + struct client_dict_cmd *cmd; + const char *query, *error; - if ((ret = client_dict_read_one_line_real(dict, line_r)) < 0) - client_dict_disconnect(dict); - return ret; + i_assert(ctx->error == NULL); + + ctx->sent_begin = TRUE; + + /* transactions commands don't have replies. only COMMIT has. */ + query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_BEGIN, ctx->id); + cmd = client_dict_cmd_init(dict, query); + cmd->no_replies = TRUE; + cmd->retry_errors = TRUE; + if (!client_dict_cmd_send(dict, &cmd, &error)) + ctx->error = i_strdup(error); +} + +static void +client_dict_send_transaction_query(struct client_dict_transaction_context *ctx, + const char *query) +{ + struct client_dict *dict = (struct client_dict *)ctx->ctx.dict; + struct client_dict_cmd *cmd; + const char *error; + + if (ctx->error != NULL) + return; + + if (!ctx->sent_begin) + client_dict_transaction_send_begin(ctx); + + cmd = client_dict_cmd_init(dict, query); + cmd->no_replies = TRUE; + if (!client_dict_cmd_send(dict, &cmd, &error)) + ctx->error = i_strdup(error); } static bool client_dict_is_finished(struct client_dict *dict) { - return dict->transactions == NULL && !dict->in_iteration && - dict->async_commits == 0; + return dict->transactions == NULL && array_count(&dict->cmds) == 0; } static void client_dict_timeout(struct client_dict *dict) { if (client_dict_is_finished(dict)) - client_dict_disconnect(dict); + client_dict_disconnect(dict, "Idle disconnection"); } static void client_dict_add_timeout(struct client_dict *dict) @@ -415,103 +318,153 @@ static void client_dict_add_timeout(struct client_dict *dict) } else if (client_dict_is_finished(dict)) { dict->to_idle = timeout_add(dict->idle_msecs, client_dict_timeout, dict); + if (dict->to_requests != NULL) + timeout_remove(&dict->to_requests); } } -static char *client_dict_read_line(struct client_dict *dict) +static int dict_conn_input_line(struct connection *_conn, const char *line) { - char *line; + struct dict_connection *conn = (struct dict_connection *)_conn; + struct client_dict *dict = conn->dict; + struct client_dict_cmd *const *cmds; + unsigned int count; + bool finished; + int diff; + + dict->last_input = ioloop_timeval; + if (dict->to_requests != NULL) + timeout_reset(dict->to_requests); + + cmds = array_get(&conn->dict->cmds, &count); + if (count == 0) { + i_error("%s: Received reply without pending commands: %s", + dict->conn.conn.name, line); + return -1; + } + i_assert(!cmds[0]->no_replies); - while (client_dict_read_one_line(dict, &line) == 0) - ; + client_dict_cmd_ref(cmds[0]); + finished = dict_cmd_callback_line(cmds[0], line); + if (!client_dict_cmd_unref(cmds[0])) { + /* disconnected during command handling */ + return -1; + } + if (!finished) { + /* more lines needed for this command */ + return 1; + } + diff = timeval_diff_msecs(&ioloop_timeval, &cmds[0]->start_time); + if (diff >= DICT_CLIENT_REQUEST_WARN_TIMEOUT_MSECS) { + i_warning("read(%s): dict lookup took %u.%03u seconds: %s", + dict->conn.conn.name, diff/1000, diff % 1000, + cmds[0]->query); + } + client_dict_cmd_unref(cmds[0]); + array_delete(&dict->cmds, 0, 1); client_dict_add_timeout(dict); - return line; + return 1; } -static int client_dict_connect(struct client_dict *dict) +static int client_dict_connect(struct client_dict *dict, const char **error_r) { const char *query; + if (dict->conn.conn.fd_in != -1) + return 0; if (dict->last_failed_connect == ioloop_time) { /* Try again later */ + *error_r = "Waiting until the next connect attempt"; return -1; } - dict->fd = net_connect_unix(dict->path); - if (dict->fd == -1) { + if (connection_client_connect(&dict->conn.conn) < 0) { dict->last_failed_connect = ioloop_time; if (errno == EACCES) { - i_error("%s", eacces_error_get("net_connect_unix", - dict->path)); + *error_r = eacces_error_get("net_connect_unix", + dict->conn.conn.name); } else { - i_error("net_connect_unix(%s) failed: %m", - dict->path); + *error_r = t_strdup_printf( + "net_connect_unix(%s) failed: %m", dict->conn.conn.name); } return -1; } - /* Dictionary lookups are blocking */ - net_set_nonblock(dict->fd, FALSE); - - dict->input = i_stream_create_fd(dict->fd, (size_t)-1, FALSE); - dict->output = o_stream_create_fd(dict->fd, 4096, FALSE); - query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n", DICT_PROTOCOL_CMD_HELLO, DICT_CLIENT_PROTOCOL_MAJOR_VERSION, DICT_CLIENT_PROTOCOL_MINOR_VERSION, dict->value_type, dict->username, dict->uri); - if (client_dict_send_query(dict, query) < 0) { - dict->last_failed_connect = ioloop_time; - client_dict_disconnect(dict); - return -1; - } - - dict->handshaked = TRUE; + o_stream_nsend_str(dict->conn.conn.output, query); + client_dict_add_timeout(dict); return 0; } -static void client_dict_disconnect(struct client_dict *dict) +static void +client_dict_abort_commands(struct client_dict *dict, const char *reason) +{ + ARRAY(struct client_dict_cmd *) cmds_copy; + struct client_dict_cmd *const *cmdp; + + /* abort all commands */ + t_array_init(&cmds_copy, array_count(&dict->cmds)); + array_append_array(&cmds_copy, &dict->cmds); + array_clear(&dict->cmds); + + array_foreach(&cmds_copy, cmdp) { + dict_cmd_callback_error(*cmdp, reason); + client_dict_cmd_unref(*cmdp); + } +} + +static void client_dict_disconnect(struct client_dict *dict, const char *reason) { struct client_dict_transaction_context *ctx, *next; - dict->connect_counter++; - dict->handshaked = FALSE; - dict->iter_replies_skip = 0; + client_dict_abort_commands(dict, reason); - /* abort all pending async commits */ + /* all transactions that have sent BEGIN are no longer valid */ for (ctx = dict->transactions; ctx != NULL; ctx = next) { next = ctx->next; - if (ctx->async) - client_dict_finish_transaction(dict, ctx->id, -1); + if (ctx->sent_begin && ctx->error == NULL) + ctx->error = i_strdup(reason); } if (dict->to_idle != NULL) timeout_remove(&dict->to_idle); - if (dict->io != NULL) - io_remove(&dict->io); - if (dict->input != NULL) - i_stream_destroy(&dict->input); - if (dict->output != NULL) - o_stream_destroy(&dict->output); + if (dict->to_requests != NULL) + timeout_remove(&dict->to_requests); + connection_disconnect(&dict->conn.conn); +} - if (dict->fd != -1) { - if (close(dict->fd) < 0) - i_error("close(%s) failed: %m", dict->path); - dict->fd = -1; - } +static void dict_conn_destroy(struct connection *_conn) +{ + struct dict_connection *conn = (struct dict_connection *)_conn; + + client_dict_disconnect(conn->dict, connection_disconnect_reason(_conn)); } +static const struct connection_settings dict_conn_set = { + .input_max_size = (size_t)-1, + .output_max_size = (size_t)-1, + .client = TRUE +}; + +static const struct connection_vfuncs dict_conn_vfuncs = { + .destroy = dict_conn_destroy, + .input_line = dict_conn_input_line +}; + static int client_dict_init(struct dict *driver, const char *uri, const struct dict_settings *set, struct dict **dict_r, const char **error_r) { + struct ioloop *old_ioloop = current_ioloop; struct client_dict *dict; - const char *p, *dest_uri; + const char *p, *dest_uri, *path; unsigned int idle_msecs = DICT_CLIENT_DEFAULT_TIMEOUT_MSECS; - pool_t pool; /* uri = [idle_msecs=:] [] ":" */ if (strncmp(uri, "idle_msecs=", 11) == 0) { @@ -532,29 +485,36 @@ client_dict_init(struct dict *driver, const char *uri, return -1; } - pool = pool_alloconly_create("client dict", 1024); - dict = p_new(pool, struct client_dict, 1); - dict->pool = pool; + if (dict_connections == NULL) { + dict_connections = connection_list_init(&dict_conn_set, + &dict_conn_vfuncs); + } + + dict = i_new(struct client_dict, 1); dict->dict = *driver; + dict->conn.dict = dict; dict->value_type = set->value_type; - dict->username = p_strdup(pool, set->username); + dict->username = i_strdup(set->username); dict->idle_msecs = idle_msecs; - - dict->fd = -1; + i_array_init(&dict->cmds, 32); if (uri[0] == ':') { /* default path */ - dict->path = p_strconcat(pool, set->base_dir, - "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL); + path = t_strconcat(set->base_dir, + "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL); } else if (uri[0] == '/') { /* absolute path */ - dict->path = p_strdup_until(pool, uri, dest_uri); + path = t_strdup_until(uri, dest_uri); } else { /* relative path to base_dir */ - dict->path = p_strconcat(pool, set->base_dir, "/", - p_strdup_until(pool, uri, dest_uri), NULL); + path = t_strconcat(set->base_dir, "/", + t_strdup_until(uri, dest_uri), NULL); } - dict->uri = p_strdup(pool, dest_uri + 1); + connection_init_client_unix(dict_connections, &dict->conn.conn, path); + dict->uri = i_strdup(dest_uri + 1); + + dict->ioloop = io_loop_create(); + io_loop_set_current(old_ioloop); *dict_r = &dict->dict; return 0; } @@ -562,70 +522,234 @@ client_dict_init(struct dict *driver, const char *uri, static void client_dict_deinit(struct dict *_dict) { struct client_dict *dict = (struct client_dict *)_dict; + struct ioloop *old_ioloop = current_ioloop; + + client_dict_disconnect(dict, "Deinit"); + connection_deinit(&dict->conn.conn); - client_dict_disconnect(dict); i_assert(dict->transactions == NULL); - pool_unref(&dict->pool); + i_assert(array_count(&dict->cmds) == 0); + + io_loop_set_current(dict->ioloop); + io_loop_destroy(&dict->ioloop); + io_loop_set_current(old_ioloop); + + array_free(&dict->cmds); + i_free(dict->username); + i_free(dict->uri); + i_free(dict); + + if (dict_connections->connections == NULL) + connection_list_deinit(&dict_connections); } static int client_dict_wait(struct dict *_dict) { struct client_dict *dict = (struct client_dict *)_dict; - char *line; - int ret; - if (!dict->handshaked) - return -1; + if (array_count(&dict->cmds) == 0) + return 0; - while (dict->async_commits > 0) { - if ((ret = client_dict_read_one_line(dict, &line)) < 0) - return -1; + dict->prev_ioloop = current_ioloop; + io_loop_set_current(dict->ioloop); - if (ret > 0) { - i_error("dict-client: Unexpected reply waiting waiting for async commits: %s", line); - client_dict_disconnect(dict); - return -1; - } - } + if (dict->to_idle != NULL) + dict->to_idle = io_loop_move_timeout(&dict->to_idle); + if (dict->to_requests != NULL) + dict->to_requests = io_loop_move_timeout(&dict->to_requests); + connection_switch_ioloop(&dict->conn.conn); + + while (array_count(&dict->cmds) > 0) + io_loop_run(dict->ioloop); + + io_loop_set_current(dict->prev_ioloop); + dict->prev_ioloop = NULL; + + if (dict->to_idle != NULL) + dict->to_idle = io_loop_move_timeout(&dict->to_idle); + if (dict->to_requests != NULL) + dict->to_requests = io_loop_move_timeout(&dict->to_requests); + connection_switch_ioloop(&dict->conn.conn); return 0; } -static int client_dict_lookup(struct dict *_dict, pool_t pool, - const char *key, const char **value_r) +static void +client_dict_lookup_async_callback(struct client_dict_cmd *cmd, const char *line, + const char *error) +{ + struct client_dict *dict = cmd->dict; + struct dict_lookup_result result; + + memset(&result, 0, sizeof(result)); + if (error != NULL) { + result.ret = -1; + result.error = error; + } else switch (*line) { + case DICT_PROTOCOL_REPLY_OK: + result.value = t_str_tabunescape(line + 1); + result.ret = 1; + break; + case DICT_PROTOCOL_REPLY_NOTFOUND: + result.ret = 0; + break; + case DICT_PROTOCOL_REPLY_FAIL: + result.error = line[1] == '\0' ? "dict-server returned failure" : + t_strdup_printf("dict-server returned failure: %s", + t_str_tabunescape(line+1)); + result.ret = -1; + break; + default: + result.error = t_strdup_printf( + "dict-client: Invalid lookup '%s' reply: %s", + cmd->query, line); + client_dict_disconnect(dict, result.error); + result.ret = -1; + break; + } + dict_pre_api_callback(dict); + cmd->api_callback.lookup(&result, cmd->api_callback.context); + dict_post_api_callback(dict); +} + +static void +client_dict_lookup_async(struct dict *_dict, const char *key, + dict_lookup_callback_t *callback, void *context) { struct client_dict *dict = (struct client_dict *)_dict; - const char *line; - int ret; + struct client_dict_cmd *cmd; + const char *query; - T_BEGIN { - const char *query; + query = t_strdup_printf("%c%s", DICT_PROTOCOL_CMD_LOOKUP, + str_tabescape(key)); + cmd = client_dict_cmd_init(dict, query); + cmd->callback = client_dict_lookup_async_callback; + cmd->api_callback.lookup = callback; + cmd->api_callback.context = context; + cmd->retry_errors = TRUE; - query = t_strdup_printf("%c%s\n", DICT_PROTOCOL_CMD_LOOKUP, - dict_client_escape(key)); - ret = client_dict_send_query(dict, query); - } T_END; - if (ret < 0) - return -1; + client_dict_cmd_send(dict, &cmd, NULL); +} - /* read reply */ - line = client_dict_read_line(dict); - if (line == NULL) - return -1; +static void client_dict_lookup_callback(const struct dict_lookup_result *result, + void *context) +{ + struct dict_lookup_result *result_copy = context; - switch (*line) { - case DICT_PROTOCOL_REPLY_OK: - *value_r = p_strdup(pool, dict_client_unescape(line + 1)); - return 1; - case DICT_PROTOCOL_REPLY_NOTFOUND: + *result_copy = *result; +} + +static int client_dict_lookup(struct dict *_dict, pool_t pool, const char *key, + const char **value_r) +{ + struct dict_lookup_result result; + + memset(&result, 0, sizeof(result)); + result.ret = -2; + + client_dict_lookup_async(_dict, key, client_dict_lookup_callback, &result); + if (result.ret == -2) + client_dict_wait(_dict); + + switch (result.ret) { + case -1: + i_error("dict-client: Lookup '%s' failed: %s", key, result.error); + return -1; + case 0: *value_r = NULL; return 0; + case 1: + *value_r = p_strdup(pool, result.value); + return 1; + } + i_unreached(); +} + +static void client_dict_iterate_free(struct client_dict_iterate_context *ctx) +{ + if (!ctx->deinit || !ctx->finished) + return; + i_free(ctx->error); + i_free(ctx); +} + +static void +client_dict_iter_api_callback(struct client_dict_iterate_context *ctx, + struct client_dict *dict) +{ + if (ctx->deinit) { + /* iterator was already deinitialized */ + return; + } + if (ctx->ctx.async_callback != NULL) { + dict_pre_api_callback(dict); + ctx->ctx.async_callback(ctx->ctx.async_context); + dict_post_api_callback(dict); + } else { + /* synchronous lookup */ + io_loop_stop(dict->ioloop); + } +} + +static void +client_dict_iter_async_callback(struct client_dict_cmd *cmd, const char *line, + const char *error) +{ + struct client_dict_iterate_context *ctx = cmd->iter; + struct client_dict *dict = cmd->dict; + struct client_dict_iter_result *result; + const char *key = NULL, *value = NULL; + + if (error != NULL) { + /* failed */ + } else switch (*line) { + case '\0': + /* end of iteration */ + ctx->finished = TRUE; + client_dict_iter_api_callback(ctx, dict); + client_dict_iterate_free(ctx); + return; + case DICT_PROTOCOL_REPLY_OK: + /* key \t value */ + key = line+1; + value = strchr(key, '\t'); + break; case DICT_PROTOCOL_REPLY_FAIL: - return -1; + error = t_strdup_printf("dict-server returned failure: %s", line+1); + break; default: - i_error("dict-client: Invalid lookup '%s' reply: %s", key, line); - client_dict_disconnect(dict); - return -1; + break; + } + if (value == NULL && error == NULL) { + /* broken protocol */ + error = t_strdup_printf("dict client (%s) sent broken iterate reply: %s", + dict->conn.conn.name, line); + client_dict_disconnect(dict, error); + } + + if (error != NULL) { + if (ctx->error == NULL) + ctx->error = i_strdup(error); + ctx->finished = TRUE; + if (dict->prev_ioloop != NULL) { + /* stop client_dict_wait() */ + io_loop_stop(dict->ioloop); + } + client_dict_iterate_free(ctx); + return; + } + cmd->unfinished = TRUE; + + if (ctx->deinit) { + /* iterator was already deinitialized */ + return; } + + key = t_strdup_until(key, value++); + result = array_append_space(&ctx->results); + result->key = p_strdup(ctx->results_pool, t_str_tabunescape(key)); + result->value = p_strdup(ctx->results_pool, t_str_tabunescape(value)); + + client_dict_iter_api_callback(ctx, dict); } static struct dict_iterate_context * @@ -634,28 +758,28 @@ client_dict_iterate_init(struct dict *_dict, const char *const *paths, { struct client_dict *dict = (struct client_dict *)_dict; struct client_dict_iterate_context *ctx; - - if (dict->in_iteration) - i_panic("dict-client: Only one iteration supported"); - dict->in_iteration = TRUE; + struct client_dict_cmd *cmd; + string_t *query = t_str_new(256); + unsigned int i; ctx = i_new(struct client_dict_iterate_context, 1); ctx->ctx.dict = _dict; - ctx->pool = pool_alloconly_create("client dict iteration", 512); + ctx->results_pool = pool_alloconly_create("client dict iteration", 512); + ctx->async = (flags & DICT_ITERATE_FLAG_ASYNC) != 0; + i_array_init(&ctx->results, 64); + + str_printfa(query, "%c%d", DICT_PROTOCOL_CMD_ITERATE, flags); + for (i = 0; paths[i] != NULL; i++) { + str_append_c(query, '\t'); + str_append(query, str_tabescape(paths[i])); + } - T_BEGIN { - string_t *query = t_str_new(256); - unsigned int i; + cmd = client_dict_cmd_init(dict, str_c(query)); + cmd->iter = ctx; + cmd->callback = client_dict_iter_async_callback; + cmd->retry_errors = TRUE; - str_printfa(query, "%c%d", DICT_PROTOCOL_CMD_ITERATE, flags); - for (i = 0; paths[i] != NULL; i++) { - str_append_c(query, '\t'); - str_append(query, dict_client_escape(paths[i])); - } - str_append_c(query, '\n'); - if (client_dict_send_query(dict, str_c(query)) < 0) - ctx->failed = TRUE; - } T_END; + client_dict_cmd_send(dict, &cmd, NULL); return &ctx->ctx; } @@ -664,52 +788,32 @@ static bool client_dict_iterate(struct dict_iterate_context *_ctx, { struct client_dict_iterate_context *ctx = (struct client_dict_iterate_context *)_ctx; - struct client_dict *dict = (struct client_dict *)_ctx->dict; - char *line, *key, *value; - - if (ctx->failed) - return FALSE; + const struct client_dict_iter_result *results; + unsigned int count; - /* read next reply */ - line = client_dict_read_line(dict); - if (line == NULL) { - ctx->failed = TRUE; + if (ctx->error != NULL) { + ctx->ctx.has_more = FALSE; return FALSE; } - if (*line == '\0') { - /* end of iteration */ - ctx->finished = TRUE; - return FALSE; + results = array_get(&ctx->results, &count); + if (ctx->result_idx < count) { + *key_r = results[ctx->result_idx].key; + *value_r = results[ctx->result_idx].value; + ctx->ctx.has_more = TRUE; + ctx->result_idx++; + return TRUE; } - - /* line contains key \t value */ - p_clear(ctx->pool); - - switch (*line) { - case DICT_PROTOCOL_REPLY_OK: - key = line+1; - value = strchr(key, '\t'); - break; - case DICT_PROTOCOL_REPLY_FAIL: - ctx->failed = TRUE; - return FALSE; - default: - key = NULL; - value = NULL; - break; + ctx->ctx.has_more = !ctx->finished; + ctx->result_idx = 0; + array_clear(&ctx->results); + p_clear(ctx->results_pool); + + if (!ctx->async && ctx->ctx.has_more) { + client_dict_wait(_ctx->dict); + return client_dict_iterate(_ctx, key_r, value_r); } - if (value == NULL) { - /* broken protocol */ - i_error("dict client (%s) sent broken iterate reply: %s", dict->path, line); - ctx->failed = TRUE; - return FALSE; - } - *value++ = '\0'; - - *key_r = p_strdup(ctx->pool, dict_client_unescape(key)); - *value_r = p_strdup(ctx->pool, dict_client_unescape(value)); - return TRUE; + return FALSE; } static int client_dict_iterate_deinit(struct dict_iterate_context *_ctx) @@ -717,14 +821,15 @@ static int client_dict_iterate_deinit(struct dict_iterate_context *_ctx) struct client_dict *dict = (struct client_dict *)_ctx->dict; struct client_dict_iterate_context *ctx = (struct client_dict_iterate_context *)_ctx; - int ret = ctx->failed ? -1 : 0; + int ret = ctx->error != NULL ? -1 : 0; - if (!ctx->finished) - dict->iter_replies_skip++; + ctx->deinit = TRUE; - pool_unref(&ctx->pool); - i_free(ctx); - dict->in_iteration = FALSE; + if (ret < 0) + i_error("dict-client: Iteration failed: %s", ctx->error); + array_free(&ctx->results); + pool_unref(&ctx->results_pool); + client_dict_iterate_free(ctx); client_dict_add_timeout(dict); return ret; @@ -744,23 +849,46 @@ client_dict_transaction_init(struct dict *_dict) return &ctx->ctx; } -static void dict_async_input(struct client_dict *dict) +static void +client_dict_transaction_commit_callback(struct client_dict_cmd *cmd, + const char *line, const char *error) { - char *line; - int ret; - - i_assert(!dict->in_iteration); + struct client_dict *dict = cmd->dict; + int ret = -1; - do { - ret = client_dict_read_one_line(dict, &line); - } while (ret == 0 && i_stream_get_data_size(dict->input) > 0); + if (error != NULL) { + /* failed */ + i_error("dict-client: Commit failed: %s", error); + } else switch (*line) { + case DICT_PROTOCOL_REPLY_OK: + ret = 1; + break; + case DICT_PROTOCOL_REPLY_NOTFOUND: + ret = 0; + break; + case DICT_PROTOCOL_REPLY_FAIL: { + const char *error = strchr(line+1, '\t'); - if (ret < 0) - io_remove(&dict->io); - else if (ret > 0) { - i_error("dict-client: Unexpected reply waiting waiting for async commits: %s", line); - client_dict_disconnect(dict); + i_error("dict-client: server returned failure: %s", + error != NULL ? t_str_tabunescape(error) : ""); + break; + } + default: + ret = -1; + error = t_strdup_printf("dict-client: Invalid commit reply: %s", line); + i_error("%s", error); + client_dict_disconnect(dict, error); + break; } + dict_pre_api_callback(dict); + cmd->api_callback.commit(ret, cmd->api_callback.context); + dict_post_api_callback(dict); +} + +static void commit_sync_callback(int ret, void *context) +{ + int *ret_p = context; + *ret_p = ret; } static int @@ -772,65 +900,43 @@ client_dict_transaction_commit(struct dict_transaction_context *_ctx, struct client_dict_transaction_context *ctx = (struct client_dict_transaction_context *)_ctx; struct client_dict *dict = (struct client_dict *)_ctx->dict; - unsigned int id; - int ret = ctx->failed ? -1 : 1; + struct client_dict_cmd *cmd; + const char *query; + int ret = -1; - ctx->committed = TRUE; - if (ctx->sent_begin && !ctx->failed) T_BEGIN { - const char *query, *line; + DLLIST_REMOVE(&dict->transactions, ctx); - query = t_strdup_printf("%c%u\n", !async ? - DICT_PROTOCOL_CMD_COMMIT : - DICT_PROTOCOL_CMD_COMMIT_ASYNC, - ctx->id); - if (client_dict_send_transaction_query(ctx, query) < 0) - ret = -1; - else if (async) { - ctx->callback = callback; - ctx->context = context; - ctx->async = TRUE; - if (dict->async_commits++ == 0) { - dict->io = io_add(dict->fd, IO_READ, - dict_async_input, dict); - } + if (ctx->sent_begin && ctx->error == NULL) { + query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_COMMIT, ctx->id); + cmd = client_dict_cmd_init(dict, query); + cmd->callback = client_dict_transaction_commit_callback; + if (callback != NULL) { + cmd->api_callback.commit = callback; + cmd->api_callback.context = context; } else { - /* sync commit, read reply */ - line = client_dict_read_line(dict); - if (line == NULL) - ret = -1; - else switch (*line) { - case DICT_PROTOCOL_REPLY_OK: - ret = 1; - break; - case DICT_PROTOCOL_REPLY_NOTFOUND: - ret = 0; - break; - case DICT_PROTOCOL_REPLY_FAIL: - ret = -1; - break; - default: - i_error("dict-client: Invalid commit reply: %s", line); - client_dict_disconnect(dict); - line = NULL; - ret = -1; - break; - } - if (line != NULL && - (str_to_uint(line+1, &id) < 0 || ctx->id != id)) { - i_error("dict-client: Invalid commit reply, " - "expected id=%u: %s", ctx->id, line); - client_dict_disconnect(dict); - ret = -1; - } + cmd->api_callback.commit = commit_sync_callback; + cmd->api_callback.context = &ret; + } + if (client_dict_cmd_send(dict, &cmd, NULL)) { + if (!async) + client_dict_wait(_ctx->dict); } - } T_END; + } else if (ctx->error != NULL) { + /* already failed */ + if (callback != NULL) + callback(-1, context); + ret = -1; + } else { + /* nothing changed */ + if (callback != NULL) + callback(1, context); + ret = 1; + } - if (ret < 0 || !async) { - DLLIST_REMOVE(&dict->transactions, ctx); - i_free(ctx); + i_free(ctx->error); + i_free(ctx); - client_dict_add_timeout(dict); - } + client_dict_add_timeout(dict); return ret; } @@ -841,13 +947,13 @@ client_dict_transaction_rollback(struct dict_transaction_context *_ctx) (struct client_dict_transaction_context *)_ctx; struct client_dict *dict = (struct client_dict *)_ctx->dict; - if (ctx->sent_begin) T_BEGIN { + if (ctx->sent_begin) { const char *query; - query = t_strdup_printf("%c%u\n", DICT_PROTOCOL_CMD_ROLLBACK, + query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_ROLLBACK, ctx->id); client_dict_send_transaction_query(ctx, query); - } T_END; + } DLLIST_REMOVE(&dict->transactions, ctx); i_free(ctx); @@ -860,16 +966,13 @@ static void client_dict_set(struct dict_transaction_context *_ctx, { struct client_dict_transaction_context *ctx = (struct client_dict_transaction_context *)_ctx; + const char *query; - T_BEGIN { - const char *query; - - query = t_strdup_printf("%c%u\t%s\t%s\n", - DICT_PROTOCOL_CMD_SET, ctx->id, - dict_client_escape(key), - dict_client_escape(value)); - client_dict_send_transaction_query(ctx, query); - } T_END; + query = t_strdup_printf("%c%u\t%s\t%s", + DICT_PROTOCOL_CMD_SET, ctx->id, + str_tabescape(key), + str_tabescape(value)); + client_dict_send_transaction_query(ctx, query); } static void client_dict_unset(struct dict_transaction_context *_ctx, @@ -877,32 +980,12 @@ static void client_dict_unset(struct dict_transaction_context *_ctx, { struct client_dict_transaction_context *ctx = (struct client_dict_transaction_context *)_ctx; + const char *query; - T_BEGIN { - const char *query; - - query = t_strdup_printf("%c%u\t%s\n", - DICT_PROTOCOL_CMD_UNSET, ctx->id, - dict_client_escape(key)); - client_dict_send_transaction_query(ctx, query); - } T_END; -} - -static void client_dict_append(struct dict_transaction_context *_ctx, - const char *key, const char *value) -{ - struct client_dict_transaction_context *ctx = - (struct client_dict_transaction_context *)_ctx; - - T_BEGIN { - const char *query; - - query = t_strdup_printf("%c%u\t%s\t%s\n", - DICT_PROTOCOL_CMD_APPEND, ctx->id, - dict_client_escape(key), - dict_client_escape(value)); - client_dict_send_transaction_query(ctx, query); - } T_END; + query = t_strdup_printf("%c%u\t%s", + DICT_PROTOCOL_CMD_UNSET, ctx->id, + str_tabescape(key)); + client_dict_send_transaction_query(ctx, query); } static void client_dict_atomic_inc(struct dict_transaction_context *_ctx, @@ -910,14 +993,12 @@ static void client_dict_atomic_inc(struct dict_transaction_context *_ctx, { struct client_dict_transaction_context *ctx = (struct client_dict_transaction_context *)_ctx; + const char *query; - T_BEGIN { - const char *query; - query = t_strdup_printf("%c%u\t%s\t%lld\n", - DICT_PROTOCOL_CMD_ATOMIC_INC, - ctx->id, dict_client_escape(key), diff); - client_dict_send_transaction_query(ctx, query); - } T_END; + query = t_strdup_printf("%c%u\t%s\t%lld", + DICT_PROTOCOL_CMD_ATOMIC_INC, + ctx->id, str_tabescape(key), diff); + client_dict_send_transaction_query(ctx, query); } struct dict dict_driver_client = { @@ -936,8 +1017,8 @@ struct dict dict_driver_client = { client_dict_transaction_rollback, client_dict_set, client_dict_unset, - client_dict_append, + NULL, client_dict_atomic_inc, - NULL + client_dict_lookup_async } };