o_stream_cork(conn->output);
while ((ret = dict_iterate(conn->iter_ctx, &key, &value)) > 0) {
str_truncate(str, 0);
- str_printfa(str, "%s\t%s\n", key, value);
+ str_printfa(str, "%c%s\t%s\n", DICT_PROTOCOL_REPLY_OK,
+ key, value);
o_stream_send(conn->output, str_data(str), str_len(str));
if (o_stream_get_buffer_used_size(conn->output) >
/* <id> */
trans = array_append_space(&conn->transactions);
trans->id = id;
+ trans->conn = conn;
trans->ctx = dict_transaction_begin(conn->dict);
return 0;
}
static int cmd_commit(struct dict_connection *conn, const char *line)
{
struct dict_connection_transaction *trans;
- const char *reply;
+ char chr;
int ret;
if (conn->iter_ctx != NULL) {
return -1;
ret = dict_transaction_commit(&trans->ctx);
- reply = t_strdup_printf("%c\n", ret == 0 ? DICT_PROTOCOL_REPLY_OK :
- DICT_PROTOCOL_REPLY_FAIL);
- o_stream_send_str(conn->output, reply);
+ switch (ret) {
+ case 1:
+ chr = DICT_PROTOCOL_REPLY_OK;
+ break;
+ case 0:
+ chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+ break;
+ default:
+ chr = DICT_PROTOCOL_REPLY_FAIL;
+ break;
+ }
+ o_stream_send_str(conn->output, t_strdup_printf("%c\n", chr));
dict_connection_transaction_array_remove(conn, trans);
return 0;
}
+static void cmd_commit_async_callback(int ret, void *context)
+{
+ struct dict_connection_transaction *trans = context;
+ const char *reply;
+ char chr;
+
+ switch (ret) {
+ case 1:
+ chr = DICT_PROTOCOL_REPLY_OK;
+ break;
+ case 0:
+ chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+ break;
+ default:
+ chr = DICT_PROTOCOL_REPLY_FAIL;
+ break;
+ }
+ reply = t_strdup_printf("%c%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_COMMIT,
+ chr, trans->id);
+ o_stream_send_str(trans->conn->output, reply);
+
+ dict_connection_transaction_array_remove(trans->conn, trans);
+}
+
static int
cmd_commit_async(struct dict_connection *conn, const char *line)
{
if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
return -1;
- dict_transaction_commit_async(&trans->ctx);
- dict_connection_transaction_array_remove(conn, trans);
+ dict_transaction_commit_async(&trans->ctx, cmd_commit_async_callback,
+ trans);
return 0;
}
struct dict_connection_transaction {
unsigned int id;
+ struct dict_connection *conn;
struct dict_transaction_context *ctx;
};
/* Copyright (c) 2005-2009 Dovecot authors, see the included COPYING file */
#include "lib.h"
+#include "llist.h"
#include "str.h"
#include "network.h"
#include "istream.h"
#include "dict-private.h"
#include "dict-client.h"
+#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
time_t last_connect_try;
struct istream *input;
struct ostream *output;
+ struct io *io;
+
+ struct client_dict_transaction_context *transactions;
unsigned int connect_counter;
unsigned int transaction_id_counter;
+ unsigned int async_commits;
unsigned int in_iteration:1;
unsigned int handshaked:1;
struct client_dict_transaction_context {
struct dict_transaction_context ctx;
+ struct client_dict_transaction_context *prev, *next;
+
+ /* for async commits */
+ dict_transaction_commit_callback_t *callback;
+ void *context;
unsigned int id;
unsigned int connect_counter;
return 0;
}
-static char *client_dict_read_line(struct client_dict *dict)
+static struct client_dict_transaction_context *
+client_dict_transaction_find(struct client_dict *dict, unsigned int id)
+{
+ struct client_dict_transaction_context *ctx;
+
+ for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) {
+ if (ctx->id == id)
+ return ctx;
+ }
+ return NULL;
+}
+
+static void
+client_dict_finish_transaction(struct client_dict *dict,
+ unsigned int id, int ret)
+{
+ struct client_dict_transaction_context *ctx;
+
+ ctx = client_dict_transaction_find(dict, id);
+ if (ctx == NULL) {
+ i_error("dict-client: Unknown transaction id %u", id);
+ return;
+ }
+ if (ctx->callback != NULL)
+ ctx->callback(ret, ctx->context);
+
+ DLLIST_REMOVE(&dict->transactions, ctx);
+ i_free(ctx);
+
+ i_assert(dict->async_commits > 0);
+ if (--dict->async_commits == 0)
+ io_remove(&dict->io);
+}
+
+static int client_dict_read_one_line(struct client_dict *dict, char **line_r)
{
+ unsigned int id;
char *line;
int ret;
- line = i_stream_next_line(dict->input);
- if (line != NULL)
- return line;
-
- while ((ret = i_stream_read(dict->input)) > 0) {
- line = i_stream_next_line(dict->input);
- if (line != NULL)
- return line;
+ *line_r = NULL;
+ while ((line = i_stream_next_line(dict->input)) == NULL) {
+ ret = i_stream_read(dict->input);
+ switch (ret) {
+ case -1:
+ if (dict->input->stream_errno != 0)
+ i_error("read(%s) failed: %m", dict->path);
+ else {
+ i_error("read(%s) failed: Remote disconnected",
+ dict->path);
+ }
+ return -1;
+ case -2:
+ i_error("read(%s) returned too much data", dict->path);
+ return -1;
+ default:
+ i_assert(ret > 0);
+ break;
+ }
+ }
+ if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) {
+ switch (line[1]) {
+ case DICT_PROTOCOL_REPLY_OK:
+ ret = 1;
+ break;
+ case DICT_PROTOCOL_REPLY_NOTFOUND:
+ ret = 0;
+ break;
+ case DICT_PROTOCOL_REPLY_FAIL:
+ ret = -1;
+ break;
+ default:
+ i_error("dict-client: Invalid async commit line: %s",
+ line);
+ return 0;
+ }
+ id = strtoul(line+2, NULL, 10);
+ client_dict_finish_transaction(dict, id, ret);
+ return 0;
}
- i_assert(ret < 0);
+ *line_r = line;
+ return 1;
+}
- if (ret == -2)
- i_error("read(%s) returned too much data", dict->path);
- else if (dict->input->stream_errno == 0)
- i_error("read(%s) failed: Remote disconnected", dict->path);
- else
- i_error("read(%s) failed: %m", dict->path);
- return NULL;
+static char *client_dict_read_line(struct client_dict *dict)
+{
+ char *line;
+
+ while (client_dict_read_one_line(dict, &line) == 0)
+ ;
+ return line;
}
static int client_dict_connect(struct client_dict *dict)
dict->input->blocking = TRUE;
dict->output = o_stream_create_fd(dict->fd, 4096, FALSE);
dict->transaction_id_counter = 0;
+ dict->async_commits = 0;
query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n",
DICT_PROTOCOL_CMD_HELLO,
dict->connect_counter++;
dict->handshaked = FALSE;
+ if (dict->io != NULL)
+ io_remove(&dict->io);
if (dict->input != NULL)
i_stream_destroy(&dict->input);
if (dict->output != NULL)
pool_unref(&dict->pool);
}
+static int client_dict_wait(struct dict *_dict)
+{
+ struct client_dict *dict = (struct client_dict *)_dict;
+ char *line;
+ int ret = 0;
+
+ while (dict->async_commits > 0) {
+ if (client_dict_read_one_line(dict, &line) < 0) {
+ ret = -1;
+ break;
+ }
+ }
+ return ret;
+}
+
static int client_dict_lookup(struct dict *_dict, pool_t pool,
const char *key, const char **value_r)
{
/* line contains key \t value */
p_clear(ctx->pool);
- value = strchr(line, '\t');
+ if (*line != DICT_PROTOCOL_REPLY_OK)
+ value = NULL;
+ else
+ value = strchr(++line, '\t');
if (value == NULL) {
/* broken protocol */
i_error("dict client (%s) sent broken reply", dict->path);
ctx->ctx.dict = _dict;
ctx->id = ++dict->transaction_id_counter;
+ DLLIST_PREPEND(&dict->transactions, ctx);
return &ctx->ctx;
}
-static int client_dict_transaction_commit(struct dict_transaction_context *_ctx,
- bool async)
+static void dict_async_input(struct client_dict *dict)
+{
+ char *line;
+ size_t size;
+ int ret;
+
+ i_assert(!dict->in_iteration);
+
+ do {
+ ret = client_dict_read_one_line(dict, &line);
+ (void)i_stream_get_data(dict->input, &size);
+ } while (ret == 0 && size > 0);
+
+ if (ret < 0)
+ io_remove(&dict->io);
+}
+
+static int
+client_dict_transaction_commit(struct dict_transaction_context *_ctx,
+ bool async,
+ dict_transaction_commit_callback_t *callback,
+ void *context)
{
struct client_dict_transaction_context *ctx =
(struct client_dict_transaction_context *)_ctx;
struct client_dict *dict = (struct client_dict *)_ctx->dict;
- int ret = ctx->failed ? -1 : 0;
+ int ret = ctx->failed ? -1 : 1;
- if (ctx->sent_begin) T_BEGIN {
+ if (ctx->sent_begin && !ctx->failed) T_BEGIN {
const char *query, *line;
- query = t_strdup_printf("%c%u\n", ctx->failed ?
- DICT_PROTOCOL_CMD_ROLLBACK :
- (!async ? DICT_PROTOCOL_CMD_COMMIT :
- DICT_PROTOCOL_CMD_COMMIT_ASYNC),
+ query = t_strdup_printf("%c%u\n", !async ?
+ DICT_PROTOCOL_CMD_COMMIT :
+ DICT_PROTOCOL_CMD_COMMIT_ASYNC,
ctx->id);
if (client_dict_send_transaction_query(ctx, query) < 0)
ret = -1;
- else if (ret < 0 || async) {
- /* no reply */
+ else if (async) {
+ ctx->callback = callback;
+ ctx->context = context;
+ if (dict->async_commits++ == 0) {
+ dict->io = io_add(dict->fd, IO_READ,
+ dict_async_input, dict);
+ }
} else {
/* sync commit, read reply */
line = client_dict_read_line(dict);
- if (line == NULL || *line != DICT_PROTOCOL_REPLY_OK)
+ if (line == NULL)
+ ret = -1;
+ else if (*line == DICT_PROTOCOL_REPLY_OK)
+ ret = 1;
+ else if (*line == DICT_PROTOCOL_REPLY_NOTFOUND)
+ ret = 0;
+ else
ret = -1;
}
} T_END;
- i_free(ctx);
+ if (ret < 0 || !async) {
+ DLLIST_REMOVE(&dict->transactions, ctx);
+ i_free(ctx);
+ }
return ret;
}
{
struct client_dict_transaction_context *ctx =
(struct client_dict_transaction_context *)_ctx;
+ struct client_dict *dict = (struct client_dict *)_ctx->dict;
if (ctx->sent_begin) T_BEGIN {
const char *query;
(void)client_dict_send_transaction_query(ctx, query);
} T_END;
+ DLLIST_REMOVE(&dict->transactions, ctx);
i_free(ctx);
}
{
client_dict_init,
client_dict_deinit,
+ client_dict_wait,
client_dict_lookup,
client_dict_iterate_init,
client_dict_iterate,
};
enum {
- /* For LOOKUP command */
DICT_PROTOCOL_REPLY_OK = 'O', /* <value> */
DICT_PROTOCOL_REPLY_NOTFOUND = 'N',
- DICT_PROTOCOL_REPLY_FAIL = 'F'
+ DICT_PROTOCOL_REPLY_FAIL = 'F',
+ DICT_PROTOCOL_REPLY_ASYNC_COMMIT = 'A'
};
const char *dict_client_escape(const char *src);
return &ctx->ctx;
}
-static int db_dict_transaction_commit(struct dict_transaction_context *_ctx,
- bool async ATTR_UNUSED)
+static int
+db_dict_transaction_commit(struct dict_transaction_context *_ctx,
+ bool async ATTR_UNUSED,
+ dict_transaction_commit_callback_t *callback,
+ void *context)
{
struct db_dict_transaction_context *ctx =
(struct db_dict_transaction_context *)_ctx;
int ret;
- ret = ctx->tid->commit(ctx->tid, 0);
+ ret = ctx->tid->commit(ctx->tid, 0) < 0 ? -1 : 1;
i_free(ctx);
- return ret == 0 ? 0 : -1;
+
+ if (callback != NULL)
+ callback(ret, context);
+ return ret;
}
static void db_dict_transaction_rollback(struct dict_transaction_context *_ctx)
{
db_dict_init,
db_dict_deinit,
+ NULL,
db_dict_lookup,
db_dict_iterate_init,
db_dict_iterate,
pool_t pool;
ARRAY_DEFINE(changes, struct file_dict_change);
+
+ unsigned int atomic_inc_not_found:1;
};
static struct dotlock_settings file_dict_dotlock_settings = {
switch (changes[i].type) {
case FILE_DICT_CHANGE_TYPE_INC:
- diff = old_value == NULL ? 0 :
- strtoll(old_value, NULL, 10);
- diff += changes[i].value.diff;
+ if (old_value == NULL) {
+ ctx->atomic_inc_not_found = TRUE;
+ break;
+ }
+ diff = strtoll(old_value, NULL, 10) +
+ changes[i].value.diff;
tmp = t_strdup_printf("%lld", diff);
new_len = strlen(tmp);
if (old_value == NULL || new_len > strlen(old_value))
return 0;
}
-static int file_dict_transaction_commit(struct dict_transaction_context *_ctx,
- bool async ATTR_UNUSED)
+static int
+file_dict_transaction_commit(struct dict_transaction_context *_ctx,
+ bool async ATTR_UNUSED,
+ dict_transaction_commit_callback_t *callback,
+ void *context)
{
struct file_dict_transaction_context *ctx =
(struct file_dict_transaction_context *)_ctx;
int ret;
- ret = file_dict_write_changes(ctx);
+ if (file_dict_write_changes(ctx) < 0)
+ ret = -1;
+ else if (ctx->atomic_inc_not_found)
+ ret = 0;
+ else
+ ret = 1;
pool_unref(&ctx->pool);
+
+ if (callback != NULL)
+ callback(ret, context);
return ret;
}
{
file_dict_init,
file_dict_deinit,
+ NULL,
file_dict_lookup,
file_dict_iterate_init,
file_dict_iterate,
enum dict_data_type value_type,
const char *username, const char *base_dir);
void (*deinit)(struct dict *dict);
+ int (*wait)(struct dict *dict);
int (*lookup)(struct dict *dict, pool_t pool,
const char *key, const char **value_r);
struct dict_transaction_context *(*transaction_init)(struct dict *dict);
int (*transaction_commit)(struct dict_transaction_context *ctx,
- bool async);
+ bool async,
+ dict_transaction_commit_callback_t *callback,
+ void *context);
void (*transaction_rollback)(struct dict_transaction_context *ctx);
void (*set)(struct dict_transaction_context *ctx,
unsigned int key_prefix_len, pattern_prefix_len, next_map_idx;
};
+struct sql_dict_inc_row {
+ struct sql_dict_inc_row *prev;
+ unsigned int rows;
+};
+
struct sql_dict_transaction_context {
struct dict_transaction_context ctx;
const struct dict_sql_map *prev_inc_map;
char *prev_inc_key;
long long prev_inc_diff;
+ pool_t inc_row_pool;
+ struct sql_dict_inc_row *inc_row;
unsigned int failed:1;
unsigned int changed:1;
return &ctx->ctx;
}
-static int sql_dict_transaction_commit(struct dict_transaction_context *_ctx,
- bool async ATTR_UNUSED)
+static int
+sql_dict_transaction_commit(struct dict_transaction_context *_ctx,
+ bool async ATTR_UNUSED,
+ dict_transaction_commit_callback_t *callback,
+ void *context)
{
struct sql_dict_transaction_context *ctx =
(struct sql_dict_transaction_context *)_ctx;
const char *error;
- int ret;
+ int ret = 1;
if (ctx->prev_inc_map != NULL)
sql_dict_prev_inc_flush(ctx);
sql_transaction_rollback(&ctx->sql_ctx);
ret = -1;
} else if (_ctx->changed) {
- ret = sql_transaction_commit_s(&ctx->sql_ctx, &error);
- if (ret < 0)
+ if (sql_transaction_commit_s(&ctx->sql_ctx, &error) < 0) {
i_error("sql dict: commit failed: %s", error);
- } else {
- /* nothing to be done */
- ret = 0;
+ ret = -1;
+ } else {
+ while (ctx->inc_row != NULL) {
+ i_assert(ctx->inc_row->rows != -1UL);
+ if (ctx->inc_row->rows == 0) {
+ ret = 0;
+ break;
+ }
+ ctx->inc_row = ctx->inc_row->prev;
+ }
+ }
}
+ if (ctx->inc_row_pool != NULL)
+ pool_unref(&ctx->inc_row_pool);
+ i_free(ctx->prev_inc_key);
i_free(ctx);
+
+ if (callback != NULL)
+ callback(ret, context);
return ret;
}
if (_ctx->changed)
sql_transaction_rollback(&ctx->sql_ctx);
+
+ if (ctx->inc_row_pool != NULL)
+ pool_unref(&ctx->inc_row_pool);
i_free(ctx->prev_inc_key);
i_free(ctx);
}
fields[i].map->value_field);
if (fields[i].value[0] != '-')
str_append_c(query, '+');
- else
- str_append(query, fields[i].value);
+ str_append(query, fields[i].value);
}
sql_dict_where_build(dict, fields[0].map, build->extra_values,
} T_END;
}
+static unsigned int *
+sql_dict_next_inc_row(struct sql_dict_transaction_context *ctx)
+{
+ struct sql_dict_inc_row *row;
+
+ if (ctx->inc_row_pool == NULL) {
+ ctx->inc_row_pool =
+ pool_alloconly_create("sql dict inc rows", 128);
+ }
+ row = p_new(ctx->inc_row_pool, struct sql_dict_inc_row, 1);
+ row->prev = ctx->inc_row;
+ row->rows = -1UL;
+ ctx->inc_row = row;
+ return &row->rows;
+}
+
static void sql_dict_atomic_inc_real(struct sql_dict_transaction_context *ctx,
const char *key, long long diff)
{
T_BEGIN {
struct dict_sql_build_query build;
struct dict_sql_build_query_field field;
- const char *query;
field.map = map;
field.value = t_strdup_printf("%lld", diff);
build.key1 = key[0];
build.inc = TRUE;
- if (diff >= 0)
- query = sql_dict_set_query(&build);
- else {
- /* negative changes can't never be initial values,
- use UPDATE directly. */
- query = sql_dict_update_query(&build);
- }
- sql_update(ctx->sql_ctx, query);
+ sql_update_get_rows(ctx->sql_ctx, sql_dict_update_query(&build),
+ sql_dict_next_inc_row(ctx));
} T_END;
}
ctx->prev_inc_diff = diff;
return;
}
+
if (!sql_dict_maps_are_mergeable(dict, ctx->prev_inc_map, map,
ctx->prev_inc_key, key, &values)) {
sql_dict_prev_inc_flush(ctx);
} else T_BEGIN {
struct dict_sql_build_query build;
struct dict_sql_build_query_field *field;
- const char *query;
memset(&build, 0, sizeof(build));
build.dict = dict;
field->map = map;
field->value = t_strdup_printf("%lld", diff);
- if (diff >= 0)
- query = sql_dict_set_query(&build);
- else {
- /* negative changes can't never be initial values,
- use UPDATE directly. */
- query = sql_dict_update_query(&build);
- }
- sql_update(ctx->sql_ctx, query);
+ sql_update_get_rows(ctx->sql_ctx, sql_dict_update_query(&build),
+ sql_dict_next_inc_row(ctx));
i_free_and_null(ctx->prev_inc_key);
ctx->prev_inc_map = NULL;
{
sql_dict_init,
sql_dict_deinit,
+ NULL,
sql_dict_lookup,
sql_dict_iterate_init,
sql_dict_iterate,
dict->v.deinit(dict);
}
+int dict_wait(struct dict *dict)
+{
+ return dict->v.wait == NULL ? 1 : dict->v.wait(dict);
+}
+
static bool dict_key_prefix_is_valid(const char *key)
{
return strncmp(key, DICT_PATH_SHARED, strlen(DICT_PATH_SHARED)) == 0 ||
struct dict_transaction_context *ctx = *_ctx;
*_ctx = NULL;
- return ctx->dict->v.transaction_commit(ctx, FALSE);
+ return ctx->dict->v.transaction_commit(ctx, FALSE, NULL, NULL);
}
-void dict_transaction_commit_async(struct dict_transaction_context **_ctx)
+void dict_transaction_commit_async(struct dict_transaction_context **_ctx,
+ dict_transaction_commit_callback_t *callback,
+ void *context)
{
struct dict_transaction_context *ctx = *_ctx;
*_ctx = NULL;
- ctx->dict->v.transaction_commit(ctx, TRUE);
+ ctx->dict->v.transaction_commit(ctx, TRUE, callback, context);
}
void dict_transaction_rollback(struct dict_transaction_context **_ctx)
DICT_DATA_TYPE_UINT32
};
+typedef void dict_transaction_commit_callback_t(int ret, void *context);
+
void dict_driver_register(struct dict *driver);
void dict_driver_unregister(struct dict *driver);
const char *username, const char *base_dir);
/* Close dictionary. */
void dict_deinit(struct dict **dict);
+/* Wait for all pending asynchronous transaction commits to finish.
+ Returns 0 if ok, -1 if error. */
+int dict_wait(struct dict *dict);
/* Lookup value for key. Set it to NULL if it's not found.
Returns 1 if found, 0 if not found and -1 if lookup failed. */
/* Start a new dictionary transaction. */
struct dict_transaction_context *dict_transaction_begin(struct dict *dict);
-/* Commit the transaction. Returns 0 if ok, -1 if failed. */
+/* Commit the transaction. Returns 1 if ok, 0 if dict_atomic_inc() was used
+ on a non-existing key, -1 if failed. */
int dict_transaction_commit(struct dict_transaction_context **ctx);
-/* Commit the transaction, but don't wait to see if it finishes successfully. */
-void dict_transaction_commit_async(struct dict_transaction_context **ctx);
+/* Commit the transaction, but don't wait to see if it finishes successfully.
+ If callback isn't NULL, it's called eventually. If it's not called by the
+ time you want to deinitialize dict, call dict_flush() to wait for the
+ result. */
+void dict_transaction_commit_async(struct dict_transaction_context **ctx,
+ dict_transaction_commit_callback_t *callback,
+ void *context);
/* Rollback all changes made in transaction. */
void dict_transaction_rollback(struct dict_transaction_context **ctx);
const char *key);
/* Increase/decrease a numeric value in dictionary. Note that the value is
changed when transaction is being committed, so you can't know beforehand
- what the value will become. */
+ what the value will become. The value is updated only if it already exists,
+ otherwise commit() will return 0. */
void dict_atomic_inc(struct dict_transaction_context *ctx,
const char *key, long long diff);