From 583303c48b34054dfd1626954a46cf4e7655d17c Mon Sep 17 00:00:00 2001 From: Olivier Houchard Date: Wed, 15 Jan 2025 16:37:35 +0100 Subject: [PATCH] MINOR: proxies/servers: Calculate queueslength and use it. For both proxies and servers, properly calculates queueslength, which is the total number of element in each queues (as they currently are only using one queue, it is equivalent to the number of element of that queue), and use it instead of the queue's length. --- include/haproxy/queue.h | 2 +- src/backend.c | 14 +++++++------- src/check.c | 4 ++-- src/haproxy.c | 8 ++++---- src/hlua_fcn.c | 2 +- src/lb_chash.c | 2 +- src/lb_fas.c | 2 +- src/lb_fwlc.c | 8 ++++---- src/lb_fwrr.c | 2 +- src/lb_map.c | 2 +- src/queue.c | 10 +++++++++- src/server.c | 6 +++--- src/stats-proxy.c | 4 ++-- 13 files changed, 37 insertions(+), 29 deletions(-) diff --git a/include/haproxy/queue.h b/include/haproxy/queue.h index 5ca68cde70..2509f2b40c 100644 --- a/include/haproxy/queue.h +++ b/include/haproxy/queue.h @@ -86,7 +86,7 @@ static inline int server_has_room(const struct server *s) { * for and if/else usage. */ static inline int may_dequeue_tasks(const struct server *s, const struct proxy *p) { - return (s && (s->queue.length || (p->queue.length && srv_currently_usable(s))) && + return (s && (s->queueslength || (p->queueslength && srv_currently_usable(s))) && (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s))); } diff --git a/src/backend.c b/src/backend.c index daa1c2a1d8..f0dba4d182 100644 --- a/src/backend.c +++ b/src/backend.c @@ -584,7 +584,7 @@ struct server *get_server_rnd(struct stream *s, const struct server *avoid) * the backend's queue instead. */ if (curr && - (curr->queue.length || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr)))) + (curr->queueslength || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr)))) curr = NULL; return curr; @@ -654,7 +654,7 @@ int assign_server(struct stream *s) ((s->sess->flags & SESS_FL_PREFER_LAST) || (!s->be->max_ka_queue || server_has_room(tmpsrv) || ( - tmpsrv->queue.length + 1 < s->be->max_ka_queue))) && + tmpsrv->queueslength + 1 < s->be->max_ka_queue))) && srv_currently_usable(tmpsrv)) { list_for_each_entry(conn, &pconns->conn_list, sess_el) { if (!(conn->flags & CO_FL_WAIT_XPRT)) { @@ -681,7 +681,7 @@ int assign_server(struct stream *s) /* if there's some queue on the backend, with certain algos we * know it's because all servers are full. */ - if (s->be->queue.length && s->be->served && s->be->queue.length != s->be->beconn && + if (s->be->queueslength && s->be->served && s->be->queueslength != s->be->beconn && (((s->be->lbprm.algo & (BE_LB_KIND|BE_LB_NEED|BE_LB_PARM)) == BE_LB_ALGO_FAS)|| // first ((s->be->lbprm.algo & (BE_LB_KIND|BE_LB_NEED|BE_LB_PARM)) == BE_LB_ALGO_RR) || // roundrobin ((s->be->lbprm.algo & (BE_LB_KIND|BE_LB_NEED|BE_LB_PARM)) == BE_LB_ALGO_SRR))) { // static-rr @@ -1051,7 +1051,7 @@ int assign_server_and_queue(struct stream *s) __ha_cpu_relax()); } if (!got_it) { - if (srv->maxqueue > 0 && srv->queue.length >= srv->maxqueue) + if (srv->maxqueue > 0 && srv->queueslength >= srv->maxqueue) return SRV_STATUS_FULL; p = pendconn_add(s); @@ -3063,7 +3063,7 @@ smp_fetch_connslots(const struct arg *args, struct sample *smp, const char *kw, } smp->data.u.sint += (iterator->maxconn - iterator->cur_sess) - + (iterator->maxqueue - iterator->queue.length); + + (iterator->maxqueue - iterator->queueslength); } return 1; @@ -3340,7 +3340,7 @@ smp_fetch_srv_queue(const struct arg *args, struct sample *smp, const char *kw, { smp->flags = SMP_F_VOL_TEST; smp->data.type = SMP_T_SINT; - smp->data.u.sint = args->data.srv->queue.length; + smp->data.u.sint = args->data.srv->queueslength; return 1; } @@ -3482,7 +3482,7 @@ sample_conv_srv_queue(const struct arg *args, struct sample *smp, void *private) return 0; smp->data.type = SMP_T_SINT; - smp->data.u.sint = srv->queue.length; + smp->data.u.sint = srv->queueslength; return 1; } diff --git a/src/check.c b/src/check.c index dde1e72b09..e46fc654b1 100644 --- a/src/check.c +++ b/src/check.c @@ -1027,8 +1027,8 @@ int httpchk_build_status_header(struct server *s, struct buffer *buf) global.node, (s->cur_eweight * s->proxy->lbprm.wmult + s->proxy->lbprm.wdiv - 1) / s->proxy->lbprm.wdiv, (s->proxy->lbprm.tot_weight * s->proxy->lbprm.wmult + s->proxy->lbprm.wdiv - 1) / s->proxy->lbprm.wdiv, - s->cur_sess, s->proxy->beconn - s->proxy->queue.length, - s->queue.length); + s->cur_sess, s->proxy->beconn - s->proxy->queueslength, + s->queueslength); if ((s->cur_state == SRV_ST_STARTING) && ns_to_sec(now_ns) < s->counters.last_change + s->slowstart && diff --git a/src/haproxy.c b/src/haproxy.c index 7ee654b07d..22a17adc7c 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -748,7 +748,7 @@ static void sig_dump_state(struct sig_handler *sh) "SIGHUP: Server %s/%s is %s. Conn: %d act, %d pend, %lld tot.", p->id, s->id, (s->cur_state != SRV_ST_STOPPED) ? "UP" : "DOWN", - s->cur_sess, s->queue.length, s->counters.cum_sess); + s->cur_sess, s->queueslength, s->counters.cum_sess); ha_warning("%s\n", trash.area); send_log(p, LOG_NOTICE, "%s\n", trash.area); s = s->next; @@ -759,19 +759,19 @@ static void sig_dump_state(struct sig_handler *sh) chunk_printf(&trash, "SIGHUP: Proxy %s has no servers. Conn: act(FE+BE): %d+%d, %d pend (%d unass), tot(FE+BE): %lld+%lld.", p->id, - p->feconn, p->beconn, p->totpend, p->queue.length, p->fe_counters.cum_conn, p->be_counters.cum_sess); + p->feconn, p->beconn, p->totpend, p->queueslength, p->fe_counters.cum_conn, p->be_counters.cum_sess); } else if (p->srv_act == 0) { chunk_printf(&trash, "SIGHUP: Proxy %s %s ! Conn: act(FE+BE): %d+%d, %d pend (%d unass), tot(FE+BE): %lld+%lld.", p->id, (p->srv_bck) ? "is running on backup servers" : "has no server available", - p->feconn, p->beconn, p->totpend, p->queue.length, p->fe_counters.cum_conn, p->be_counters.cum_sess); + p->feconn, p->beconn, p->totpend, p->queueslength, p->fe_counters.cum_conn, p->be_counters.cum_sess); } else { chunk_printf(&trash, "SIGHUP: Proxy %s has %d active servers and %d backup servers available." " Conn: act(FE+BE): %d+%d, %d pend (%d unass), tot(FE+BE): %lld+%lld.", p->id, p->srv_act, p->srv_bck, - p->feconn, p->beconn, p->totpend, p->queue.length, p->fe_counters.cum_conn, p->be_counters.cum_sess); + p->feconn, p->beconn, p->totpend, p->queueslength, p->fe_counters.cum_conn, p->be_counters.cum_sess); } ha_warning("%s\n", trash.area); send_log(p, LOG_NOTICE, "%s\n", trash.area); diff --git a/src/hlua_fcn.c b/src/hlua_fcn.c index d49e073a46..f39e11af96 100644 --- a/src/hlua_fcn.c +++ b/src/hlua_fcn.c @@ -1421,7 +1421,7 @@ int hlua_server_get_pend_conn(lua_State *L) return 1; } - lua_pushinteger(L, srv->queue.length); + lua_pushinteger(L, srv->queueslength); return 1; } diff --git a/src/lb_chash.c b/src/lb_chash.c index b3e472ef1d..784a27af1f 100644 --- a/src/lb_chash.c +++ b/src/lb_chash.c @@ -521,7 +521,7 @@ struct server *chash_get_next_server(struct proxy *p, struct server *srvtoavoid) * case we simply remember it for later use if needed. */ s = eb32_entry(node, struct tree_occ, node)->server; - if (!s->maxconn || (!s->queue.length && s->served < srv_dynamic_maxconn(s))) { + if (!s->maxconn || (!s->queueslength && s->served < srv_dynamic_maxconn(s))) { if (s != srvtoavoid) { srv = s; break; diff --git a/src/lb_fas.c b/src/lb_fas.c index d90388b401..bac20d28bb 100644 --- a/src/lb_fas.c +++ b/src/lb_fas.c @@ -322,7 +322,7 @@ struct server *fas_get_next_server(struct proxy *p, struct server *srvtoavoid) struct server *s; s = eb32_entry(node, struct server, lb_node); - if (!s->maxconn || (!s->queue.length && s->served < srv_dynamic_maxconn(s))) { + if (!s->maxconn || (!s->queueslength && s->served < srv_dynamic_maxconn(s))) { if (s != srvtoavoid) { srv = s; break; diff --git a/src/lb_fwlc.c b/src/lb_fwlc.c index 8e913d4863..011c139d82 100644 --- a/src/lb_fwlc.c +++ b/src/lb_fwlc.c @@ -57,7 +57,7 @@ static inline void fwlc_dequeue_srv(struct server *s) */ static inline void fwlc_queue_srv(struct server *s, unsigned int eweight) { - unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length); + unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queueslength); s->lb_node.key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / eweight : 0; eb32_insert(s->lb_tree, &s->lb_node); @@ -70,7 +70,7 @@ static inline void fwlc_queue_srv(struct server *s, unsigned int eweight) */ static void fwlc_srv_reposition(struct server *s) { - unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length); + unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queueslength); unsigned int eweight = _HA_ATOMIC_LOAD(&s->cur_eweight); unsigned int new_key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / (eweight ? eweight : 1) : 0; @@ -87,7 +87,7 @@ static void fwlc_srv_reposition(struct server *s) * likely to have released a connection or taken one leading * to our target value (50% of the case in measurements). */ - inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length); + inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queueslength); eweight = _HA_ATOMIC_LOAD(&s->cur_eweight); new_key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / (eweight ? eweight : 1) : 0; if (!s->lb_node.node.leaf_p || s->lb_node.key != new_key) { @@ -349,7 +349,7 @@ struct server *fwlc_get_next_server(struct proxy *p, struct server *srvtoavoid) struct server *s; s = eb32_entry(node, struct server, lb_node); - if (!s->maxconn || s->served + s->queue.length < srv_dynamic_maxconn(s) + s->maxqueue) { + if (!s->maxconn || s->served + s->queueslength < srv_dynamic_maxconn(s) + s->maxqueue) { if (s != srvtoavoid) { srv = s; break; diff --git a/src/lb_fwrr.c b/src/lb_fwrr.c index a762623f33..574606dc5d 100644 --- a/src/lb_fwrr.c +++ b/src/lb_fwrr.c @@ -564,7 +564,7 @@ struct server *fwrr_get_next_server(struct proxy *p, struct server *srvtoavoid) fwrr_update_position(grp, srv); fwrr_dequeue_srv(srv); grp->curr_pos++; - if (!srv->maxconn || (!srv->queue.length && srv->served < srv_dynamic_maxconn(srv))) { + if (!srv->maxconn || (!srv->queueslength && srv->served < srv_dynamic_maxconn(srv))) { /* make sure it is not the server we are trying to exclude... */ if (srv != srvtoavoid || avoided) break; diff --git a/src/lb_map.c b/src/lb_map.c index 592df91cce..7dc27b4447 100644 --- a/src/lb_map.c +++ b/src/lb_map.c @@ -230,7 +230,7 @@ struct server *map_get_server_rr(struct proxy *px, struct server *srvtoavoid) avoididx = 0; /* shut a gcc warning */ do { srv = px->lbprm.map.srv[newidx++]; - if (!srv->maxconn || (!srv->queue.length && srv->served < srv_dynamic_maxconn(srv))) { + if (!srv->maxconn || (!srv->queueslength && srv->served < srv_dynamic_maxconn(srv))) { /* make sure it is not the server we are try to exclude... */ /* ...but remember that is was selected yet avoided */ avoided = srv; diff --git a/src/queue.c b/src/queue.c index 8e532e6e96..f5ea31de49 100644 --- a/src/queue.c +++ b/src/queue.c @@ -348,6 +348,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int _HA_ATOMIC_DEC(&px->queue.length); _HA_ATOMIC_INC(&px->queue.idx); + _HA_ATOMIC_DEC(&px->queueslength); return 1; use_p: @@ -369,6 +370,7 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int _HA_ATOMIC_DEC(&srv->queue.length); _HA_ATOMIC_INC(&srv->queue.idx); + _HA_ATOMIC_DEC(&srv->queueslength); return 1; } @@ -487,6 +489,7 @@ struct pendconn *pendconn_add(struct stream *strm) struct server *srv; struct queue *q; unsigned int *max_ptr; + unsigned int *queueslength; unsigned int old_max, new_max; p = pool_alloc(pool_head_pendconn); @@ -509,15 +512,18 @@ struct pendconn *pendconn_add(struct stream *strm) if (srv) { q = &srv->queue; max_ptr = &srv->counters.nbpend_max; + queueslength = &srv->queueslength; } else { q = &px->queue; max_ptr = &px->be_counters.nbpend_max; + queueslength = &px->queueslength; } p->queue = q; p->queue_idx = _HA_ATOMIC_LOAD(&q->idx) - 1; // for logging only - new_max = _HA_ATOMIC_ADD_FETCH(&q->length, 1); + new_max = _HA_ATOMIC_ADD_FETCH(queueslength, 1); + _HA_ATOMIC_INC(&q->length); old_max = _HA_ATOMIC_LOAD(max_ptr); while (new_max > old_max) { if (likely(_HA_ATOMIC_CAS(max_ptr, &old_max, new_max))) @@ -572,6 +578,7 @@ int pendconn_redistribute(struct server *s) if (xferred) { _HA_ATOMIC_SUB(&s->queue.length, xferred); + _HA_ATOMIC_SUB(&s->queueslength, xferred); _HA_ATOMIC_SUB(&s->proxy->totpend, xferred); } @@ -597,6 +604,7 @@ int pendconn_redistribute(struct server *s) if (px_xferred) { _HA_ATOMIC_SUB(&px->queue.length, px_xferred); + _HA_ATOMIC_SUB(&px->queueslength, px_xferred); _HA_ATOMIC_SUB(&px->totpend, px_xferred); } done: diff --git a/src/server.c b/src/server.c index b7f303325b..6d15de3714 100644 --- a/src/server.c +++ b/src/server.c @@ -2092,13 +2092,13 @@ static void srv_append_more(struct buffer *msg, struct server *s, " %d sessions active, %d requeued, %d remaining in queue", s->proxy->srv_act, s->proxy->srv_bck, (s->proxy->srv_bck && !s->proxy->srv_act) ? " Running on backup." : "", - s->cur_sess, xferred, s->queue.length); + s->cur_sess, xferred, s->queueslength); else chunk_appendf(msg, ". %d active and %d backup servers online.%s" " %d sessions requeued, %d total in queue", s->proxy->srv_act, s->proxy->srv_bck, (s->proxy->srv_bck && !s->proxy->srv_act) ? " Running on backup." : "", - xferred, s->queue.length); + xferred, s->queueslength); } } @@ -6044,7 +6044,7 @@ int srv_check_for_deletion(const char *bename, const char *svname, struct proxy /* Ensure that there is no active/pending connection on the server. */ if (srv->curr_used_conns || - !eb_is_empty(&srv->queue.head) || srv_has_streams(srv)) { + srv->queueslength || srv_has_streams(srv)) { msg = "Server still has connections attached to it, cannot remove it."; goto leave; } diff --git a/src/stats-proxy.c b/src/stats-proxy.c index d2500f7e7b..1031ae3dc7 100644 --- a/src/stats-proxy.c +++ b/src/stats-proxy.c @@ -786,7 +786,7 @@ int stats_fill_sv_line(struct proxy *px, struct server *sv, int flags, field = mkf_str(FO_CONFIG|FS_SERVICE, proxy_mode_str(px->mode)); break; case ST_I_PX_QCUR: - field = mkf_u32(0, sv->queue.length); + field = mkf_u32(0, sv->queueslength); break; case ST_I_PX_QMAX: field = mkf_u32(FN_MAX, sv->counters.nbpend_max); @@ -1165,7 +1165,7 @@ int stats_fill_be_line(struct proxy *px, int flags, struct field *line, int len, field = mkf_str(FO_CONFIG|FS_SERVICE, proxy_mode_str(px->mode)); break; case ST_I_PX_QCUR: - field = mkf_u32(0, px->queue.length); + field = mkf_u32(0, px->queueslength); break; case ST_I_PX_QMAX: field = mkf_u32(FN_MAX, px->be_counters.nbpend_max); -- 2.39.5