guint16 terminated;
};
+#define LUA_REDIS_SPECIFIC_REPLIED (1 << 0)
+#define LUA_REDIS_SPECIFIC_FINISHED (1 << 1)
+#define LUA_REDIS_ASYNC (1 << 0)
+#define LUA_REDIS_TEXTDATA (1 << 1)
+#define IS_ASYNC(ctx) ((ctx)->flags & LUA_REDIS_ASYNC)
+
struct lua_redis_specific_userdata {
gint cbref;
guint nargs;
struct lua_redis_ctx *ctx;
struct lua_redis_specific_userdata *next;
struct event timeout;
- gboolean replied;
- gboolean finished;
+ guint flags;
};
struct lua_redis_ctx {
- gboolean async;
+ guint flags;
union {
struct lua_redis_userdata async;
redisContext *sync;
gboolean is_successfull = TRUE;
struct redisAsyncContext *ac;
- if (ctx->async) {
+ if (IS_ASYNC (ctx)) {
msg_debug ("desctructing %p", ctx);
ud = &ctx->d.async;
LL_FOREACH_SAFE (ud->specific, cur, tmp) {
event_del (&cur->timeout);
- if (!cur->replied) {
+ if (!(cur->flags & LUA_REDIS_SPECIFIC_REPLIED)) {
is_successfull = FALSE;
}
- cur->finished = TRUE;
+ cur->flags |= LUA_REDIS_SPECIFIC_FINISHED;
}
ud->terminated = 1;
ctx = sp_ud->ctx;
event_del (&sp_ud->timeout);
msg_debug ("finished redis query %p from session %p", sp_ud, ctx);
- sp_ud->finished = TRUE;
+ sp_ud->flags |= LUA_REDIS_SPECIFIC_FINISHED;
REDIS_RELEASE (ctx);
}
{
struct lua_redis_userdata *ud = sp_ud->c;
- if (!sp_ud->replied && !sp_ud->finished) {
+ if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
if (sp_ud->cbref != -1) {
/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
}
}
- sp_ud->replied = TRUE;
+ sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
+
if (connected && ud->s) {
rspamd_session_watcher_pop (ud->s, sp_ud->w);
rspamd_session_remove_event (ud->s, lua_redis_fin, sp_ud);
}
static void
-lua_redis_push_reply (lua_State *L, const redisReply *r)
+lua_redis_push_reply (lua_State *L, const redisReply *r, gboolean text_data)
{
guint i;
+ struct rspamd_lua_text *t;
switch (r->type) {
case REDIS_REPLY_INTEGER:
break;
case REDIS_REPLY_STRING:
case REDIS_REPLY_STATUS:
- lua_pushlstring (L, r->str, r->len);
+ if (text_data) {
+ t = lua_newuserdata (L, sizeof (*t));
+ rspamd_lua_setclass (L, "rspamd{text}", -1);
+ t->flags = 0;
+ t->start = r->str;
+ t->len = r->len;
+ }
+ else {
+ lua_pushlstring (L, r->str, r->len);
+ }
break;
case REDIS_REPLY_ARRAY:
lua_createtable (L, r->elements, 0);
for (i = 0; i < r->elements; ++i) {
- lua_redis_push_reply (L, r->element[i]);
+ lua_redis_push_reply (L, r->element[i], text_data);
lua_rawseti (L, -2, i + 1); /* Store sub-reply */
}
break;
{
struct lua_redis_userdata *ud = sp_ud->c;
- if (!sp_ud->replied && !sp_ud->finished) {
+ if (!(sp_ud->flags & (LUA_REDIS_SPECIFIC_REPLIED|LUA_REDIS_SPECIFIC_FINISHED))) {
if (sp_ud->cbref != -1) {
/* Push error */
lua_rawgeti (ud->L, LUA_REGISTRYINDEX, sp_ud->cbref);
/* Error is nil */
lua_pushnil (ud->L);
/* Data */
- lua_redis_push_reply (ud->L, r);
+ lua_redis_push_reply (ud->L, r, ctx->flags & LUA_REDIS_TEXTDATA);
if (lua_pcall (ud->L, 2, 0, 0) != 0) {
msg_info ("call to callback failed: %s", lua_tostring (ud->L, -1));
}
- sp_ud->replied = TRUE;
+ sp_ud->flags |= LUA_REDIS_SPECIFIC_REPLIED;
if (ud->s) {
rspamd_session_watcher_pop (ud->s, sp_ud->w);
REDIS_RETAIN (ctx);
/* If session is finished, we cannot call lua callbacks */
- if (!sp_ud->finished) {
+ if (!(sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED)) {
if (c->err == 0) {
if (r != NULL) {
if (reply->type != REDIS_REPLY_ERROR) {
struct lua_redis_ctx *ctx;
redisAsyncContext *ac;
- if (sp_ud->finished) {
+ if (sp_ud->flags & LUA_REDIS_SPECIFIC_FINISHED) {
return;
}
struct rspamd_async_session *session = NULL;
struct event_base *ev_base = NULL;
gboolean ret = FALSE;
+ guint flags = 0;
if (lua_istable (L, 1)) {
/* Table version */
}
lua_pop (L, 1);
+ lua_pushstring (L, "ev_base");
+ lua_gettable (L, -2);
+ if (lua_type (L, -1) == LUA_TUSERDATA) {
+ ev_base = lua_check_ev_base (L, -1);
+ }
+ lua_pop (L, 1);
+
if (cfg && ev_base) {
ret = TRUE;
}
dbname = lua_tostring (L, -1);
}
lua_pop (L, 1);
+
+ lua_pushstring (L, "opaque_data");
+ lua_gettable (L, -2);
+ if (!!lua_toboolean (L, -1)) {
+ flags |= LUA_REDIS_TEXTDATA;
+ }
+ lua_pop (L, 1);
+
lua_pop (L, 1); /* table */
if (ret && addr != NULL) {
ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
REF_INIT_RETAIN (ctx, lua_redis_dtor);
- ctx->async = TRUE;
+ ctx->flags |= flags | LUA_REDIS_ASYNC;
ud = &ctx->d.async;
ud->s = session;
ud->cfg = cfg;
lua_pop (L, 1);
ud->timeout = timeout;
+
lua_pushstring (L, "args");
lua_gettable (L, -2);
lua_redis_parse_args (L, -1, cmd, &sp_ud->args, &sp_ud->arglens,
gdouble timeout = REDIS_DEFAULT_TIMEOUT;
gchar **args = NULL;
gsize *arglens = NULL;
- guint nargs = 0;
+ guint nargs = 0, flags = 0;
redisContext *ctx;
redisReply *r;
}
lua_pop (L, 1);
+ lua_pushstring (L, "opaque_data");
+ lua_gettable (L, -2);
+ if (!!lua_toboolean (L, -1)) {
+ flags |= LUA_REDIS_TEXTDATA;
+ }
+ lua_pop (L, 1);
+
+
if (cmd) {
lua_pushstring (L, "args");
lua_gettable (L, -2);
if (r != NULL) {
if (r->type != REDIS_REPLY_ERROR) {
lua_pushboolean (L, TRUE);
- lua_redis_push_reply (L, r);
+ lua_redis_push_reply (L, r, flags & LUA_REDIS_TEXTDATA);
}
else {
lua_pushboolean (L, FALSE);
const gchar *host;
struct timeval tv;
gboolean ret = FALSE;
+ guint flags = 0;
gdouble timeout = REDIS_DEFAULT_TIMEOUT;
struct lua_redis_ctx *ctx, **pctx;
}
lua_pop (L, 1);
+ lua_pushstring (L, "opaque_data");
+ lua_gettable (L, -2);
+ if (!!lua_toboolean (L, -1)) {
+ flags |= LUA_REDIS_TEXTDATA;
+ }
+ lua_pop (L, 1);
+
if (addr) {
ret = TRUE;
}
double_to_tv (timeout, &tv);
ctx = g_slice_alloc0 (sizeof (struct lua_redis_ctx));
REF_INIT_RETAIN (ctx, lua_redis_dtor);
- ctx->async = FALSE;
+ ctx->flags = flags;
ctx->d.sync = redisConnectWithTimeout (
rspamd_inet_address_to_string (addr->addr),
rspamd_inet_address_get_port (addr->addr), tv);
if (ctx) {
- if (ctx->async) {
+ if (IS_ASYNC (ctx)) {
ud = &ctx->d.async;
/* Async version */
return 1;
}
- if (ctx->async) {
+ if (IS_ASYNC (ctx)) {
lua_pushstring (L, "Async redis pipelining is not implemented");
lua_error (L);
return 0;
if (ret == REDIS_OK) {
if (r->type != REDIS_REPLY_ERROR) {
lua_pushboolean (L, TRUE);
- lua_redis_push_reply (L, r);
+ lua_redis_push_reply (L, r,
+ ctx->flags & LUA_REDIS_TEXTDATA);
}
else {
lua_pushboolean (L, FALSE);