]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Minor] Allow to pass data transparently to lua from redis
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 18 Mar 2017 13:21:02 +0000 (13:21 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Sat, 18 Mar 2017 13:21:02 +0000 (13:21 +0000)
src/lua/lua_redis.c

index acb355faaf60e52c9dc3b188b1b43adfe88c1703..4a0a1cfd95810161c7a9a2f3d3af92a5d542e9cd 100644 (file)
@@ -106,6 +106,12 @@ struct lua_redis_userdata {
        guint16 terminated;
 };
 
+#define LUA_REDIS_SPECIFIC_REPLIED (1 << 0)
+#define LUA_REDIS_SPECIFIC_FINISHED (1 << 1)
+#define LUA_REDIS_ASYNC (1 << 0)
+#define LUA_REDIS_TEXTDATA (1 << 1)
+#define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)
+
 struct lua_redis_specific_userdata {
        gint cbref;
        guint nargs;
@@ -116,12 +122,11 @@ struct lua_redis_specific_userdata {
        struct lua_redis_ctx *ctx;
        struct lua_redis_specific_userdata *next;
        struct event timeout;
-       gboolean replied;
-       gboolean finished;
+       guint flags;
 };
 
 struct lua_redis_ctx {
-       gboolean async;
+       guint flags;
        union {
                struct lua_redis_userdata async;
                redisContext *sync;
@@ -161,7 +166,7 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
        gboolean is_successfull = TRUE;
        struct redisAsyncContext *ac;
 
-       if (ctx->async) {
+       if (IS_ASYNC (ctx)) {
                msg_debug ("desctructing %p", ctx);
                ud = &ctx->d.async;
 
@@ -170,11 +175,11 @@ lua_redis_dtor (struct lua_redis_ctx *ctx)
                        LL_FOREACH_SAFE (ud->specific, cur, tmp) {
                                event_del (&cur->timeout);
 
-                               if (!cur->replied) {
+                               if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
                                        is_successfull = FALSE;
                                }
 
-                               cur->finished = TRUE;
+                               cur->flags |= LUA_REDIS_SPECIFIC_FINISHED;
                        }
 
                        ud->terminated = 1;
@@ -223,7 +228,7 @@ lua_redis_fin (void *arg)
        ctx = sp_ud->ctx;
        event_del (&sp_ud->timeout);
        msg_debug ("finished redis query %p from session %p", sp_ud, ctx);
-       sp_ud->finished = TRUE;
+       sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED;
 
        REDIS_RELEASE (ctx);
 }
@@ -241,7 +246,7 @@ lua_redis_push_error (const gchar *err,
 {
        struct lua_redis_userdata *ud = sp_ud->c;
 
-       if (!sp_ud->replied && !sp_ud->finished) {
+       if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
                if (sp_ud->cbref != -1) {
                        /* Push error */
                        lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
@@ -257,7 +262,8 @@ lua_redis_push_error (const gchar *err,
                        }
                }
 
-               sp_ud->replied = TRUE;
+               sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
+
                if (connected && ud->s) {
                        rspamd_session_watcher_pop (ud->s, sp_ud->w);
                        rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
@@ -269,9 +275,10 @@ lua_redis_push_error (const gchar *err,
 }
 
 static void
-lua_redis_push_reply (lua_State *L, const redisReply *r)
+lua_redis_push_reply (lua_State *L, const redisReply *r, gboolean text_data)
 {
        guint i;
+       struct rspamd_lua_text *t;
 
        switch (r->type) {
        case REDIS_REPLY_INTEGER:
@@ -283,12 +290,21 @@ lua_redis_push_reply (lua_State *L, const redisReply *r)
                break;
        case REDIS_REPLY_STRING:
        case REDIS_REPLY_STATUS:
-               lua_pushlstring (L, r->str, r->len);
+               if (text_data) {
+                       t = lua_newuserdata (L, sizeof (*t));
+                       rspamd_lua_setclass (L, "rspamd{text}", -1);
+                       t->flags = 0;
+                       t->start = r->str;
+                       t->len = r->len;
+               }
+               else {
+                       lua_pushlstring (L, r->str, r->len);
+               }
                break;
        case REDIS_REPLY_ARRAY:
                lua_createtable (L, r->elements, 0);
                for (i = 0; i < r->elements; ++i) {
-                       lua_redis_push_reply (L, r->element[i]);
+                       lua_redis_push_reply (L, r->element[i], text_data);
                        lua_rawseti (L, -2, i + 1); /* Store sub-reply */
                }
                break;
@@ -309,14 +325,14 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
 {
        struct lua_redis_userdata *ud = sp_ud->c;
 
-       if (!sp_ud->replied && !sp_ud->finished) {
+       if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
                if (sp_ud->cbref != -1) {
                        /* Push error */
                        lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
                        /* Error is nil */
                        lua_pushnil (ud->L);
                        /* Data */
-                       lua_redis_push_reply (ud->L, r);
+                       lua_redis_push_reply (ud->L, r, ctx->flags & LUA_REDIS_TEXTDATA);
 
                        if (lua_pcall (ud->L, 2, 0, 0) != 0) {
                                msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
@@ -325,7 +341,7 @@ lua_redis_push_data (const redisReply *r, struct lua_redis_ctx *ctx,
 
                }
 
-               sp_ud->replied = TRUE;
+               sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
 
                if (ud->s) {
                        rspamd_session_watcher_pop (ud->s, sp_ud->w);
@@ -365,7 +381,7 @@ lua_redis_callback (redisAsyncContext *c, gpointer r, gpointer priv)
        REDIS_RETAIN (ctx);
 
        /* If session is finished, we cannot call lua callbacks */
-       if (!sp_ud->finished) {
+       if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
                if (c->err == 0) {
                        if (r != NULL) {
                                if (reply->type != REDIS_REPLY_ERROR) {
@@ -412,7 +428,7 @@ lua_redis_timeout (int fd, short what, gpointer u)
        struct lua_redis_ctx *ctx;
        redisAsyncContext *ac;
 
-       if (sp_ud->finished) {
+       if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
                return;
        }
 
@@ -530,6 +546,7 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
        struct rspamd_async_session *session = NULL;
        struct event_base *ev_base = NULL;
        gboolean ret = FALSE;
+       guint flags = 0;
 
        if (lua_istable (L, 1)) {
                /* Table version */
@@ -564,6 +581,13 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
                        }
                        lua_pop (L, 1);
 
+                       lua_pushstring (L, "ev_base");
+                       lua_gettable (L, -2);
+                       if (lua_type (L, -1) == LUA_TUSERDATA) {
+                               ev_base = lua_check_ev_base (L, -1);
+                       }
+                       lua_pop (L, 1);
+
                        if (cfg && ev_base) {
                                ret = TRUE;
                        }
@@ -622,13 +646,21 @@ rspamd_lua_redis_prepare_connection (lua_State *L, gint *pcbref)
                        dbname = lua_tostring (L, -1);
                }
                lua_pop (L, 1);
+
+               lua_pushstring (L, "opaque_data");
+               lua_gettable (L, -2);
+               if (!!lua_toboolean (L, -1)) {
+                       flags |= LUA_REDIS_TEXTDATA;
+               }
+               lua_pop (L, 1);
+
                lua_pop (L, 1); /* table */
 
 
                if (ret && addr != NULL) {
                        ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
                        REF_INIT_RETAIN (ctx, lua_redis_dtor);
-                       ctx->async = TRUE;
+                       ctx->flags |= flags | LUA_REDIS_ASYNC;
                        ud = &ctx->d.async;
                        ud->s = session;
                        ud->cfg = cfg;
@@ -731,6 +763,7 @@ lua_redis_make_request (lua_State *L)
                lua_pop (L, 1);
                ud->timeout = timeout;
 
+
                lua_pushstring (L, "args");
                lua_gettable (L, -2);
                lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->arglens,
@@ -814,7 +847,7 @@ lua_redis_make_request_sync (lua_State *L)
        gdouble timeout = REDIS_DEFAULT_TIMEOUT;
        gchar **args = NULL;
        gsize *arglens = NULL;
-       guint nargs = 0;
+       guint nargs = 0, flags = 0;
        redisContext *ctx;
        redisReply *r;
 
@@ -851,6 +884,14 @@ lua_redis_make_request_sync (lua_State *L)
                }
                lua_pop (L, 1);
 
+               lua_pushstring (L, "opaque_data");
+               lua_gettable (L, -2);
+               if (!!lua_toboolean (L, -1)) {
+                       flags |= LUA_REDIS_TEXTDATA;
+               }
+               lua_pop (L, 1);
+
+
                if (cmd) {
                        lua_pushstring (L, "args");
                        lua_gettable (L, -2);
@@ -890,7 +931,7 @@ lua_redis_make_request_sync (lua_State *L)
                if (r != NULL) {
                        if (r->type != REDIS_REPLY_ERROR) {
                                lua_pushboolean (L, TRUE);
-                               lua_redis_push_reply (L, r);
+                               lua_redis_push_reply (L, r, flags & LUA_REDIS_TEXTDATA);
                        }
                        else {
                                lua_pushboolean (L, FALSE);
@@ -979,6 +1020,7 @@ lua_redis_connect_sync (lua_State *L)
        const gchar *host;
        struct timeval tv;
        gboolean ret = FALSE;
+       guint flags = 0;
        gdouble timeout = REDIS_DEFAULT_TIMEOUT;
        struct lua_redis_ctx *ctx, **pctx;
 
@@ -1008,6 +1050,13 @@ lua_redis_connect_sync (lua_State *L)
                }
                lua_pop (L, 1);
 
+               lua_pushstring (L, "opaque_data");
+               lua_gettable (L, -2);
+               if (!!lua_toboolean (L, -1)) {
+                       flags |= LUA_REDIS_TEXTDATA;
+               }
+               lua_pop (L, 1);
+
                if (addr) {
                        ret = TRUE;
                }
@@ -1017,7 +1066,7 @@ lua_redis_connect_sync (lua_State *L)
                double_to_tv (timeout, &tv);
                ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
                REF_INIT_RETAIN (ctx, lua_redis_dtor);
-               ctx->async = FALSE;
+               ctx->flags = flags;
                ctx->d.sync = redisConnectWithTimeout (
                                rspamd_inet_address_to_string (addr->addr),
                                rspamd_inet_address_get_port (addr->addr), tv);
@@ -1082,7 +1131,7 @@ lua_redis_add_cmd (lua_State *L)
 
        if (ctx) {
 
-               if (ctx->async) {
+               if (IS_ASYNC (ctx)) {
                        ud = &ctx->d.async;
 
                        /* Async version */
@@ -1202,7 +1251,7 @@ lua_redis_exec (lua_State *L)
                return 1;
        }
 
-       if (ctx->async) {
+       if (IS_ASYNC (ctx)) {
                lua_pushstring (L, "Async redis pipelining is not implemented");
                lua_error (L);
                return 0;
@@ -1220,7 +1269,8 @@ lua_redis_exec (lua_State *L)
                                if (ret == REDIS_OK) {
                                        if (r->type != REDIS_REPLY_ERROR) {
                                                lua_pushboolean (L, TRUE);
-                                               lua_redis_push_reply (L, r);
+                                               lua_redis_push_reply (L, r,
+                                                               ctx->flags & LUA_REDIS_TEXTDATA);
                                        }
                                        else {
                                                lua_pushboolean (L, FALSE);