void (*ignore_events)(struct connection *conn, int event_type); /* unsubscribe from socket events */
int (*get_src)(struct connection *conn, struct sockaddr *, socklen_t); /* retrieve connection's source address; -1=fail */
int (*get_dst)(struct connection *conn, struct sockaddr *, socklen_t); /* retrieve connection's dest address; -1=fail */
- int (*set_affinity)(struct connection *conn, int new_tid);
+
+ /* functions related to thread affinity update */
+ /* prepare rebind connection on a new thread, may fail */
+ int (*set_affinity1)(struct connection *conn, int new_tid);
+ /* complete connection thread rebinding, no error possible */
+ void (*set_affinity2)(struct connection *conn);
+ /* cancel connection thread rebinding */
+ void (*reset_affinity)(struct connection *conn);
/* functions acting on the receiver */
int (*rx_suspend)(struct receiver *rx); /* temporarily suspend this receiver for a soft restart */
int qc_parse_hd_form(struct quic_rx_packet *pkt,
unsigned char **buf, const unsigned char *end);
-int qc_set_tid_affinity(struct quic_conn *qc, uint new_tid, struct listener *new_li);
+int qc_set_tid_affinity1(struct quic_conn *qc, uint new_tid, struct listener *new_li);
void qc_finalize_affinity_rebind(struct quic_conn *qc);
int qc_handle_conn_migration(struct quic_conn *qc,
const struct sockaddr_storage *peer_addr,
}
-/* tries to push a new accepted connection <conn> into ring <ring>. Returns
- * non-zero if it succeeds, or zero if the ring is full. Supports multiple
- * producers.
+/* Tries to push a new accepted connection <conn> into ring <ring>.
+ * <accept_push_cb> is called if not NULL just prior to the push operation.
+ *
+ * Returns non-zero if it succeeds, or zero if the ring is full. Supports
+ * multiple producers.
*/
-int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn)
+int accept_queue_push_mp(struct accept_queue_ring *ring, struct connection *conn,
+ void (*accept_push_cb)(struct connection *))
{
unsigned int pos, next;
uint32_t idx = _HA_ATOMIC_LOAD(&ring->idx); /* (head << 16) + tail */
next |= (idx & 0xffff0000U);
} while (unlikely(!_HA_ATOMIC_CAS(&ring->idx, &idx, next) && __ha_cpu_relax()));
+ if (accept_push_cb)
+ accept_push_cb(conn);
+
ring->entry[pos] = conn;
__ha_barrier_store();
return 1;
*/
void listener_accept(struct listener *l)
{
+ void (*li_set_affinity2)(struct connection *);
struct connection *cli_conn;
struct proxy *p;
unsigned int max_accept;
int ret;
p = l->bind_conf->frontend;
+ li_set_affinity2 = l->rx.proto ? l->rx.proto->set_affinity2 : NULL;
/* if l->bind_conf->maxaccept is -1, then max_accept is UINT_MAX. It is
* not really illimited, but it is probably enough.
* reservation in the target ring.
*/
- if (l->rx.proto && l->rx.proto->set_affinity) {
- if (l->rx.proto->set_affinity(cli_conn, t)) {
+ if (l->rx.proto && l->rx.proto->set_affinity1) {
+ if (l->rx.proto->set_affinity1(cli_conn, t)) {
/* Failed migration, stay on the same thread. */
goto local_accept;
}
* when processing this loop.
*/
ring = &accept_queue_rings[t];
- if (accept_queue_push_mp(ring, cli_conn)) {
+ if (accept_queue_push_mp(ring, cli_conn, li_set_affinity2)) {
_HA_ATOMIC_INC(&activity[t].accq_pushed);
tasklet_wakeup(ring->tasklet);
+
continue;
}
/* If the ring is full we do a synchronous accept on
* the local thread here.
*/
_HA_ATOMIC_INC(&activity[t].accq_full);
+
+ if (l->rx.proto && l->rx.proto->reset_affinity)
+ l->rx.proto->reset_affinity(cli_conn);
}
#endif // USE_THREAD
static int quic_connect_server(struct connection *conn, int flags);
static void quic_enable_listener(struct listener *listener);
static void quic_disable_listener(struct listener *listener);
-static int quic_set_affinity(struct connection *conn, int new_tid);
+static int quic_set_affinity1(struct connection *conn, int new_tid);
/* Note: must not be declared <const> as its list will be overwritten */
struct protocol proto_quic4 = {
.get_src = quic_sock_get_src,
.get_dst = quic_sock_get_dst,
.connect = quic_connect_server,
- .set_affinity = quic_set_affinity,
+ .set_affinity1 = quic_set_affinity1,
/* binding layer */
.rx_suspend = udp_suspend_receiver,
.get_src = quic_sock_get_src,
.get_dst = quic_sock_get_dst,
.connect = quic_connect_server,
- .set_affinity = quic_set_affinity,
+ .set_affinity1 = quic_set_affinity1,
/* binding layer */
.rx_suspend = udp_suspend_receiver,
* target is a listener, and the caller is responsible for guaranteeing that
* the listener assigned to the connection is bound to the requested thread.
*/
-static int quic_set_affinity(struct connection *conn, int new_tid)
+static int quic_set_affinity1(struct connection *conn, int new_tid)
{
struct quic_conn *qc = conn->handle.qc;
- return qc_set_tid_affinity(qc, new_tid, objt_listener(conn->target));
+ return qc_set_tid_affinity1(qc, new_tid, objt_listener(conn->target));
}
static int quic_alloc_dghdlrs(void)
.unbind = rhttp_unbind_receiver,
.resume = default_resume_listener,
.accept_conn = rhttp_accept_conn,
- .set_affinity = rhttp_set_affinity,
+ .set_affinity1 = rhttp_set_affinity,
/* address family */
.fam = &proto_fam_rhttp,
*
* Returns 0 on success else non-zero.
*/
-int qc_set_tid_affinity(struct quic_conn *qc, uint new_tid, struct listener *new_li)
+int qc_set_tid_affinity1(struct quic_conn *qc, uint new_tid, struct listener *new_li)
{
struct task *t1 = NULL, *t2 = NULL;
struct tasklet *t3 = NULL;