]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] Miltiple fixes to new lua_tcp, add debugging
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 29 Dec 2016 17:58:47 +0000 (17:58 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 29 Dec 2016 18:58:16 +0000 (18:58 +0000)
src/lua/lua_tcp.c

index d7c2a7d23f0f3b5f3b3554d424dbc2483ded610f..2b2a7ce4cf869890fc0cd0580bb12bef5666a1e6 100644 (file)
@@ -152,12 +152,18 @@ struct lua_tcp_cbdata {
        gint connect_cb;
        guint port;
        guint flags;
+       gchar tag[7];
        struct rspamd_async_watcher *w;
        struct event ev;
        struct lua_tcp_dtor *dtors;
        ref_entry_t ref;
 };
 
+#define msg_debug_tcp(...)  rspamd_default_log_function (G_LOG_LEVEL_DEBUG, \
+        "lua_tcp", cbd->tag, \
+        G_STRFUNC, \
+        __VA_ARGS__)
+
 static void lua_tcp_handler (int fd, short what, gpointer ud);
 static void lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd,
                gboolean can_read, gboolean can_write);
@@ -218,6 +224,8 @@ lua_tcp_fin (gpointer arg)
        struct lua_tcp_cbdata *cbd = (struct lua_tcp_cbdata *)arg;
        struct lua_tcp_dtor *dtor, *dttmp;
 
+       msg_debug_tcp ("finishing TCP connection");
+
        if (cbd->connect_cb) {
                luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
        }
@@ -232,6 +240,7 @@ lua_tcp_fin (gpointer arg)
        }
 
        while (lua_tcp_shift_handler (cbd)) {}
+       g_queue_free (cbd->handlers);
 
        LL_FOREACH_SAFE (cbd->dtors, dtor, dttmp) {
                dtor->dtor (dtor->data);
@@ -301,9 +310,9 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, const char *err, ...)
                        msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
                        lua_pop (cbd->L, 1);
                }
-       }
 
-       REF_RELEASE (cbd);
+               REF_RELEASE (cbd);
+       }
 }
 
 static void
@@ -312,7 +321,7 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
        struct rspamd_lua_text *t;
        struct lua_tcp_cbdata **pcbd;
        struct lua_tcp_handler *hdl;
-       gint cbref;
+       gint cbref, arg_cnt;
 
        hdl = g_queue_peek_head (cbd->handlers);
 
@@ -337,6 +346,10 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
                        t->start = (const gchar *)str;
                        t->len = len;
                        t->flags = 0;
+                       arg_cnt = 3;
+               }
+               else {
+                       arg_cnt = 2;
                }
                /* Connection */
                pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
@@ -345,13 +358,13 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
 
                REF_RETAIN (cbd);
 
-               if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
+               if (lua_pcall (cbd->L, arg_cnt, 0, 0) != 0) {
                        msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
                        lua_pop (cbd->L, 1);
                }
-       }
 
-       REF_RELEASE (cbd);
+               REF_RELEASE (cbd);
+       }
 }
 
 static void
@@ -359,10 +372,10 @@ lua_tcp_plan_read (struct lua_tcp_cbdata *cbd)
 {
        event_del (&cbd->ev);
 #ifdef EV_CLOSED
-       event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST|EV_CLOSED,
+       event_set (&cbd->ev, cbd->fd, EV_READ|EV_CLOSED,
                                lua_tcp_handler, cbd);
 #else
-       event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST, lua_tcp_handler, cbd);
+       event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd);
 #endif
        event_base_set (cbd->ev_base, &cbd->ev);
        event_add (&cbd->ev, &cbd->tv);
@@ -445,6 +458,8 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
 
 call_finish_handler:
 
+       msg_debug_tcp ("finishing TCP write");
+
        if ((cbd->flags & LUA_TCP_FLAG_SHUTDOWN)) {
                /* Half close the connection */
                shutdown (cbd->fd, SHUT_WR);
@@ -469,6 +484,7 @@ lua_tcp_process_read_handler (struct lua_tcp_cbdata *cbd,
                if (cbd->in->len >= slen) {
                        if ((pos = rspamd_substring_search (cbd->in->data, cbd->in->len,
                                        rh->stop_pattern, slen)) != -1) {
+                               msg_debug_tcp ("found TCP stop pattern");
                                lua_tcp_push_data (cbd, cbd->in->data, pos);
 
                                if (pos + slen < cbd->in->len) {
@@ -485,11 +501,13 @@ lua_tcp_process_read_handler (struct lua_tcp_cbdata *cbd,
                        }
                        else {
                                /* Plan new read */
+                               msg_debug_tcp ("NOT found TCP stop pattern");
                                lua_tcp_plan_read (cbd);
                        }
                }
        }
        else {
+               msg_debug_tcp ("read TCP partial data");
                lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len);
                lua_tcp_shift_handler (cbd);
 
@@ -570,6 +588,8 @@ lua_tcp_handler (int fd, short what, gpointer ud)
 
        REF_RETAIN (cbd);
 
+       msg_debug_tcp ("processed TCP event: %d", what);
+
        if (what == EV_READ) {
                r = read (cbd->fd, inbuf, sizeof (inbuf));
                lua_tcp_process_read (cbd, inbuf, r);
@@ -637,12 +657,15 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
 
        if (hdl == NULL) {
                /* We are finished with a connection */
+               msg_debug_tcp ("no handlers left, finish session");
                REF_RELEASE (cbd);
        }
        else {
                if (hdl->type == LUA_WANT_READ) {
+
                        /* We need to check if we have some leftover in the buffer */
                        if (cbd->in->len > 0) {
+                               msg_debug_tcp ("process read buffer leftover");
                                if (lua_tcp_process_read_handler (cbd, &hdl->h.r)) {
                                        /* We can go to the next handler */
                                        lua_tcp_shift_handler (cbd);
@@ -650,6 +673,7 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
                                }
                        }
                        else {
+                               msg_debug_tcp ("plan new read");
                                if (can_read) {
                                        /* We need to plan a new event */
                                        event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd);
@@ -669,15 +693,17 @@ lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
                         * We need to plan write event if there is something in the
                         * write request
                         */
+
                        if (hdl->h.w.pos < hdl->h.w.total) {
+                               msg_debug_tcp ("plan new write");
                                if (can_write) {
                                        event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
                                        event_base_set (cbd->ev_base, &cbd->ev);
                                        event_add (&cbd->ev, &cbd->tv);
                                }
                                else {
-                                       /* Cannot read more */
-                                       lua_tcp_push_error (cbd, "EOF, cannot read more data");
+                                       /* Cannot write more */
+                                       lua_tcp_push_error (cbd, "EOF, cannot write more data");
                                        lua_tcp_shift_handler (cbd);
                                        lua_tcp_plan_handler_event (cbd, can_read, can_write);
                                }
@@ -826,6 +852,7 @@ lua_tcp_request (lua_State *L)
        struct rspamd_task *task = NULL;
        struct iovec *iov = NULL;
        guint niov = 0, total_out;
+       guint64 h;
        gdouble timeout = default_tcp_timeout;
        gboolean partial = FALSE, do_shutdown = FALSE, do_read = TRUE;
 
@@ -1007,6 +1034,9 @@ lua_tcp_request (lua_State *L)
        }
 
        cbd->L = L;
+       h = rspamd_random_uint64_fast ();
+       rspamd_snprintf (cbd->tag, sizeof (cbd->tag), "%uxL", h);
+       cbd->handlers = g_queue_new ();
 
        if (total_out > 0) {
                struct lua_tcp_handler *wh;
@@ -1162,8 +1192,9 @@ lua_tcp_add_read (lua_State *L)
        rh->type = LUA_WANT_READ;
        rh->h.r.cbref = cbref;
        rh->h.r.stop_pattern = stop_pattern;
+       msg_debug_tcp ("added read event, cbref: %d", cbref);
+
        g_queue_push_tail (cbd->handlers, rh);
-       lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
 
        return 0;
 }
@@ -1242,19 +1273,10 @@ lua_tcp_add_write (lua_State *L)
        wh->h.w.total = total_out;
        wh->h.w.pos = 0;
        /* Cannot set write handler here */
-       wh->h.w.cbref = -1;
-
-       if (cbref != -1) {
-               /* We have write only callback */
-               wh->h.w.cbref = cbref;
-       }
-       else {
-               /* We have simple client callback */
-               wh->h.w.cbref = -1;
-       }
+       wh->h.w.cbref = cbref;
+       msg_debug_tcp ("added write event, cbref: %d", cbref);
 
        g_queue_push_tail (cbd->handlers, wh);
-       lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
        lua_pushboolean (L, TRUE);
 
        return 1;