]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: server: replace the pendconns-related stuff with a struct queue
authorWilly Tarreau <w@1wt.eu>
Fri, 18 Jun 2021 07:30:30 +0000 (09:30 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 22 Jun 2021 16:43:14 +0000 (18:43 +0200)
Just like for proxies, all three elements (pendconns, nbpend, queue_idx)
were moved to struct queue.

14 files changed:
include/haproxy/queue.h
include/haproxy/server-t.h
src/backend.c
src/check.c
src/flt_spoe.c
src/haproxy.c
src/lb_chash.c
src/lb_fas.c
src/lb_fwlc.c
src/lb_fwrr.c
src/lb_map.c
src/queue.c
src/server.c
src/stats.c

index 3c6ebe89b543887ba786791e53e372bfc509c235..58a0b091ca5cc82ae7656378c6b5df6f2332388d 100644 (file)
@@ -87,7 +87,7 @@ static inline int server_has_room(const struct server *s) {
  * for and if/else usage.
  */
 static inline int may_dequeue_tasks(const struct server *s, const struct proxy *p) {
-       return (s && (s->nbpend || (p->queue.length && srv_currently_usable(s))) &&
+       return (s && (s->queue.length || (p->queue.length && srv_currently_usable(s))) &&
                (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s)));
 }
 
index ef91ee9c78cbde1da9c440e090612deb7001a8cc..6772db2b040dd5cd6be1412dfc79c9e21c77feb5 100644 (file)
@@ -36,6 +36,7 @@
 #include <haproxy/listener-t.h>
 #include <haproxy/obj_type-t.h>
 #include <haproxy/openssl-compat.h>
+#include <haproxy/queue-t.h>
 #include <haproxy/resolvers-t.h>
 #include <haproxy/ssl_sock-t.h>
 #include <haproxy/stats-t.h>
@@ -271,7 +272,7 @@ struct server {
        unsigned int est_need_conns;            /* Estimate on the number of needed connections (max of curr and previous max_used) */
        unsigned int next_takeover;             /* thread ID to try to steal connections from next time */
 
-       struct eb_root pendconns;               /* pending connections */
+       struct queue queue;                     /* pending connections */
 
        /* Element below are usd by LB algorithms and must be doable in
         * parallel to other threads reusing connections above.
@@ -286,10 +287,8 @@ struct server {
        ALWAYS_ALIGN(64);
        int cur_sess;                           /* number of currently active sessions (including syn_sent) */
        int served;                             /* # of active sessions currently being served (ie not pending) */
-       int nbpend;                             /* number of pending connections */
        int consecutive_errors;                 /* current number of consecutive errors */
        struct freq_ctr sess_per_sec;           /* sessions per second on this server */
-       unsigned int queue_idx;                 /* count of pending connections which have been de-queued */
        struct be_counters counters;            /* statistics counters */
 
        /* Below are some relatively stable settings, only changed under the lock */
index 1bd40fbaa6f56448d8ab3449f8c4223e847a5ded..f6f3ef037f567164c9ca87fcff7225c23dd234cf 100644 (file)
@@ -552,7 +552,7 @@ static struct server *get_server_rnd(struct stream *s, const struct server *avoi
         * the backend's queue instead.
         */
        if (curr &&
-           (curr->nbpend || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr))))
+           (curr->queue.length || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr))))
                curr = NULL;
 
        return curr;
@@ -624,7 +624,7 @@ int assign_server(struct stream *s)
                            ((s->sess->flags & SESS_FL_PREFER_LAST) ||
                             (!s->be->max_ka_queue ||
                              server_has_room(tmpsrv) || (
-                             tmpsrv->nbpend + 1 < s->be->max_ka_queue))) &&
+                             tmpsrv->queue.length + 1 < s->be->max_ka_queue))) &&
                            srv_currently_usable(tmpsrv)) {
                                list_for_each_entry(conn, &srv_list->conn_list, session_list) {
                                        if (!(conn->flags & CO_FL_WAIT_XPRT)) {
@@ -1001,9 +1001,9 @@ int assign_server_and_queue(struct stream *s)
                 * not full, in which case we have to return FULL.
                 */
                if (srv->maxconn &&
-                   (srv->nbpend || srv->served >= srv_dynamic_maxconn(srv))) {
+                   (srv->queue.length || srv->served >= srv_dynamic_maxconn(srv))) {
 
-                       if (srv->maxqueue > 0 && srv->nbpend >= srv->maxqueue)
+                       if (srv->maxqueue > 0 && srv->queue.length >= srv->maxqueue)
                                return SRV_STATUS_FULL;
 
                        p = pendconn_add(s);
@@ -2734,7 +2734,7 @@ smp_fetch_connslots(const struct arg *args, struct sample *smp, const char *kw,
                }
 
                smp->data.u.sint += (iterator->maxconn - iterator->cur_sess)
-                                      +  (iterator->maxqueue - iterator->nbpend);
+                                      +  (iterator->maxqueue - iterator->queue.length);
        }
 
        return 1;
@@ -2981,7 +2981,7 @@ smp_fetch_srv_queue(const struct arg *args, struct sample *smp, const char *kw,
 {
        smp->flags = SMP_F_VOL_TEST;
        smp->data.type = SMP_T_SINT;
-       smp->data.u.sint = args->data.srv->nbpend;
+       smp->data.u.sint = args->data.srv->queue.length;
        return 1;
 }
 
@@ -3123,7 +3123,7 @@ sample_conv_srv_queue(const struct arg *args, struct sample *smp, void *private)
                return 0;
 
        smp->data.type = SMP_T_SINT;
-       smp->data.u.sint = srv->nbpend;
+       smp->data.u.sint = srv->queue.length;
        return 1;
 }
 
index 68cb1c3582b6735f91b1eb64b91483980ba1179c..9c885eaad48a674a22086e8f08eed642fbaba0ee 100644 (file)
@@ -994,7 +994,7 @@ int httpchk_build_status_header(struct server *s, struct buffer *buf)
                      (s->cur_eweight * s->proxy->lbprm.wmult + s->proxy->lbprm.wdiv - 1) / s->proxy->lbprm.wdiv,
                      (s->proxy->lbprm.tot_weight * s->proxy->lbprm.wmult + s->proxy->lbprm.wdiv - 1) / s->proxy->lbprm.wdiv,
                      s->cur_sess, s->proxy->beconn - s->proxy->queue.length,
-                     s->nbpend);
+                     s->queue.length);
 
        if ((s->cur_state == SRV_ST_STARTING) &&
            now.tv_sec < s->last_change + s->slowstart &&
index 743565d61110ee9a9f35dab4404819ffa103d1d0..3d68fc56df8b2142f7aa1c3d8c1fc6b896ed5ec8 100644 (file)
@@ -1728,7 +1728,7 @@ spoe_handle_processing_appctx(struct appctx *appctx)
                 */
                if (global.nbthread > 1 &&
                    (agent->b.be->queue.length ||
-                    (srv && (srv->nbpend || (srv->maxconn && srv->served >=srv_dynamic_maxconn(srv)))))) {
+                    (srv && (srv->queue.length || (srv->maxconn && srv->served >=srv_dynamic_maxconn(srv)))))) {
                        SPOE_APPCTX(appctx)->status_code = SPOE_FRM_ERR_NONE;
                        appctx->st0 = SPOE_APPCTX_ST_DISCONNECT;
                        appctx->st1 = SPOE_APPCTX_ERR_NONE;
index 82efcd23e93b7ca2bae1d29aef393fb8fbe0b6d3..de13aba0542106c752f30024f4a1994f14bf7021 100644 (file)
@@ -904,7 +904,7 @@ static void sig_dump_state(struct sig_handler *sh)
                                     "SIGHUP: Server %s/%s is %s. Conn: %d act, %d pend, %lld tot.",
                                     p->id, s->id,
                                     (s->cur_state != SRV_ST_STOPPED) ? "UP" : "DOWN",
-                                    s->cur_sess, s->nbpend, s->counters.cum_sess);
+                                    s->cur_sess, s->queue.length, s->counters.cum_sess);
                        ha_warning("%s\n", trash.area);
                        send_log(p, LOG_NOTICE, "%s\n", trash.area);
                        s = s->next;
index 23b2b12722a5e346aaf592ed62c9132fc19b9539..023219c98bf5caff6ebfbb54ac58f9468d2bf9c4 100644 (file)
@@ -443,7 +443,7 @@ struct server *chash_get_next_server(struct proxy *p, struct server *srvtoavoid)
                 * case we simply remember it for later use if needed.
                 */
                s = eb32_entry(node, struct tree_occ, node)->server;
-               if (!s->maxconn || (!s->nbpend && s->served < srv_dynamic_maxconn(s))) {
+               if (!s->maxconn || (!s->queue.length && s->served < srv_dynamic_maxconn(s))) {
                        if (s != srvtoavoid) {
                                srv = s;
                                break;
index 53bd0392d4123b97f931fc7b614e17c7c2ec9870..d90388b4016978b4ae75a4bf2b108377f66d5514 100644 (file)
@@ -322,7 +322,7 @@ struct server *fas_get_next_server(struct proxy *p, struct server *srvtoavoid)
                struct server *s;
 
                s = eb32_entry(node, struct server, lb_node);
-               if (!s->maxconn || (!s->nbpend && s->served < srv_dynamic_maxconn(s))) {
+               if (!s->maxconn || (!s->queue.length && s->served < srv_dynamic_maxconn(s))) {
                        if (s != srvtoavoid) {
                                srv = s;
                                break;
index ba1ca95ae8f5b2d84bbf316790c1a297ba9601fe..091241cc5b8704866b7179c24cc3fcb40dbab274 100644 (file)
@@ -57,7 +57,7 @@ static inline void fwlc_dequeue_srv(struct server *s)
  */
 static inline void fwlc_queue_srv(struct server *s, unsigned int eweight)
 {
-       unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->nbpend);
+       unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length);
 
        s->lb_node.key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / eweight : 0;
        eb32_insert(s->lb_tree, &s->lb_node);
@@ -70,7 +70,7 @@ static inline void fwlc_queue_srv(struct server *s, unsigned int eweight)
  */
 static void fwlc_srv_reposition(struct server *s)
 {
-       unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->nbpend);
+       unsigned int inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length);
        unsigned int new_key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / s->cur_eweight : 0;
 
        /* some calls will be made for no change (e.g connect_server() after
@@ -86,7 +86,7 @@ static void fwlc_srv_reposition(struct server *s)
                 * likely to have released a connection or taken one leading
                 * to our target value (50% of the case in measurements).
                 */
-               inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->nbpend);
+               inflight = _HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queue.length);
                new_key = inflight ? (inflight + 1) * SRV_EWGHT_MAX / s->cur_eweight : 0;
                if (!s->lb_node.node.leaf_p || s->lb_node.key != new_key) {
                        eb32_delete(&s->lb_node);
@@ -347,7 +347,7 @@ struct server *fwlc_get_next_server(struct proxy *p, struct server *srvtoavoid)
                struct server *s;
 
                s = eb32_entry(node, struct server, lb_node);
-               if (!s->maxconn || s->served + s->nbpend < srv_dynamic_maxconn(s) + s->maxqueue) {
+               if (!s->maxconn || s->served + s->queue.length < srv_dynamic_maxconn(s) + s->maxqueue) {
                        if (s != srvtoavoid) {
                                srv = s;
                                break;
index d7f618faf6b3ee35f364020ad4091a6fa0600502..74c7fb244c381885147dee2c75b1bbe3515e0621 100644 (file)
@@ -564,7 +564,7 @@ struct server *fwrr_get_next_server(struct proxy *p, struct server *srvtoavoid)
                fwrr_update_position(grp, srv);
                fwrr_dequeue_srv(srv);
                grp->curr_pos++;
-               if (!srv->maxconn || (!srv->nbpend && srv->served < srv_dynamic_maxconn(srv))) {
+               if (!srv->maxconn || (!srv->queue.length && srv->served < srv_dynamic_maxconn(srv))) {
                        /* make sure it is not the server we are trying to exclude... */
                        if (srv != srvtoavoid || avoided)
                                break;
index b735678a8c5faf2b896a7c596569a57b21760298..592df91cceb8bbfa0a2a574aac7270fb08642115 100644 (file)
@@ -230,7 +230,7 @@ struct server *map_get_server_rr(struct proxy *px, struct server *srvtoavoid)
        avoididx = 0; /* shut a gcc warning */
        do {
                srv = px->lbprm.map.srv[newidx++];
-               if (!srv->maxconn || (!srv->nbpend && srv->served < srv_dynamic_maxconn(srv))) {
+               if (!srv->maxconn || (!srv->queue.length && srv->served < srv_dynamic_maxconn(srv))) {
                        /* make sure it is not the server we are try to exclude... */
                        /* ...but remember that is was selected yet avoided */
                        avoided = srv;
index 40fabfc95ceee191d888548c8bee091ed3e2411f..c9134afb92e0f4396c03dbbf056d2270a5043e25 100644 (file)
@@ -133,7 +133,7 @@ unsigned int srv_dynamic_maxconn(const struct server *s)
  */
 static void __pendconn_unlink_srv(struct pendconn *p)
 {
-       p->strm->logs.srv_queue_pos += p->srv->queue_idx - p->queue_idx;
+       p->strm->logs.srv_queue_pos += p->srv->queue.idx - p->queue_idx;
        eb32_delete(&p->node);
 }
 
@@ -194,7 +194,7 @@ void pendconn_unlink(struct pendconn *p)
                }
                HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
                if (done) {
-                       _HA_ATOMIC_DEC(&p->srv->nbpend);
+                       _HA_ATOMIC_DEC(&p->srv->queue.length);
                        _HA_ATOMIC_DEC(&p->px->totpend);
                }
        }
@@ -273,8 +273,8 @@ static struct pendconn *pendconn_process_next_strm(struct server *srv, struct pr
                rsrv = srv;
 
        p = NULL;
-       if (srv->nbpend)
-               p = pendconn_first(&srv->pendconns);
+       if (srv->queue.length)
+               p = pendconn_first(&srv->queue.head);
 
        pp = NULL;
        if (srv_currently_usable(rsrv) && px->queue.length &&
@@ -320,9 +320,9 @@ static struct pendconn *pendconn_process_next_strm(struct server *srv, struct pr
        goto unlinked;
  use_p:
        __pendconn_unlink_srv(p);
-       _HA_ATOMIC_DEC(&srv->nbpend);
+       _HA_ATOMIC_DEC(&srv->queue.length);
        _HA_ATOMIC_DEC(&px->totpend);
-       srv->queue_idx++;
+       srv->queue.idx++;
  unlinked:
        p->strm_flags |= SF_ASSIGNED;
        p->target = srv;
@@ -416,7 +416,7 @@ struct pendconn *pendconn_add(struct stream *strm)
        if (srv) {
                unsigned int old_max, new_max;
 
-               new_max = _HA_ATOMIC_ADD_FETCH(&srv->nbpend, 1);
+               new_max = _HA_ATOMIC_ADD_FETCH(&srv->queue.length, 1);
                old_max = srv->counters.nbpend_max;
                while (new_max > old_max) {
                        if (likely(_HA_ATOMIC_CAS(&srv->counters.nbpend_max, &old_max, new_max)))
@@ -425,8 +425,8 @@ struct pendconn *pendconn_add(struct stream *strm)
                __ha_barrier_atomic_store();
 
                HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock);
-               p->queue_idx = srv->queue_idx - 1; // for increment
-               eb32_insert(&srv->pendconns, &p->node);
+               p->queue_idx = srv->queue.idx - 1; // for increment
+               eb32_insert(&srv->queue.head, &p->node);
                HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock);
        }
        else {
@@ -465,7 +465,7 @@ int pendconn_redistribute(struct server *s)
        if ((s->proxy->options & (PR_O_REDISP|PR_O_PERSIST)) != PR_O_REDISP)
                return 0;
 
-       for (node = eb32_first(&s->pendconns); node; node = nodeb) {
+       for (node = eb32_first(&s->queue.head); node; node = nodeb) {
                nodeb = eb32_next(node);
 
                p = eb32_entry(node, struct pendconn, node);
@@ -480,7 +480,7 @@ int pendconn_redistribute(struct server *s)
                xferred++;
        }
        if (xferred) {
-               _HA_ATOMIC_SUB(&s->nbpend, xferred);
+               _HA_ATOMIC_SUB(&s->queue.length, xferred);
                _HA_ATOMIC_SUB(&s->proxy->totpend, xferred);
        }
        return xferred;
index 141f1ba09fc4ac6b2c47db22f871f43c483b9e2a..5d869e6fba5640a9df4512e358b4f43e9fd86de8 100644 (file)
@@ -1370,13 +1370,13 @@ void srv_append_status(struct buffer *msg, struct server *s,
                                " %d sessions active, %d requeued, %d remaining in queue",
                                s->proxy->srv_act, s->proxy->srv_bck,
                                (s->proxy->srv_bck && !s->proxy->srv_act) ? " Running on backup." : "",
-                               s->cur_sess, xferred, s->nbpend);
+                               s->cur_sess, xferred, s->queue.length);
                else
                        chunk_appendf(msg, ". %d active and %d backup servers online.%s"
                                " %d sessions requeued, %d total in queue",
                                s->proxy->srv_act, s->proxy->srv_bck,
                                (s->proxy->srv_bck && !s->proxy->srv_act) ? " Running on backup." : "",
-                               xferred, s->nbpend);
+                               xferred, s->queue.length);
        }
 }
 
@@ -2163,7 +2163,7 @@ struct server *new_server(struct proxy *proxy)
 
        srv->obj_type = OBJ_TYPE_SERVER;
        srv->proxy = proxy;
-       srv->pendconns = EB_ROOT;
+       srv->queue.head = EB_ROOT;
        LIST_APPEND(&servers_list, &srv->global_list);
        LIST_INIT(&srv->srv_rec_item);
        LIST_INIT(&srv->ip_rec_item);
@@ -4642,7 +4642,7 @@ static int cli_parse_delete_server(char **args, char *payload, struct appctx *ap
         * cleanup function should be implemented to be used here.
         */
        if (srv->cur_sess || srv->curr_idle_conns ||
-           !eb_is_empty(&srv->pendconns)) {
+           !eb_is_empty(&srv->queue.head)) {
                cli_err(appctx, "Server still has connections attached to it, cannot remove it.");
                goto out;
        }
index c8b5fb1429b31aec11a10f1713392df5dfa8631e..3458924b786fef8546cc155822635c3db723bf7a 100644 (file)
@@ -2141,7 +2141,7 @@ int stats_fill_sv_stats(struct proxy *px, struct server *sv, int flags,
                                metric = mkf_str(FO_CONFIG|FS_SERVICE, proxy_mode_str(px->mode));
                                break;
                        case ST_F_QCUR:
-                               metric = mkf_u32(0, sv->nbpend);
+                               metric = mkf_u32(0, sv->queue.length);
                                break;
                        case ST_F_QMAX:
                                metric = mkf_u32(FN_MAX, sv->counters.nbpend_max);