]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Add keep-alive support 5456/head
authorVsevolod Stakhov <vsevolod@rspamd.com>
Wed, 7 May 2025 13:27:32 +0000 (14:27 +0100)
committerVsevolod Stakhov <vsevolod@rspamd.com>
Wed, 7 May 2025 13:27:32 +0000 (14:27 +0100)
src/libserver/http/http_connection.c
src/rspamd_proxy.c

index baf37a38568c721e62807a28cf6f4128eb87a9c9..d94f9835e9eb541ef433b67cff092481ba6ad56e 100644 (file)
@@ -2633,4 +2633,4 @@ void rspamd_http_connection_disable_encryption(struct rspamd_http_connection *co
                priv->peer_key = NULL;
                priv->flags &= ~RSPAMD_HTTP_CONN_FLAG_ENCRYPTED;
        }
-}
\ No newline at end of file
+}
index c8c3d5a71150e5b579a07766b478f0d49c86c4eb..8e69298e63eaa4e13a538de99edc6b8cdd1dc557 100644 (file)
@@ -97,6 +97,7 @@ struct rspamd_http_upstream {
        gboolean self_scan;
        gboolean compress;
        gboolean ssl;
+       gboolean keepalive; /* Whether to use keepalive for this upstream */
        ucl_object_t *extra_headers;
 };
 
@@ -112,6 +113,7 @@ struct rspamd_http_mirror {
        gboolean local;
        gboolean compress;
        gboolean ssl;
+       gboolean keepalive; /* Whether to use keepalive for this mirror */
        ucl_object_t *extra_headers;
 };
 
@@ -218,6 +220,7 @@ struct rspamd_proxy_session {
        enum rspamd_proxy_legacy_support legacy_support;
        int retries;
        ref_entry_t ref;
+       gboolean use_keepalive; /* Whether to use keepalive for this session */
 };
 
 static gboolean proxy_send_master_message(struct rspamd_proxy_session *session);
@@ -429,9 +432,14 @@ rspamd_proxy_parse_upstream(rspamd_mempool_t *pool,
                up->ssl = TRUE;
        }
 
-       elt = ucl_object_lookup(obj, "ssl");
+       elt = ucl_object_lookup_any(obj, "keepalive", "keep_alive", NULL);
        if (elt && ucl_object_toboolean(elt)) {
-               up->ssl = TRUE;
+               up->keepalive = TRUE;
+       }
+
+       elt = ucl_object_lookup_any(obj, "keepalive", "keep_alive", NULL);
+       if (elt && ucl_object_toboolean(elt)) {
+               up->keepalive = TRUE;
        }
 
        elt = ucl_object_lookup(obj, "hosts");
@@ -935,7 +943,11 @@ proxy_backend_close_connection(struct rspamd_proxy_backend_connection *conn)
                if (conn->backend_conn) {
                        rspamd_http_connection_reset(conn->backend_conn);
                        rspamd_http_connection_unref(conn->backend_conn);
-                       close(conn->backend_sock);
+
+                       if (!(conn->s && conn->s->use_keepalive)) {
+                               /* Only close socket if we're not using keepalive */
+                               close(conn->backend_sock);
+                       }
                }
 
                conn->flags |= RSPAMD_BACKEND_CLOSED;
@@ -1414,6 +1426,8 @@ proxy_backend_mirror_finish_handler(struct rspamd_http_connection *conn,
        struct rspamd_proxy_backend_connection *bk_conn = conn->ud;
        struct rspamd_proxy_session *session;
        const rspamd_ftok_t *orig_ct;
+       const rspamd_ftok_t *conn_hdr;
+       gboolean is_keepalive = FALSE;
 
        session = bk_conn->s;
 
@@ -1433,6 +1447,36 @@ proxy_backend_mirror_finish_handler(struct rspamd_http_connection *conn,
                                         bk_conn->name, msg->code);
        rspamd_upstream_ok(bk_conn->up);
 
+       /* Check if we can use keepalive */
+       conn_hdr = rspamd_http_message_find_header(msg, "Connection");
+       if (conn_hdr) {
+               if (rspamd_substring_search_caseless(conn_hdr->begin, conn_hdr->len,
+                                                                                        "keep-alive", 10) != -1) {
+                       is_keepalive = TRUE;
+               }
+       }
+
+       if (is_keepalive && session->use_keepalive &&
+               bk_conn->up && session->ctx->http_ctx) {
+               /* Store connection in keepalive pool */
+               const char *up_name = rspamd_upstream_name(bk_conn->up);
+               if (up_name) {
+                       rspamd_http_context_prepare_keepalive(session->ctx->http_ctx,
+                                                                                                 conn, rspamd_upstream_addr_cur(bk_conn->up),
+                                                                                                 up_name, FALSE);
+                       rspamd_http_context_push_keepalive(session->ctx->http_ctx,
+                                                                                          conn, msg, session->ctx->event_loop);
+
+                       msg_debug_session("pushed mirror connection to %s to keepalive pool",
+                                                         bk_conn->name);
+
+                       /* Mark connection as closed without actually closing it */
+                       bk_conn->flags |= RSPAMD_BACKEND_CLOSED;
+                       REF_RELEASE(bk_conn->s);
+                       return 0;
+               }
+       }
+
        proxy_backend_close_connection(bk_conn);
        REF_RELEASE(bk_conn->s);
 
@@ -1448,6 +1492,7 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session)
        struct rspamd_proxy_backend_connection *bk_conn;
        struct rspamd_http_message *msg;
        GError *err = NULL;
+       const rspamd_inet_addr_t *keepalive_addr;
 
        coin = rspamd_random_double();
 
@@ -1459,6 +1504,153 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session)
                        continue;
                }
 
+               /* Check if we can use keepalive for this mirror */
+               if (m->keepalive && session->ctx->http_ctx) {
+                       const char *up_name = NULL;
+                       unsigned int port = 0;
+
+                       /* Try to find a keepalive connection */
+                       if (m->u) {
+                               struct upstream *up = rspamd_upstream_get(m->u,
+                                                                                                                 RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
+                               if (up) {
+                                       up_name = rspamd_upstream_name(up);
+                                       port = rspamd_inet_address_get_port(rspamd_upstream_addr_cur(up));
+                               }
+                       }
+
+                       if (up_name) {
+                               keepalive_addr = rspamd_http_context_has_keepalive(
+                                       session->ctx->http_ctx, up_name, port, m->ssl);
+
+                               if (keepalive_addr) {
+                                       /* We found a keepalive connection, use it */
+                                       struct rspamd_http_connection *conn;
+
+                                       conn = rspamd_http_context_check_keepalive(
+                                               session->ctx->http_ctx,
+                                               (rspamd_inet_addr_t *) keepalive_addr,
+                                               up_name,
+                                               m->ssl);
+
+                                       if (conn) {
+                                               /* We have a keepalive connection, set it up */
+                                               bk_conn = rspamd_mempool_alloc0(session->pool, sizeof(*bk_conn));
+                                               bk_conn->s = session;
+                                               bk_conn->name = m->name;
+                                               bk_conn->timeout = m->timeout;
+                                               bk_conn->parser_from_ref = m->parser_from_ref;
+                                               bk_conn->parser_to_ref = m->parser_to_ref;
+                                               bk_conn->backend_conn = conn;
+                                               bk_conn->backend_sock = conn->fd;
+
+                                               msg = rspamd_http_connection_copy_msg(session->client_message, &err);
+
+                                               if (msg == NULL) {
+                                                       msg_err_session("cannot copy message to send to a mirror %s: %e",
+                                                                                       m->name, err);
+                                                       if (err) {
+                                                               g_error_free(err);
+                                                       }
+                                                       continue;
+                                               }
+
+                                               if (up_name) {
+                                                       rspamd_http_message_remove_header(msg, "Host");
+                                                       rspamd_http_message_add_header(msg, "Host", up_name);
+                                               }
+                                               rspamd_http_message_add_header(msg, "Connection", "keep-alive");
+
+                                               if (msg->url->len == 0) {
+                                                       msg->url = rspamd_fstring_append(msg->url, "/check", strlen("/check"));
+                                               }
+
+                                               if (m->settings_id != NULL) {
+                                                       rspamd_http_message_remove_header(msg, "Settings-ID");
+                                                       rspamd_http_message_add_header(msg, "Settings-ID", m->settings_id);
+                                               }
+
+                                               /* Add extra headers if specified */
+                                               if (m->extra_headers != NULL) {
+                                                       ucl_object_iter_t it = NULL;
+                                                       const ucl_object_t *cur;
+                                                       const char *key, *value;
+
+                                                       while ((cur = ucl_object_iterate(m->extra_headers, &it, true)) != NULL) {
+                                                               key = ucl_object_key(cur);
+                                                               value = ucl_object_tostring(cur);
+
+                                                               if (key != NULL && value != NULL) {
+                                                                       rspamd_http_message_remove_header(msg, key);
+                                                                       rspamd_http_message_add_header(msg, key, value);
+                                                               }
+                                                       }
+                                               }
+
+                                               /* Set handlers for the connection */
+                                               conn->error_handler = proxy_backend_mirror_error_handler;
+                                               conn->finish_handler = proxy_backend_mirror_finish_handler;
+                                               conn->ud = bk_conn;
+
+                                               if (m->key) {
+                                                       msg->peer_key = rspamd_pubkey_ref(m->key);
+                                               }
+
+                                               if (m->local || rspamd_inet_address_is_local(keepalive_addr)) {
+                                                       if (session->fname) {
+                                                               rspamd_http_message_add_header(msg, "File", session->fname);
+                                                       }
+
+                                                       msg->method = HTTP_GET;
+                                                       rspamd_http_connection_write_message_shared(conn,
+                                                                                                                                               msg, up_name,
+                                                                                                                                               NULL, bk_conn,
+                                                                                                                                               bk_conn->timeout);
+                                               }
+                                               else {
+                                                       if (session->fname) {
+                                                               msg->flags &= ~RSPAMD_HTTP_FLAG_SHMEM;
+                                                               rspamd_http_message_set_body(msg, session->map, session->map_len);
+                                                       }
+
+                                                       msg->method = HTTP_POST;
+
+                                                       if (m->compress) {
+                                                               proxy_request_compress(msg);
+
+                                                               if (session->client_milter_conn) {
+                                                                       rspamd_http_message_add_header(msg, "Content-Type",
+                                                                                                                                  "application/octet-stream");
+                                                               }
+                                                       }
+                                                       else {
+                                                               if (session->client_milter_conn) {
+                                                                       rspamd_http_message_add_header(msg, "Content-Type",
+                                                                                                                                  "text/plain");
+                                                               }
+                                                       }
+
+                                                       rspamd_http_connection_write_message(conn,
+                                                                                                                                msg, up_name, NULL, bk_conn,
+                                                                                                                                bk_conn->timeout);
+                                               }
+
+                                               g_ptr_array_add(session->mirror_conns, bk_conn);
+                                               REF_RETAIN(session);
+                                               msg_info_session("send request to %s (using keepalive)", m->name);
+
+                                               /*
+                                                * We have found the existing keepalive connection, so we can
+                                                * process another mirror
+                                                */
+                                               continue;
+                                       }
+                               }
+                       }
+               }
+
+               /* Non-keepalive connection */
+
                bk_conn = rspamd_mempool_alloc0(session->pool,
                                                                                sizeof(*bk_conn));
                bk_conn->s = session;
@@ -1502,7 +1694,8 @@ proxy_open_mirror_connections(struct rspamd_proxy_session *session)
                        rspamd_http_message_remove_header(msg, "Host");
                        rspamd_http_message_add_header(msg, "Host", up_name);
                }
-               rspamd_http_message_add_header(msg, "Connection", "close");
+               rspamd_http_message_add_header(msg, "Connection",
+                                                                          m->keepalive ? "keep-alive" : "close");
 
                if (msg->url->len == 0) {
                        msg->url = rspamd_fstring_append(msg->url, "/check", strlen("/check"));
@@ -1702,7 +1895,9 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
        struct rspamd_proxy_session *session, *nsession;
        rspamd_fstring_t *reply;
        const rspamd_ftok_t *orig_ct;
+       const rspamd_ftok_t *conn_hdr;
        goffset body_offset = -1;
+       gboolean is_keepalive = FALSE;
 
        session = bk_conn->s;
        rspamd_http_connection_steal_msg(session->master_conn->backend_conn);
@@ -1718,6 +1913,16 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
        rspamd_http_message_remove_header(msg, "Server");
        rspamd_http_message_remove_header(msg, "Key");
        orig_ct = rspamd_http_message_find_header(msg, "Content-Type");
+
+       /* Check if we can use keepalive */
+       conn_hdr = rspamd_http_message_find_header(msg, "Connection");
+       if (conn_hdr) {
+               if (rspamd_substring_search_caseless(conn_hdr->begin, conn_hdr->len,
+                                                                                        "keep-alive", 10) != -1) {
+                       is_keepalive = TRUE;
+               }
+       }
+
        rspamd_http_connection_reset(session->master_conn->backend_conn);
 
        if (!proxy_backend_parse_results(session, bk_conn, session->ctx->lua_state,
@@ -1750,6 +1955,22 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
 
        rspamd_upstream_ok(bk_conn->up);
 
+       /* Handle keepalive for master connection */
+       if (is_keepalive && session->use_keepalive &&
+               bk_conn->up && session->ctx->http_ctx) {
+               /* Store connection in keepalive pool */
+               const char *up_name = rspamd_upstream_name(bk_conn->up);
+               if (up_name) {
+                       rspamd_http_context_prepare_keepalive(session->ctx->http_ctx,
+                                                                                                 conn, rspamd_upstream_addr_cur(bk_conn->up),
+                                                                                                 up_name, FALSE);
+
+                       /* We'll push to keepalive pool after we're done with the response */
+                       msg_debug_session("will push master connection to %s to keepalive pool",
+                                                         up_name);
+               }
+       }
+
        if (session->client_milter_conn) {
                nsession = proxy_session_refresh(session);
 
@@ -1763,6 +1984,20 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
                        rspamd_milter_send_task_results(nsession->client_milter_conn,
                                                                                        session->master_conn->results, NULL, 0);
                }
+
+               /* Push to keepalive if needed */
+               if (is_keepalive && session->use_keepalive &&
+                       bk_conn->up && session->ctx->http_ctx) {
+                       const char *up_name = rspamd_upstream_name(bk_conn->up);
+                       if (up_name) {
+                               rspamd_http_context_push_keepalive(session->ctx->http_ctx,
+                                                                                                  conn, msg, session->ctx->event_loop);
+
+                               /* Mark connection as closed without actually closing it */
+                               bk_conn->flags |= RSPAMD_BACKEND_CLOSED;
+                       }
+               }
+
                REF_RELEASE(session);
                rspamd_http_message_free(msg);
        }
@@ -1778,6 +2013,19 @@ proxy_backend_master_finish_handler(struct rspamd_http_connection *conn,
                rspamd_http_connection_write_message(session->client_conn,
                                                                                         msg, NULL, passed_ct, session,
                                                                                         bk_conn->timeout);
+
+               /* Push to keepalive if needed */
+               if (is_keepalive && session->use_keepalive &&
+                       bk_conn->up && session->ctx->http_ctx) {
+                       const char *up_name = rspamd_upstream_name(bk_conn->up);
+                       if (up_name) {
+                               rspamd_http_context_push_keepalive(session->ctx->http_ctx,
+                                                                                                  conn, msg, session->ctx->event_loop);
+
+                               /* Mark connection as closed without actually closing it */
+                               bk_conn->flags |= RSPAMD_BACKEND_CLOSED;
+                       }
+               }
        }
 
        return 0;
@@ -2037,6 +2285,9 @@ proxy_send_master_message(struct rspamd_proxy_session *session)
        /* Remove the original `Connection` header */
        rspamd_http_message_remove_header(session->client_message, "Connection");
 
+       /* Set keepalive flag based on backend configuration */
+       session->use_keepalive = backend ? backend->keepalive : FALSE;
+
        if (backend == NULL) {
                /* No backend */
                msg_err_session("cannot find upstream for %s", host ? hostbuf : "default");
@@ -2118,7 +2369,8 @@ proxy_send_master_message(struct rspamd_proxy_session *session)
                if (up_name) {
                        rspamd_http_message_add_header(msg, "Host", up_name);
                }
-               rspamd_http_message_add_header(msg, "Connection", "close");
+               rspamd_http_message_add_header(msg, "Connection",
+                                                                          backend->keepalive ? "keep-alive" : "close");
 
                unsigned int http_opts = RSPAMD_HTTP_CLIENT_SIMPLE;