]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Fix] fuzzy_storage: peer-pipe write resume and shutdown drain
authorVsevolod Stakhov <vsevolod@rspamd.com>
Mon, 18 May 2026 09:40:40 +0000 (10:40 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Mon, 18 May 2026 10:07:56 +0000 (11:07 +0100)
fuzzy_peer_try_send retried short writes from byte 0 of the command
instead of resuming at the offset already sent, so a partial write
followed by a watcher-driven retry shoved garbage into the peer pipe.

Track the bytes sent on the request and resume from there.  Convert
the helper to a tri-state (DONE / AGAIN / FATAL) so the watcher can
keep firing on transient short writes and only stop+free on completion
or a hard error.

Also link pending requests into a list on the ctx so worker shutdown
can drain any whose write watcher never fires (e.g. on non-update
workers where the event loop has already broken out), instead of
leaking the up_req allocations.

src/fuzzy_storage.c
src/libserver/fuzzy_storage_internal.h

index c87bd86f29099cb9453a43d0789e359eb497a1bd..ece40aa2dda0b506ab7de3dc22960f555b0aff3a 100644 (file)
@@ -131,6 +131,19 @@ struct fuzzy_tcp_session {
 struct fuzzy_peer_request {
        ev_io io_ev;
        struct fuzzy_peer_cmd cmd;
+       /* ctx is set (and the request linked into ctx->pending_peer_requests) only
+        * while the write watcher is registered, so synchronous-success requests
+        * skip the list bookkeeping. */
+       struct rspamd_fuzzy_storage_ctx *ctx;
+       struct fuzzy_peer_request *prev;
+       struct fuzzy_peer_request *next;
+       gsize written; /* bytes of cmd already sent on the peer pipe */
+};
+
+enum fuzzy_peer_send_status {
+       FUZZY_PEER_SEND_DONE = 0, /* fully sent */
+       FUZZY_PEER_SEND_AGAIN,    /* short write or EAGAIN/EWOULDBLOCK/EINTR */
+       FUZZY_PEER_SEND_FATAL,    /* unrecoverable write error */
 };
 
 struct rspamd_updates_cbdata {
@@ -864,31 +877,62 @@ rspamd_fuzzy_make_reply(struct rspamd_fuzzy_cmd *cmd,
        rspamd_fuzzy_write_reply(session);
 }
 
-static gboolean
+static enum fuzzy_peer_send_status
 fuzzy_peer_try_send(int fd, struct fuzzy_peer_request *up_req)
 {
+       const unsigned char *p = (const unsigned char *) &up_req->cmd + up_req->written;
+       gsize remaining = sizeof(up_req->cmd) - up_req->written;
        gssize r;
 
-       r = write(fd, &up_req->cmd, sizeof(up_req->cmd));
+       r = write(fd, p, remaining);
 
-       if (r != sizeof(up_req->cmd)) {
-               return FALSE;
+       if (r > 0) {
+               up_req->written += (gsize) r;
+               if (up_req->written == sizeof(up_req->cmd)) {
+                       return FUZZY_PEER_SEND_DONE;
+               }
+               /* Short write — kernel buffer probably filled; resume from up_req->written. */
+               return FUZZY_PEER_SEND_AGAIN;
        }
 
-       return TRUE;
+       if (r == 0) {
+               /* write(2) only returns 0 if asked to write 0 bytes; not expected here */
+               return FUZZY_PEER_SEND_AGAIN;
+       }
+
+       if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+               return FUZZY_PEER_SEND_AGAIN;
+       }
+
+       return FUZZY_PEER_SEND_FATAL;
+}
+
+static void
+fuzzy_peer_request_release(struct fuzzy_peer_request *up_req)
+{
+       if (up_req->ctx != NULL) {
+               DL_DELETE(up_req->ctx->pending_peer_requests, up_req);
+       }
+       g_free(up_req);
 }
 
 static void
 fuzzy_peer_send_io(EV_P_ ev_io *w, int revents)
 {
        struct fuzzy_peer_request *up_req = (struct fuzzy_peer_request *) w->data;
+       enum fuzzy_peer_send_status status = fuzzy_peer_try_send(w->fd, up_req);
 
-       if (!fuzzy_peer_try_send(w->fd, up_req)) {
+       if (status == FUZZY_PEER_SEND_AGAIN) {
+               /* Leave the watcher running; libev will fire again when the pipe is writable. */
+               return;
+       }
+
+       if (status == FUZZY_PEER_SEND_FATAL) {
                msg_err("cannot send update request to the peer: %s", strerror(errno));
        }
 
        ev_io_stop(EV_A_ w);
-       g_free(up_req);
+       fuzzy_peer_request_release(up_req);
 }
 
 static void
@@ -1107,13 +1151,23 @@ rspamd_fuzzy_check_callback(struct rspamd_fuzzy_multiflag_result *mf_result, voi
                                           sizeof(up_req->cmd.cmd.shingle.sgl));
                        }
 
-                       if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) {
+                       enum fuzzy_peer_send_status sst = fuzzy_peer_try_send(
+                               session->ctx->peer_fd, up_req);
+
+                       if (sst == FUZZY_PEER_SEND_DONE) {
+                               g_free(up_req);
+                       }
+                       else if (sst == FUZZY_PEER_SEND_AGAIN) {
+                               up_req->ctx = session->ctx;
+                               DL_APPEND(session->ctx->pending_peer_requests, up_req);
                                up_req->io_ev.data = up_req;
                                ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
                                                   session->ctx->peer_fd, EV_WRITE);
                                ev_io_start(session->ctx->event_loop, &up_req->io_ev);
                        }
                        else {
+                               msg_err("cannot send update request to the peer: %s",
+                                               strerror(errno));
                                g_free(up_req);
                        }
                }
@@ -1465,13 +1519,23 @@ rspamd_fuzzy_process_command(struct fuzzy_session *session)
                                        ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
                                        memcpy(ptr, cmd, up_len);
 
-                                       if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) {
+                                       enum fuzzy_peer_send_status sst = fuzzy_peer_try_send(
+                                               session->ctx->peer_fd, up_req);
+
+                                       if (sst == FUZZY_PEER_SEND_DONE) {
+                                               g_free(up_req);
+                                       }
+                                       else if (sst == FUZZY_PEER_SEND_AGAIN) {
+                                               up_req->ctx = session->ctx;
+                                               DL_APPEND(session->ctx->pending_peer_requests, up_req);
                                                up_req->io_ev.data = up_req;
                                                ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
                                                                   session->ctx->peer_fd, EV_WRITE);
                                                ev_io_start(session->ctx->event_loop, &up_req->io_ev);
                                        }
                                        else {
+                                               msg_err("cannot send update request to the peer: %s",
+                                                               strerror(errno));
                                                g_free(up_req);
                                        }
                                }
@@ -3380,6 +3444,22 @@ start_fuzzy(struct rspamd_worker *worker)
                ctx->dynamic_blocked_nets = NULL;
        }
 
+       /* Drain any peer-pipe write requests that were waiting for the write
+        * watcher to fire. The event loop has already broken out of ev_loop on
+        * non-update workers, so these watchers will never fire on their own and
+        * the up_req allocations would leak. */
+       {
+               struct fuzzy_peer_request *pr, *pr_tmp;
+               DL_FOREACH_SAFE(ctx->pending_peer_requests, pr, pr_tmp)
+               {
+                       if (ev_can_stop(&pr->io_ev)) {
+                               ev_io_stop(ctx->event_loop, &pr->io_ev);
+                       }
+                       DL_DELETE(ctx->pending_peer_requests, pr);
+                       g_free(pr);
+               }
+       }
+
        struct rspamd_lua_fuzzy_script *cur, *tmp;
 
        LL_FOREACH_SAFE(ctx->lua_pre_handlers, cur, tmp)
index e3fcca803447c561a95c9789e874fc99fcc02487..3c491895ec0d94ef16c15d335a8cb38373f358b1 100644 (file)
@@ -140,6 +140,8 @@ struct rspamd_lua_fuzzy_script {
        struct rspamd_lua_fuzzy_script *next;
 };
 
+struct fuzzy_peer_request;
+
 struct rspamd_fuzzy_storage_ctx {
        uint64_t magic;
        struct ev_loop *event_loop;
@@ -185,6 +187,10 @@ struct rspamd_fuzzy_storage_ctx {
        unsigned int updates_failed;
        unsigned int updates_maxfail;
        int peer_fd;
+       /* Doubly-linked list of peer requests whose write watcher is registered
+        * but has not yet completed. Walked at worker shutdown so pending
+        * up_reqs don't leak when the event loop stops before they fire. */
+       struct fuzzy_peer_request *pending_peer_requests;
 
        unsigned int leaky_bucket_ttl;
        unsigned int leaky_bucket_mask;