From: Vsevolod Stakhov Date: Mon, 21 Jan 2019 16:26:11 +0000 (+0000) Subject: [Project] Lua_udp: Implement fully functional client X-Git-Tag: 1.9.0~282 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=239f6045f9cfad501d55f926771a2dd7c664c5b2;p=thirdparty%2Frspamd.git [Project] Lua_udp: Implement fully functional client --- diff --git a/src/lua/lua_udp.c b/src/lua/lua_udp.c index 3b54c5e9ec..1bff702a96 100644 --- a/src/lua/lua_udp.c +++ b/src/lua/lua_udp.c @@ -54,16 +54,21 @@ static const struct luaL_reg udp_libf[] = { struct lua_udp_cbdata { struct event io; + struct timeval tv; struct event_base *ev_base; + struct rspamd_async_event *async_ev; + struct rspamd_task *task; rspamd_mempool_t *pool; rspamd_inet_addr_t *addr; struct rspamd_symcache_item *item; struct rspamd_async_session *s; struct iovec *iov; lua_State *L; + guint retransmits; guint iovlen; gint sock; gint cbref; + gboolean sent; }; #define msg_debug_udp(...) rspamd_conditional_debug_fast (NULL, cbd->addr, \ @@ -110,6 +115,26 @@ lua_udp_cbd_fin (gpointer p) if (cbd->addr) { rspamd_inet_address_free (cbd->addr); } + + if (cbd->cbref) { + luaL_unref (cbd->L, LUA_REGISTRYINDEX, cbd->cbref); + } +} + +static void +lua_udp_maybe_free (struct lua_udp_cbdata *cbd) +{ + if (cbd->item) { + rspamd_symcache_item_async_dec_check (cbd->task, cbd->item, M); + cbd->item = NULL; + } + + if (cbd->async_ev) { + rspamd_session_remove_event (cbd->s, lua_udp_cbd_fin, cbd); + } + else { + lua_udp_cbd_fin (cbd); + } } @@ -143,44 +168,186 @@ lua_try_send_request (struct lua_udp_cbdata *cbd) return RSPAMD_SENT_FAILURE; } +static void +lua_udp_maybe_push_error (struct lua_udp_cbdata *cbd, const gchar *err) +{ + if (cbd->cbref != -1) { + gint top; + lua_State *L = cbd->L; + + top = lua_gettop (L); + lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref); + + /* Error message */ + lua_pushboolean (L, false); + lua_pushstring (L, err); + + if (cbd->item) { + rspamd_symcache_set_cur_item (cbd->task, cbd->item); + } + + if (lua_pcall (L, 2, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); + } + + lua_settop (L, top); + } + + lua_udp_maybe_free (cbd); +} + +static void +lua_udp_push_data (struct lua_udp_cbdata *cbd, const gchar *data, + gssize len) +{ + if (cbd->cbref != -1) { + gint top; + lua_State *L = cbd->L; + + top = lua_gettop (L); + lua_rawgeti (L, LUA_REGISTRYINDEX, cbd->cbref); + + /* Error message */ + lua_pushboolean (L, true); + lua_pushlstring (L, data, len); + + if (cbd->item) { + rspamd_symcache_set_cur_item (cbd->task, cbd->item); + } + + if (lua_pcall (L, 2, 0, 0) != 0) { + msg_info ("callback call failed: %s", lua_tostring (L, -1)); + } + + lua_settop (L, top); + } + + lua_udp_maybe_free (cbd); +} + +static gboolean +lua_udp_maybe_register_event (struct lua_udp_cbdata *cbd) +{ + if (cbd->s) { + cbd->async_ev = rspamd_session_add_event (cbd->s, lua_udp_cbd_fin, + cbd, M); + + if (!cbd->async_ev) { + return FALSE; + } + } + + if (cbd->task) { + cbd->item = rspamd_symcache_get_cur_item (cbd->task); + rspamd_symcache_item_async_inc (cbd->task, cbd->item, M); + } + + return TRUE; +} + +static void +lua_udp_io_handler (gint fd, short what, gpointer p) +{ + struct lua_udp_cbdata *cbd = (struct lua_udp_cbdata *)p; + lua_State *L; + gssize r; + + L = cbd->L; + + if (what == EV_TIMEOUT) { + if (cbd->sent && cbd->retransmits > 0) { + r = lua_try_send_request (cbd); + + if (r == RSPAMD_SENT_OK) { + event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd); + event_base_set (cbd->ev_base, &cbd->io); + event_add (&cbd->io, &cbd->tv); + lua_udp_maybe_register_event (cbd); + } + else if (r == RSPAMD_SENT_FAILURE) { + lua_udp_maybe_push_error (cbd, "write error"); + } + else { + cbd->retransmits --; + event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd); + event_base_set (cbd->ev_base, &cbd->io); + event_add (&cbd->io, &cbd->tv); + } + } + else { + if (!cbd->sent) { + lua_udp_maybe_push_error (cbd, "sent timeout"); + } + else { + lua_udp_maybe_push_error (cbd, "read timeout"); + } + } + } + else if (what == EV_WRITE) { + r = lua_try_send_request (cbd); + + if (r == RSPAMD_SENT_OK) { + if (cbd->cbref != -1) { + event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd); + event_base_set (cbd->ev_base, &cbd->io); + event_add (&cbd->io, &cbd->tv); + cbd->sent = TRUE; + } + else { + lua_udp_maybe_free (cbd); + } + } + else if (r == RSPAMD_SENT_FAILURE) { + lua_udp_maybe_push_error (cbd, "write error"); + } + else { + cbd->retransmits --; + event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd); + event_base_set (cbd->ev_base, &cbd->io); + event_add (&cbd->io, &cbd->tv); + } + } + else if (what == EV_READ) { + guchar udpbuf[4096]; + socklen_t slen; + struct sockaddr *sa; + + sa = rspamd_inet_address_get_sa (cbd->addr, &slen); + + r = recvfrom (cbd->sock, udpbuf, sizeof (udpbuf), 0, sa, &slen); + + if (r == -1) { + lua_udp_maybe_push_error (cbd, strerror (errno)); + } + else { + lua_udp_push_data (cbd, udpbuf, r); + } + } +} /*** - * @function rspamd_tcp.request({params}) - * This function creates and sends TCP request to the specified host and port, - * resolves hostname (if needed) and invokes continuation callback upon data received - * from the remote peer. This function accepts table of arguments with the following - * attributes + * @function rspamd_udp.sendto({params}) + * This function simply sends data to an external UDP service * * - `task`: rspamd task objects (implies `pool`, `session`, `ev_base` and `resolver` arguments) * - `ev_base`: event base (if no task specified) * - `session`: events session (no task) * - `host`: IP or name of the peer (required) - * - `port`: remote port to use + * - `port`: remote port to use (if `host` has no port part this is required) * - `data`: a table of strings or `rspamd_text` objects that contains data pieces - * - `callback`: continuation function (required) - * - `on_connect`: callback called on connection success - * - `timeout`: floating point value that specifies timeout for IO operations in **seconds** - * - `partial`: boolean flag that specifies that callback should be called on any data portion received - * - `stop_pattern`: stop reading on finding a certain pattern (e.g. \r\n.\r\n for smtp) - * - `shutdown`: half-close socket after writing (boolean: default false) - * - `read`: read response after sending request (boolean: default true) - * @return {boolean} true if request has been sent + * @return {boolean} true if request has been sent (additional string if it has not) */ static gint lua_udp_sendto (lua_State *L) { LUA_TRACE_POINT; const gchar *host; guint port; - gint cbref, tp, conn_cbref = -1; struct event_base *ev_base = NULL; struct lua_udp_cbdata *cbd; struct rspamd_async_session *session = NULL; struct rspamd_task *task = NULL; rspamd_inet_addr_t *addr; rspamd_mempool_t *pool = NULL; - struct iovec *iov = NULL; - guint niov = 0, total_out; - guint64 h; gdouble timeout = default_udp_timeout; if (lua_type (L, 1) == LUA_TTABLE) { @@ -204,7 +371,9 @@ lua_udp_sendto (lua_State *L) { host = luaL_checkstring (L, -1); if (rspamd_parse_inet_address (&addr, host, 0)) { - rspamd_inet_address_set_port (addr, port); + if (port != 0) { + rspamd_inet_address_set_port (addr, port); + } } else { lua_pop (L, 1); @@ -222,6 +391,10 @@ lua_udp_sendto (lua_State *L) { } addr = rspamd_inet_address_copy (lip->addr); + + if (port != 0) { + rspamd_inet_address_set_port (addr, port); + } } else { lua_pop (L, 1); @@ -276,6 +449,14 @@ lua_udp_sendto (lua_State *L) { } lua_pop (L, 1); + if (!ev_base || !pool) { + rspamd_inet_address_free (addr); + + return luaL_error (L, "invalid arguments"); + } + + + if (!ev_base || !pool) { rspamd_inet_address_free (addr); @@ -289,6 +470,8 @@ lua_udp_sendto (lua_State *L) { cbd->addr = addr; cbd->sock = rspamd_socket_create (rspamd_inet_address_get_af (addr), SOCK_DGRAM, 0, TRUE); + cbd->cbref = -1; + double_to_tv (timeout, &cbd->tv); if (cbd->sock == -1) { rspamd_inet_address_free (addr); @@ -301,6 +484,8 @@ lua_udp_sendto (lua_State *L) { gsize data_len; lua_pushstring (L, "data"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TTABLE) { data_len = rspamd_lua_table_size (L, -1); cbd->iov = rspamd_mempool_alloc (pool, @@ -322,22 +507,65 @@ lua_udp_sendto (lua_State *L) { lua_pop (L, 1); + lua_pushstring (L, "callback"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TFUNCTION) { + cbd->cbref = luaL_ref (L, LUA_REGISTRYINDEX); + } + else { + lua_pop (L, 1); + } + + lua_pushstring (L, "retransmits"); + lua_gettable (L, -2); + if (lua_type (L, -1) == LUA_TNUMBER) { + cbd->retransmits = lua_tonumber (L, -1); + } + lua_pop (L, 1); + enum rspamd_udp_send_result r; r = lua_try_send_request (cbd); if (r == RSPAMD_SENT_OK) { + if (cbd->cbref == -1) { + lua_udp_maybe_free (cbd); + } + else { + if (!lua_udp_maybe_register_event (cbd)) { + lua_pushboolean (L, false); + lua_pushstring (L, "session error"); + lua_udp_maybe_free (cbd); + + return 2; + } + + event_set (&cbd->io, cbd->sock, EV_READ, lua_udp_io_handler, cbd); + event_base_set (cbd->ev_base, &cbd->io); + event_add (&cbd->io, &cbd->tv); + cbd->sent = TRUE; + } + lua_pushboolean (L, true); - lua_udp_cbd_fin (cbd); } else if (r == RSPAMD_SENT_FAILURE) { lua_pushboolean (L, false); lua_pushstring (L, strerror (errno)); - lua_udp_cbd_fin (cbd); + lua_udp_maybe_free (cbd); return 2; } else { - /* TODO: add waiting */ + event_set (&cbd->io, cbd->sock, EV_WRITE, lua_udp_io_handler, cbd); + event_base_set (cbd->ev_base, &cbd->io); + event_add (&cbd->io, &cbd->tv); + + if (!lua_udp_maybe_register_event (cbd)) { + lua_pushboolean (L, false); + lua_pushstring (L, "session error"); + lua_udp_maybe_free (cbd); + + return 2; + } } } else {