]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Minor] Added coroutine support to HTTP module
authorMikhail Galanin <mgalanin@mimecast.com>
Thu, 23 Aug 2018 12:45:32 +0000 (13:45 +0100)
committerMikhail Galanin <mgalanin@mimecast.com>
Thu, 23 Aug 2018 12:45:32 +0000 (13:45 +0100)
src/lua/lua_common.c
src/lua/lua_config.c
src/lua/lua_http.c
src/lua/lua_thread_pool.c
src/lua/lua_thread_pool.h

index f446ec9e80737ba29366954ebb4985a3ecee96cd..4f3c8046d9456cf85240e715e33fe7501ff0033d 100644 (file)
@@ -1507,7 +1507,7 @@ GString *
 rspamd_lua_get_traceback_string (lua_State *L)
 {
        GString *tb;
-       const gchar *msg = lua_tostring (L, 1);
+       const gchar *msg = lua_tostring (L, -1);
 
        tb = g_string_sized_new (100);
        g_string_append_printf (tb, "%s; trace:", msg);
index 629653e851b9fed04a93d7140279fd9f78693479..bf7c68ac53a5c143b296fc942aaee37c595ea165 100644 (file)
@@ -1209,7 +1209,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
        thread_entry->cd = cd;
 
        lua_State *thread = thread_entry->lua_state;
-       cd->stack_level = lua_gettop (cd->L);
+       cd->stack_level = lua_gettop (thread);
 
        if (cd->cb_is_ref) {
                lua_rawgeti (thread, LUA_REGISTRYINDEX, cd->callback.ref);
@@ -1226,7 +1226,7 @@ lua_metric_symbol_callback (struct rspamd_task *task, gpointer ud)
        thread_entry->error_callback = lua_metric_symbol_callback_error;
        thread_entry->task = task;
 
-       lua_thread_call (task->cfg->lua_thread_pool, thread_entry, 1);
+       lua_thread_call (thread_entry, 1);
 }
 
 gint
index 64617be9b8e75dcb8a149d98a4122c15f64feb83..3b00305064d8ecb510c8551b5fd26a0989779f9f 100644 (file)
@@ -75,6 +75,7 @@ struct lua_http_cbdata {
        gint flags;
        gint fd;
        gint cbref;
+       struct thread_entry *thread;
 };
 
 static const int default_http_timeout = 5000;
@@ -96,7 +97,10 @@ lua_http_fin (gpointer arg)
 {
        struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg;
 
-       luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
+       if (cbd->cbref != -1) {
+               luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
+       }
+
        if (cbd->conn) {
                /* Here we already have a connection, so we need to unref it */
                rspamd_http_connection_unref (cbd->conn);
@@ -170,12 +174,19 @@ lua_http_push_error (struct lua_http_cbdata *cbd, const char *err)
        lua_thread_pool_restore_callback (&lcbd);
 }
 
+static void lua_http_resume_handler (struct rspamd_http_connection *conn,
+                                                struct rspamd_http_message *msg, const char *err);
+
 static void
 lua_http_error_handler (struct rspamd_http_connection *conn, GError *err)
 {
        struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;
-
-       lua_http_push_error (cbd, err->message);
+       if (cbd->cbref == -1) {
+               lua_http_resume_handler (conn, NULL, err->message);
+       }
+       else {
+               lua_http_push_error (cbd, err->message);
+       }
        lua_http_maybe_free (cbd);
 }
 
@@ -191,6 +202,11 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
        struct lua_callback_state lcbd;
        lua_State *L;
 
+       if (cbd->cbref == -1) {
+               lua_http_resume_handler (conn, msg, NULL);
+               lua_http_maybe_free (cbd);
+               return 0;
+       }
        lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);
 
        L = lcbd.L;
@@ -245,6 +261,84 @@ lua_http_finish_handler (struct rspamd_http_connection *conn,
        return 0;
 }
 
+/*
+ * resumes yielded thread
+ */
+static void
+lua_http_resume_handler (struct rspamd_http_connection *conn,
+                                                struct rspamd_http_message *msg, const char *err)
+{
+       struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;
+       lua_State *L = cbd->thread->lua_state;
+       const gchar *body;
+       gsize body_len;
+       struct rspamd_http_header *h, *htmp;
+
+       msg_info ("T=%p, L=%p, status=%d, err=%s", cbd->thread, cbd->thread->lua_state, lua_status (cbd->thread->lua_state), err);
+       if (err) {
+               lua_pushstring (L, err);
+               lua_pushnil (L);
+       }
+       else {
+               /*
+                * 1 - nil (error)
+                * 2 - table:
+                *   code (int)
+                *   content (string)
+                *   headers (table: header -> value)
+                */
+               lua_pushnil (L); // error code
+
+               lua_createtable (L, 0, 3);
+
+               /* code */
+               lua_pushliteral (L, "code");
+               lua_pushinteger (L, msg->code);
+               lua_settable (L, -3);
+
+               /* content */
+               lua_pushliteral (L, "content");
+
+               body = rspamd_http_message_get_body (msg, &body_len);
+               if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) {
+                       struct rspamd_lua_text *t;
+
+                       t = lua_newuserdata (L, sizeof (*t));
+                       rspamd_lua_setclass (L, "rspamd{text}", -1);
+                       t->start = body;
+                       t->len = body_len;
+                       t->flags = 0;
+               }
+               else {
+                       if (body_len > 0) {
+                               lua_pushlstring (L, body, body_len);
+                       }
+                       else {
+                               lua_pushnil (L);
+                       }
+               }
+               lua_settable (L, -3);
+
+               /* headers */
+               lua_pushliteral (L, "headers");
+               lua_newtable (L);
+
+               HASH_ITER (hh, msg->headers, h, htmp) {
+                       /*
+                        * Lowercase header name, as Lua cannot search in caseless matter
+                        */
+                       rspamd_str_lc (h->combined->str, h->name.len);
+                       lua_pushlstring (L, h->name.begin, h->name.len);
+                       lua_pushlstring (L, h->value.begin, h->value.len);
+                       lua_settable (L, -3);
+               }
+
+               lua_settable (L, -3);
+       }
+
+       lua_resume_thread (cbd->thread, 2);
+}
+
 static gboolean
 lua_http_make_connection (struct lua_http_cbdata *cbd)
 {
@@ -404,7 +498,7 @@ lua_http_request (lua_State *L)
        const gchar *url, *lua_body;
        rspamd_fstring_t *body = NULL;
        gchar *to_resolve;
-       gint cbref;
+       gint cbref = -1;
        gsize bodylen;
        gdouble timeout = default_http_timeout;
        gint flags = 0;
@@ -464,11 +558,9 @@ lua_http_request (lua_State *L)
                lua_gettable (L, 1);
                if (url == NULL || lua_type (L, -1) != LUA_TFUNCTION) {
                        lua_pop (L, 1);
-                       msg_err ("http request has bad params");
-                       lua_pushboolean (L, FALSE);
-                       return 1;
+               } else {
+                       cbref = luaL_ref (L, LUA_REGISTRYINDEX);
                }
-               cbref = luaL_ref (L, LUA_REGISTRYINDEX);
 
                lua_pushstring (L, "task");
                lua_gettable (L, 1);
@@ -802,8 +894,13 @@ lua_http_request (lua_State *L)
                }
        }
 
-       lua_pushboolean (L, TRUE);
-       return 1;
+       if (cbd->cbref == -1) {
+               cbd->thread = lua_thread_pool_get_running_entry (cfg->lua_thread_pool);
+               return lua_yield_thread (cbd->thread, 0);
+       } else {
+               lua_pushboolean (L, TRUE);
+               return 1;
+       }
 }
 
 static gint
index 3525879ddcdb41680abc58dd97208ba83e9a8d3f..4c1681f960ec2cbe32a0d9ee97293be50cb3c852 100644 (file)
@@ -92,6 +92,7 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa
        if (g_queue_get_length (pool->available_items) <= pool->max_items) {
                thread_entry->cd = NULL;
                thread_entry->finish_callback = NULL;
+               thread_entry->error_callback = NULL;
                thread_entry->task = NULL;
                thread_entry->cfg = NULL;
 
@@ -102,7 +103,7 @@ lua_thread_pool_return (struct lua_thread_pool *pool, struct thread_entry *threa
        }
 }
 
-void
+static void
 lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry)
 {
        struct thread_entry *ent = NULL;
@@ -164,10 +165,10 @@ lua_do_resume (lua_State *L, gint narg)
 static void lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg);
 
 void
-lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg)
+lua_thread_call (struct thread_entry *thread_entry, int narg)
 {
        g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */
-       g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call running/yielded thread */
+       g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */
 
        lua_resume_thread_internal (thread_entry, narg);
 }
@@ -178,7 +179,7 @@ lua_resume_thread (struct thread_entry *thread_entry, gint narg)
        /*
         * The only state where we can resume from is LUA_YIELD
         * Another acceptable status is OK (0) but in that case we should push function on stack
-        * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function.
+        * to start the thread from, which is happening in lua_thread_call(), not in this function.
         */
        g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);
 
index e5b2f287333bdbed30524e0d9dbb52609af3f9f4..c77f774554321a52954f9e88c0b4f056ae42fd1c 100644 (file)
@@ -70,15 +70,6 @@ lua_thread_pool_get(struct lua_thread_pool *pool);
 void
 lua_thread_pool_return(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
 
-/**
- * Removes thread from Lua state. It should be done to dead (which ended with an error) threads only
- *
- * @param pool
- * @param thread_entry
- */
-void
-lua_thread_pool_terminate_entry(struct lua_thread_pool *pool, struct thread_entry *thread_entry);
-
 /**
  * Currently running thread. Typically needed in yielding point - to fill-up continuation.
  *
@@ -115,8 +106,15 @@ void
 lua_thread_pool_restore_callback (struct lua_callback_state *cbs);
 
 
+/**
+ * Acts like lua_call but the tread is able to suspend execution.
+ * As soon as the call is over, call either thread_entry::finish_callback or thread_entry::error_callback.
+ *
+ * @param thread_entry
+ * @param narg
+ */
 void
-lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg);
+lua_thread_call (struct thread_entry *thread_entry, int narg);
 
 #endif /* LUA_THREAD_POOL_H_ */