/* Copyright (c) 2005-2016 Dovecot authors, see the included COPYING file */
#include "lib.h"
+#include "array.h"
#include "llist.h"
#include "str.h"
#include "strescape.h"
-#include "net.h"
-#include "istream.h"
+#include "time-util.h"
+#include "connection.h"
#include "ostream.h"
#include "eacces-error.h"
#include "dict-private.h"
#define DICT_CLIENT_DEFAULT_TIMEOUT_MSECS 0
/* Abort dict lookup after this many seconds. */
-#define DICT_CLIENT_READ_TIMEOUT_SECS 30
-/* Log a warning if dict lookup takes longer than this many seconds. */
-#define DICT_CLIENT_READ_WARN_TIMEOUT_SECS 5
+#define DICT_CLIENT_REQUEST_TIMEOUT_MSECS 30000
+/* Log a warning if dict lookup takes longer than this many milliseconds. */
+#define DICT_CLIENT_REQUEST_WARN_TIMEOUT_MSECS 5000
+
+struct client_dict_cmd {
+ int refcount;
+ struct client_dict *dict;
+ struct timeval start_time;
+ char *query;
+
+ bool retry_errors;
+ bool no_replies;
+ bool unfinished;
+
+ void (*callback)(struct client_dict_cmd *cmd,
+ const char *line, const char *error);
+ struct client_dict_iterate_context *iter;
+
+ struct {
+ dict_lookup_callback_t *lookup;
+ dict_transaction_commit_callback_t *commit;
+ void *context;
+ } api_callback;
+};
+
+struct dict_connection {
+ struct connection conn;
+ struct client_dict *dict;
+};
struct client_dict {
struct dict dict;
+ struct dict_connection conn;
- pool_t pool;
- int fd;
- const char *uri;
- const char *username;
- const char *path;
+ char *uri, *username;
enum dict_data_type value_type;
time_t last_failed_connect;
- struct istream *input;
- struct ostream *output;
- struct io *io;
+ struct ioloop *ioloop, *prev_ioloop;
+ struct timeout *to_requests;
struct timeout *to_idle;
unsigned int idle_msecs;
+ struct timeval last_input;
+ ARRAY(struct client_dict_cmd *) cmds;
struct client_dict_transaction_context *transactions;
- unsigned int connect_counter;
unsigned int transaction_id_counter;
- unsigned int async_commits;
- unsigned int iter_replies_skip;
+};
- bool in_iteration:1;
- bool handshaked:1;
+struct client_dict_iter_result {
+ const char *key, *value;
};
struct client_dict_iterate_context {
struct dict_iterate_context ctx;
char *error;
- pool_t pool;
+ pool_t results_pool;
+ ARRAY(struct client_dict_iter_result) results;
+ unsigned int result_idx;
+
+ bool async;
bool finished;
+ bool deinit;
};
struct client_dict_transaction_context {
struct dict_transaction_context ctx;
struct client_dict_transaction_context *prev, *next;
- /* for async commits */
- dict_transaction_commit_callback_t *callback;
- void *context;
-
char *error;
unsigned int id;
- unsigned int connect_counter;
bool sent_begin:1;
- bool async:1;
- bool committed:1;
};
+static struct connection_list *dict_connections;
+
static int client_dict_connect(struct client_dict *dict, const char **error_r);
static void client_dict_disconnect(struct client_dict *dict, const char *reason);
-static int client_dict_send_query(struct client_dict *dict, const char *query,
- const char **error_r)
+static struct client_dict_cmd *
+client_dict_cmd_init(struct client_dict *dict, const char *query)
{
- if (dict->output == NULL) {
- /* not connected currently */
- if (client_dict_connect(dict, error_r) < 0)
- return -1;
- }
-
- if (o_stream_send_str(dict->output, query) < 0 ||
- o_stream_flush(dict->output) < 0) {
- /* Send failed */
- *error_r = t_strdup_printf("write(%s) failed: %s",
- dict->path, o_stream_get_error(dict->output));
- if (!dict->handshaked) {
- /* we're trying to send hello, don't try to reconnect */
- return -1;
- }
-
- /* Reconnect and try again. */
- client_dict_disconnect(dict, *error_r);
- if (client_dict_connect(dict, error_r) < 0)
- return -1;
-
- if (o_stream_send_str(dict->output, query) < 0 ||
- o_stream_flush(dict->output) < 0) {
- *error_r = t_strdup_printf("write(%s) failed: %s",
- dict->path, o_stream_get_error(dict->output));
- client_dict_disconnect(dict, *error_r);
- return -1;
- }
- }
- return 0;
+ struct client_dict_cmd *cmd;
+
+ cmd = i_new(struct client_dict_cmd, 1);
+ cmd->refcount = 1;
+ cmd->dict = dict;
+ cmd->query = i_strdup(query);
+ cmd->start_time = ioloop_timeval;
+ return cmd;
}
-static int
-client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx,
- const char **error_r)
+static void client_dict_cmd_ref(struct client_dict_cmd *cmd)
{
- struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
- const char *query;
-
- i_assert(ctx->error == NULL);
-
- query = t_strdup_printf("%c%u\n", DICT_PROTOCOL_CMD_BEGIN, ctx->id);
- if (client_dict_send_query(dict, query, error_r) < 0)
- return -1;
- ctx->connect_counter = dict->connect_counter;
- return 0;
+ i_assert(cmd->refcount > 0);
+ cmd->refcount++;
}
-static int
-client_dict_send_transaction_query(struct client_dict_transaction_context *ctx,
- const char *query, const char **error_r)
+static bool client_dict_cmd_unref(struct client_dict_cmd *cmd)
{
- struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
-
- i_assert(ctx->error == NULL);
+ i_assert(cmd->refcount > 0);
+ if (--cmd->refcount > 0)
+ return TRUE;
- if (!ctx->sent_begin) {
- if (client_dict_transaction_send_begin(ctx, error_r) < 0)
- return -1;
- ctx->sent_begin = TRUE;
- }
-
- if (ctx->connect_counter != dict->connect_counter) {
- *error_r = "Reconnected to dict-server - transaction lost";
- return -1;
- }
-
- if (dict->output == NULL) {
- /* not connected, this'll fail */
- *error_r = "Disconnected from dict-server";
- return -1;
- }
+ i_free(cmd->query);
+ i_free(cmd);
+ return FALSE;
+}
- if (o_stream_send_str(dict->output, query) < 0 ||
- o_stream_flush(dict->output) < 0) {
- /* Send failed. Our transactions have died, so don't even try
- to re-send the command */
- *error_r = t_strdup_printf("write(%s) failed: %s",
- dict->path, o_stream_get_error(dict->output));
- client_dict_disconnect(dict, *error_r);
- return -1;
+static void dict_pre_api_callback(struct client_dict *dict)
+{
+ if (dict->prev_ioloop != NULL) {
+ /* Don't let callback see that we've created our
+ internal ioloop in case it wants to add some ios
+ or timeouts. */
+ current_ioloop = dict->prev_ioloop;
}
- return 0;
}
-static void
-client_dict_try_send_transaction_query(struct client_dict_transaction_context *ctx,
- const char *query)
+static void dict_post_api_callback(struct client_dict *dict)
{
- const char *error;
-
- if (ctx->error != NULL)
- return;
- if (client_dict_send_transaction_query(ctx, query, &error) < 0)
- ctx->error = i_strdup(error);
+ if (dict->prev_ioloop != NULL) {
+ current_ioloop = dict->ioloop;
+ /* stop client_dict_wait() */
+ io_loop_stop(dict->ioloop);
+ }
}
-static struct client_dict_transaction_context *
-client_dict_transaction_find(struct client_dict *dict, unsigned int id)
+static bool
+dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *line)
{
- struct client_dict_transaction_context *ctx;
-
- for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) {
- if (ctx->id == id)
- return ctx;
- }
- return NULL;
+ cmd->unfinished = FALSE;
+ cmd->callback(cmd, line, NULL);
+ return !cmd->unfinished;
}
static void
-client_dict_finish_transaction(struct client_dict *dict, unsigned int id,
- const struct dict_commit_result *result)
+dict_cmd_callback_error(struct client_dict_cmd *cmd, const char *error)
{
- struct client_dict_transaction_context *ctx;
-
- ctx = client_dict_transaction_find(dict, id);
- if (ctx == NULL) {
- i_error("dict-client: Unknown transaction id %u", id);
- return;
- }
- if (!ctx->committed) {
- /* transaction isn't committed yet, but we disconnected from
- dict. mark it as failed so commit will fail later. */
- if (result->ret >= 0) {
- i_error("dict-client: Received transaction reply before it was committed");
- return;
- }
- if (ctx->error == NULL)
- ctx->error = i_strdup(result->error);
- return;
- }
+ cmd->unfinished = FALSE;
+ if (cmd->callback != NULL)
+ cmd->callback(cmd, NULL, error);
+ i_assert(!cmd->unfinished);
+}
- /* the callback may call the dict code again, so remove this
- transaction before calling it */
- i_assert(dict->async_commits > 0);
- if (--dict->async_commits == 0) {
- if (dict->io != NULL)
- io_remove(&dict->io);
- }
- DLLIST_REMOVE(&dict->transactions, ctx);
+static void client_dict_input_timeout(struct client_dict *dict)
+{
+ int diff = timeval_diff_msecs(&ioloop_timeval, &dict->last_input);
- if (ctx->callback != NULL)
- ctx->callback(result, ctx->context);
- i_free(ctx);
+ client_dict_disconnect(dict, t_strdup_printf(
+ "Timeout: No input from dict for %u.%03u secs",
+ diff/1000, diff%1000));
}
-static ssize_t client_dict_read_timeout(struct client_dict *dict)
+static int
+client_dict_cmd_query_send(struct client_dict *dict, const char *query)
{
- time_t now, timeout;
- unsigned int diff;
+ struct const_iovec iov[2];
ssize_t ret;
- now = time(NULL);
- timeout = now + DICT_CLIENT_READ_TIMEOUT_SECS;
-
- do {
- alarm(timeout - now);
- ret = i_stream_read(dict->input);
- alarm(0);
- if (ret != 0)
- break;
-
- /* interrupted most likely because of timeout,
- but check anyway. */
- now = time(NULL);
- } while (now < timeout);
-
- if (ret > 0) {
- diff = time(NULL) - now;
- if (diff >= DICT_CLIENT_READ_WARN_TIMEOUT_SECS) {
- i_warning("read(%s): dict lookup took %u seconds",
- dict->path, diff);
- }
- }
- return ret;
+ iov[0].iov_base = query;
+ iov[0].iov_len = strlen(query);
+ iov[1].iov_base = "\n";
+ iov[1].iov_len = 1;
+ ret = o_stream_sendv(dict->conn.conn.output, iov, 2);
+ if (ret < 0)
+ return -1;
+ i_assert((size_t)ret == iov[0].iov_len + 1);
+ return 0;
}
-static int
-client_dict_read_one_line_real(struct client_dict *dict, char **line_r,
- const char **error_r)
+static bool
+client_dict_cmd_send(struct client_dict *dict, struct client_dict_cmd **_cmd,
+ const char **error_r)
{
- unsigned int id;
- char *line;
- ssize_t ret;
+ struct client_dict_cmd *cmd = *_cmd;
+ const char *error = NULL;
+ bool retry = cmd->retry_errors;
+ int ret;
- *line_r = NULL;
- while ((line = i_stream_next_line(dict->input)) == NULL) {
- ret = client_dict_read_timeout(dict);
- switch (ret) {
- case -1:
- *error_r = t_strdup_printf("read(%s) failed: %s",
- dict->path, i_stream_get_disconnect_reason(dict->input));
- return -1;
- case -2:
- *error_r = t_strdup_printf(
- "read(%s) returned too much data", dict->path);
- return -1;
- case 0:
- *error_r = t_strdup_printf(
- "read(%s) failed: Timeout after %u seconds",
- dict->path, DICT_CLIENT_READ_TIMEOUT_SECS);
- return -1;
- default:
- i_assert(ret > 0);
- break;
+ *_cmd = NULL;
+
+ /* we're no longer idling. even with no_replies=TRUE we're going to
+ wait for COMMIT/ROLLBACK. */
+ if (dict->to_idle != NULL)
+ timeout_remove(&dict->to_idle);
+
+ if (client_dict_connect(dict, &error) < 0) {
+ retry = FALSE;
+ ret = -1;
+ } else {
+ ret = client_dict_cmd_query_send(dict, cmd->query);
+ if (ret < 0) {
+ error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name,
+ o_stream_get_error(dict->conn.conn.output));
}
}
- if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) {
- struct dict_commit_result result;
-
- memset(&result, 0, sizeof(result));
- switch (line[1]) {
- case DICT_PROTOCOL_REPLY_OK:
- result.ret = 1;
- break;
- case DICT_PROTOCOL_REPLY_NOTFOUND:
- result.ret = 0;
- break;
- case DICT_PROTOCOL_REPLY_FAIL: {
- const char *error = strchr(line+2, '\t');
-
- result.ret = -1;
- result.error = t_strdup_printf(
- "dict-server returned failure: %s",
- error != NULL ? t_str_tabunescape(error) : "");
- break;
- }
- default:
- *error_r = t_strdup_printf(
- "dict-client: Invalid async commit line: %s", line);
- return -1;
- }
- if (str_to_uint(t_strcut(line+2, '\t'), &id) < 0) {
- *error_r = t_strdup_printf("dict-client: Invalid ID");
- return -1;
+ if (ret < 0 && retry) {
+ /* Reconnect and try again. */
+ client_dict_disconnect(dict, error);
+ if (client_dict_connect(dict, &error) < 0)
+ ;
+ else if (client_dict_cmd_query_send(dict, cmd->query) < 0) {
+ error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name,
+ o_stream_get_error(dict->conn.conn.output));
+ } else {
+ ret = 0;
}
- client_dict_finish_transaction(dict, id, &result);
- return 0;
}
- if (dict->iter_replies_skip > 0) {
- /* called aborted the iteration before finishing it.
- skip over the iteration reply */
- if (*line == DICT_PROTOCOL_REPLY_OK)
- return 0;
- if (*line != '\0' && *line != DICT_PROTOCOL_REPLY_FAIL) {
- *error_r = t_strdup_printf(
- "dict-client: Invalid iteration reply line: %s", line);
- return -1;
+
+ if (cmd->no_replies) {
+ /* just send and forget */
+ client_dict_cmd_unref(cmd);
+ return TRUE;
+ } else if (ret < 0) {
+ i_assert(error != NULL);
+ dict_cmd_callback_error(cmd, error);
+ client_dict_cmd_unref(cmd);
+ if (error_r != NULL)
+ *error_r = error;
+ return FALSE;
+ } else {
+ if (dict->to_requests == NULL) {
+ dict->to_requests =
+ timeout_add(DICT_CLIENT_REQUEST_TIMEOUT_MSECS,
+ client_dict_input_timeout, dict);
}
- dict->iter_replies_skip--;
- return 0;
+ array_append(&dict->cmds, &cmd, 1);
+ return TRUE;
}
- *line_r = line;
- return 1;
}
-static int client_dict_read_one_line(struct client_dict *dict, char **line_r,
- const char **error_r)
+static void
+client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx)
{
- int ret;
+ struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
+ struct client_dict_cmd *cmd;
+ const char *query, *error;
- if ((ret = client_dict_read_one_line_real(dict, line_r, error_r)) < 0)
- client_dict_disconnect(dict, *error_r);
- return ret;
+ i_assert(ctx->error == NULL);
+
+ ctx->sent_begin = TRUE;
+
+ /* transactions commands don't have replies. only COMMIT has. */
+ query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_BEGIN, ctx->id);
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->no_replies = TRUE;
+ cmd->retry_errors = TRUE;
+ if (!client_dict_cmd_send(dict, &cmd, &error))
+ ctx->error = i_strdup(error);
+}
+
+static void
+client_dict_send_transaction_query(struct client_dict_transaction_context *ctx,
+ const char *query)
+{
+ struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
+ struct client_dict_cmd *cmd;
+ const char *error;
+
+ if (ctx->error != NULL)
+ return;
+
+ if (!ctx->sent_begin)
+ client_dict_transaction_send_begin(ctx);
+
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->no_replies = TRUE;
+ if (!client_dict_cmd_send(dict, &cmd, &error))
+ ctx->error = i_strdup(error);
}
static bool client_dict_is_finished(struct client_dict *dict)
{
- return dict->transactions == NULL && !dict->in_iteration &&
- dict->async_commits == 0;
+ return dict->transactions == NULL && array_count(&dict->cmds) == 0;
}
static void client_dict_timeout(struct client_dict *dict)
} else if (client_dict_is_finished(dict)) {
dict->to_idle = timeout_add(dict->idle_msecs,
client_dict_timeout, dict);
+ if (dict->to_requests != NULL)
+ timeout_remove(&dict->to_requests);
}
}
-static int client_dict_read_line(struct client_dict *dict,
- char **line_r, const char **error_r)
+static int dict_conn_input_line(struct connection *_conn, const char *line)
{
- int ret;
+ struct dict_connection *conn = (struct dict_connection *)_conn;
+ struct client_dict *dict = conn->dict;
+ struct client_dict_cmd *const *cmds;
+ unsigned int count;
+ bool finished;
+ int diff;
- while ((ret = client_dict_read_one_line(dict, line_r, error_r)) == 0)
- ;
- i_assert(ret < 0 || *line_r != NULL);
+ dict->last_input = ioloop_timeval;
+ if (dict->to_requests != NULL)
+ timeout_reset(dict->to_requests);
+
+ cmds = array_get(&conn->dict->cmds, &count);
+ if (count == 0) {
+ i_error("%s: Received reply without pending commands: %s",
+ dict->conn.conn.name, line);
+ return -1;
+ }
+ i_assert(!cmds[0]->no_replies);
+
+ client_dict_cmd_ref(cmds[0]);
+ finished = dict_cmd_callback_line(cmds[0], line);
+ if (!client_dict_cmd_unref(cmds[0])) {
+ /* disconnected during command handling */
+ return -1;
+ }
+ if (!finished) {
+ /* more lines needed for this command */
+ return 1;
+ }
+ diff = timeval_diff_msecs(&ioloop_timeval, &cmds[0]->start_time);
+ if (diff >= DICT_CLIENT_REQUEST_WARN_TIMEOUT_MSECS) {
+ i_warning("read(%s): dict lookup took %u.%03u seconds: %s",
+ dict->conn.conn.name, diff/1000, diff % 1000,
+ cmds[0]->query);
+ }
+ client_dict_cmd_unref(cmds[0]);
+ array_delete(&dict->cmds, 0, 1);
client_dict_add_timeout(dict);
- return ret < 0 ? -1 : 0;
+ return 1;
}
static int client_dict_connect(struct client_dict *dict, const char **error_r)
{
const char *query;
+ if (dict->conn.conn.fd_in != -1)
+ return 0;
if (dict->last_failed_connect == ioloop_time) {
/* Try again later */
*error_r = "Waiting until the next connect attempt";
return -1;
}
- dict->fd = net_connect_unix(dict->path);
- if (dict->fd == -1) {
+ if (connection_client_connect(&dict->conn.conn) < 0) {
dict->last_failed_connect = ioloop_time;
if (errno == EACCES) {
*error_r = eacces_error_get("net_connect_unix",
- dict->path);
+ dict->conn.conn.name);
} else {
*error_r = t_strdup_printf(
- "net_connect_unix(%s) failed: %m", dict->path);
+ "net_connect_unix(%s) failed: %m", dict->conn.conn.name);
}
return -1;
}
- /* Dictionary lookups are blocking */
- net_set_nonblock(dict->fd, FALSE);
-
- dict->input = i_stream_create_fd(dict->fd, (size_t)-1);
- dict->output = o_stream_create_fd(dict->fd, 4096);
-
query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n",
DICT_PROTOCOL_CMD_HELLO,
DICT_CLIENT_PROTOCOL_MAJOR_VERSION,
DICT_CLIENT_PROTOCOL_MINOR_VERSION,
dict->value_type, dict->username, dict->uri);
- if (client_dict_send_query(dict, query, error_r) < 0) {
- dict->last_failed_connect = ioloop_time;
- client_dict_disconnect(dict, *error_r);
- return -1;
- }
-
- dict->handshaked = TRUE;
+ o_stream_nsend_str(dict->conn.conn.output, query);
+ client_dict_add_timeout(dict);
return 0;
}
+static void
+client_dict_abort_commands(struct client_dict *dict, const char *reason)
+{
+ ARRAY(struct client_dict_cmd *) cmds_copy;
+ struct client_dict_cmd *const *cmdp;
+
+ /* abort all commands */
+ t_array_init(&cmds_copy, array_count(&dict->cmds));
+ array_append_array(&cmds_copy, &dict->cmds);
+ array_clear(&dict->cmds);
+
+ array_foreach(&cmds_copy, cmdp) {
+ dict_cmd_callback_error(*cmdp, reason);
+ client_dict_cmd_unref(*cmdp);
+ }
+}
+
static void client_dict_disconnect(struct client_dict *dict, const char *reason)
{
struct client_dict_transaction_context *ctx, *next;
- struct dict_commit_result result = { -1, reason };
- dict->connect_counter++;
- dict->handshaked = FALSE;
- dict->iter_replies_skip = 0;
+ client_dict_abort_commands(dict, reason);
- /* abort all pending async commits */
+ /* all transactions that have sent BEGIN are no longer valid */
for (ctx = dict->transactions; ctx != NULL; ctx = next) {
next = ctx->next;
- if (ctx->async)
- client_dict_finish_transaction(dict, ctx->id, &result);
+ if (ctx->sent_begin && ctx->error == NULL)
+ ctx->error = i_strdup(reason);
}
if (dict->to_idle != NULL)
timeout_remove(&dict->to_idle);
- if (dict->io != NULL)
- io_remove(&dict->io);
- if (dict->input != NULL)
- i_stream_destroy(&dict->input);
- if (dict->output != NULL)
- o_stream_destroy(&dict->output);
+ if (dict->to_requests != NULL)
+ timeout_remove(&dict->to_requests);
+ connection_disconnect(&dict->conn.conn);
+}
- if (dict->fd != -1) {
- if (close(dict->fd) < 0)
- i_error("close(%s) failed: %m", dict->path);
- dict->fd = -1;
- }
+static void dict_conn_destroy(struct connection *_conn)
+{
+ struct dict_connection *conn = (struct dict_connection *)_conn;
+
+ client_dict_disconnect(conn->dict, connection_disconnect_reason(_conn));
}
+static const struct connection_settings dict_conn_set = {
+ .input_max_size = (size_t)-1,
+ .output_max_size = (size_t)-1,
+ .client = TRUE
+};
+
+static const struct connection_vfuncs dict_conn_vfuncs = {
+ .destroy = dict_conn_destroy,
+ .input_line = dict_conn_input_line
+};
+
static int
client_dict_init(struct dict *driver, const char *uri,
const struct dict_settings *set,
struct dict **dict_r, const char **error_r)
{
+ struct ioloop *old_ioloop = current_ioloop;
struct client_dict *dict;
- const char *p, *dest_uri;
+ const char *p, *dest_uri, *path;
unsigned int idle_msecs = DICT_CLIENT_DEFAULT_TIMEOUT_MSECS;
- pool_t pool;
/* uri = [idle_msecs=<n>:] [<path>] ":" <uri> */
if (strncmp(uri, "idle_msecs=", 11) == 0) {
return -1;
}
- pool = pool_alloconly_create("client dict", 1024);
- dict = p_new(pool, struct client_dict, 1);
- dict->pool = pool;
+ if (dict_connections == NULL) {
+ dict_connections = connection_list_init(&dict_conn_set,
+ &dict_conn_vfuncs);
+ }
+
+ dict = i_new(struct client_dict, 1);
dict->dict = *driver;
+ dict->conn.dict = dict;
dict->value_type = set->value_type;
- dict->username = p_strdup(pool, set->username);
+ dict->username = i_strdup(set->username);
dict->idle_msecs = idle_msecs;
-
- dict->fd = -1;
+ i_array_init(&dict->cmds, 32);
if (uri[0] == ':') {
/* default path */
- dict->path = p_strconcat(pool, set->base_dir,
- "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL);
+ path = t_strconcat(set->base_dir,
+ "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL);
} else if (uri[0] == '/') {
/* absolute path */
- dict->path = p_strdup_until(pool, uri, dest_uri);
+ path = t_strdup_until(uri, dest_uri);
} else {
/* relative path to base_dir */
- dict->path = p_strconcat(pool, set->base_dir, "/",
- p_strdup_until(pool, uri, dest_uri), NULL);
+ path = t_strconcat(set->base_dir, "/",
+ t_strdup_until(uri, dest_uri), NULL);
}
- dict->uri = p_strdup(pool, dest_uri + 1);
+ connection_init_client_unix(dict_connections, &dict->conn.conn, path);
+ dict->uri = i_strdup(dest_uri + 1);
+
+ dict->ioloop = io_loop_create();
+ io_loop_set_current(old_ioloop);
*dict_r = &dict->dict;
return 0;
}
static void client_dict_deinit(struct dict *_dict)
{
struct client_dict *dict = (struct client_dict *)_dict;
+ struct ioloop *old_ioloop = current_ioloop;
+
+ client_dict_disconnect(dict, "Deinit");
+ connection_deinit(&dict->conn.conn);
- client_dict_disconnect(dict, "Deinit");
i_assert(dict->transactions == NULL);
- pool_unref(&dict->pool);
+ i_assert(array_count(&dict->cmds) == 0);
+
+ io_loop_set_current(dict->ioloop);
+ io_loop_destroy(&dict->ioloop);
+ io_loop_set_current(old_ioloop);
+
+ array_free(&dict->cmds);
+ i_free(dict->username);
+ i_free(dict->uri);
+ i_free(dict);
+
+ if (dict_connections->connections == NULL)
+ connection_list_deinit(&dict_connections);
}
static void client_dict_wait(struct dict *_dict)
{
struct client_dict *dict = (struct client_dict *)_dict;
- const char *error;
- char *line;
- int ret;
- while (dict->async_commits > 0) {
- if ((ret = client_dict_read_one_line(dict, &line, &error)) < 0) {
- i_error("%s", error);
- break;
- }
+ if (array_count(&dict->cmds) == 0)
+ return;
- if (ret > 0) {
- const char *reason = t_strdup_printf(
- "dict-client: Unexpected reply waiting waiting for async commits: %s", line);
- i_error("%s", reason);
- client_dict_disconnect(dict, reason);
- break;
- }
+ dict->prev_ioloop = current_ioloop;
+ io_loop_set_current(dict->ioloop);
+
+ if (dict->to_idle != NULL)
+ dict->to_idle = io_loop_move_timeout(&dict->to_idle);
+ if (dict->to_requests != NULL)
+ dict->to_requests = io_loop_move_timeout(&dict->to_requests);
+ connection_switch_ioloop(&dict->conn.conn);
+
+ while (array_count(&dict->cmds) > 0)
+ io_loop_run(dict->ioloop);
+
+ io_loop_set_current(dict->prev_ioloop);
+ dict->prev_ioloop = NULL;
+
+ if (dict->to_idle != NULL)
+ dict->to_idle = io_loop_move_timeout(&dict->to_idle);
+ if (dict->to_requests != NULL)
+ dict->to_requests = io_loop_move_timeout(&dict->to_requests);
+ connection_switch_ioloop(&dict->conn.conn);
+}
+
+static void
+client_dict_lookup_async_callback(struct client_dict_cmd *cmd, const char *line,
+ const char *error)
+{
+ struct client_dict *dict = cmd->dict;
+ struct dict_lookup_result result;
+
+ memset(&result, 0, sizeof(result));
+ if (error != NULL) {
+ result.ret = -1;
+ result.error = error;
+ } else switch (*line) {
+ case DICT_PROTOCOL_REPLY_OK:
+ result.value = t_str_tabunescape(line + 1);
+ result.ret = 1;
+ break;
+ case DICT_PROTOCOL_REPLY_NOTFOUND:
+ result.ret = 0;
+ break;
+ case DICT_PROTOCOL_REPLY_FAIL:
+ result.error = line[1] == '\0' ? "dict-server returned failure" :
+ t_strdup_printf("dict-server returned failure: %s",
+ t_str_tabunescape(line+1));
+ result.ret = -1;
+ break;
+ default:
+ result.error = t_strdup_printf(
+ "dict-client: Invalid lookup '%s' reply: %s",
+ cmd->query, line);
+ client_dict_disconnect(dict, result.error);
+ result.ret = -1;
+ break;
}
- /* we should have aborted all the async calls if we disconnected */
- i_assert(dict->async_commits == 0);
+ dict_pre_api_callback(dict);
+ cmd->api_callback.lookup(&result, cmd->api_callback.context);
+ dict_post_api_callback(dict);
}
-static int client_dict_lookup(struct dict *_dict, pool_t pool, const char *key,
- const char **value_r, const char **error_r)
+static void
+client_dict_lookup_async(struct dict *_dict, const char *key,
+ dict_lookup_callback_t *callback, void *context)
{
struct client_dict *dict = (struct client_dict *)_dict;
+ struct client_dict_cmd *cmd;
const char *query;
- char *line;
- query = t_strdup_printf("%c%s\n", DICT_PROTOCOL_CMD_LOOKUP,
+ query = t_strdup_printf("%c%s", DICT_PROTOCOL_CMD_LOOKUP,
str_tabescape(key));
- if (client_dict_send_query(dict, query, error_r) < 0)
- return -1;
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->callback = client_dict_lookup_async_callback;
+ cmd->api_callback.lookup = callback;
+ cmd->api_callback.context = context;
+ cmd->retry_errors = TRUE;
- /* read reply */
- if (client_dict_read_line(dict, &line, error_r) < 0)
- return -1;
+ client_dict_cmd_send(dict, &cmd, NULL);
+}
- switch (*line) {
- case DICT_PROTOCOL_REPLY_OK:
- *value_r = p_strdup(pool, t_str_tabunescape(line + 1));
- return 1;
- case DICT_PROTOCOL_REPLY_NOTFOUND:
+static void client_dict_lookup_callback(const struct dict_lookup_result *result,
+ void *context)
+{
+ struct dict_lookup_result *result_copy = context;
+
+ *result_copy = *result;
+}
+
+static int client_dict_lookup(struct dict *_dict, pool_t pool, const char *key,
+ const char **value_r, const char **error_r)
+{
+ struct dict_lookup_result result;
+
+ memset(&result, 0, sizeof(result));
+ result.ret = -2;
+
+ client_dict_lookup_async(_dict, key, client_dict_lookup_callback, &result);
+ if (result.ret == -2)
+ client_dict_wait(_dict);
+
+ switch (result.ret) {
+ case -1:
+ *error_r = result.error;
+ return -1;
+ case 0:
*value_r = NULL;
return 0;
+ case 1:
+ *value_r = p_strdup(pool, result.value);
+ return 1;
+ }
+ i_unreached();
+}
+
+static void client_dict_iterate_free(struct client_dict_iterate_context *ctx)
+{
+ if (!ctx->deinit || !ctx->finished)
+ return;
+ i_free(ctx->error);
+ i_free(ctx);
+}
+
+static void
+client_dict_iter_api_callback(struct client_dict_iterate_context *ctx,
+ struct client_dict *dict)
+{
+ if (ctx->deinit) {
+ /* iterator was already deinitialized */
+ return;
+ }
+ if (ctx->ctx.async_callback != NULL) {
+ dict_pre_api_callback(dict);
+ ctx->ctx.async_callback(ctx->ctx.async_context);
+ dict_post_api_callback(dict);
+ } else {
+ /* synchronous lookup */
+ io_loop_stop(dict->ioloop);
+ }
+}
+
+static void
+client_dict_iter_async_callback(struct client_dict_cmd *cmd, const char *line,
+ const char *error)
+{
+ struct client_dict_iterate_context *ctx = cmd->iter;
+ struct client_dict *dict = cmd->dict;
+ struct client_dict_iter_result *result;
+ const char *key = NULL, *value = NULL;
+
+ if (error != NULL) {
+ /* failed */
+ } else switch (*line) {
+ case '\0':
+ /* end of iteration */
+ ctx->finished = TRUE;
+ client_dict_iter_api_callback(ctx, dict);
+ client_dict_iterate_free(ctx);
+ return;
+ case DICT_PROTOCOL_REPLY_OK:
+ /* key \t value */
+ key = line+1;
+ value = strchr(key, '\t');
+ break;
case DICT_PROTOCOL_REPLY_FAIL:
- *error_r = line[1] == '\0' ? "dict-server returned failure" :
- t_strdup_printf("dict-server returned failure: %s",
- t_str_tabunescape(line+1));
- return -1;
+ error = t_strdup_printf("dict-server returned failure: %s", line+1);
+ break;
default:
- *error_r = t_strdup_printf(
- "dict-client: Invalid lookup '%s' reply: %s", key, line);
- client_dict_disconnect(dict, *error_r);
- return -1;
+ break;
+ }
+ if (value == NULL && error == NULL) {
+ /* broken protocol */
+ error = t_strdup_printf("dict client (%s) sent broken iterate reply: %s",
+ dict->conn.conn.name, line);
+ client_dict_disconnect(dict, error);
+ }
+
+ if (error != NULL) {
+ if (ctx->error == NULL)
+ ctx->error = i_strdup(error);
+ ctx->finished = TRUE;
+ if (dict->prev_ioloop != NULL) {
+ /* stop client_dict_wait() */
+ io_loop_stop(dict->ioloop);
+ }
+ client_dict_iterate_free(ctx);
+ return;
+ }
+ cmd->unfinished = TRUE;
+
+ if (ctx->deinit) {
+ /* iterator was already deinitialized */
+ return;
}
+
+ key = t_strdup_until(key, value++);
+ result = array_append_space(&ctx->results);
+ result->key = p_strdup(ctx->results_pool, t_str_tabunescape(key));
+ result->value = p_strdup(ctx->results_pool, t_str_tabunescape(value));
+
+ client_dict_iter_api_callback(ctx, dict);
}
static struct dict_iterate_context *
{
struct client_dict *dict = (struct client_dict *)_dict;
struct client_dict_iterate_context *ctx;
+ struct client_dict_cmd *cmd;
string_t *query = t_str_new(256);
unsigned int i;
- const char *error;
-
- if (dict->in_iteration)
- i_panic("dict-client: Only one iteration supported");
- dict->in_iteration = TRUE;
ctx = i_new(struct client_dict_iterate_context, 1);
ctx->ctx.dict = _dict;
- ctx->pool = pool_alloconly_create("client dict iteration", 512);
+ ctx->results_pool = pool_alloconly_create("client dict iteration", 512);
+ ctx->async = (flags & DICT_ITERATE_FLAG_ASYNC) != 0;
+ i_array_init(&ctx->results, 64);
str_printfa(query, "%c%d", DICT_PROTOCOL_CMD_ITERATE, flags);
for (i = 0; paths[i] != NULL; i++) {
str_append_c(query, '\t');
str_append(query, str_tabescape(paths[i]));
}
- str_append_c(query, '\n');
- if (client_dict_send_query(dict, str_c(query), &error) < 0)
- ctx->error = i_strdup(error);
+
+ cmd = client_dict_cmd_init(dict, str_c(query));
+ cmd->iter = ctx;
+ cmd->callback = client_dict_iter_async_callback;
+ cmd->retry_errors = TRUE;
+
+ client_dict_cmd_send(dict, &cmd, NULL);
return &ctx->ctx;
}
{
struct client_dict_iterate_context *ctx =
(struct client_dict_iterate_context *)_ctx;
- struct client_dict *dict = (struct client_dict *)_ctx->dict;
- char *line, *key, *value;
- const char *error;
-
- if (ctx->error != NULL)
- return FALSE;
+ const struct client_dict_iter_result *results;
+ unsigned int count;
- /* read next reply */
- if (client_dict_read_line(dict, &line, &error) < 0) {
- ctx->error = i_strdup(error);
+ if (ctx->error != NULL) {
+ ctx->ctx.has_more = FALSE;
return FALSE;
}
- if (*line == '\0') {
- /* end of iteration */
- ctx->finished = TRUE;
- return FALSE;
+ results = array_get(&ctx->results, &count);
+ if (ctx->result_idx < count) {
+ *key_r = results[ctx->result_idx].key;
+ *value_r = results[ctx->result_idx].value;
+ ctx->ctx.has_more = TRUE;
+ ctx->result_idx++;
+ return TRUE;
}
+ ctx->ctx.has_more = !ctx->finished;
+ ctx->result_idx = 0;
+ array_clear(&ctx->results);
+ p_clear(ctx->results_pool);
- /* line contains key \t value */
- p_clear(ctx->pool);
-
- switch (*line) {
- case DICT_PROTOCOL_REPLY_OK:
- key = line+1;
- value = strchr(key, '\t');
- break;
- case DICT_PROTOCOL_REPLY_FAIL:
- ctx->error = i_strdup_printf("dict-server returned failure: %s", line+1);
- return FALSE;
- default:
- key = NULL;
- value = NULL;
- break;
+ if (!ctx->async && ctx->ctx.has_more) {
+ client_dict_wait(_ctx->dict);
+ return client_dict_iterate(_ctx, key_r, value_r);
}
- if (value == NULL) {
- /* broken protocol */
- ctx->error = i_strdup_printf("dict client (%s) sent broken iterate reply: %s", dict->path, line);
- return FALSE;
- }
- *value++ = '\0';
-
- *key_r = p_strdup(ctx->pool, t_str_tabunescape(key));
- *value_r = p_strdup(ctx->pool, t_str_tabunescape(value));
- return TRUE;
+ return FALSE;
}
static int client_dict_iterate_deinit(struct dict_iterate_context *_ctx,
(struct client_dict_iterate_context *)_ctx;
int ret = ctx->error != NULL ? -1 : 0;
- if (!ctx->finished)
- dict->iter_replies_skip++;
+ ctx->deinit = TRUE;
*error_r = t_strdup(ctx->error);
- pool_unref(&ctx->pool);
- i_free(ctx->error);
- i_free(ctx);
- dict->in_iteration = FALSE;
+ array_free(&ctx->results);
+ pool_unref(&ctx->results_pool);
+ client_dict_iterate_free(ctx);
client_dict_add_timeout(dict);
return ret;
return &ctx->ctx;
}
-static void dict_async_input(struct client_dict *dict)
+static void
+client_dict_transaction_commit_callback(struct client_dict_cmd *cmd,
+ const char *line, const char *error)
{
- const char *error;
- char *line;
- int ret;
-
- i_assert(!dict->in_iteration);
-
- do {
- ret = client_dict_read_one_line(dict, &line, &error);
- } while (ret == 0 && i_stream_get_data_size(dict->input) > 0);
+ struct client_dict *dict = cmd->dict;
+ struct dict_commit_result result = {
+ .ret = -1, .error = NULL
+ };
+
+ if (error != NULL) {
+ /* failed */
+ result.error = error;
+ } else switch (*line) {
+ case DICT_PROTOCOL_REPLY_OK:
+ result.ret = 1;
+ break;
+ case DICT_PROTOCOL_REPLY_NOTFOUND:
+ result.ret = 0;
+ break;
+ case DICT_PROTOCOL_REPLY_FAIL: {
+ const char *error = strchr(line+1, '\t');
- if (ret < 0) {
- i_error("%s", error);
- io_remove(&dict->io);
- } else if (ret > 0) {
- const char *reason = t_strdup_printf(
- "dict-client: Unexpected reply waiting waiting for async commits: %s", line);
- i_error("%s", reason);
- client_dict_disconnect(dict, reason);
+ result.error = t_strdup_printf("dict-server returned failure: %s",
+ error != NULL ? t_str_tabunescape(error) : "");
+ break;
}
+ default:
+ result.ret = -1;
+ result.error = t_strdup_printf(
+ "dict-client: Invalid commit reply: %s", line);
+ client_dict_disconnect(dict, result.error);
+ break;
+ }
+ dict_pre_api_callback(dict);
+ cmd->api_callback.commit(&result, cmd->api_callback.context);
+ dict_post_api_callback(dict);
}
static void
struct client_dict_transaction_context *ctx =
(struct client_dict_transaction_context *)_ctx;
struct client_dict *dict = (struct client_dict *)_ctx->dict;
- unsigned int id;
- struct dict_commit_result result;
+ struct client_dict_cmd *cmd;
+ const char *query;
- memset(&result, 0, sizeof(result));
- result.ret = ctx->error != NULL ? -1 : 1;
- result.error = t_strdup(ctx->error);
+ DLLIST_REMOVE(&dict->transactions, ctx);
- ctx->committed = TRUE;
if (ctx->sent_begin && ctx->error == NULL) {
- const char *query;
- char *line;
-
- query = t_strdup_printf("%c%u\n", !async ?
- DICT_PROTOCOL_CMD_COMMIT :
- DICT_PROTOCOL_CMD_COMMIT_ASYNC,
- ctx->id);
- if (client_dict_send_transaction_query(ctx, query, &result.error) < 0)
- result.ret = -1;
- else if (async) {
- ctx->callback = callback;
- ctx->context = context;
- ctx->async = TRUE;
- if (dict->async_commits++ == 0) {
- dict->io = io_add(dict->fd, IO_READ,
- dict_async_input, dict);
- }
- return;
- } else {
- /* sync commit, read reply */
- if (client_dict_read_line(dict, &line, &result.error) < 0) {
- result.ret = -1;
- } else switch (*line) {
- case DICT_PROTOCOL_REPLY_OK:
- result.ret = 1;
- break;
- case DICT_PROTOCOL_REPLY_NOTFOUND:
- result.ret = 0;
- break;
- case DICT_PROTOCOL_REPLY_FAIL: {
- const char *error = strchr(line+1, '\t');
-
- result.ret = -1;
- result.error = t_strdup_printf(
- "dict-server returned failure: %s",
- error != NULL ? t_str_tabunescape(error) : "");
- break;
- }
- default:
- result.ret = -1;
- result.error = t_strdup_printf(
- "dict-client: Invalid commit reply: %s", line);
- client_dict_disconnect(dict, result.error);
- line = NULL;
- break;
- }
- if (line != NULL &&
- (str_to_uint(t_strcut(line+1, '\t'), &id) < 0 || ctx->id != id)) {
- result.ret = -1;
- result.error = t_strdup_printf(
- "dict-client: Invalid commit reply, "
- "expected id=%u: %s", ctx->id, line);
- client_dict_disconnect(dict, result.error);
- }
+ query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_COMMIT, ctx->id);
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->callback = client_dict_transaction_commit_callback;
+ cmd->api_callback.commit = callback;
+ cmd->api_callback.context = context;
+ if (client_dict_cmd_send(dict, &cmd, NULL)) {
+ if (!async)
+ client_dict_wait(_ctx->dict);
}
+ } else if (ctx->error != NULL) {
+ /* already failed */
+ struct dict_commit_result result = {
+ .ret = -1, .error = ctx->error
+ };
+ callback(&result, context);
+ } else {
+ /* nothing changed */
+ struct dict_commit_result result = {
+ .ret = 1, .error = NULL
+ };
+ callback(&result, context);
}
- DLLIST_REMOVE(&dict->transactions, ctx);
- callback(&result, context);
i_free(ctx->error);
i_free(ctx);
if (ctx->sent_begin) {
const char *query;
- query = t_strdup_printf("%c%u\n", DICT_PROTOCOL_CMD_ROLLBACK,
+ query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_ROLLBACK,
ctx->id);
- client_dict_try_send_transaction_query(ctx, query);
+ client_dict_send_transaction_query(ctx, query);
}
DLLIST_REMOVE(&dict->transactions, ctx);
(struct client_dict_transaction_context *)_ctx;
const char *query;
- query = t_strdup_printf("%c%u\t%s\t%s\n",
+ query = t_strdup_printf("%c%u\t%s\t%s",
DICT_PROTOCOL_CMD_SET, ctx->id,
str_tabescape(key),
str_tabescape(value));
- client_dict_try_send_transaction_query(ctx, query);
+ client_dict_send_transaction_query(ctx, query);
}
static void client_dict_unset(struct dict_transaction_context *_ctx,
(struct client_dict_transaction_context *)_ctx;
const char *query;
- query = t_strdup_printf("%c%u\t%s\n",
+ query = t_strdup_printf("%c%u\t%s",
DICT_PROTOCOL_CMD_UNSET, ctx->id,
str_tabescape(key));
- client_dict_try_send_transaction_query(ctx, query);
+ client_dict_send_transaction_query(ctx, query);
}
static void client_dict_atomic_inc(struct dict_transaction_context *_ctx,
(struct client_dict_transaction_context *)_ctx;
const char *query;
- query = t_strdup_printf("%c%u\t%s\t%lld\n",
+ query = t_strdup_printf("%c%u\t%s\t%lld",
DICT_PROTOCOL_CMD_ATOMIC_INC,
ctx->id, str_tabescape(key), diff);
- client_dict_try_send_transaction_query(ctx, query);
+ client_dict_send_transaction_query(ctx, query);
}
struct dict dict_driver_client = {
client_dict_set,
client_dict_unset,
client_dict_atomic_inc,
- NULL
+ client_dict_lookup_async
}
};