#define DICT_OUTPUT_OPTIMAL_SIZE 1024
-struct dict_client_cmd {
- int cmd;
- int (*func)(struct dict_connection *conn, const char *line);
+struct dict_cmd_func {
+ enum dict_protocol_cmd cmd;
+ int (*func)(struct dict_connection_cmd *cmd, const char *line);
};
-static int cmd_lookup(struct dict_connection *conn, const char *line)
+struct dict_connection_cmd {
+ const struct dict_cmd_func *cmd;
+ struct dict_connection *conn;
+ char *reply;
+
+ struct dict_iterate_context *iter;
+ enum dict_iterate_flags iter_flags;
+
+ struct dict_connection_transaction *trans;
+};
+
+static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd);
+
+static void dict_connection_cmd_free(struct dict_connection_cmd *cmd)
{
- const char *reply;
- const char *value;
- int ret;
+ if (cmd->iter != NULL)
+ (void)dict_iterate_deinit(&cmd->iter);
+ array_delete(&cmd->conn->cmds, 0, 1);
+ i_free(cmd->reply);
- if (conn->iter_ctx != NULL) {
- i_error("dict client: LOOKUP: Can't lookup while iterating");
- return -1;
+ dict_connection_continue_input(cmd->conn);
+ i_free(cmd);
+}
+
+static void dict_connection_cmd_remove(struct dict_connection_cmd *cmd)
+{
+ struct dict_connection_cmd *const *cmds;
+ unsigned int i, count;
+
+ cmds = array_get(&cmd->conn->cmds, &count);
+ for (i = 0; i < count; i++) {
+ if (cmds[i] == cmd) {
+ dict_connection_cmd_free(cmd);
+ return;
+ }
}
+ i_unreached();
+}
- /* <key> */
- ret = dict_lookup(conn->dict, pool_datastack_create(), line, &value);
- if (ret > 0) {
- reply = t_strdup_printf("%c%s\n",
- DICT_PROTOCOL_REPLY_OK, str_tabescape(value));
- o_stream_nsend_str(conn->output, reply);
+static void dict_connection_cmds_flush(struct dict_connection *conn)
+{
+ struct dict_connection_cmd *cmd, *const *first_cmdp;
+
+ while (array_count(&conn->cmds) > 0) {
+ first_cmdp = array_idx(&conn->cmds, 0);
+ cmd = *first_cmdp;
+
+ if (cmd->reply == NULL) {
+ /* command not finished yet */
+ break;
+ }
+
+ o_stream_nsend_str(conn->output, cmd->reply);
+ dict_connection_cmd_remove(cmd);
+ }
+}
+
+void dict_connection_cmds_free(struct dict_connection *conn)
+{
+ struct dict_connection_cmd *const *first_cmdp;
+
+ while (array_count(&conn->cmds) > 0) {
+ first_cmdp = array_idx(&conn->cmds, 0);
+ dict_connection_cmd_remove(*first_cmdp);
+ }
+}
+
+static void
+cmd_lookup_callback(const struct dict_lookup_result *result, void *context)
+{
+ struct dict_connection_cmd *cmd = context;
+
+ if (result->ret > 0) {
+ cmd->reply = i_strdup_printf("%c%s\n",
+ DICT_PROTOCOL_REPLY_OK, str_tabescape(result->value));
+ } else if (result->ret == 0) {
+ cmd->reply = i_strdup_printf("%c\n", DICT_PROTOCOL_REPLY_NOTFOUND);
} else {
- reply = t_strdup_printf("%c\n", ret == 0 ?
- DICT_PROTOCOL_REPLY_NOTFOUND :
- DICT_PROTOCOL_REPLY_FAIL);
- o_stream_nsend_str(conn->output, reply);
+ i_error("%s", result->error);
+ cmd->reply = i_strdup_printf("%c\n", DICT_PROTOCOL_REPLY_FAIL);
}
- return 0;
+ dict_connection_cmds_flush(cmd->conn);
}
-static int cmd_iterate_flush(struct dict_connection *conn)
+static int cmd_lookup(struct dict_connection_cmd *cmd, const char *line)
+{
+ /* <key> */
+ dict_lookup_async(cmd->conn->dict, line, cmd_lookup_callback, cmd);
+ return 1;
+}
+
+static int cmd_iterate_flush(struct dict_connection_cmd *cmd)
{
string_t *str;
const char *key, *value;
str = t_str_new(256);
- o_stream_cork(conn->output);
- while (dict_iterate(conn->iter_ctx, &key, &value)) {
+ o_stream_cork(cmd->conn->output);
+ while (dict_iterate(cmd->iter, &key, &value)) {
str_truncate(str, 0);
str_append_c(str, DICT_PROTOCOL_REPLY_OK);
str_append_tabescaped(str, key);
str_append_c(str, '\t');
- if ((conn->iter_flags & DICT_ITERATE_FLAG_NO_VALUE) == 0)
+ if ((cmd->iter_flags & DICT_ITERATE_FLAG_NO_VALUE) == 0)
str_append_tabescaped(str, value);
str_append_c(str, '\n');
- o_stream_nsend(conn->output, str_data(str), str_len(str));
+ o_stream_nsend(cmd->conn->output, str_data(str), str_len(str));
- if (o_stream_get_buffer_used_size(conn->output) >
+ if (o_stream_get_buffer_used_size(cmd->conn->output) >
DICT_OUTPUT_OPTIMAL_SIZE) {
- if (o_stream_flush(conn->output) <= 0) {
- /* continue later */
- o_stream_uncork(conn->output);
+ if (o_stream_flush(cmd->conn->output) <= 0) {
+ /* continue later when there's more space
+ in output buffer */
+ o_stream_uncork(cmd->conn->output);
+ o_stream_set_flush_pending(cmd->conn->output, TRUE);
return 0;
}
/* flushed everything, continue */
}
}
-
- /* finished iterating */
- o_stream_unset_flush_callback(conn->output);
+ if (dict_iterate_has_more(cmd->iter)) {
+ /* wait for the next iteration callback */
+ return 0;
+ }
str_truncate(str, 0);
- if (dict_iterate_deinit(&conn->iter_ctx) < 0)
+ if (dict_iterate_deinit(&cmd->iter) < 0)
str_append_c(str, DICT_PROTOCOL_REPLY_FAIL);
str_append_c(str, '\n');
- o_stream_nsend(conn->output, str_data(str), str_len(str));
- o_stream_uncork(conn->output);
+ o_stream_uncork(cmd->conn->output);
+
+ cmd->reply = i_strdup(str_c(str));
+ dict_connection_cmds_flush(cmd->conn);
return 1;
}
-static int cmd_iterate(struct dict_connection *conn, const char *line)
+static void cmd_iterate_callback(void *context)
+{
+ struct dict_connection_cmd *cmd = context;
+
+ dict_connection_cmd_output_more(cmd);
+}
+
+static int cmd_iterate(struct dict_connection_cmd *cmd, const char *line)
{
const char *const *args;
unsigned int flags;
- if (conn->iter_ctx != NULL) {
- i_error("dict client: ITERATE: Already iterating");
- return -1;
- }
-
args = t_strsplit_tabescaped(line);
if (str_array_length(args) < 2 ||
str_to_uint(args[0], &flags) < 0) {
}
/* <flags> <path> */
- conn->iter_ctx = dict_iterate_init_multiple(conn->dict, args+1, flags);
- conn->iter_flags = flags;
-
- o_stream_set_flush_callback(conn->output, cmd_iterate_flush, conn);
- (void)cmd_iterate_flush(conn);
- return 0;
+ flags |= DICT_ITERATE_FLAG_ASYNC;
+ cmd->iter = dict_iterate_init_multiple(cmd->conn->dict, args+1, flags);
+ cmd->iter_flags = flags;
+ dict_iterate_set_async_callback(cmd->iter, cmd_iterate_callback, cmd);
+ dict_connection_cmd_output_more(cmd);
+ return 1;
}
static struct dict_connection_transaction *
const struct dict_connection_transaction *transactions;
unsigned int i, count;
+ i_assert(trans->ctx == NULL);
+
transactions = array_get(&conn->transactions, &count);
for (i = 0; i < count; i++) {
if (&transactions[i] == trans) {
}
}
-static int cmd_begin(struct dict_connection *conn, const char *line)
+static int cmd_begin(struct dict_connection_cmd *cmd, const char *line)
{
struct dict_connection_transaction *trans;
unsigned int id;
i_error("dict client: Invalid transaction ID %s", line);
return -1;
}
- if (dict_connection_transaction_lookup(conn, id) != NULL) {
+ if (dict_connection_transaction_lookup(cmd->conn, id) != NULL) {
i_error("dict client: Transaction ID %u already exists", id);
return -1;
}
- if (!array_is_created(&conn->transactions))
- i_array_init(&conn->transactions, 4);
+ if (!array_is_created(&cmd->conn->transactions))
+ i_array_init(&cmd->conn->transactions, 4);
/* <id> */
- trans = array_append_space(&conn->transactions);
+ trans = array_append_space(&cmd->conn->transactions);
trans->id = id;
- trans->conn = conn;
- trans->ctx = dict_transaction_begin(conn->dict);
+ trans->conn = cmd->conn;
+ trans->ctx = dict_transaction_begin(cmd->conn->dict);
return 0;
}
return 0;
}
-static int cmd_commit(struct dict_connection *conn, const char *line)
+static void
+cmd_commit_finish(struct dict_connection_cmd *cmd, int ret, bool async)
{
- struct dict_connection_transaction *trans;
char chr;
- int ret;
-
- if (conn->iter_ctx != NULL) {
- i_error("dict client: COMMIT: Can't commit while iterating");
- return -1;
- }
-
- if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
- return -1;
- ret = dict_transaction_commit(&trans->ctx);
switch (ret) {
case 1:
chr = DICT_PROTOCOL_REPLY_OK;
chr = DICT_PROTOCOL_REPLY_FAIL;
break;
}
- o_stream_nsend_str(conn->output, t_strdup_printf("%c\n", chr));
- dict_connection_transaction_array_remove(conn, trans);
- return 0;
+ if (async) {
+ cmd->reply = i_strdup_printf("%c%c%u\n",
+ DICT_PROTOCOL_REPLY_ASYNC_COMMIT, chr, cmd->trans->id);
+ } else {
+ cmd->reply = i_strdup_printf("%c%u\n", chr, cmd->trans->id);
+ }
+ dict_connection_transaction_array_remove(cmd->conn, cmd->trans);
+ dict_connection_cmds_flush(cmd->conn);
}
-static void cmd_commit_async_callback(int ret, void *context)
+static void cmd_commit_callback(int ret, void *context)
{
- struct dict_connection_transaction *trans = context;
- const char *reply;
- char chr;
+ struct dict_connection_cmd *cmd = context;
- switch (ret) {
- case 1:
- chr = DICT_PROTOCOL_REPLY_OK;
- break;
- case 0:
- chr = DICT_PROTOCOL_REPLY_NOTFOUND;
- break;
- default:
- chr = DICT_PROTOCOL_REPLY_FAIL;
- break;
- }
- reply = t_strdup_printf("%c%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_COMMIT,
- chr, trans->id);
- o_stream_nsend_str(trans->conn->output, reply);
+ cmd_commit_finish(cmd, ret, FALSE);
+}
- dict_connection_transaction_array_remove(trans->conn, trans);
+static void cmd_commit_callback_async(int ret, void *context)
+{
+ struct dict_connection_cmd *cmd = context;
+
+ cmd_commit_finish(cmd, ret, TRUE);
}
static int
-cmd_commit_async(struct dict_connection *conn, const char *line)
+cmd_commit(struct dict_connection_cmd *cmd, const char *line)
{
- struct dict_connection_transaction *trans;
-
- if (conn->iter_ctx != NULL) {
- i_error("dict client: COMMIT: Can't commit while iterating");
+ if (dict_connection_transaction_lookup_parse(cmd->conn, line, &cmd->trans) < 0)
return -1;
- }
- if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
+ dict_transaction_commit_async(&cmd->trans->ctx, cmd_commit_callback, cmd);
+ return 1;
+}
+
+static int
+cmd_commit_async(struct dict_connection_cmd *cmd, const char *line)
+{
+ if (dict_connection_transaction_lookup_parse(cmd->conn, line, &cmd->trans) < 0)
return -1;
- dict_transaction_commit_async(&trans->ctx, cmd_commit_async_callback,
- trans);
- return 0;
+ dict_transaction_commit_async(&cmd->trans->ctx, cmd_commit_callback_async, cmd);
+ return 1;
}
-static int cmd_rollback(struct dict_connection *conn, const char *line)
+static int cmd_rollback(struct dict_connection_cmd *cmd, const char *line)
{
struct dict_connection_transaction *trans;
- if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
+ if (dict_connection_transaction_lookup_parse(cmd->conn, line, &trans) < 0)
return -1;
dict_transaction_rollback(&trans->ctx);
- dict_connection_transaction_array_remove(conn, trans);
+ dict_connection_transaction_array_remove(cmd->conn, trans);
return 0;
}
-static int cmd_set(struct dict_connection *conn, const char *line)
+static int cmd_set(struct dict_connection_cmd *cmd, const char *line)
{
struct dict_connection_transaction *trans;
const char *const *args;
return -1;
}
- if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0)
+ if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0)
return -1;
-
dict_set(trans->ctx, args[1], args[2]);
return 0;
}
-static int cmd_unset(struct dict_connection *conn, const char *line)
+static int cmd_unset(struct dict_connection_cmd *cmd, const char *line)
{
struct dict_connection_transaction *trans;
const char *const *args;
return -1;
}
- if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0)
+ if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0)
return -1;
-
dict_unset(trans->ctx, args[1]);
return 0;
}
-static int cmd_append(struct dict_connection *conn, const char *line)
+static int cmd_append(struct dict_connection_cmd *cmd, const char *line)
{
struct dict_connection_transaction *trans;
const char *const *args;
return -1;
}
- if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0)
+ if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0)
return -1;
dict_append(trans->ctx, args[1], args[2]);
return 0;
}
-static int cmd_atomic_inc(struct dict_connection *conn, const char *line)
+static int cmd_atomic_inc(struct dict_connection_cmd *cmd, const char *line)
{
struct dict_connection_transaction *trans;
const char *const *args;
return -1;
}
- if (dict_connection_transaction_lookup_parse(conn, args[0], &trans) < 0)
+ if (dict_connection_transaction_lookup_parse(cmd->conn, args[0], &trans) < 0)
return -1;
dict_atomic_inc(trans->ctx, args[1], diff);
return 0;
}
-static struct dict_client_cmd cmds[] = {
+static const struct dict_cmd_func cmds[] = {
{ DICT_PROTOCOL_CMD_LOOKUP, cmd_lookup },
{ DICT_PROTOCOL_CMD_ITERATE, cmd_iterate },
{ DICT_PROTOCOL_CMD_BEGIN, cmd_begin },
{ 0, NULL }
};
-static struct dict_client_cmd *dict_command_find(char cmd)
+static const struct dict_cmd_func *dict_command_find(enum dict_protocol_cmd cmd)
{
unsigned int i;
int dict_command_input(struct dict_connection *conn, const char *line)
{
- struct dict_client_cmd *cmd;
+ const struct dict_cmd_func *cmd_func;
+ struct dict_connection_cmd *cmd;
+ int ret;
- cmd = dict_command_find(*line);
- if (cmd == NULL) {
+ cmd_func = dict_command_find((enum dict_protocol_cmd)*line);
+ if (cmd_func == NULL) {
i_error("dict client: Unknown command %c", *line);
return -1;
}
- return cmd->func(conn, line + 1);
+ cmd = i_new(struct dict_connection_cmd, 1);
+ cmd->conn = conn;
+ cmd->cmd = cmd_func;
+ array_append(&conn->cmds, &cmd, 1);
+ if ((ret = cmd_func->func(cmd, line + 1)) <= 0) {
+ dict_connection_cmd_remove(cmd);
+ return ret;
+ }
+ return 0;
+}
+
+static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd)
+{
+ struct dict_connection_cmd *const *first_cmdp;
+
+ first_cmdp = array_idx(&cmd->conn->cmds, 0);
+ if (*first_cmdp == cmd)
+ (void)cmd_iterate_flush(cmd);
+}
+
+void dict_connection_cmds_output_more(struct dict_connection *conn)
+{
+ struct dict_connection_cmd *cmd, *const *first_cmdp;
+
+ /* only iterators may be returning a lot of data */
+ while (array_count(&conn->cmds) > 0) {
+ first_cmdp = array_idx(&conn->cmds, 0);
+ cmd = *first_cmdp;
+
+ if (cmd->iter == NULL)
+ break;
+
+ if (cmd_iterate_flush(cmd) == 0) {
+ /* unfinished */
+ break;
+ }
+ /* cmd should be freed now */
+ }
}