gint flags;
gint fd;
gint cbref;
+ struct thread_entry *thread;
};
static const int default_http_timeout = 5000;
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)arg;
- luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
+ if (cbd->cbref != -1) {
+ luaL_unref (cbd->cfg->lua_state, LUA_REGISTRYINDEX, cbd->cbref);
+ }
+
if (cbd->conn) {
/* Here we already have a connection, so we need to unref it */
rspamd_http_connection_unref (cbd->conn);
lua_thread_pool_restore_callback (&lcbd);
}
+static void lua_http_resume_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg, const char *err);
+
static void
lua_http_error_handler (struct rspamd_http_connection *conn, GError *err)
{
struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;
-
- lua_http_push_error (cbd, err->message);
+ if (cbd->cbref == -1) {
+ lua_http_resume_handler (conn, NULL, err->message);
+ }
+ else {
+ lua_http_push_error (cbd, err->message);
+ }
lua_http_maybe_free (cbd);
}
struct lua_callback_state lcbd;
lua_State *L;
+ if (cbd->cbref == -1) {
+ lua_http_resume_handler (conn, msg, NULL);
+ lua_http_maybe_free (cbd);
+ return 0;
+ }
lua_thread_pool_prepare_callback (cbd->cfg->lua_thread_pool, &lcbd);
L = lcbd.L;
return 0;
}
+/*
+ * resumes yielded thread
+ */
+static void
+lua_http_resume_handler (struct rspamd_http_connection *conn,
+ struct rspamd_http_message *msg, const char *err)
+{
+ struct lua_http_cbdata *cbd = (struct lua_http_cbdata *)conn->ud;
+ lua_State *L = cbd->thread->lua_state;
+ const gchar *body;
+ gsize body_len;
+ struct rspamd_http_header *h, *htmp;
+
+ msg_info ("T=%p, L=%p, status=%d, err=%s", cbd->thread, cbd->thread->lua_state, lua_status (cbd->thread->lua_state), err);
+ if (err) {
+ lua_pushstring (L, err);
+ lua_pushnil (L);
+ }
+ else {
+ /*
+ * 1 - nil (error)
+ * 2 - table:
+ * code (int)
+ * content (string)
+ * headers (table: header -> value)
+ */
+ lua_pushnil (L); // error code
+
+ lua_createtable (L, 0, 3);
+
+ /* code */
+ lua_pushliteral (L, "code");
+ lua_pushinteger (L, msg->code);
+ lua_settable (L, -3);
+
+ /* content */
+ lua_pushliteral (L, "content");
+
+ body = rspamd_http_message_get_body (msg, &body_len);
+ if (cbd->flags & RSPAMD_LUA_HTTP_FLAG_TEXT) {
+ struct rspamd_lua_text *t;
+
+ t = lua_newuserdata (L, sizeof (*t));
+ rspamd_lua_setclass (L, "rspamd{text}", -1);
+ t->start = body;
+ t->len = body_len;
+ t->flags = 0;
+ }
+ else {
+ if (body_len > 0) {
+ lua_pushlstring (L, body, body_len);
+ }
+ else {
+ lua_pushnil (L);
+ }
+ }
+ lua_settable (L, -3);
+
+ /* headers */
+ lua_pushliteral (L, "headers");
+ lua_newtable (L);
+
+ HASH_ITER (hh, msg->headers, h, htmp) {
+ /*
+ * Lowercase header name, as Lua cannot search in caseless matter
+ */
+ rspamd_str_lc (h->combined->str, h->name.len);
+ lua_pushlstring (L, h->name.begin, h->name.len);
+ lua_pushlstring (L, h->value.begin, h->value.len);
+ lua_settable (L, -3);
+ }
+
+ lua_settable (L, -3);
+ }
+
+ lua_resume_thread (cbd->thread, 2);
+}
+
static gboolean
lua_http_make_connection (struct lua_http_cbdata *cbd)
{
const gchar *url, *lua_body;
rspamd_fstring_t *body = NULL;
gchar *to_resolve;
- gint cbref;
+ gint cbref = -1;
gsize bodylen;
gdouble timeout = default_http_timeout;
gint flags = 0;
lua_gettable (L, 1);
if (url == NULL || lua_type (L, -1) != LUA_TFUNCTION) {
lua_pop (L, 1);
- msg_err ("http request has bad params");
- lua_pushboolean (L, FALSE);
- return 1;
+ } else {
+ cbref = luaL_ref (L, LUA_REGISTRYINDEX);
}
- cbref = luaL_ref (L, LUA_REGISTRYINDEX);
lua_pushstring (L, "task");
lua_gettable (L, 1);
}
}
- lua_pushboolean (L, TRUE);
- return 1;
+ if (cbd->cbref == -1) {
+ cbd->thread = lua_thread_pool_get_running_entry (cfg->lua_thread_pool);
+ return lua_yield_thread (cbd->thread, 0);
+ } else {
+ lua_pushboolean (L, TRUE);
+ return 1;
+ }
}
static gint
if (g_queue_get_length (pool->available_items) <= pool->max_items) {
thread_entry->cd = NULL;
thread_entry->finish_callback = NULL;
+ thread_entry->error_callback = NULL;
thread_entry->task = NULL;
thread_entry->cfg = NULL;
}
}
-void
+static void
lua_thread_pool_terminate_entry (struct lua_thread_pool *pool, struct thread_entry *thread_entry)
{
struct thread_entry *ent = NULL;
static void lua_resume_thread_internal (struct thread_entry *thread_entry, gint narg);
void
-lua_thread_call (struct lua_thread_pool *pool, struct thread_entry *thread_entry, int narg)
+lua_thread_call (struct thread_entry *thread_entry, int narg)
{
g_assert (lua_status (thread_entry->lua_state) == 0); /* we can't call running/yielded thread */
- g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call running/yielded thread */
+ g_assert (thread_entry->task != NULL || thread_entry->cfg != NULL); /* we can't call without pool */
lua_resume_thread_internal (thread_entry, narg);
}
/*
* The only state where we can resume from is LUA_YIELD
* Another acceptable status is OK (0) but in that case we should push function on stack
- * to start the thread from, which is happening in lua_metric_symbol_callback(), not in this function.
+ * to start the thread from, which is happening in lua_thread_call(), not in this function.
*/
g_assert (lua_status (thread_entry->lua_state) == LUA_YIELD);