]> git.ipfire.org Git - thirdparty/rspamd.git/commitdiff
[Feature] Implement retransmits for master connection
authorVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 18 Jul 2016 16:50:54 +0000 (17:50 +0100)
committerVsevolod Stakhov <vsevolod@highsecure.ru>
Mon, 18 Jul 2016 16:51:26 +0000 (17:51 +0100)
src/rspamd_proxy.c

index dcf492a24552f8d57df506a3af5c266c4b6c5ed1..a726f8f39925f18818890fa528ee0a6f185fc42d 100644 (file)
@@ -33,6 +33,7 @@
 
 /* Rotate keys each minute by default */
 #define DEFAULT_ROTATION_TIME 60.0
+#define DEFAULT_RETRIES 5
 
 #define msg_err_session(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
         session->pool->tag.tagname, session->pool->tag.uid, \
@@ -115,6 +116,8 @@ struct rspamd_proxy_ctx {
        lua_State *lua_state;
        /* Array of callback functions called on end of scan to compare results */
        GArray *cmp_refs;
+       /* Maximum count for retries */
+       guint max_retries;
 };
 
 enum rspamd_backend_flags {
@@ -149,13 +152,17 @@ struct rspamd_proxy_session {
        gpointer map;
        gpointer shmem_ref;
        struct rspamd_proxy_backend_connection *master_conn;
+       struct rspamd_http_message *client_message;
        GPtrArray *mirror_conns;
        gsize map_len;
        gint client_sock;
        gboolean is_spamc;
+       gint retries;
        ref_entry_t ref;
 };
 
+static gboolean proxy_send_master_message (struct rspamd_proxy_session *session);
+
 static GQuark
 rspamd_proxy_quark (void)
 {
@@ -621,6 +628,7 @@ init_rspamd_proxy (struct rspamd_config *cfg)
        ctx->cfg = cfg;
        ctx->lua_state = cfg->lua_state;
        ctx->cmp_refs = g_array_new (FALSE, FALSE, sizeof (gint));
+       ctx->max_retries = DEFAULT_RETRIES;
 
        rspamd_rcl_register_worker_option (cfg,
                        type,
@@ -675,6 +683,14 @@ init_rspamd_proxy (struct rspamd_config *cfg)
                        0,
                        RSPAMD_CL_FLAG_MULTIPLE,
                        "Compare script to be executed");
+       rspamd_rcl_register_worker_option (cfg,
+                       type,
+                       "timeout",
+                       rspamd_rcl_parse_struct_integer,
+                       ctx,
+                       G_STRUCT_OFFSET (struct rspamd_proxy_ctx, max_retries),
+                       RSPAMD_CL_FLAG_UINT,
+                       "Maximum number of retries for master connection");
 
        return ctx;
 }
@@ -846,6 +862,7 @@ proxy_session_dtor (struct rspamd_proxy_session *session)
 
        g_ptr_array_free (session->mirror_conns, TRUE);
        rspamd_http_message_shmem_unref (session->shmem_ref);
+       rspamd_http_message_unref (session->client_message);
        rspamd_inet_address_destroy (session->client_addr);
        close (session->client_sock);
        rspamd_mempool_delete (session->pool);
@@ -1030,7 +1047,7 @@ proxy_open_mirror_connections (struct rspamd_proxy_session *session)
                        continue;
                }
 
-               msg = rspamd_http_connection_copy_msg (session->client_conn);
+               msg = rspamd_http_connection_copy_msg (session->client_message);
 
                if (msg == NULL) {
                        msg_err_session ("cannot copy message to send to a mirror %s: %s",
@@ -1107,9 +1124,21 @@ proxy_backend_master_error_handler (struct rspamd_http_connection *conn, GError
        msg_info_session ("abnormally closing connection from backend: %s, error: %s",
                rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)),
                err->message);
-       /* Terminate session immediately */
-       proxy_client_write_error (session, err->code, err->message);
+       session->retries ++;
        proxy_backend_close_connection (session->master_conn);
+
+       if (session->ctx->max_retries &&
+                       session->retries > session->ctx->max_retries) {
+               msg_err_session ("cannot connect to upstream, maximum retries "
+                               "has been reached: %d", session->retries);
+               /* Terminate session immediately */
+               proxy_client_write_error (session, err->code, err->message);
+       }
+       else {
+               if (!proxy_send_master_message (session)) {
+                       proxy_client_write_error (session, err->code, err->message);
+               }
+       }
 }
 
 static gint
@@ -1154,6 +1183,110 @@ proxy_backend_master_finish_handler (struct rspamd_http_connection *conn,
        return 0;
 }
 
+static gboolean
+proxy_send_master_message (struct rspamd_proxy_session *session)
+{
+       struct rspamd_http_message *msg;
+       struct rspamd_http_upstream *backend = NULL;
+       const rspamd_ftok_t *host;
+       gchar hostbuf[512];
+
+       host = rspamd_http_message_find_header (session->client_message, "Host");
+
+       if (host == NULL) {
+               backend = session->ctx->default_upstream;
+       }
+       else {
+               rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf)));
+               backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf);
+
+               if (backend == NULL) {
+                       backend = session->ctx->default_upstream;
+               }
+       }
+
+       if (backend == NULL) {
+               /* No backend */
+               msg_err_session ("cannot find upstream for %s", host ? hostbuf : "default");
+               goto err;
+       }
+       else {
+               retry:
+               if (session->ctx->max_retries &&
+                               session->retries > session->ctx->max_retries) {
+                       msg_err_session ("cannot connect to upstream, maximum retries "
+                                       "has been reached: %d", session->retries);
+                       goto err;
+               }
+
+               session->master_conn->up = rspamd_upstream_get (backend->u,
+                               RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
+               session->master_conn->io_tv = &backend->io_tv;
+
+               if (session->master_conn->up == NULL) {
+                       msg_err_session ("cannot select upstream for %s",
+                                       host ? hostbuf : "default");
+                       goto err;
+               }
+
+               session->master_conn->backend_sock = rspamd_inet_address_connect (
+                               rspamd_upstream_addr (session->master_conn->up),
+                               SOCK_STREAM, TRUE);
+
+               if (session->master_conn->backend_sock == -1) {
+                       msg_err_session ("cannot connect upstream: %s(%s)",
+                                       host ? hostbuf : "default",
+                                                       rspamd_inet_address_to_string (rspamd_upstream_addr (
+                                                                       session->master_conn->up)));
+                       rspamd_upstream_fail (session->master_conn->up);
+                       session->retries ++;
+                       goto retry;
+               }
+
+               session->master_conn->backend_conn = rspamd_http_connection_new (
+                               NULL,
+                               proxy_backend_master_error_handler,
+                               proxy_backend_master_finish_handler,
+                               RSPAMD_HTTP_CLIENT_SIMPLE,
+                               RSPAMD_HTTP_CLIENT,
+                               session->ctx->keys_cache,
+                               NULL);
+               session->master_conn->parser_from_ref = backend->parser_from_ref;
+               session->master_conn->parser_to_ref = backend->parser_to_ref;
+
+               msg = rspamd_http_connection_copy_msg (session->client_message);
+               rspamd_http_connection_set_key (session->master_conn->backend_conn,
+                               session->ctx->local_key);
+               msg->peer_key = rspamd_pubkey_ref (backend->key);
+
+               if (backend->local ||
+                               rspamd_inet_address_is_local (
+                                               rspamd_upstream_addr (session->master_conn->up))) {
+                       rspamd_http_connection_write_message_shared (
+                                       session->master_conn->backend_conn,
+                                       msg, NULL, NULL, session->master_conn,
+                                       session->master_conn->backend_sock,
+                                       session->master_conn->io_tv, session->ctx->ev_base);
+               }
+               else {
+                       rspamd_http_connection_write_message (
+                                       session->master_conn->backend_conn,
+                                       msg, NULL, NULL, session->master_conn,
+                                       session->master_conn->backend_sock,
+                                       session->master_conn->io_tv, session->ctx->ev_base);
+               }
+       }
+
+       return TRUE;
+
+err:
+       rspamd_http_connection_steal_msg (session->client_conn);
+       rspamd_http_connection_reset (session->client_conn);
+       proxy_client_write_error (session, 404, "Backend not found");
+
+       return FALSE;
+}
+
 static void
 proxy_client_error_handler (struct rspamd_http_connection *conn, GError *err)
 {
@@ -1171,16 +1304,12 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
        struct rspamd_http_message *msg)
 {
        struct rspamd_proxy_session *session = conn->ud;
-       struct rspamd_http_upstream *backend = NULL;
-       const rspamd_ftok_t *host;
-       gchar hostbuf[512];
 
        if (!session->master_conn) {
                session->master_conn = rspamd_mempool_alloc0 (session->pool,
                                sizeof (*session->master_conn));
                session->master_conn->s = session;
                session->master_conn->name = "master";
-               host = rspamd_http_message_find_header (msg, "Host");
 
                /* Reset spamc legacy */
                if (msg->method >= HTTP_SYMBOLS) {
@@ -1193,88 +1322,20 @@ proxy_client_finish_handler (struct rspamd_http_connection *conn,
                        msg->url = rspamd_fstring_append (msg->url, "/check", strlen ("/check"));
                }
 
-               if (host == NULL) {
-                       backend = session->ctx->default_upstream;
-               }
-               else {
-                       rspamd_strlcpy (hostbuf, host->begin, MIN(host->len + 1, sizeof (hostbuf)));
-                       backend = g_hash_table_lookup (session->ctx->upstreams, hostbuf);
-
-                       if (backend == NULL) {
-                               backend = session->ctx->default_upstream;
-                       }
-               }
-
-               if (backend == NULL) {
-                       /* No backend */
-                       msg_err_session ("cannot find upstream for %s", host ? hostbuf : "default");
+               if (!proxy_check_file (msg, session)) {
                        goto err;
                }
-               else {
-                       session->master_conn->up = rspamd_upstream_get (backend->u,
-                                       RSPAMD_UPSTREAM_ROUND_ROBIN, NULL, 0);
-                       session->master_conn->io_tv = &backend->io_tv;
 
-                       if (session->master_conn->up == NULL) {
-                               msg_err_session ("cannot select upstream for %s", host ? hostbuf : "default");
-                               goto err;
-                       }
-
-                       session->master_conn->backend_sock = rspamd_inet_address_connect (
-                                       rspamd_upstream_addr (session->master_conn->up),
-                                       SOCK_STREAM, TRUE);
-
-                       if (session->master_conn->backend_sock == -1) {
-                               msg_err_session ("cannot connect upstream: %s(%s)",
-                                               host ? hostbuf : "default",
-                                               rspamd_inet_address_to_string (rspamd_upstream_addr (session->master_conn->up)));
-                               rspamd_upstream_fail (session->master_conn->up);
-                               goto err;
-                       }
+               session->client_message = rspamd_http_connection_steal_msg (
+                               session->client_conn);
+               rspamd_http_message_remove_header (msg, "Content-Length");
+               rspamd_http_message_remove_header (msg, "Key");
 
-                       if (!proxy_check_file (msg, session)) {
-                               goto err;
-                       }
+               proxy_open_mirror_connections (session);
+               rspamd_http_connection_reset (session->client_conn);
+               session->shmem_ref = rspamd_http_message_shmem_ref (session->client_message);
 
-                       proxy_open_mirror_connections (session);
-                       rspamd_http_connection_steal_msg (session->client_conn);
-                       rspamd_http_message_remove_header (msg, "Content-Length");
-                       rspamd_http_message_remove_header (msg, "Key");
-                       rspamd_http_connection_reset (session->client_conn);
-                       session->shmem_ref = rspamd_http_message_shmem_ref (msg);
-
-                       session->master_conn->backend_conn = rspamd_http_connection_new (
-                                       NULL,
-                                       proxy_backend_master_error_handler,
-                                       proxy_backend_master_finish_handler,
-                                       RSPAMD_HTTP_CLIENT_SIMPLE,
-                                       RSPAMD_HTTP_CLIENT,
-                                       session->ctx->keys_cache,
-                                       NULL);
-                       session->master_conn->parser_from_ref = backend->parser_from_ref;
-                       session->master_conn->parser_to_ref = backend->parser_to_ref;
-
-                       rspamd_http_connection_set_key (session->master_conn->backend_conn,
-                                       session->ctx->local_key);
-                       msg->peer_key = rspamd_pubkey_ref (backend->key);
-
-                       if (backend->local ||
-                                       rspamd_inet_address_is_local (
-                                                       rspamd_upstream_addr (session->master_conn->up))) {
-                               rspamd_http_connection_write_message_shared (
-                                               session->master_conn->backend_conn,
-                                               msg, NULL, NULL, session->master_conn,
-                                               session->master_conn->backend_sock,
-                                               session->master_conn->io_tv, session->ctx->ev_base);
-                       }
-                       else {
-                               rspamd_http_connection_write_message (
-                                               session->master_conn->backend_conn,
-                                               msg, NULL, NULL, session->master_conn,
-                                               session->master_conn->backend_sock,
-                                               session->master_conn->io_tv, session->ctx->ev_base);
-                       }
-               }
+               proxy_send_master_message (session);
        }
        else {
                msg_info_session ("finished master connection");