struct fuzzy_peer_request {
ev_io io_ev;
struct fuzzy_peer_cmd cmd;
+ /* ctx is set (and the request linked into ctx->pending_peer_requests) only
+ * while the write watcher is registered, so synchronous-success requests
+ * skip the list bookkeeping. */
+ struct rspamd_fuzzy_storage_ctx *ctx;
+ struct fuzzy_peer_request *prev;
+ struct fuzzy_peer_request *next;
+ gsize written; /* bytes of cmd already sent on the peer pipe */
+};
+
+enum fuzzy_peer_send_status {
+ FUZZY_PEER_SEND_DONE = 0, /* fully sent */
+ FUZZY_PEER_SEND_AGAIN, /* short write or EAGAIN/EWOULDBLOCK/EINTR */
+ FUZZY_PEER_SEND_FATAL, /* unrecoverable write error */
};
struct rspamd_updates_cbdata {
rspamd_fuzzy_write_reply(session);
}
-static gboolean
+static enum fuzzy_peer_send_status
fuzzy_peer_try_send(int fd, struct fuzzy_peer_request *up_req)
{
+ const unsigned char *p = (const unsigned char *) &up_req->cmd + up_req->written;
+ gsize remaining = sizeof(up_req->cmd) - up_req->written;
gssize r;
- r = write(fd, &up_req->cmd, sizeof(up_req->cmd));
+ r = write(fd, p, remaining);
- if (r != sizeof(up_req->cmd)) {
- return FALSE;
+ if (r > 0) {
+ up_req->written += (gsize) r;
+ if (up_req->written == sizeof(up_req->cmd)) {
+ return FUZZY_PEER_SEND_DONE;
+ }
+ /* Short write — kernel buffer probably filled; resume from up_req->written. */
+ return FUZZY_PEER_SEND_AGAIN;
}
- return TRUE;
+ if (r == 0) {
+ /* write(2) only returns 0 if asked to write 0 bytes; not expected here */
+ return FUZZY_PEER_SEND_AGAIN;
+ }
+
+ if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
+ return FUZZY_PEER_SEND_AGAIN;
+ }
+
+ return FUZZY_PEER_SEND_FATAL;
+}
+
+static void
+fuzzy_peer_request_release(struct fuzzy_peer_request *up_req)
+{
+ if (up_req->ctx != NULL) {
+ DL_DELETE(up_req->ctx->pending_peer_requests, up_req);
+ }
+ g_free(up_req);
}
static void
fuzzy_peer_send_io(EV_P_ ev_io *w, int revents)
{
struct fuzzy_peer_request *up_req = (struct fuzzy_peer_request *) w->data;
+ enum fuzzy_peer_send_status status = fuzzy_peer_try_send(w->fd, up_req);
- if (!fuzzy_peer_try_send(w->fd, up_req)) {
+ if (status == FUZZY_PEER_SEND_AGAIN) {
+ /* Leave the watcher running; libev will fire again when the pipe is writable. */
+ return;
+ }
+
+ if (status == FUZZY_PEER_SEND_FATAL) {
msg_err("cannot send update request to the peer: %s", strerror(errno));
}
ev_io_stop(EV_A_ w);
- g_free(up_req);
+ fuzzy_peer_request_release(up_req);
}
static void
sizeof(up_req->cmd.cmd.shingle.sgl));
}
- if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) {
+ enum fuzzy_peer_send_status sst = fuzzy_peer_try_send(
+ session->ctx->peer_fd, up_req);
+
+ if (sst == FUZZY_PEER_SEND_DONE) {
+ g_free(up_req);
+ }
+ else if (sst == FUZZY_PEER_SEND_AGAIN) {
+ up_req->ctx = session->ctx;
+ DL_APPEND(session->ctx->pending_peer_requests, up_req);
up_req->io_ev.data = up_req;
ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
session->ctx->peer_fd, EV_WRITE);
ev_io_start(session->ctx->event_loop, &up_req->io_ev);
}
else {
+ msg_err("cannot send update request to the peer: %s",
+ strerror(errno));
g_free(up_req);
}
}
ptr = is_shingle ? (gpointer) &up_req->cmd.cmd.shingle : (gpointer) &up_req->cmd.cmd.normal;
memcpy(ptr, cmd, up_len);
- if (!fuzzy_peer_try_send(session->ctx->peer_fd, up_req)) {
+ enum fuzzy_peer_send_status sst = fuzzy_peer_try_send(
+ session->ctx->peer_fd, up_req);
+
+ if (sst == FUZZY_PEER_SEND_DONE) {
+ g_free(up_req);
+ }
+ else if (sst == FUZZY_PEER_SEND_AGAIN) {
+ up_req->ctx = session->ctx;
+ DL_APPEND(session->ctx->pending_peer_requests, up_req);
up_req->io_ev.data = up_req;
ev_io_init(&up_req->io_ev, fuzzy_peer_send_io,
session->ctx->peer_fd, EV_WRITE);
ev_io_start(session->ctx->event_loop, &up_req->io_ev);
}
else {
+ msg_err("cannot send update request to the peer: %s",
+ strerror(errno));
g_free(up_req);
}
}
ctx->dynamic_blocked_nets = NULL;
}
+ /* Drain any peer-pipe write requests that were waiting for the write
+ * watcher to fire. The event loop has already broken out of ev_loop on
+ * non-update workers, so these watchers will never fire on their own and
+ * the up_req allocations would leak. */
+ {
+ struct fuzzy_peer_request *pr, *pr_tmp;
+ DL_FOREACH_SAFE(ctx->pending_peer_requests, pr, pr_tmp)
+ {
+ if (ev_can_stop(&pr->io_ev)) {
+ ev_io_stop(ctx->event_loop, &pr->io_ev);
+ }
+ DL_DELETE(ctx->pending_peer_requests, pr);
+ g_free(pr);
+ }
+ }
+
struct rspamd_lua_fuzzy_script *cur, *tmp;
LL_FOREACH_SAFE(ctx->lua_pre_handlers, cur, tmp)