]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: proto: extend connection thread rebind API
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 4 Jul 2024 13:23:13 +0000 (15:23 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 4 Jul 2024 14:33:21 +0000 (16:33 +0200)
MINOR: listener: define callback for accept queue push

Extend API for connection thread rebind API by replacing single callback
set_affinity by three different ones. Each one of them is used at a
different stage of the operation :

* set_affinity1 is used similarly to previous set_affinity

* set_affinity2 is called directly from accept_queue_push_mp() when an
  entry has been found in accept ring. This operation cannot fail.

* reset_affinity is called after set_affinity1 in case of failure from
  accept_queue_push_mp() due to no space left in accept ring. This is
  necessary for protocols which must reconfigure resources before
  fallback on the current tid.

This patch does not have any functional changes. However, it will be
required to fix crashes for QUIC connections when accept queue ring is
full. As such, it must be backported with it.

include/haproxy/protocol-t.h
include/haproxy/quic_conn.h
src/listener.c
src/proto_quic.c
src/proto_rhttp.c
src/quic_conn.c

index 0c5bd9e1cbc7ec3332ce8f3d6039f05bbbdeba11..a37c9256afca07084e90e730f162af86e4c19f7c 100644 (file)
@@ -119,7 +119,14 @@ struct protocol {
        void (*ignore_events)(struct connection *conn, int event_type);  /* unsubscribe from socket events */
        int (*get_src)(struct connection *conn, struct sockaddr *, socklen_t); /* retrieve connection's source address; -1=fail */
        int (*get_dst)(struct connection *conn, struct sockaddr *, socklen_t); /* retrieve connection's dest address; -1=fail */
-       int (*set_affinity)(struct connection *conn, int new_tid);
+
+       /* functions related to thread affinity update */
+       /* prepare rebind connection on a new thread, may fail */
+       int (*set_affinity1)(struct connection *conn, int new_tid);
+       /* complete connection thread rebinding, no error possible */
+       void (*set_affinity2)(struct connection *conn);
+       /* cancel connection thread rebinding */
+       void (*reset_affinity)(struct connection *conn);
 
        /* functions acting on the receiver */
        int (*rx_suspend)(struct receiver *rx);         /* temporarily suspend this receiver for a soft restart */
index a3de4ab114acf04295791fc10a102003836f4c7d..c7005c06cd0e5ed2286e3f1fc978f165bf0748d3 100644 (file)
@@ -177,7 +177,7 @@ void qc_kill_conn(struct quic_conn *qc);
 int qc_parse_hd_form(struct quic_rx_packet *pkt,
                      unsigned char **buf, const unsigned char *end);
 
-int qc_set_tid_affinity(struct quic_conn *qc, uint new_tid, struct listener *new_li);
+int qc_set_tid_affinity1(struct quic_conn *qc, uint new_tid, struct listener *new_li);
 void qc_finalize_affinity_rebind(struct quic_conn *qc);
 int qc_handle_conn_migration(struct quic_conn *qc,
                              const struct sockaddr_storage *peer_addr,
index a3485580b389314dab7671e20f978a3550e8d8a4..5d07a6650af55e6abff61219f0250e1f7e6e5615 100644 (file)
@@ -105,11 +105,14 @@ struct connection *accept_queue_pop_sc(struct accept_queue_ring *ring)
 }
 
 
-/* tries to push a new accepted connection <conn> into ring <ring>. Returns
- * non-zero if it succeeds, or zero if the ring is full. Supports multiple
- * producers.
+/* Tries to push a new accepted connection <conn> into ring <ring>.
+ * <accept_push_cb> is called if not NULL just prior to the push operation.
+ *
+ * Returns non-zero if it succeeds, or zero if the ring is full. Supports
+ * multiple producers.
  */
-int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn)
+int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn,
+                         void (*accept_push_cb)(struct connection *))
 {
        unsigned int pos, next;
        uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx);  /* (head << 16) + tail */
@@ -124,6 +127,9 @@ int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn
                next |= (idx & 0xffff0000U);
        } while (unlikely(!_HA_ATOMIC_CAS(&ring->idx, &idx, next) && __ha_cpu_relax()));
 
+       if (accept_push_cb)
+               accept_push_cb(conn);
+
        ring->entry[pos] = conn;
        __ha_barrier_store();
        return 1;
@@ -1013,6 +1019,7 @@ static inline int listener_uses_maxconn(const struct listener *l)
  */
 void listener_accept(struct listener *l)
 {
+       void (*li_set_affinity2)(struct connection *);
        struct connection *cli_conn;
        struct proxy *p;
        unsigned int max_accept;
@@ -1023,6 +1030,7 @@ void listener_accept(struct listener *l)
        int ret;
 
        p = l->bind_conf->frontend;
+       li_set_affinity2 = l->rx.proto ? l->rx.proto->set_affinity2 : NULL;
 
        /* if l->bind_conf->maxaccept is -1, then max_accept is UINT_MAX. It is
         * not really illimited, but it is probably enough.
@@ -1461,8 +1469,8 @@ void listener_accept(struct listener *l)
                         * reservation in the target ring.
                         */
 
-                       if (l->rx.proto && l->rx.proto->set_affinity) {
-                               if (l->rx.proto->set_affinity(cli_conn, t)) {
+                       if (l->rx.proto && l->rx.proto->set_affinity1) {
+                               if (l->rx.proto->set_affinity1(cli_conn, t)) {
                                        /* Failed migration, stay on the same thread. */
                                        goto local_accept;
                                }
@@ -1475,15 +1483,19 @@ void listener_accept(struct listener *l)
                         * when processing this loop.
                         */
                        ring = &accept_queue_rings[t];
-                       if (accept_queue_push_mp(ring, cli_conn)) {
+                       if (accept_queue_push_mp(ring, cli_conn, li_set_affinity2)) {
                                _HA_ATOMIC_INC(&activity[t].accq_pushed);
                                tasklet_wakeup(ring->tasklet);
+
                                continue;
                        }
                        /* If the ring is full we do a synchronous accept on
                         * the local thread here.
                         */
                        _HA_ATOMIC_INC(&activity[t].accq_full);
+
+                       if (l->rx.proto && l->rx.proto->reset_affinity)
+                               l->rx.proto->reset_affinity(cli_conn);
                }
 #endif // USE_THREAD
 
index d03123e1e853b5503691c13f198686ef5906dd17..4b8f0c20f00d4bc884ed40a8f049e47ddcf354d2 100644 (file)
@@ -61,7 +61,7 @@ static int quic_bind_listener(struct listener *listener, char *errmsg, int errle
 static int quic_connect_server(struct connection *conn, int flags);
 static void quic_enable_listener(struct listener *listener);
 static void quic_disable_listener(struct listener *listener);
-static int quic_set_affinity(struct connection *conn, int new_tid);
+static int quic_set_affinity1(struct connection *conn, int new_tid);
 
 /* Note: must not be declared <const> as its list will be overwritten */
 struct protocol proto_quic4 = {
@@ -80,7 +80,7 @@ struct protocol proto_quic4 = {
        .get_src        = quic_sock_get_src,
        .get_dst        = quic_sock_get_dst,
        .connect        = quic_connect_server,
-       .set_affinity   = quic_set_affinity,
+       .set_affinity1  = quic_set_affinity1,
 
        /* binding layer */
        .rx_suspend     = udp_suspend_receiver,
@@ -124,7 +124,7 @@ struct protocol proto_quic6 = {
        .get_src        = quic_sock_get_src,
        .get_dst        = quic_sock_get_dst,
        .connect        = quic_connect_server,
-       .set_affinity   = quic_set_affinity,
+       .set_affinity1  = quic_set_affinity1,
 
        /* binding layer */
        .rx_suspend     = udp_suspend_receiver,
@@ -668,10 +668,10 @@ static void quic_disable_listener(struct listener *l)
  * target is a listener, and the caller is responsible for guaranteeing that
  * the listener assigned to the connection is bound to the requested thread.
  */
-static int quic_set_affinity(struct connection *conn, int new_tid)
+static int quic_set_affinity1(struct connection *conn, int new_tid)
 {
        struct quic_conn *qc = conn->handle.qc;
-       return qc_set_tid_affinity(qc, new_tid, objt_listener(conn->target));
+       return qc_set_tid_affinity1(qc, new_tid, objt_listener(conn->target));
 }
 
 static int quic_alloc_dghdlrs(void)
index a6fc955749a271d5247b5e10866ef597c7be7919..abd28d0cf0a75c4f2c6954057d35fa3dea0c18bb 100644 (file)
@@ -39,7 +39,7 @@ struct protocol proto_rhttp = {
        .unbind      = rhttp_unbind_receiver,
        .resume      = default_resume_listener,
        .accept_conn = rhttp_accept_conn,
-       .set_affinity = rhttp_set_affinity,
+       .set_affinity1 = rhttp_set_affinity,
 
        /* address family */
        .fam  = &proto_fam_rhttp,
index 46c74d9436a94a6a5a44176340f264d3a7860cff..24e52d0363212d8f522f2ecc3b5fc6dfbcccdbc6 100644 (file)
@@ -1730,7 +1730,7 @@ void qc_notify_err(struct quic_conn *qc)
  *
  * Returns 0 on success else non-zero.
  */
-int qc_set_tid_affinity(struct quic_conn *qc, uint new_tid, struct listener *new_li)
+int qc_set_tid_affinity1(struct quic_conn *qc, uint new_tid, struct listener *new_li)
 {
        struct task *t1 = NULL, *t2 = NULL;
        struct tasklet *t3 = NULL;