]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Rework] Rework lua_tcp to allow TCP dialog
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 29 Dec 2016 15:50:29 +0000 (15:50 +0000)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Thu, 29 Dec 2016 18:58:16 +0000 (18:58 +0000)
- Now, lua_tcp has a chain of read and write events that are processed
  in order
- The old API wasn't touched, however, new style API will be possible
- Partial lua_tcp might be broken, so I need to revisit all plugins that
  use lua_tcp

Issue: #1224

src/lua/lua_tcp.c

index 7be257b51e3ed93e8530a610328a40aaff1a0096..b3276b709b084e80d0ca5a7d08a221e599b9fafa 100644 (file)
@@ -19,7 +19,6 @@
 #include "ref.h"
 #include "unix-std.h"
 
-static void lua_tcp_handler (int fd, short what, gpointer ud);
 /***
  * @module rspamd_tcp
  * Rspamd TCP module represents generic TCP asynchronous client available from LUA code.
@@ -77,32 +76,64 @@ static const struct luaL_reg tcp_libm[] = {
        {NULL, NULL}
 };
 
+struct lua_tcp_read_handler {
+       gchar *stop_pattern;
+       gint cbref;
+};
+
+struct lua_tcp_write_handler {
+       struct iovec *iov;
+       guint iovlen;
+       guint pos;
+       guint total;
+       gint cbref;
+};
+
+enum lua_tcp_handler_type {
+       LUA_WANT_WRITE = 0,
+       LUA_WANT_READ,
+};
+
+struct lua_tcp_handler {
+       union {
+               struct lua_tcp_read_handler r;
+               struct lua_tcp_write_handler w;
+       } h;
+       enum lua_tcp_handler_type type;
+};
+
+struct lua_tcp_dtor {
+       rspamd_mempool_destruct_t dtor;
+       void *data;
+       struct lua_tcp_dtor *next;
+};
+
+#define LUA_TCP_FLAG_PARTIAL (1 << 0)
+#define LUA_TCP_FLAG_SHUTDOWN (1 << 2)
+#define LUA_TCP_FLAG_CONNECTED (1 << 3)
+
 struct lua_tcp_cbdata {
        lua_State *L;
        struct rspamd_async_session *session;
        struct event_base *ev_base;
        struct timeval tv;
        rspamd_inet_addr_t *addr;
-       rspamd_mempool_t *pool;
-       struct iovec *iov;
        GByteArray *in;
-       gchar *stop_pattern;
+       GQueue *handlers;
+       gint fd;
+       gint connect_cb;
+       guint port;
+       guint flags;
        struct rspamd_async_watcher *w;
        struct event ev;
+       struct lua_tcp_dtor *dtors;
        ref_entry_t ref;
-       gint fd;
-       gint cbref;
-       gint connect_cb;
-       guint iovlen;
-       guint pos;
-       guint total;
-       guint16 port;
-       gboolean partial;
-       gboolean do_shutdown;
-       gboolean do_read;
-       gboolean connected;
 };
 
+static void lua_tcp_handler (int fd, short what, gpointer ud);
+static void lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd,
+               gboolean can_read, gboolean can_write);
+
 static const int default_tcp_timeout = 5000;
 
 static struct rspamd_dns_resolver *
@@ -117,12 +148,51 @@ lua_tcp_global_resolver (struct event_base *ev_base)
        return global_resolver;
 }
 
+static gboolean
+lua_tcp_shift_handler (struct lua_tcp_cbdata *cbd)
+{
+       struct lua_tcp_handler *hdl;
+
+       hdl = g_queue_pop_head (cbd->handlers);
+
+       if (hdl == NULL) {
+               /* We are done */
+               return FALSE;
+       }
+
+       if (hdl->type == LUA_WANT_READ) {
+               if (hdl->h.r.cbref) {
+                       luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.r.cbref);
+               }
+
+               if (hdl->h.r.stop_pattern) {
+                       g_free (hdl->h.r.stop_pattern);
+               }
+       }
+       else {
+               if (hdl->h.w.cbref) {
+                       luaL_unref (cbd->L, LUA_REGISTRYINDEX, hdl->h.w.cbref);
+               }
+
+               if (hdl->h.w.iov) {
+                       g_free (hdl->h.w.iov);
+               }
+       }
+
+       g_slice_free1 (sizeof (*hdl), hdl);
+
+       return TRUE;
+}
+
 static void
 lua_tcp_fin (gpointer arg)
 {
        struct lua_tcp_cbdata *cbd = (struct lua_tcp_cbdata *)arg;
+       struct lua_tcp_dtor *dtor, *dttmp;
 
-       luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+       if (cbd->connect_cb) {
+               luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->connect_cb);
+       }
 
        if (cbd->fd != -1) {
                event_del (&cbd->ev);
@@ -133,6 +203,14 @@ lua_tcp_fin (gpointer arg)
                rspamd_inet_address_destroy (cbd->addr);
        }
 
+       while (lua_tcp_shift_handler (cbd)) {}
+
+       LL_FOREACH_SAFE (cbd->dtors, dtor, dttmp) {
+               dtor->dtor (dtor->data);
+               g_slice_free1 (sizeof (*dtor), dtor);
+       }
+
+       g_byte_array_unref (cbd->in);
        g_slice_free1 (sizeof (struct lua_tcp_cbdata), cbd);
 }
 
@@ -161,25 +239,40 @@ lua_tcp_push_error (struct lua_tcp_cbdata *cbd, const char *err, ...)
 {
        va_list ap;
        struct lua_tcp_cbdata **pcbd;
+       struct lua_tcp_handler *hdl;
+       gint cbref;
 
-       lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
+       hdl = g_queue_peek_head (cbd->handlers);
 
-       /* Error message */
-       va_start (ap, err);
-       lua_pushvfstring (cbd->L, err, ap);
-       va_end (ap);
+       g_assert (hdl != NULL);
 
-       /* Body */
-       lua_pushnil (cbd->L);
-       /* Connection */
-       pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
-       *pcbd = cbd;
-       rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
-       REF_RETAIN (cbd);
+       if (hdl->type == LUA_WANT_READ) {
+               cbref = hdl->h.r.cbref;
+       }
+       else {
+               cbref = hdl->h.w.cbref;
+       }
 
-       if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
-               msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
-               lua_pop (cbd->L, 1);
+       if (cbref != -1) {
+               lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+
+               /* Error message */
+               va_start (ap, err);
+               lua_pushvfstring (cbd->L, err, ap);
+               va_end (ap);
+
+               /* Body */
+               lua_pushnil (cbd->L);
+               /* Connection */
+               pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+               *pcbd = cbd;
+               rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+               REF_RETAIN (cbd);
+
+               if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
+                       msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+                       lua_pop (cbd->L, 1);
+               }
        }
 
        REF_RELEASE (cbd);
@@ -190,31 +283,63 @@ lua_tcp_push_data (struct lua_tcp_cbdata *cbd, const guint8 *str, gsize len)
 {
        struct rspamd_lua_text *t;
        struct lua_tcp_cbdata **pcbd;
+       struct lua_tcp_handler *hdl;
+       gint cbref;
 
-       lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbd->cbref);
-       /* Error */
-       lua_pushnil (cbd->L);
-       /* Body */
-       t = lua_newuserdata (cbd->L, sizeof (*t));
-       rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
-       t->start = (const gchar *)str;
-       t->len = len;
-       t->flags = 0;
-       /* Connection */
-       pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
-       *pcbd = cbd;
-       rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+       hdl = g_queue_peek_head (cbd->handlers);
 
-       REF_RETAIN (cbd);
+       g_assert (hdl != NULL);
 
-       if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
-               msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
-               lua_pop (cbd->L, 1);
+       if (hdl->type == LUA_WANT_READ) {
+               cbref = hdl->h.r.cbref;
+       }
+       else {
+               cbref = hdl->h.w.cbref;
+       }
+
+       if (cbref != -1) {
+               lua_rawgeti (cbd->L, LUA_REGISTRYINDEX, cbref);
+               /* Error */
+               lua_pushnil (cbd->L);
+               /* Body */
+
+               if (hdl->type == LUA_WANT_READ) {
+                       t = lua_newuserdata (cbd->L, sizeof (*t));
+                       rspamd_lua_setclass (cbd->L, "rspamd{text}", -1);
+                       t->start = (const gchar *)str;
+                       t->len = len;
+                       t->flags = 0;
+               }
+               /* Connection */
+               pcbd = lua_newuserdata (cbd->L, sizeof (*pcbd));
+               *pcbd = cbd;
+               rspamd_lua_setclass (cbd->L, "rspamd{tcp}", -1);
+
+               REF_RETAIN (cbd);
+
+               if (lua_pcall (cbd->L, 3, 0, 0) != 0) {
+                       msg_info ("callback call failed: %s", lua_tostring (cbd->L, -1));
+                       lua_pop (cbd->L, 1);
+               }
        }
 
        REF_RELEASE (cbd);
 }
 
+static void
+lua_tcp_plan_read (struct lua_tcp_cbdata *cbd)
+{
+       event_del (&cbd->ev);
+#ifdef EV_CLOSED
+       event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST|EV_CLOSED,
+                               lua_tcp_handler, cbd);
+#else
+       event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST, lua_tcp_handler, cbd);
+#endif
+       event_base_set (cbd->ev_base, &cbd->ev);
+       event_add (&cbd->ev, &cbd->tv);
+}
+
 static void
 lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
 {
@@ -224,20 +349,27 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
        gsize remain;
        gssize r;
        struct iovec *cur_iov;
+       struct lua_tcp_handler *hdl;
+       struct lua_tcp_write_handler *wh;
        struct msghdr msg;
 
-       if (cbd->pos == cbd->total) {
+       hdl = g_queue_peek_head (cbd->handlers);
+
+       g_assert (hdl != NULL && hdl->type == LUA_WANT_WRITE);
+       wh = &hdl->h.w;
+
+       if (wh->pos == wh->total) {
                goto call_finish_handler;
        }
 
-       start = &cbd->iov[0];
-       niov = cbd->iovlen;
-       remain = cbd->pos;
+       start = &wh->iov[0];
+       niov = wh->iovlen;
+       remain = wh->pos;
        /* We know that niov is small enough for that */
        cur_iov = alloca (niov * sizeof (struct iovec));
-       memcpy (cur_iov, cbd->iov, niov * sizeof (struct iovec));
+       memcpy (cur_iov, wh->iov, niov * sizeof (struct iovec));
 
-       for (i = 0; i < cbd->iovlen && remain > 0; i++) {
+       for (i = 0; i < wh->iovlen && remain > 0; i++) {
                /* Find out the first iov required */
                start = &cur_iov[i];
                if (start->iov_len <= remain) {
@@ -264,15 +396,16 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
        if (r == -1) {
                lua_tcp_push_error (cbd, "IO write error while trying to write %d "
                                "bytes: %s", (gint)remain, strerror (errno));
-               REF_RELEASE (cbd);
+               lua_tcp_shift_handler (cbd);
+               lua_tcp_plan_handler_event (cbd, TRUE, FALSE);
 
                return;
        }
        else {
-               cbd->pos += r;
+               wh->pos += r;
        }
 
-       if (cbd->pos >= cbd->total) {
+       if (wh->pos >= wh->total) {
                goto call_finish_handler;
        }
        else {
@@ -284,93 +417,132 @@ lua_tcp_write_helper (struct lua_tcp_cbdata *cbd)
 
 call_finish_handler:
 
-       if (cbd->do_shutdown) {
+       if ((cbd->flags & LUA_TCP_FLAG_SHUTDOWN)) {
                /* Half close the connection */
                shutdown (cbd->fd, SHUT_WR);
+               cbd->flags &= ~LUA_TCP_FLAG_SHUTDOWN;
        }
 
-       if (cbd->do_read) {
-               event_del (&cbd->ev);
-#ifdef EV_CLOSED
-               event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST|EV_CLOSED,
-                                       lua_tcp_handler, cbd);
-#else
-               event_set (&cbd->ev, cbd->fd, EV_READ|EV_PERSIST, lua_tcp_handler, cbd);
-#endif
-               event_base_set (cbd->ev_base, &cbd->ev);
-               event_add (&cbd->ev, &cbd->tv);
-       }
-       else {
-               lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len);
-               REF_RELEASE (cbd);
-       }
+       lua_tcp_push_data (cbd, NULL, 0);
+       lua_tcp_shift_handler (cbd);
+       lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
 }
 
-static void
-lua_tcp_handler (int fd, short what, gpointer ud)
+static gboolean
+lua_tcp_process_read_handler (struct lua_tcp_cbdata *cbd,
+               struct lua_tcp_read_handler *rh)
 {
-       struct lua_tcp_cbdata *cbd = ud;
-       gchar inbuf[8192];
-       gssize r;
        guint slen;
-       gint so_error = 0;
-       socklen_t so_len = sizeof (so_error);
+       goffset pos;
 
-       REF_RETAIN (cbd);
+       if (rh->stop_pattern) {
+               slen = strlen (rh->stop_pattern);
 
-       if (what == EV_READ) {
-               g_assert (cbd->partial || cbd->in != NULL);
+               if (cbd->in->len >= slen) {
+                       if ((pos = rspamd_substring_search (cbd->in->data, cbd->in->len,
+                                       rh->stop_pattern, slen)) != -1) {
+                               lua_tcp_push_data (cbd, cbd->in->data, pos);
 
-               r = read (cbd->fd, inbuf, sizeof (inbuf));
+                               if (pos + slen < cbd->in->len) {
+                                       /* We have a leftover */
+                                       memmove (cbd->in->data, cbd->in->data + pos + slen,
+                                                       cbd->in->len - (pos + slen));
+                                       lua_tcp_shift_handler (cbd);
+                               }
+                               else {
+                                       lua_tcp_shift_handler (cbd);
 
-               if (r <= 0) {
-                       /*
-                        * We actually can have connection reset here, so we just check if
-                        * the cumulative buffer is not empty
-                        */
-                       if (cbd->partial) {
-                               if (r < 0) {
-                                       lua_tcp_push_error (cbd, "IO read error while trying to read %d "
-                                                                       "bytes: %s", (gint)sizeof (inbuf),
-                                                                       strerror (errno));
+                                       return TRUE;
                                }
                        }
                        else {
-                               if (cbd->in->len > 0) {
-                                       lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len);
-                               }
-                               else {
-                                       lua_tcp_push_error (cbd, "IO read error while trying to write %d "
-                                                       "bytes: %s", (gint)sizeof (inbuf),
-                                                       strerror (errno));
-                               }
+                               /* Plan new read */
+                               lua_tcp_plan_read (cbd);
                        }
+               }
+       }
 
-                       REF_RELEASE (cbd);
+       return FALSE;
+}
+
+static void
+lua_tcp_process_read (struct lua_tcp_cbdata *cbd,
+               guchar *in, gssize r)
+{
+       struct lua_tcp_handler *hdl;
+       struct lua_tcp_read_handler *rh;
+
+       hdl = g_queue_peek_head (cbd->handlers);
+
+       g_assert (hdl != NULL && hdl->type == LUA_WANT_READ);
+       rh = &hdl->h.r;
+
+       if (r > 0) {
+               if (cbd->flags & LUA_TCP_FLAG_PARTIAL) {
+                       lua_tcp_push_data (cbd, in, r);
+                       /* Plan next event */
+                       lua_tcp_shift_handler (cbd);
+                       lua_tcp_shift_handler (cbd);
                }
                else {
-                       if (cbd->partial) {
-                               lua_tcp_push_data (cbd, inbuf, r);
+                       g_byte_array_append (cbd->in, in, r);
+
+                       if (!lua_tcp_process_read_handler (cbd, rh)) {
+                               /* Plan more read */
+                               lua_tcp_plan_read (cbd);
                        }
                        else {
-                               g_byte_array_append (cbd->in, inbuf, r);
+                               /* Go towards the next handler */
+                               lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
+                       }
+               }
+       }
+       else if (r == 0) {
+               /* EOF */
+               if (cbd->in->len > 0) {
+                       /* We have some data to process */
+                       lua_tcp_process_read_handler (cbd, rh);
+               }
+               else {
+                       lua_tcp_push_error (cbd, "IO read error: connection terminated");
+               }
 
-                               if (cbd->stop_pattern) {
-                                       slen = strlen (cbd->stop_pattern);
+               lua_tcp_plan_handler_event (cbd, FALSE, TRUE);
+       }
+       else {
+               /* An error occurred */
+               if (errno == EAGAIN || errno == EINTR) {
+                       /* Restart call */
+                       lua_tcp_plan_read (cbd);
 
-                                       if (cbd->in->len >= slen) {
-                                               if (memcmp (cbd->stop_pattern, cbd->in->data +
-                                                               (cbd->in->len - slen), slen) == 0) {
-                                                       lua_tcp_push_data (cbd, cbd->in->data, cbd->in->len);
-                                                       REF_RELEASE (cbd);
-                                               }
-                                       }
-                               }
-                       }
+                       return;
                }
+
+               /* Fatal error */
+               lua_tcp_push_error (cbd, "IO read error while trying to read data: %s",
+                               strerror (errno));
+
+               REF_RELEASE (cbd);
+       }
+}
+
+static void
+lua_tcp_handler (int fd, short what, gpointer ud)
+{
+       struct lua_tcp_cbdata *cbd = ud;
+       guchar inbuf[8192];
+       gssize r;
+       gint so_error = 0;
+       socklen_t so_len = sizeof (so_error);
+
+       REF_RETAIN (cbd);
+
+       if (what == EV_READ) {
+               r = read (cbd->fd, inbuf, sizeof (inbuf));
+               lua_tcp_process_read (cbd, inbuf, r);
        }
        else if (what == EV_WRITE) {
-               if (!cbd->connected) {
+               if (!(cbd->flags & LUA_TCP_FLAG_CONNECTED)) {
                        if (getsockopt (fd, SOL_SOCKET, SO_ERROR, &so_error, &so_len) == -1) {
                                lua_tcp_push_error (cbd, "Cannot get socket error: %s",
                                                strerror (errno));
@@ -384,7 +556,7 @@ lua_tcp_handler (int fd, short what, gpointer ud)
                                goto out;
                        }
                        else {
-                               cbd->connected = TRUE;
+                               cbd->flags |= LUA_TCP_FLAG_CONNECTED;
 
                                if (cbd->connect_cb != -1) {
                                        struct lua_tcp_cbdata **pcbd;
@@ -422,6 +594,69 @@ out:
        REF_RELEASE (cbd);
 }
 
+static void
+lua_tcp_plan_handler_event (struct lua_tcp_cbdata *cbd, gboolean can_read,
+               gboolean can_write)
+{
+       struct lua_tcp_handler *hdl;
+
+       hdl = g_queue_peek_head (cbd->handlers);
+
+       if (hdl == NULL) {
+               /* We are finished with a connection */
+               REF_RELEASE (cbd);
+       }
+       else {
+               if (hdl->type == LUA_WANT_READ) {
+                       /* We need to check if we have some leftover in the buffer */
+                       if (cbd->in->len > 0) {
+                               if (lua_tcp_process_read_handler (cbd, &hdl->h.r)) {
+                                       /* We can go to the next handler */
+                                       lua_tcp_shift_handler (cbd);
+                                       lua_tcp_plan_handler_event (cbd, can_read, can_write);
+                               }
+                       }
+                       else {
+                               if (can_read) {
+                                       /* We need to plan a new event */
+                                       event_set (&cbd->ev, cbd->fd, EV_READ, lua_tcp_handler, cbd);
+                                       event_base_set (cbd->ev_base, &cbd->ev);
+                                       event_add (&cbd->ev, &cbd->tv);
+                               }
+                               else {
+                                       /* Cannot read more */
+                                       lua_tcp_push_error (cbd, "EOF, cannot read more data");
+                                       lua_tcp_shift_handler (cbd);
+                                       lua_tcp_plan_handler_event (cbd, can_read, can_write);
+                               }
+                       }
+               }
+               else {
+                       /*
+                        * We need to plan write event if there is something in the
+                        * write request
+                        */
+                       if (hdl->h.w.pos < hdl->h.w.total) {
+                               if (can_write) {
+                                       event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
+                                       event_base_set (cbd->ev_base, &cbd->ev);
+                                       event_add (&cbd->ev, &cbd->tv);
+                               }
+                               else {
+                                       /* Cannot read more */
+                                       lua_tcp_push_error (cbd, "EOF, cannot read more data");
+                                       lua_tcp_shift_handler (cbd);
+                                       lua_tcp_plan_handler_event (cbd, can_read, can_write);
+                               }
+                       }
+                       else {
+                               /* We shouldn't have empty write handlers */
+                               g_assert_not_reached ();
+                       }
+               }
+       }
+}
+
 static gboolean
 lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
 {
@@ -434,11 +669,9 @@ lua_tcp_make_connection (struct lua_tcp_cbdata *cbd)
                msg_info ("cannot connect to %s", rspamd_inet_address_to_string (cbd->addr));
                return FALSE;
        }
-       cbd->fd = fd;
 
-       event_set (&cbd->ev, cbd->fd, EV_WRITE, lua_tcp_handler, cbd);
-       event_base_set (cbd->ev_base, &cbd->ev);
-       event_add (&cbd->ev, &cbd->tv);
+       cbd->fd = fd;
+       lua_tcp_plan_handler_event (cbd, TRUE, TRUE);
 
        return TRUE;
 }
@@ -476,12 +709,13 @@ lua_tcp_dns_handler (struct rdns_reply *reply, gpointer ud)
 }
 
 static gboolean
-lua_tcp_arg_toiovec (lua_State *L, gint pos, rspamd_mempool_t *pool,
+lua_tcp_arg_toiovec (lua_State *L, gint pos, struct lua_tcp_cbdata *cbd,
                struct iovec *vec)
 {
        struct rspamd_lua_text *t;
        gsize len;
        const gchar *str;
+       struct lua_tcp_dtor *dtor;
 
        if (lua_type (L, pos) == LUA_TUSERDATA) {
                t = lua_check_text (L, pos);
@@ -493,7 +727,10 @@ lua_tcp_arg_toiovec (lua_State *L, gint pos, rspamd_mempool_t *pool,
                        if (t->flags & RSPAMD_TEXT_FLAG_OWN) {
                                /* Steal ownership */
                                t->flags = 0;
-                               rspamd_mempool_add_destructor (pool, g_free, (void *)t->start);
+                               dtor = g_slice_alloc0 (sizeof (*dtor));
+                               dtor->dtor = g_free;
+                               dtor->data = (void *)t->start;
+                               LL_PREPEND (cbd->dtors, dtor);
                        }
                }
                else {
@@ -503,7 +740,10 @@ lua_tcp_arg_toiovec (lua_State *L, gint pos, rspamd_mempool_t *pool,
        }
        else if (lua_type (L, pos) == LUA_TSTRING) {
                str = luaL_checklstring (L, pos, &len);
-               vec->iov_base = rspamd_mempool_alloc (pool, len);
+               vec->iov_base = g_malloc (len);
+               dtor = g_slice_alloc0 (sizeof (*dtor));
+               dtor->dtor = g_free;
+               dtor->data = (void *)vec->iov_base;
                memcpy (vec->iov_base, str, len);
                vec->iov_len = len;
        }
@@ -551,7 +791,6 @@ lua_tcp_request (lua_State *L)
        struct rspamd_dns_resolver *resolver;
        struct rspamd_async_session *session;
        struct rspamd_task *task = NULL;
-       rspamd_mempool_t *pool;
        struct iovec *iov = NULL;
        guint niov = 0, total_out;
        gdouble timeout = default_tcp_timeout;
@@ -585,6 +824,8 @@ lua_tcp_request (lua_State *L)
                }
                cbref = luaL_ref (L, LUA_REGISTRYINDEX);
 
+               cbd = g_slice_alloc0 (sizeof (*cbd));
+
                lua_pushstring (L, "task");
                lua_gettable (L, -2);
                if (lua_type (L, -1) == LUA_TUSERDATA) {
@@ -592,7 +833,6 @@ lua_tcp_request (lua_State *L)
                        ev_base = task->ev_base;
                        resolver = task->resolver;
                        session = task->s;
-                       pool = task->task_pool;
                }
                lua_pop (L, 1);
 
@@ -607,16 +847,6 @@ lua_tcp_request (lua_State *L)
                        }
                        lua_pop (L, 1);
 
-                       lua_pushstring (L, "pool");
-                       lua_gettable (L, -2);
-                       if (rspamd_lua_check_udata (L, -1, "rspamd{mempool}")) {
-                               pool = *(rspamd_mempool_t **)lua_touserdata (L, -1);
-                       }
-                       else {
-                               pool = NULL;
-                       }
-                       lua_pop (L, 1);
-
                        lua_pushstring (L, "resolver");
                        lua_gettable (L, -2);
                        if (rspamd_lua_check_udata_maybe (L, -1, "rspamd{resolver}")) {
@@ -638,10 +868,6 @@ lua_tcp_request (lua_State *L)
                        lua_pop (L, 1);
                }
 
-               if (pool == NULL) {
-                       return luaL_error (L, "tcp request has no memory pool associated");
-               }
-
                lua_pushstring (L, "timeout");
                lua_gettable (L, -2);
                if (lua_type (L, -1) == LUA_TNUMBER) {
@@ -652,7 +878,7 @@ lua_tcp_request (lua_State *L)
                lua_pushstring (L, "stop_pattern");
                lua_gettable (L, -2);
                if (lua_type (L, -1) == LUA_TSTRING) {
-                       stop_pattern = rspamd_mempool_strdup (pool, lua_tostring (L, -1));
+                       stop_pattern = g_strdup (lua_tostring (L, -1));
                }
                lua_pop (L, 1);
 
@@ -693,13 +919,16 @@ lua_tcp_request (lua_State *L)
 
                tp = lua_type (L, -1);
                if (tp == LUA_TSTRING || tp == LUA_TUSERDATA) {
-                       iov = rspamd_mempool_alloc (pool, sizeof (*iov));
+                       iov = g_malloc (sizeof (*iov));
                        niov = 1;
 
-                       if (!lua_tcp_arg_toiovec (L, -1, pool, iov)) {
+                       if (!lua_tcp_arg_toiovec (L, -1, cbd, iov)) {
                                lua_pop (L, 1);
                                msg_err ("tcp request has bad data argument");
                                lua_pushboolean (L, FALSE);
+                               g_free (iov);
+                               g_slice_free1 (sizeof (*cbd), cbd);
+
                                return 1;
                        }
 
@@ -713,15 +942,18 @@ lua_tcp_request (lua_State *L)
                                lua_pop (L, 1);
                        }
 
-                       iov = rspamd_mempool_alloc (pool, sizeof (*iov) * niov);
+                       iov = g_malloc (sizeof (*iov) * niov);
                        lua_pushnil (L);
                        niov = 0;
 
                        while (lua_next (L, -2) != 0) {
-                               if (!lua_tcp_arg_toiovec (L, -1, pool, &iov[niov])) {
+                               if (!lua_tcp_arg_toiovec (L, -1, cbd, &iov[niov])) {
                                        lua_pop (L, 2);
                                        msg_err ("tcp request has bad data argument at pos %d", niov);
                                        lua_pushboolean (L, FALSE);
+                                       g_free (iov);
+                                       g_slice_free1 (sizeof (*cbd), cbd);
+
                                        return 1;
                                }
 
@@ -741,27 +973,64 @@ lua_tcp_request (lua_State *L)
                return 1;
        }
 
-       cbd = g_slice_alloc0 (sizeof (*cbd));
        cbd->L = L;
-       cbd->cbref = cbref;
+
+       if (total_out > 0) {
+               struct lua_tcp_handler *wh;
+
+               wh = g_slice_alloc0 (sizeof (*wh));
+               wh->type = LUA_WANT_WRITE;
+               wh->h.w.iov = iov;
+               wh->h.w.iovlen = niov;
+               wh->h.w.total = total_out;
+               wh->h.w.pos = 0;
+               /* Cannot set write handler here */
+               wh->h.w.cbref = -1;
+
+               if (cbref != -1 && !do_read) {
+                       /* We have write only callback */
+                       wh->h.w.cbref = cbref;
+               }
+               else {
+                       /* We have simple client callback */
+                       wh->h.w.cbref = -1;
+               }
+
+               g_queue_push_tail (cbd->handlers, wh);
+       }
+
        cbd->ev_base = ev_base;
        msec_to_tv (timeout, &cbd->tv);
        cbd->fd = -1;
-       cbd->pool = pool;
-       cbd->partial = partial;
-       cbd->do_shutdown = do_shutdown;
-       cbd->iov = iov;
-       cbd->iovlen = niov;
-       cbd->total = total_out;
-       cbd->pos = 0;
        cbd->port = port;
-       cbd->stop_pattern = stop_pattern;
+
+       if (do_read) {
+               cbd->in = g_byte_array_sized_new (8192);
+       }
+       else {
+               /* Save some space... */
+               cbd->in = g_byte_array_new ();
+       }
+
+       if (partial) {
+               cbd->flags |= LUA_TCP_FLAG_PARTIAL;
+       }
+
+       if (do_shutdown) {
+               cbd->flags |= LUA_TCP_FLAG_SHUTDOWN;
+       }
+
+       if (do_read) {
+               struct lua_tcp_handler *rh;
+
+               rh = g_slice_alloc0 (sizeof (*rh));
+               rh->type = LUA_WANT_READ;
+               rh->h.r.cbref = cbref;
+               rh->h.r.stop_pattern = stop_pattern;
+               g_queue_push_tail (cbd->handlers, rh);
+       }
+
        cbd->connect_cb = conn_cbref;
-       cbd->in = g_byte_array_new ();
-       cbd->do_read = do_read;
-       rspamd_mempool_add_destructor (cbd->pool,
-                       (rspamd_mempool_destruct_t)g_byte_array_unref,
-                       cbd->in);
        REF_INIT_RETAIN (cbd, lua_tcp_maybe_free);
 
        if (session) {