]> git.ipfire.org Git - thirdparty/dovecot/core.git/commitdiff
dict-sql: Added support for async operations.
authorTimo Sirainen <tss@iki.fi>
Wed, 2 Sep 2015 14:37:16 +0000 (17:37 +0300)
committerTimo Sirainen <tss@iki.fi>
Wed, 2 Sep 2015 14:37:16 +0000 (17:37 +0300)
src/lib-dict/dict-sql.c

index 3a0b7b00b0eb221cf9a6ac6b463839125f975db3..614316a2ecfa52119842de7d0fc417cabc410cfc 100644 (file)
@@ -63,6 +63,9 @@ struct sql_dict_transaction_context {
        pool_t inc_row_pool;
        struct sql_dict_inc_row *inc_row;
 
+       dict_transaction_commit_callback_t *async_callback;
+       void *async_context;
+
        unsigned int failed:1;
 };
 
@@ -106,6 +109,12 @@ static void sql_dict_deinit(struct dict *_dict)
        pool_unref(&dict->pool);
 }
 
+static int sql_dict_wait(struct dict *dict ATTR_UNUSED)
+{
+       /* FIXME: lib-sql doesn't support this yet */
+       return 0;
+}
+
 static bool
 dict_sql_map_match(const struct dict_sql_map *map, const char *path,
                   ARRAY_TYPE(const_string) *values, unsigned int *pat_len_r,
@@ -266,32 +275,44 @@ sql_dict_where_build(struct sql_dict *dict, const struct dict_sql_map *map,
        }
 }
 
-static int sql_dict_lookup(struct dict *_dict, pool_t pool,
-                          const char *key, const char **value_r)
+static int sql_lookup_get_query(struct sql_dict *dict, const char *key,
+                               string_t *query)
 {
-       struct sql_dict *dict = (struct sql_dict *)_dict;
        const struct dict_sql_map *map;
        ARRAY_TYPE(const_string) values;
-       struct sql_result *result;
-       int ret;
 
        map = sql_dict_find_map(dict, key, &values);
        if (map == NULL) {
                i_error("sql dict lookup: Invalid/unmapped key: %s", key);
-               *value_r = NULL;
-               return 0;
+               return -1;
        }
+       str_printfa(query, "SELECT %s FROM %s",
+                   map->value_field, map->table);
+       sql_dict_where_build(dict, map, &values, key[0],
+                            SQL_DICT_RECURSE_NONE, query);
+       return 0;
+}
+
+static int sql_dict_lookup(struct dict *_dict, pool_t pool,
+                          const char *key, const char **value_r)
+{
+       struct sql_dict *dict = (struct sql_dict *)_dict;
+       struct sql_result *result = NULL;
+       int ret;
 
        T_BEGIN {
                string_t *query = t_str_new(256);
 
-               str_printfa(query, "SELECT %s FROM %s",
-                           map->value_field, map->table);
-               sql_dict_where_build(dict, map, &values, key[0],
-                                    SQL_DICT_RECURSE_NONE, query);
-               result = sql_query_s(dict->db, str_c(query));
+               ret = sql_lookup_get_query(dict, key, query);
+               if (ret == 0)
+                       result = sql_query_s(dict->db, str_c(query));
        } T_END;
 
+       if (ret < 0) {
+               *value_r = NULL;
+               return -1;
+       }
+
        ret = sql_result_next_row(result);
        if (ret <= 0) {
                if (ret < 0) {
@@ -308,6 +329,48 @@ static int sql_dict_lookup(struct dict *_dict, pool_t pool,
        return ret;
 }
 
+struct sql_dict_lookup_context {
+       dict_lookup_callback_t *callback;
+       void *context;
+};
+
+static void
+sql_dict_lookup_async_callback(struct sql_result *sql_result,
+                              struct sql_dict_lookup_context *ctx)
+{
+       struct dict_lookup_result result;
+
+       memset(&result, 0, sizeof(result));
+       result.ret = sql_result_next_row(sql_result);
+       if (result.ret < 0)
+               result.error = sql_result_get_error(sql_result);
+       else if (result.ret > 0)
+               result.value = sql_result_get_field_value(sql_result, 0);
+       ctx->callback(&result, ctx->context);
+
+       i_free(ctx);
+}
+
+static void
+sql_dict_lookup_async(struct dict *_dict, const char *key,
+                     dict_lookup_callback_t *callback, void *context)
+{
+       struct sql_dict *dict = (struct sql_dict *)_dict;
+       struct sql_dict_lookup_context *ctx;
+
+       T_BEGIN {
+               string_t *query = t_str_new(256);
+
+               if (sql_lookup_get_query(dict, key, query) == 0) {
+                       ctx = i_new(struct sql_dict_lookup_context, 1);
+                       ctx->callback = callback;
+                       ctx->context = context;
+                       sql_query(dict->db, str_c(query),
+                                 sql_dict_lookup_async_callback, ctx);
+               }
+       } T_END;
+}
+
 static const struct dict_sql_map *
 sql_dict_iterate_find_next_map(struct sql_dict_iterate_context *ctx,
                               ARRAY_TYPE(const_string) *values)
@@ -356,8 +419,10 @@ sql_dict_iterate_build_next_query(struct sql_dict_iterate_context *ctx,
        if (map == NULL)
                return FALSE;
 
-       if (ctx->result != NULL)
+       if (ctx->result != NULL) {
                sql_result_unref(ctx->result);
+               ctx->result = NULL;
+       }
 
        str_append(query, "SELECT ");
        if ((ctx->flags & DICT_ITERATE_FLAG_NO_VALUE) == 0)
@@ -402,6 +467,15 @@ sql_dict_iterate_build_next_query(struct sql_dict_iterate_context *ctx,
        return TRUE;
 }
 
+static void sql_dict_iterate_callback(struct sql_result *result,
+                                     struct sql_dict_iterate_context *ctx)
+{
+       sql_result_ref(result);
+       ctx->result = result;
+       if (ctx->ctx.async_callback != NULL)
+               ctx->ctx.async_callback(ctx->ctx.async_context);
+}
+
 static bool sql_dict_iterate_next_query(struct sql_dict_iterate_context *ctx)
 {
        struct sql_dict *dict = (struct sql_dict *)ctx->ctx.dict;
@@ -413,14 +487,17 @@ static bool sql_dict_iterate_next_query(struct sql_dict_iterate_context *ctx)
                ret = sql_dict_iterate_build_next_query(ctx, query);
                if (!ret) {
                        /* failed */
-               } else {
+               } else if ((ctx->flags & DICT_ITERATE_FLAG_ASYNC) == 0) {
                        ctx->result = sql_query_s(dict->db, str_c(query));
+               } else {
+                       i_assert(ctx->result == NULL);
+                       sql_query(dict->db, str_c(query),
+                                 sql_dict_iterate_callback, ctx);
                }
        } T_END;
        return ret;
 }
 
-
 static struct dict_iterate_context *
 sql_dict_iterate_init(struct dict *_dict, const char *const *paths,
                      enum dict_iterate_flags flags)
@@ -445,6 +522,7 @@ sql_dict_iterate_init(struct dict *_dict, const char *const *paths,
                i_error("sql dict iterate: Invalid/unmapped path: %s",
                        paths[0]);
                ctx->result = NULL;
+               ctx->failed = TRUE;
                return &ctx->ctx;
        }
        return &ctx->ctx;
@@ -459,12 +537,21 @@ static bool sql_dict_iterate(struct dict_iterate_context *_ctx,
        unsigned int i, count;
        int ret;
 
-       if (ctx->result == NULL) {
-               ctx->failed = TRUE;
+       _ctx->has_more = FALSE;
+       if (ctx->failed)
                return FALSE;
-       }
 
-       while ((ret = sql_result_next_row(ctx->result)) == 0) {
+       for (;;) {
+               if (ctx->result == NULL) {
+                       /* wait for async lookup to finish */
+                       i_assert((ctx->flags & DICT_ITERATE_FLAG_ASYNC) != 0);
+                       _ctx->has_more = TRUE;
+                       return FALSE;
+               }
+
+               ret = sql_result_next_row(ctx->result);
+               if (ret != 0)
+                       break;
                /* see if there are more results in the next map.
                   don't do it if we're looking for an exact match, since we
                   already should have handled it. */
@@ -550,9 +637,26 @@ sql_dict_transaction_has_nonexistent(struct sql_dict_transaction_context *ctx)
        return FALSE;
 }
 
+static void
+sql_dict_transaction_commit_callback(const char *error,
+                                    struct sql_dict_transaction_context *ctx)
+{
+       int ret;
+
+       if (error == NULL)
+               ret = sql_dict_transaction_has_nonexistent(ctx) ? 0 : 1;
+       else {
+               i_error("sql dict: commit failed: %s", error);
+               ret = -1;
+       }
+
+       if (ctx->async_callback != NULL)
+               ctx->async_callback(ret, ctx->async_context);
+       sql_dict_transaction_free(ctx);
+}
+
 static int
-sql_dict_transaction_commit(struct dict_transaction_context *_ctx,
-                           bool async ATTR_UNUSED,
+sql_dict_transaction_commit(struct dict_transaction_context *_ctx, bool async,
                            dict_transaction_commit_callback_t *callback,
                            void *context)
 {
@@ -570,6 +674,12 @@ sql_dict_transaction_commit(struct dict_transaction_context *_ctx,
        } else if (!_ctx->changed) {
                /* nothing changed, no need to commit */
                sql_transaction_rollback(&ctx->sql_ctx);
+       } else if (async) {
+               ctx->async_callback = callback;
+               ctx->async_context = context;
+               sql_transaction_commit(&ctx->sql_ctx,
+                       sql_dict_transaction_commit_callback, ctx);
+               return 1;
        } else {
                if (sql_transaction_commit_s(&ctx->sql_ctx, &error) < 0) {
                        i_error("sql dict: commit failed: %s", error);
@@ -935,7 +1045,7 @@ static struct dict sql_dict = {
        {
                sql_dict_init,
                sql_dict_deinit,
-               NULL,
+               sql_dict_wait,
                sql_dict_lookup,
                sql_dict_iterate_init,
                sql_dict_iterate,
@@ -947,7 +1057,7 @@ static struct dict sql_dict = {
                sql_dict_unset,
                sql_dict_append,
                sql_dict_atomic_inc,
-               NULL
+               sql_dict_lookup_async
        }
 };