/* Copyright (c) 2005-2016 Dovecot authors, see the included COPYING file */
#include "lib.h"
+#include "array.h"
#include "llist.h"
#include "str.h"
-#include "net.h"
-#include "istream.h"
+#include "strescape.h"
+#include "time-util.h"
+#include "connection.h"
#include "ostream.h"
#include "eacces-error.h"
#include "dict-private.h"
#define DICT_CLIENT_DEFAULT_TIMEOUT_MSECS 0
/* Abort dict lookup after this many seconds. */
-#define DICT_CLIENT_READ_TIMEOUT_SECS 30
-/* Log a warning if dict lookup takes longer than this many seconds. */
-#define DICT_CLIENT_READ_WARN_TIMEOUT_SECS 5
+#define DICT_CLIENT_REQUEST_TIMEOUT_MSECS 30000
+/* Log a warning if dict lookup takes longer than this many milliseconds. */
+#define DICT_CLIENT_REQUEST_WARN_TIMEOUT_MSECS 5000
+
+struct client_dict_cmd {
+ int refcount;
+ struct client_dict *dict;
+ struct timeval start_time;
+ char *query;
+
+ bool retry_errors;
+ bool no_replies;
+ bool unfinished;
+
+ void (*callback)(struct client_dict_cmd *cmd,
+ const char *line, const char *error);
+ struct client_dict_iterate_context *iter;
+
+ struct {
+ dict_lookup_callback_t *lookup;
+ dict_transaction_commit_callback_t *commit;
+ void *context;
+ } api_callback;
+};
+
+struct dict_connection {
+ struct connection conn;
+ struct client_dict *dict;
+};
struct client_dict {
struct dict dict;
+ struct dict_connection conn;
- pool_t pool;
- int fd;
- const char *uri;
- const char *username;
- const char *path;
+ char *uri, *username;
enum dict_data_type value_type;
time_t last_failed_connect;
- struct istream *input;
- struct ostream *output;
- struct io *io;
+ struct ioloop *ioloop, *prev_ioloop;
+ struct timeout *to_requests;
struct timeout *to_idle;
unsigned int idle_msecs;
+ struct timeval last_input;
+ ARRAY(struct client_dict_cmd *) cmds;
struct client_dict_transaction_context *transactions;
- unsigned int connect_counter;
unsigned int transaction_id_counter;
- unsigned int async_commits;
- unsigned int iter_replies_skip;
+};
- unsigned int in_iteration:1;
- unsigned int handshaked:1;
+struct client_dict_iter_result {
+ const char *key, *value;
};
struct client_dict_iterate_context {
struct dict_iterate_context ctx;
+ char *error;
+
+ pool_t results_pool;
+ ARRAY(struct client_dict_iter_result) results;
+ unsigned int result_idx;
- pool_t pool;
- bool failed;
+ bool async;
bool finished;
+ bool deinit;
};
struct client_dict_transaction_context {
struct dict_transaction_context ctx;
struct client_dict_transaction_context *prev, *next;
- /* for async commits */
- dict_transaction_commit_callback_t *callback;
- void *context;
+ char *error;
unsigned int id;
- unsigned int connect_counter;
- unsigned int failed:1;
- unsigned int sent_begin:1;
- unsigned int async:1;
- unsigned int committed:1;
+ bool sent_begin:1;
};
-static int client_dict_connect(struct client_dict *dict);
-static void client_dict_disconnect(struct client_dict *dict);
-
-const char *dict_client_escape(const char *src)
-{
- const char *p;
- string_t *dest;
-
- /* first do a quick lookup to see if there's anything to escape.
- probably not. */
- for (p = src; *p != '\0'; p++) {
- if (*p == '\t' || *p == '\n' || *p == '\001')
- break;
- }
-
- if (*p == '\0')
- return src;
-
- dest = t_str_new(256);
- str_append_n(dest, src, p - src);
-
- for (; *p != '\0'; p++) {
- switch (*p) {
- case '\t':
- str_append_c(dest, '\001');
- str_append_c(dest, 't');
- break;
- case '\n':
- str_append_c(dest, '\001');
- str_append_c(dest, 'n');
- break;
- case '\001':
- str_append_c(dest, '\001');
- str_append_c(dest, '1');
- break;
- default:
- str_append_c(dest, *p);
- break;
- }
- }
- return str_c(dest);
-}
-
-const char *dict_client_unescape(const char *src)
-{
- const char *p;
- string_t *dest;
-
- /* first do a quick lookup to see if there's anything to unescape.
- probably not. */
- for (p = src; *p != '\0'; p++) {
- if (*p == '\001')
- break;
- }
-
- if (*p == '\0')
- return src;
-
- dest = t_str_new(256);
- str_append_n(dest, src, p - src);
- for (; *p != '\0'; p++) {
- if (*p != '\001')
- str_append_c(dest, *p);
- else if (p[1] != '\0') {
- p++;
- switch (*p) {
- case '1':
- str_append_c(dest, '\001');
- break;
- case 't':
- str_append_c(dest, '\t');
- break;
- case 'n':
- str_append_c(dest, '\n');
- break;
- }
- }
- }
- return str_c(dest);
-}
-
-static int client_dict_send_query(struct client_dict *dict, const char *query)
-{
- if (dict->output == NULL) {
- /* not connected currently */
- if (client_dict_connect(dict) < 0)
- return -1;
- }
+static struct connection_list *dict_connections;
- if (o_stream_send_str(dict->output, query) < 0 ||
- o_stream_flush(dict->output) < 0) {
- /* Send failed */
- if (!dict->handshaked) {
- /* we're trying to send hello, don't try to reconnect */
- return -1;
- }
+static int client_dict_connect(struct client_dict *dict, const char **error_r);
+static void client_dict_disconnect(struct client_dict *dict, const char *reason);
- /* Reconnect and try again. */
- client_dict_disconnect(dict);
- if (client_dict_connect(dict) < 0)
- return -1;
-
- if (o_stream_send_str(dict->output, query) < 0 ||
- o_stream_flush(dict->output) < 0) {
- i_error("write(%s) failed: %m", dict->path);
- client_dict_disconnect(dict);
- return -1;
- }
- }
- return 0;
+static struct client_dict_cmd *
+client_dict_cmd_init(struct client_dict *dict, const char *query)
+{
+ struct client_dict_cmd *cmd;
+
+ cmd = i_new(struct client_dict_cmd, 1);
+ cmd->refcount = 1;
+ cmd->dict = dict;
+ cmd->query = i_strdup(query);
+ cmd->start_time = ioloop_timeval;
+ return cmd;
}
-static int
-client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx)
+static void client_dict_cmd_ref(struct client_dict_cmd *cmd)
{
- struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
-
- if (ctx->failed)
- return -1;
-
- T_BEGIN {
- const char *query;
-
- query = t_strdup_printf("%c%u\n", DICT_PROTOCOL_CMD_BEGIN,
- ctx->id);
- if (client_dict_send_query(dict, query) < 0)
- ctx->failed = TRUE;
- else
- ctx->connect_counter = dict->connect_counter;
- } T_END;
-
- return ctx->failed ? -1 : 0;
+ i_assert(cmd->refcount > 0);
+ cmd->refcount++;
}
-static int ATTR_NOWARN_UNUSED_RESULT
-client_dict_send_transaction_query(struct client_dict_transaction_context *ctx,
- const char *query)
+static bool client_dict_cmd_unref(struct client_dict_cmd *cmd)
{
- struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
+ i_assert(cmd->refcount > 0);
+ if (--cmd->refcount > 0)
+ return TRUE;
- if (!ctx->sent_begin) {
- if (client_dict_transaction_send_begin(ctx) < 0)
- return -1;
- ctx->sent_begin = TRUE;
- }
-
- if (ctx->connect_counter != dict->connect_counter || ctx->failed)
- return -1;
+ i_free(cmd->query);
+ i_free(cmd);
+ return FALSE;
+}
- if (dict->output == NULL) {
- /* not connected, this'll fail */
- return -1;
+static void dict_pre_api_callback(struct client_dict *dict)
+{
+ if (dict->prev_ioloop != NULL) {
+ /* Don't let callback see that we've created our
+ internal ioloop in case it wants to add some ios
+ or timeouts. */
+ current_ioloop = dict->prev_ioloop;
}
+}
- if (o_stream_send_str(dict->output, query) < 0 ||
- o_stream_flush(dict->output) < 0) {
- /* Send failed. Our transactions have died, so don't even try
- to re-send the command */
- ctx->failed = TRUE;
- client_dict_disconnect(dict);
- return -1;
+static void dict_post_api_callback(struct client_dict *dict)
+{
+ if (dict->prev_ioloop != NULL) {
+ current_ioloop = dict->ioloop;
+ /* stop client_dict_wait() */
+ io_loop_stop(dict->ioloop);
}
- return 0;
}
-static struct client_dict_transaction_context *
-client_dict_transaction_find(struct client_dict *dict, unsigned int id)
+static bool
+dict_cmd_callback_line(struct client_dict_cmd *cmd, const char *line)
{
- struct client_dict_transaction_context *ctx;
-
- for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) {
- if (ctx->id == id)
- return ctx;
- }
- return NULL;
+ cmd->unfinished = FALSE;
+ cmd->callback(cmd, line, NULL);
+ return !cmd->unfinished;
}
static void
-client_dict_finish_transaction(struct client_dict *dict,
- unsigned int id, int ret)
+dict_cmd_callback_error(struct client_dict_cmd *cmd, const char *error)
{
- struct client_dict_transaction_context *ctx;
-
- ctx = client_dict_transaction_find(dict, id);
- if (ctx == NULL) {
- i_error("dict-client: Unknown transaction id %u", id);
- return;
- }
- ctx->failed = TRUE;
- if (!ctx->committed)
- return;
+ cmd->unfinished = FALSE;
+ if (cmd->callback != NULL)
+ cmd->callback(cmd, NULL, error);
+ i_assert(!cmd->unfinished);
+}
- /* the callback may call the dict code again, so remove this
- transaction before calling it */
- i_assert(dict->async_commits > 0);
- if (--dict->async_commits == 0) {
- if (dict->io != NULL)
- io_remove(&dict->io);
- }
- DLLIST_REMOVE(&dict->transactions, ctx);
+static void client_dict_input_timeout(struct client_dict *dict)
+{
+ int diff = timeval_diff_msecs(&ioloop_timeval, &dict->last_input);
- if (ctx->callback != NULL)
- ctx->callback(ret, ctx->context);
- i_free(ctx);
+ client_dict_disconnect(dict, t_strdup_printf(
+ "Timeout: No input from dict for %u.%03u secs",
+ diff/1000, diff%1000));
}
-static ssize_t client_dict_read_timeout(struct client_dict *dict)
+static int
+client_dict_cmd_query_send(struct client_dict *dict, const char *query)
{
- time_t now, timeout;
- unsigned int diff;
+ struct const_iovec iov[2];
ssize_t ret;
- now = time(NULL);
- timeout = now + DICT_CLIENT_READ_TIMEOUT_SECS;
-
- do {
- alarm(timeout - now);
- ret = i_stream_read(dict->input);
- alarm(0);
- if (ret != 0)
- break;
-
- /* interrupted most likely because of timeout,
- but check anyway. */
- now = time(NULL);
- } while (now < timeout);
-
- if (ret > 0) {
- diff = time(NULL) - now;
- if (diff >= DICT_CLIENT_READ_WARN_TIMEOUT_SECS) {
- i_warning("read(%s): dict lookup took %u seconds",
- dict->path, diff);
- }
- }
- return ret;
+ iov[0].iov_base = query;
+ iov[0].iov_len = strlen(query);
+ iov[1].iov_base = "\n";
+ iov[1].iov_len = 1;
+ ret = o_stream_sendv(dict->conn.conn.output, iov, 2);
+ if (ret < 0)
+ return -1;
+ i_assert((size_t)ret == iov[0].iov_len + 1);
+ return 0;
}
-static int
-client_dict_read_one_line_real(struct client_dict *dict, char **line_r)
+static bool
+client_dict_cmd_send(struct client_dict *dict, struct client_dict_cmd **_cmd,
+ const char **error_r)
{
- unsigned int id;
- char *line;
- ssize_t ret;
+ struct client_dict_cmd *cmd = *_cmd;
+ const char *error = NULL;
+ bool retry = cmd->retry_errors;
+ int ret;
- *line_r = NULL;
- while ((line = i_stream_next_line(dict->input)) == NULL) {
- ret = client_dict_read_timeout(dict);
- switch (ret) {
- case -1:
- if (dict->input->stream_errno != 0)
- i_error("read(%s) failed: %m", dict->path);
- else {
- i_error("read(%s) failed: Remote disconnected",
- dict->path);
- }
- return -1;
- case -2:
- i_error("read(%s) returned too much data", dict->path);
- return -1;
- case 0:
- i_error("read(%s) failed: Timeout after %u seconds",
- dict->path, DICT_CLIENT_READ_TIMEOUT_SECS);
- return -1;
- default:
- i_assert(ret > 0);
- break;
+ *_cmd = NULL;
+
+ /* we're no longer idling. even with no_replies=TRUE we're going to
+ wait for COMMIT/ROLLBACK. */
+ if (dict->to_idle != NULL)
+ timeout_remove(&dict->to_idle);
+
+ if (client_dict_connect(dict, &error) < 0) {
+ retry = FALSE;
+ ret = -1;
+ } else {
+ ret = client_dict_cmd_query_send(dict, cmd->query);
+ if (ret < 0) {
+ error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name,
+ o_stream_get_error(dict->conn.conn.output));
}
}
- if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) {
- switch (line[1]) {
- case DICT_PROTOCOL_REPLY_OK:
- ret = 1;
- break;
- case DICT_PROTOCOL_REPLY_NOTFOUND:
+ if (ret < 0 && retry) {
+ /* Reconnect and try again. */
+ client_dict_disconnect(dict, error);
+ if (client_dict_connect(dict, &error) < 0)
+ ;
+ else if (client_dict_cmd_query_send(dict, cmd->query) < 0) {
+ error = t_strdup_printf("write(%s) failed: %s", dict->conn.conn.name,
+ o_stream_get_error(dict->conn.conn.output));
+ } else {
ret = 0;
- break;
- case DICT_PROTOCOL_REPLY_FAIL:
- ret = -1;
- break;
- default:
- i_error("dict-client: Invalid async commit line: %s",
- line);
- return -1;
- }
- if (str_to_uint(line+2, &id) < 0) {
- i_error("dict-client: Invalid ID");
- return -1;
}
- client_dict_finish_transaction(dict, id, ret);
- return 0;
}
- if (dict->iter_replies_skip > 0) {
- /* called aborted the iteration before finishing it.
- skip over the iteration reply */
- if (*line == DICT_PROTOCOL_REPLY_OK)
- return 0;
- if (*line != '\0' && *line != DICT_PROTOCOL_REPLY_FAIL) {
- i_error("dict-client: Invalid iteration reply line: %s",
- line);
- return -1;
+
+ if (cmd->no_replies) {
+ /* just send and forget */
+ client_dict_cmd_unref(cmd);
+ return TRUE;
+ } else if (ret < 0) {
+ i_assert(error != NULL);
+ dict_cmd_callback_error(cmd, error);
+ client_dict_cmd_unref(cmd);
+ if (error_r != NULL)
+ *error_r = error;
+ return FALSE;
+ } else {
+ if (dict->to_requests == NULL) {
+ dict->to_requests =
+ timeout_add(DICT_CLIENT_REQUEST_TIMEOUT_MSECS,
+ client_dict_input_timeout, dict);
}
- dict->iter_replies_skip--;
- return 0;
+ array_append(&dict->cmds, &cmd, 1);
+ return TRUE;
}
- *line_r = line;
- return 1;
}
-static int client_dict_read_one_line(struct client_dict *dict, char **line_r)
+static void
+client_dict_transaction_send_begin(struct client_dict_transaction_context *ctx)
{
- int ret;
+ struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
+ struct client_dict_cmd *cmd;
+ const char *query, *error;
- if ((ret = client_dict_read_one_line_real(dict, line_r)) < 0)
- client_dict_disconnect(dict);
- return ret;
+ i_assert(ctx->error == NULL);
+
+ ctx->sent_begin = TRUE;
+
+ /* transactions commands don't have replies. only COMMIT has. */
+ query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_BEGIN, ctx->id);
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->no_replies = TRUE;
+ cmd->retry_errors = TRUE;
+ if (!client_dict_cmd_send(dict, &cmd, &error))
+ ctx->error = i_strdup(error);
+}
+
+static void
+client_dict_send_transaction_query(struct client_dict_transaction_context *ctx,
+ const char *query)
+{
+ struct client_dict *dict = (struct client_dict *)ctx->ctx.dict;
+ struct client_dict_cmd *cmd;
+ const char *error;
+
+ if (ctx->error != NULL)
+ return;
+
+ if (!ctx->sent_begin)
+ client_dict_transaction_send_begin(ctx);
+
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->no_replies = TRUE;
+ if (!client_dict_cmd_send(dict, &cmd, &error))
+ ctx->error = i_strdup(error);
}
static bool client_dict_is_finished(struct client_dict *dict)
{
- return dict->transactions == NULL && !dict->in_iteration &&
- dict->async_commits == 0;
+ return dict->transactions == NULL && array_count(&dict->cmds) == 0;
}
static void client_dict_timeout(struct client_dict *dict)
{
if (client_dict_is_finished(dict))
- client_dict_disconnect(dict);
+ client_dict_disconnect(dict, "Idle disconnection");
}
static void client_dict_add_timeout(struct client_dict *dict)
} else if (client_dict_is_finished(dict)) {
dict->to_idle = timeout_add(dict->idle_msecs,
client_dict_timeout, dict);
+ if (dict->to_requests != NULL)
+ timeout_remove(&dict->to_requests);
}
}
-static char *client_dict_read_line(struct client_dict *dict)
+static int dict_conn_input_line(struct connection *_conn, const char *line)
{
- char *line;
+ struct dict_connection *conn = (struct dict_connection *)_conn;
+ struct client_dict *dict = conn->dict;
+ struct client_dict_cmd *const *cmds;
+ unsigned int count;
+ bool finished;
+ int diff;
+
+ dict->last_input = ioloop_timeval;
+ if (dict->to_requests != NULL)
+ timeout_reset(dict->to_requests);
+
+ cmds = array_get(&conn->dict->cmds, &count);
+ if (count == 0) {
+ i_error("%s: Received reply without pending commands: %s",
+ dict->conn.conn.name, line);
+ return -1;
+ }
+ i_assert(!cmds[0]->no_replies);
- while (client_dict_read_one_line(dict, &line) == 0)
- ;
+ client_dict_cmd_ref(cmds[0]);
+ finished = dict_cmd_callback_line(cmds[0], line);
+ if (!client_dict_cmd_unref(cmds[0])) {
+ /* disconnected during command handling */
+ return -1;
+ }
+ if (!finished) {
+ /* more lines needed for this command */
+ return 1;
+ }
+ diff = timeval_diff_msecs(&ioloop_timeval, &cmds[0]->start_time);
+ if (diff >= DICT_CLIENT_REQUEST_WARN_TIMEOUT_MSECS) {
+ i_warning("read(%s): dict lookup took %u.%03u seconds: %s",
+ dict->conn.conn.name, diff/1000, diff % 1000,
+ cmds[0]->query);
+ }
+ client_dict_cmd_unref(cmds[0]);
+ array_delete(&dict->cmds, 0, 1);
client_dict_add_timeout(dict);
- return line;
+ return 1;
}
-static int client_dict_connect(struct client_dict *dict)
+static int client_dict_connect(struct client_dict *dict, const char **error_r)
{
const char *query;
+ if (dict->conn.conn.fd_in != -1)
+ return 0;
if (dict->last_failed_connect == ioloop_time) {
/* Try again later */
+ *error_r = "Waiting until the next connect attempt";
return -1;
}
- dict->fd = net_connect_unix(dict->path);
- if (dict->fd == -1) {
+ if (connection_client_connect(&dict->conn.conn) < 0) {
dict->last_failed_connect = ioloop_time;
if (errno == EACCES) {
- i_error("%s", eacces_error_get("net_connect_unix",
- dict->path));
+ *error_r = eacces_error_get("net_connect_unix",
+ dict->conn.conn.name);
} else {
- i_error("net_connect_unix(%s) failed: %m",
- dict->path);
+ *error_r = t_strdup_printf(
+ "net_connect_unix(%s) failed: %m", dict->conn.conn.name);
}
return -1;
}
- /* Dictionary lookups are blocking */
- net_set_nonblock(dict->fd, FALSE);
-
- dict->input = i_stream_create_fd(dict->fd, (size_t)-1, FALSE);
- dict->output = o_stream_create_fd(dict->fd, 4096, FALSE);
-
query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n",
DICT_PROTOCOL_CMD_HELLO,
DICT_CLIENT_PROTOCOL_MAJOR_VERSION,
DICT_CLIENT_PROTOCOL_MINOR_VERSION,
dict->value_type, dict->username, dict->uri);
- if (client_dict_send_query(dict, query) < 0) {
- dict->last_failed_connect = ioloop_time;
- client_dict_disconnect(dict);
- return -1;
- }
-
- dict->handshaked = TRUE;
+ o_stream_nsend_str(dict->conn.conn.output, query);
+ client_dict_add_timeout(dict);
return 0;
}
-static void client_dict_disconnect(struct client_dict *dict)
+static void
+client_dict_abort_commands(struct client_dict *dict, const char *reason)
+{
+ ARRAY(struct client_dict_cmd *) cmds_copy;
+ struct client_dict_cmd *const *cmdp;
+
+ /* abort all commands */
+ t_array_init(&cmds_copy, array_count(&dict->cmds));
+ array_append_array(&cmds_copy, &dict->cmds);
+ array_clear(&dict->cmds);
+
+ array_foreach(&cmds_copy, cmdp) {
+ dict_cmd_callback_error(*cmdp, reason);
+ client_dict_cmd_unref(*cmdp);
+ }
+}
+
+static void client_dict_disconnect(struct client_dict *dict, const char *reason)
{
struct client_dict_transaction_context *ctx, *next;
- dict->connect_counter++;
- dict->handshaked = FALSE;
- dict->iter_replies_skip = 0;
+ client_dict_abort_commands(dict, reason);
- /* abort all pending async commits */
+ /* all transactions that have sent BEGIN are no longer valid */
for (ctx = dict->transactions; ctx != NULL; ctx = next) {
next = ctx->next;
- if (ctx->async)
- client_dict_finish_transaction(dict, ctx->id, -1);
+ if (ctx->sent_begin && ctx->error == NULL)
+ ctx->error = i_strdup(reason);
}
if (dict->to_idle != NULL)
timeout_remove(&dict->to_idle);
- if (dict->io != NULL)
- io_remove(&dict->io);
- if (dict->input != NULL)
- i_stream_destroy(&dict->input);
- if (dict->output != NULL)
- o_stream_destroy(&dict->output);
+ if (dict->to_requests != NULL)
+ timeout_remove(&dict->to_requests);
+ connection_disconnect(&dict->conn.conn);
+}
- if (dict->fd != -1) {
- if (close(dict->fd) < 0)
- i_error("close(%s) failed: %m", dict->path);
- dict->fd = -1;
- }
+static void dict_conn_destroy(struct connection *_conn)
+{
+ struct dict_connection *conn = (struct dict_connection *)_conn;
+
+ client_dict_disconnect(conn->dict, connection_disconnect_reason(_conn));
}
+static const struct connection_settings dict_conn_set = {
+ .input_max_size = (size_t)-1,
+ .output_max_size = (size_t)-1,
+ .client = TRUE
+};
+
+static const struct connection_vfuncs dict_conn_vfuncs = {
+ .destroy = dict_conn_destroy,
+ .input_line = dict_conn_input_line
+};
+
static int
client_dict_init(struct dict *driver, const char *uri,
const struct dict_settings *set,
struct dict **dict_r, const char **error_r)
{
+ struct ioloop *old_ioloop = current_ioloop;
struct client_dict *dict;
- const char *p, *dest_uri;
+ const char *p, *dest_uri, *path;
unsigned int idle_msecs = DICT_CLIENT_DEFAULT_TIMEOUT_MSECS;
- pool_t pool;
/* uri = [idle_msecs=<n>:] [<path>] ":" <uri> */
if (strncmp(uri, "idle_msecs=", 11) == 0) {
return -1;
}
- pool = pool_alloconly_create("client dict", 1024);
- dict = p_new(pool, struct client_dict, 1);
- dict->pool = pool;
+ if (dict_connections == NULL) {
+ dict_connections = connection_list_init(&dict_conn_set,
+ &dict_conn_vfuncs);
+ }
+
+ dict = i_new(struct client_dict, 1);
dict->dict = *driver;
+ dict->conn.dict = dict;
dict->value_type = set->value_type;
- dict->username = p_strdup(pool, set->username);
+ dict->username = i_strdup(set->username);
dict->idle_msecs = idle_msecs;
-
- dict->fd = -1;
+ i_array_init(&dict->cmds, 32);
if (uri[0] == ':') {
/* default path */
- dict->path = p_strconcat(pool, set->base_dir,
- "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL);
+ path = t_strconcat(set->base_dir,
+ "/"DEFAULT_DICT_SERVER_SOCKET_FNAME, NULL);
} else if (uri[0] == '/') {
/* absolute path */
- dict->path = p_strdup_until(pool, uri, dest_uri);
+ path = t_strdup_until(uri, dest_uri);
} else {
/* relative path to base_dir */
- dict->path = p_strconcat(pool, set->base_dir, "/",
- p_strdup_until(pool, uri, dest_uri), NULL);
+ path = t_strconcat(set->base_dir, "/",
+ t_strdup_until(uri, dest_uri), NULL);
}
- dict->uri = p_strdup(pool, dest_uri + 1);
+ connection_init_client_unix(dict_connections, &dict->conn.conn, path);
+ dict->uri = i_strdup(dest_uri + 1);
+
+ dict->ioloop = io_loop_create();
+ io_loop_set_current(old_ioloop);
*dict_r = &dict->dict;
return 0;
}
static void client_dict_deinit(struct dict *_dict)
{
struct client_dict *dict = (struct client_dict *)_dict;
+ struct ioloop *old_ioloop = current_ioloop;
+
+ client_dict_disconnect(dict, "Deinit");
+ connection_deinit(&dict->conn.conn);
- client_dict_disconnect(dict);
i_assert(dict->transactions == NULL);
- pool_unref(&dict->pool);
+ i_assert(array_count(&dict->cmds) == 0);
+
+ io_loop_set_current(dict->ioloop);
+ io_loop_destroy(&dict->ioloop);
+ io_loop_set_current(old_ioloop);
+
+ array_free(&dict->cmds);
+ i_free(dict->username);
+ i_free(dict->uri);
+ i_free(dict);
+
+ if (dict_connections->connections == NULL)
+ connection_list_deinit(&dict_connections);
}
static int client_dict_wait(struct dict *_dict)
{
struct client_dict *dict = (struct client_dict *)_dict;
- char *line;
- int ret;
- if (!dict->handshaked)
- return -1;
+ if (array_count(&dict->cmds) == 0)
+ return 0;
- while (dict->async_commits > 0) {
- if ((ret = client_dict_read_one_line(dict, &line)) < 0)
- return -1;
+ dict->prev_ioloop = current_ioloop;
+ io_loop_set_current(dict->ioloop);
- if (ret > 0) {
- i_error("dict-client: Unexpected reply waiting waiting for async commits: %s", line);
- client_dict_disconnect(dict);
- return -1;
- }
- }
+ if (dict->to_idle != NULL)
+ dict->to_idle = io_loop_move_timeout(&dict->to_idle);
+ if (dict->to_requests != NULL)
+ dict->to_requests = io_loop_move_timeout(&dict->to_requests);
+ connection_switch_ioloop(&dict->conn.conn);
+
+ while (array_count(&dict->cmds) > 0)
+ io_loop_run(dict->ioloop);
+
+ io_loop_set_current(dict->prev_ioloop);
+ dict->prev_ioloop = NULL;
+
+ if (dict->to_idle != NULL)
+ dict->to_idle = io_loop_move_timeout(&dict->to_idle);
+ if (dict->to_requests != NULL)
+ dict->to_requests = io_loop_move_timeout(&dict->to_requests);
+ connection_switch_ioloop(&dict->conn.conn);
return 0;
}
-static int client_dict_lookup(struct dict *_dict, pool_t pool,
- const char *key, const char **value_r)
+static void
+client_dict_lookup_async_callback(struct client_dict_cmd *cmd, const char *line,
+ const char *error)
+{
+ struct client_dict *dict = cmd->dict;
+ struct dict_lookup_result result;
+
+ memset(&result, 0, sizeof(result));
+ if (error != NULL) {
+ result.ret = -1;
+ result.error = error;
+ } else switch (*line) {
+ case DICT_PROTOCOL_REPLY_OK:
+ result.value = t_str_tabunescape(line + 1);
+ result.ret = 1;
+ break;
+ case DICT_PROTOCOL_REPLY_NOTFOUND:
+ result.ret = 0;
+ break;
+ case DICT_PROTOCOL_REPLY_FAIL:
+ result.error = line[1] == '\0' ? "dict-server returned failure" :
+ t_strdup_printf("dict-server returned failure: %s",
+ t_str_tabunescape(line+1));
+ result.ret = -1;
+ break;
+ default:
+ result.error = t_strdup_printf(
+ "dict-client: Invalid lookup '%s' reply: %s",
+ cmd->query, line);
+ client_dict_disconnect(dict, result.error);
+ result.ret = -1;
+ break;
+ }
+ dict_pre_api_callback(dict);
+ cmd->api_callback.lookup(&result, cmd->api_callback.context);
+ dict_post_api_callback(dict);
+}
+
+static void
+client_dict_lookup_async(struct dict *_dict, const char *key,
+ dict_lookup_callback_t *callback, void *context)
{
struct client_dict *dict = (struct client_dict *)_dict;
- const char *line;
- int ret;
+ struct client_dict_cmd *cmd;
+ const char *query;
- T_BEGIN {
- const char *query;
+ query = t_strdup_printf("%c%s", DICT_PROTOCOL_CMD_LOOKUP,
+ str_tabescape(key));
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->callback = client_dict_lookup_async_callback;
+ cmd->api_callback.lookup = callback;
+ cmd->api_callback.context = context;
+ cmd->retry_errors = TRUE;
- query = t_strdup_printf("%c%s\n", DICT_PROTOCOL_CMD_LOOKUP,
- dict_client_escape(key));
- ret = client_dict_send_query(dict, query);
- } T_END;
- if (ret < 0)
- return -1;
+ client_dict_cmd_send(dict, &cmd, NULL);
+}
- /* read reply */
- line = client_dict_read_line(dict);
- if (line == NULL)
- return -1;
+static void client_dict_lookup_callback(const struct dict_lookup_result *result,
+ void *context)
+{
+ struct dict_lookup_result *result_copy = context;
- switch (*line) {
- case DICT_PROTOCOL_REPLY_OK:
- *value_r = p_strdup(pool, dict_client_unescape(line + 1));
- return 1;
- case DICT_PROTOCOL_REPLY_NOTFOUND:
+ *result_copy = *result;
+}
+
+static int client_dict_lookup(struct dict *_dict, pool_t pool, const char *key,
+ const char **value_r)
+{
+ struct dict_lookup_result result;
+
+ memset(&result, 0, sizeof(result));
+ result.ret = -2;
+
+ client_dict_lookup_async(_dict, key, client_dict_lookup_callback, &result);
+ if (result.ret == -2)
+ client_dict_wait(_dict);
+
+ switch (result.ret) {
+ case -1:
+ i_error("dict-client: Lookup '%s' failed: %s", key, result.error);
+ return -1;
+ case 0:
*value_r = NULL;
return 0;
+ case 1:
+ *value_r = p_strdup(pool, result.value);
+ return 1;
+ }
+ i_unreached();
+}
+
+static void client_dict_iterate_free(struct client_dict_iterate_context *ctx)
+{
+ if (!ctx->deinit || !ctx->finished)
+ return;
+ i_free(ctx->error);
+ i_free(ctx);
+}
+
+static void
+client_dict_iter_api_callback(struct client_dict_iterate_context *ctx,
+ struct client_dict *dict)
+{
+ if (ctx->deinit) {
+ /* iterator was already deinitialized */
+ return;
+ }
+ if (ctx->ctx.async_callback != NULL) {
+ dict_pre_api_callback(dict);
+ ctx->ctx.async_callback(ctx->ctx.async_context);
+ dict_post_api_callback(dict);
+ } else {
+ /* synchronous lookup */
+ io_loop_stop(dict->ioloop);
+ }
+}
+
+static void
+client_dict_iter_async_callback(struct client_dict_cmd *cmd, const char *line,
+ const char *error)
+{
+ struct client_dict_iterate_context *ctx = cmd->iter;
+ struct client_dict *dict = cmd->dict;
+ struct client_dict_iter_result *result;
+ const char *key = NULL, *value = NULL;
+
+ if (error != NULL) {
+ /* failed */
+ } else switch (*line) {
+ case '\0':
+ /* end of iteration */
+ ctx->finished = TRUE;
+ client_dict_iter_api_callback(ctx, dict);
+ client_dict_iterate_free(ctx);
+ return;
+ case DICT_PROTOCOL_REPLY_OK:
+ /* key \t value */
+ key = line+1;
+ value = strchr(key, '\t');
+ break;
case DICT_PROTOCOL_REPLY_FAIL:
- return -1;
+ error = t_strdup_printf("dict-server returned failure: %s", line+1);
+ break;
default:
- i_error("dict-client: Invalid lookup '%s' reply: %s", key, line);
- client_dict_disconnect(dict);
- return -1;
+ break;
+ }
+ if (value == NULL && error == NULL) {
+ /* broken protocol */
+ error = t_strdup_printf("dict client (%s) sent broken iterate reply: %s",
+ dict->conn.conn.name, line);
+ client_dict_disconnect(dict, error);
+ }
+
+ if (error != NULL) {
+ if (ctx->error == NULL)
+ ctx->error = i_strdup(error);
+ ctx->finished = TRUE;
+ if (dict->prev_ioloop != NULL) {
+ /* stop client_dict_wait() */
+ io_loop_stop(dict->ioloop);
+ }
+ client_dict_iterate_free(ctx);
+ return;
+ }
+ cmd->unfinished = TRUE;
+
+ if (ctx->deinit) {
+ /* iterator was already deinitialized */
+ return;
}
+
+ key = t_strdup_until(key, value++);
+ result = array_append_space(&ctx->results);
+ result->key = p_strdup(ctx->results_pool, t_str_tabunescape(key));
+ result->value = p_strdup(ctx->results_pool, t_str_tabunescape(value));
+
+ client_dict_iter_api_callback(ctx, dict);
}
static struct dict_iterate_context *
{
struct client_dict *dict = (struct client_dict *)_dict;
struct client_dict_iterate_context *ctx;
-
- if (dict->in_iteration)
- i_panic("dict-client: Only one iteration supported");
- dict->in_iteration = TRUE;
+ struct client_dict_cmd *cmd;
+ string_t *query = t_str_new(256);
+ unsigned int i;
ctx = i_new(struct client_dict_iterate_context, 1);
ctx->ctx.dict = _dict;
- ctx->pool = pool_alloconly_create("client dict iteration", 512);
+ ctx->results_pool = pool_alloconly_create("client dict iteration", 512);
+ ctx->async = (flags & DICT_ITERATE_FLAG_ASYNC) != 0;
+ i_array_init(&ctx->results, 64);
+
+ str_printfa(query, "%c%d", DICT_PROTOCOL_CMD_ITERATE, flags);
+ for (i = 0; paths[i] != NULL; i++) {
+ str_append_c(query, '\t');
+ str_append(query, str_tabescape(paths[i]));
+ }
- T_BEGIN {
- string_t *query = t_str_new(256);
- unsigned int i;
+ cmd = client_dict_cmd_init(dict, str_c(query));
+ cmd->iter = ctx;
+ cmd->callback = client_dict_iter_async_callback;
+ cmd->retry_errors = TRUE;
- str_printfa(query, "%c%d", DICT_PROTOCOL_CMD_ITERATE, flags);
- for (i = 0; paths[i] != NULL; i++) {
- str_append_c(query, '\t');
- str_append(query, dict_client_escape(paths[i]));
- }
- str_append_c(query, '\n');
- if (client_dict_send_query(dict, str_c(query)) < 0)
- ctx->failed = TRUE;
- } T_END;
+ client_dict_cmd_send(dict, &cmd, NULL);
return &ctx->ctx;
}
{
struct client_dict_iterate_context *ctx =
(struct client_dict_iterate_context *)_ctx;
- struct client_dict *dict = (struct client_dict *)_ctx->dict;
- char *line, *key, *value;
-
- if (ctx->failed)
- return FALSE;
+ const struct client_dict_iter_result *results;
+ unsigned int count;
- /* read next reply */
- line = client_dict_read_line(dict);
- if (line == NULL) {
- ctx->failed = TRUE;
+ if (ctx->error != NULL) {
+ ctx->ctx.has_more = FALSE;
return FALSE;
}
- if (*line == '\0') {
- /* end of iteration */
- ctx->finished = TRUE;
- return FALSE;
+ results = array_get(&ctx->results, &count);
+ if (ctx->result_idx < count) {
+ *key_r = results[ctx->result_idx].key;
+ *value_r = results[ctx->result_idx].value;
+ ctx->ctx.has_more = TRUE;
+ ctx->result_idx++;
+ return TRUE;
}
-
- /* line contains key \t value */
- p_clear(ctx->pool);
-
- switch (*line) {
- case DICT_PROTOCOL_REPLY_OK:
- key = line+1;
- value = strchr(key, '\t');
- break;
- case DICT_PROTOCOL_REPLY_FAIL:
- ctx->failed = TRUE;
- return FALSE;
- default:
- key = NULL;
- value = NULL;
- break;
+ ctx->ctx.has_more = !ctx->finished;
+ ctx->result_idx = 0;
+ array_clear(&ctx->results);
+ p_clear(ctx->results_pool);
+
+ if (!ctx->async && ctx->ctx.has_more) {
+ client_dict_wait(_ctx->dict);
+ return client_dict_iterate(_ctx, key_r, value_r);
}
- if (value == NULL) {
- /* broken protocol */
- i_error("dict client (%s) sent broken iterate reply: %s", dict->path, line);
- ctx->failed = TRUE;
- return FALSE;
- }
- *value++ = '\0';
-
- *key_r = p_strdup(ctx->pool, dict_client_unescape(key));
- *value_r = p_strdup(ctx->pool, dict_client_unescape(value));
- return TRUE;
+ return FALSE;
}
static int client_dict_iterate_deinit(struct dict_iterate_context *_ctx)
struct client_dict *dict = (struct client_dict *)_ctx->dict;
struct client_dict_iterate_context *ctx =
(struct client_dict_iterate_context *)_ctx;
- int ret = ctx->failed ? -1 : 0;
+ int ret = ctx->error != NULL ? -1 : 0;
- if (!ctx->finished)
- dict->iter_replies_skip++;
+ ctx->deinit = TRUE;
- pool_unref(&ctx->pool);
- i_free(ctx);
- dict->in_iteration = FALSE;
+ if (ret < 0)
+ i_error("dict-client: Iteration failed: %s", ctx->error);
+ array_free(&ctx->results);
+ pool_unref(&ctx->results_pool);
+ client_dict_iterate_free(ctx);
client_dict_add_timeout(dict);
return ret;
return &ctx->ctx;
}
-static void dict_async_input(struct client_dict *dict)
+static void
+client_dict_transaction_commit_callback(struct client_dict_cmd *cmd,
+ const char *line, const char *error)
{
- char *line;
- int ret;
-
- i_assert(!dict->in_iteration);
+ struct client_dict *dict = cmd->dict;
+ int ret = -1;
- do {
- ret = client_dict_read_one_line(dict, &line);
- } while (ret == 0 && i_stream_get_data_size(dict->input) > 0);
+ if (error != NULL) {
+ /* failed */
+ i_error("dict-client: Commit failed: %s", error);
+ } else switch (*line) {
+ case DICT_PROTOCOL_REPLY_OK:
+ ret = 1;
+ break;
+ case DICT_PROTOCOL_REPLY_NOTFOUND:
+ ret = 0;
+ break;
+ case DICT_PROTOCOL_REPLY_FAIL: {
+ const char *error = strchr(line+1, '\t');
- if (ret < 0)
- io_remove(&dict->io);
- else if (ret > 0) {
- i_error("dict-client: Unexpected reply waiting waiting for async commits: %s", line);
- client_dict_disconnect(dict);
+ i_error("dict-client: server returned failure: %s",
+ error != NULL ? t_str_tabunescape(error) : "");
+ break;
+ }
+ default:
+ ret = -1;
+ error = t_strdup_printf("dict-client: Invalid commit reply: %s", line);
+ i_error("%s", error);
+ client_dict_disconnect(dict, error);
+ break;
}
+ dict_pre_api_callback(dict);
+ cmd->api_callback.commit(ret, cmd->api_callback.context);
+ dict_post_api_callback(dict);
+}
+
+static void commit_sync_callback(int ret, void *context)
+{
+ int *ret_p = context;
+ *ret_p = ret;
}
static int
struct client_dict_transaction_context *ctx =
(struct client_dict_transaction_context *)_ctx;
struct client_dict *dict = (struct client_dict *)_ctx->dict;
- unsigned int id;
- int ret = ctx->failed ? -1 : 1;
+ struct client_dict_cmd *cmd;
+ const char *query;
+ int ret = -1;
- ctx->committed = TRUE;
- if (ctx->sent_begin && !ctx->failed) T_BEGIN {
- const char *query, *line;
+ DLLIST_REMOVE(&dict->transactions, ctx);
- query = t_strdup_printf("%c%u\n", !async ?
- DICT_PROTOCOL_CMD_COMMIT :
- DICT_PROTOCOL_CMD_COMMIT_ASYNC,
- ctx->id);
- if (client_dict_send_transaction_query(ctx, query) < 0)
- ret = -1;
- else if (async) {
- ctx->callback = callback;
- ctx->context = context;
- ctx->async = TRUE;
- if (dict->async_commits++ == 0) {
- dict->io = io_add(dict->fd, IO_READ,
- dict_async_input, dict);
- }
+ if (ctx->sent_begin && ctx->error == NULL) {
+ query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_COMMIT, ctx->id);
+ cmd = client_dict_cmd_init(dict, query);
+ cmd->callback = client_dict_transaction_commit_callback;
+ if (callback != NULL) {
+ cmd->api_callback.commit = callback;
+ cmd->api_callback.context = context;
} else {
- /* sync commit, read reply */
- line = client_dict_read_line(dict);
- if (line == NULL)
- ret = -1;
- else switch (*line) {
- case DICT_PROTOCOL_REPLY_OK:
- ret = 1;
- break;
- case DICT_PROTOCOL_REPLY_NOTFOUND:
- ret = 0;
- break;
- case DICT_PROTOCOL_REPLY_FAIL:
- ret = -1;
- break;
- default:
- i_error("dict-client: Invalid commit reply: %s", line);
- client_dict_disconnect(dict);
- line = NULL;
- ret = -1;
- break;
- }
- if (line != NULL &&
- (str_to_uint(line+1, &id) < 0 || ctx->id != id)) {
- i_error("dict-client: Invalid commit reply, "
- "expected id=%u: %s", ctx->id, line);
- client_dict_disconnect(dict);
- ret = -1;
- }
+ cmd->api_callback.commit = commit_sync_callback;
+ cmd->api_callback.context = &ret;
+ }
+ if (client_dict_cmd_send(dict, &cmd, NULL)) {
+ if (!async)
+ client_dict_wait(_ctx->dict);
}
- } T_END;
+ } else if (ctx->error != NULL) {
+ /* already failed */
+ if (callback != NULL)
+ callback(-1, context);
+ ret = -1;
+ } else {
+ /* nothing changed */
+ if (callback != NULL)
+ callback(1, context);
+ ret = 1;
+ }
- if (ret < 0 || !async) {
- DLLIST_REMOVE(&dict->transactions, ctx);
- i_free(ctx);
+ i_free(ctx->error);
+ i_free(ctx);
- client_dict_add_timeout(dict);
- }
+ client_dict_add_timeout(dict);
return ret;
}
(struct client_dict_transaction_context *)_ctx;
struct client_dict *dict = (struct client_dict *)_ctx->dict;
- if (ctx->sent_begin) T_BEGIN {
+ if (ctx->sent_begin) {
const char *query;
- query = t_strdup_printf("%c%u\n", DICT_PROTOCOL_CMD_ROLLBACK,
+ query = t_strdup_printf("%c%u", DICT_PROTOCOL_CMD_ROLLBACK,
ctx->id);
client_dict_send_transaction_query(ctx, query);
- } T_END;
+ }
DLLIST_REMOVE(&dict->transactions, ctx);
i_free(ctx);
{
struct client_dict_transaction_context *ctx =
(struct client_dict_transaction_context *)_ctx;
+ const char *query;
- T_BEGIN {
- const char *query;
-
- query = t_strdup_printf("%c%u\t%s\t%s\n",
- DICT_PROTOCOL_CMD_SET, ctx->id,
- dict_client_escape(key),
- dict_client_escape(value));
- client_dict_send_transaction_query(ctx, query);
- } T_END;
+ query = t_strdup_printf("%c%u\t%s\t%s",
+ DICT_PROTOCOL_CMD_SET, ctx->id,
+ str_tabescape(key),
+ str_tabescape(value));
+ client_dict_send_transaction_query(ctx, query);
}
static void client_dict_unset(struct dict_transaction_context *_ctx,
{
struct client_dict_transaction_context *ctx =
(struct client_dict_transaction_context *)_ctx;
+ const char *query;
- T_BEGIN {
- const char *query;
-
- query = t_strdup_printf("%c%u\t%s\n",
- DICT_PROTOCOL_CMD_UNSET, ctx->id,
- dict_client_escape(key));
- client_dict_send_transaction_query(ctx, query);
- } T_END;
-}
-
-static void client_dict_append(struct dict_transaction_context *_ctx,
- const char *key, const char *value)
-{
- struct client_dict_transaction_context *ctx =
- (struct client_dict_transaction_context *)_ctx;
-
- T_BEGIN {
- const char *query;
-
- query = t_strdup_printf("%c%u\t%s\t%s\n",
- DICT_PROTOCOL_CMD_APPEND, ctx->id,
- dict_client_escape(key),
- dict_client_escape(value));
- client_dict_send_transaction_query(ctx, query);
- } T_END;
+ query = t_strdup_printf("%c%u\t%s",
+ DICT_PROTOCOL_CMD_UNSET, ctx->id,
+ str_tabescape(key));
+ client_dict_send_transaction_query(ctx, query);
}
static void client_dict_atomic_inc(struct dict_transaction_context *_ctx,
{
struct client_dict_transaction_context *ctx =
(struct client_dict_transaction_context *)_ctx;
+ const char *query;
- T_BEGIN {
- const char *query;
- query = t_strdup_printf("%c%u\t%s\t%lld\n",
- DICT_PROTOCOL_CMD_ATOMIC_INC,
- ctx->id, dict_client_escape(key), diff);
- client_dict_send_transaction_query(ctx, query);
- } T_END;
+ query = t_strdup_printf("%c%u\t%s\t%lld",
+ DICT_PROTOCOL_CMD_ATOMIC_INC,
+ ctx->id, str_tabescape(key), diff);
+ client_dict_send_transaction_query(ctx, query);
}
struct dict dict_driver_client = {
client_dict_transaction_rollback,
client_dict_set,
client_dict_unset,
- client_dict_append,
+ NULL,
client_dict_atomic_inc,
- NULL
+ client_dict_lookup_async
}
};