From: Vsevolod Stakhov Date: Mon, 18 May 2026 09:40:40 +0000 (+0100) Subject: [Fix] fuzzy_storage: peer-pipe write resume and shutdown drain X-Git-Tag: 4.1.0~50 X-Git-Url: http://git.ipfire.org/gitweb/index.cgi?a=commitdiff_plain;h=32642c94b020e02c8d86e9ef89d4041501eb40cd;p=thirdparty%2Frspamd.git [Fix] fuzzy_storage: peer-pipe write resume and shutdown drain fuzzy_peer_try_send retried short writes from byte 0 of the command instead of resuming at the offset already sent, so a partial write followed by a watcher-driven retry shoved garbage into the peer pipe. Track the bytes sent on the request and resume from there. Convert the helper to a tri-state (DONE / AGAIN / FATAL) so the watcher can keep firing on transient short writes and only stop+free on completion or a hard error. Also link pending requests into a list on the ctx so worker shutdown can drain any whose write watcher never fires (e.g. on non-update workers where the event loop has already broken out), instead of leaking the up_req allocations. --- diff --git a/src/fuzzy_storage.c b/src/fuzzy_storage.c index c87bd86f29..ece40aa2dd 100644 --- a/src/fuzzy_storage.c +++ b/src/fuzzy_storage.c @@ -131,6 +131,19 @@ struct fuzzy_tcp_session { struct fuzzy_peer_request { ev_io io_ev; struct fuzzy_peer_cmd cmd; + /* ctx is set (and the request linked into ctx->pending_peer_requests) only + * while the write watcher is registered, so synchronous-success requests + * skip the list bookkeeping. */ + struct rspamd_fuzzy_storage_ctx *ctx; + struct fuzzy_peer_request *prev; + struct fuzzy_peer_request *next; + gsize written; /* bytes of cmd already sent on the peer pipe */ +}; + +enum fuzzy_peer_send_status { + FUZZY_PEER_SEND_DONE = 0, /* fully sent */ + FUZZY_PEER_SEND_AGAIN, /* short write or EAGAIN/EWOULDBLOCK/EINTR */ + FUZZY_PEER_SEND_FATAL, /* unrecoverable write error */ }; struct rspamd_updates_cbdata { @@ -864,31 +877,62 @@ rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd *cmd, rspamd_fuzzy_write_reply(session); } -static gboolean +static enum fuzzy_peer_send_status fuzzy_peer_try_send(int fd, struct fuzzy_peer_request *up_req) { + const unsigned char *p = (const unsigned char *) &up_req->cmd + up_req->written; + gsize remaining = sizeof(up_req->cmd) - up_req->written; gssize r; - r = write(fd, &up_req->cmd, sizeof(up_req->cmd)); + r = write(fd, p, remaining); - if (r != sizeof(up_req->cmd)) { - return FALSE; + if (r > 0) { + up_req->written += (gsize) r; + if (up_req->written == sizeof(up_req->cmd)) { + return FUZZY_PEER_SEND_DONE; + } + /* Short write — kernel buffer probably filled; resume from up_req->written. */ + return FUZZY_PEER_SEND_AGAIN; } - return TRUE; + if (r == 0) { + /* write(2) only returns 0 if asked to write 0 bytes; not expected here */ + return FUZZY_PEER_SEND_AGAIN; + } + + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + return FUZZY_PEER_SEND_AGAIN; + } + + return FUZZY_PEER_SEND_FATAL; +} + +static void +fuzzy_peer_request_release(struct fuzzy_peer_request *up_req) +{ + if (up_req->ctx != NULL) { + DL_DELETE(up_req->ctx->pending_peer_requests, up_req); + } + g_free(up_req); } static void fuzzy_peer_send_io(EV_P_ ev_io *w, int revents) { struct fuzzy_peer_request *up_req = (struct fuzzy_peer_request *) w->data; + enum fuzzy_peer_send_status status = fuzzy_peer_try_send(w->fd, up_req); - if (!fuzzy_peer_try_send(w->fd, up_req)) { + if (status == FUZZY_PEER_SEND_AGAIN) { + /* Leave the watcher running; libev will fire again when the pipe is writable. */ + return; + } + + if (status == FUZZY_PEER_SEND_FATAL) { msg_err("cannot send update request to the peer: %s", strerror(errno)); } ev_io_stop(EV_A_ w); - g_free(up_req); + fuzzy_peer_request_release(up_req); } static void @@ -1107,13 +1151,23 @@ rspamd_fuzzy_check_callback(struct rspamd_fuzzy_multiflag_result *mf_result, voi sizeof(up_req->cmd.cmd.shingle.sgl)); } - if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) { + enum fuzzy_peer_send_status sst = fuzzy_peer_try_send( + session->ctx->peer_fd, up_req); + + if (sst == FUZZY_PEER_SEND_DONE) { + g_free(up_req); + } + else if (sst == FUZZY_PEER_SEND_AGAIN) { + up_req->ctx = session->ctx; + DL_APPEND(session->ctx->pending_peer_requests, up_req); up_req->io_ev.data = up_req; ev_io_init(&up_req->io_ev, fuzzy_peer_send_io, session->ctx->peer_fd, EV_WRITE); ev_io_start(session->ctx->event_loop, &up_req->io_ev); } else { + msg_err("cannot send update request to the peer: %s", + strerror(errno)); g_free(up_req); } } @@ -1465,13 +1519,23 @@ rspamd_fuzzy_process_command(struct fuzzy_session *session) ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal; memcpy(ptr, cmd, up_len); - if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) { + enum fuzzy_peer_send_status sst = fuzzy_peer_try_send( + session->ctx->peer_fd, up_req); + + if (sst == FUZZY_PEER_SEND_DONE) { + g_free(up_req); + } + else if (sst == FUZZY_PEER_SEND_AGAIN) { + up_req->ctx = session->ctx; + DL_APPEND(session->ctx->pending_peer_requests, up_req); up_req->io_ev.data = up_req; ev_io_init(&up_req->io_ev, fuzzy_peer_send_io, session->ctx->peer_fd, EV_WRITE); ev_io_start(session->ctx->event_loop, &up_req->io_ev); } else { + msg_err("cannot send update request to the peer: %s", + strerror(errno)); g_free(up_req); } } @@ -3380,6 +3444,22 @@ start_fuzzy(struct rspamd_worker *worker) ctx->dynamic_blocked_nets = NULL; } + /* Drain any peer-pipe write requests that were waiting for the write + * watcher to fire. The event loop has already broken out of ev_loop on + * non-update workers, so these watchers will never fire on their own and + * the up_req allocations would leak. */ + { + struct fuzzy_peer_request *pr, *pr_tmp; + DL_FOREACH_SAFE(ctx->pending_peer_requests, pr, pr_tmp) + { + if (ev_can_stop(&pr->io_ev)) { + ev_io_stop(ctx->event_loop, &pr->io_ev); + } + DL_DELETE(ctx->pending_peer_requests, pr); + g_free(pr); + } + } + struct rspamd_lua_fuzzy_script *cur, *tmp; LL_FOREACH_SAFE(ctx->lua_pre_handlers, cur, tmp) diff --git a/src/libserver/fuzzy_storage_internal.h b/src/libserver/fuzzy_storage_internal.h index e3fcca8034..3c491895ec 100644 --- a/src/libserver/fuzzy_storage_internal.h +++ b/src/libserver/fuzzy_storage_internal.h @@ -140,6 +140,8 @@ struct rspamd_lua_fuzzy_script { struct rspamd_lua_fuzzy_script *next; }; +struct fuzzy_peer_request; + struct rspamd_fuzzy_storage_ctx { uint64_t magic; struct ev_loop *event_loop; @@ -185,6 +187,10 @@ struct rspamd_fuzzy_storage_ctx { unsigned int updates_failed; unsigned int updates_maxfail; int peer_fd; + /* Doubly-linked list of peer requests whose write watcher is registered + * but has not yet completed. Walked at worker shutdown so pending + * up_reqs don't leak when the event loop stops before they fire. */ + struct fuzzy_peer_request *pending_peer_requests; unsigned int leaky_bucket_ttl; unsigned int leaky_bucket_mask;