]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: threads/server: Add a lock per server and atomically update server vars
authorChristopher Faulet <cfaulet@haproxy.com>
Thu, 8 Jun 2017 12:04:45 +0000 (14:04 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:31 +0000 (13:58 +0100)
The server's lock is use, among other things, to lock acces to the active
connection list of a server.

12 files changed:
include/common/hathreads.h
include/proto/server.h
include/proto/stream.h
include/types/server.h
src/backend.c
src/checks.c
src/haproxy.c
src/proto_http.c
src/queue.c
src/server.c
src/stats.c
src/stream.c

index 80a4b24431d6687399f5c802a6ebf5100a464c2c..5003d51e37b17f25e7059946cb0a10ab0364258e 100644 (file)
@@ -148,6 +148,7 @@ enum lock_label {
        LISTENER_LOCK,
        LISTENER_QUEUE_LOCK,
        PROXY_LOCK,
+       SERVER_LOCK,
        SIGNALS_LOCK,
        LOCK_LABELS
 };
@@ -233,7 +234,8 @@ static inline void show_lock_stats()
 {
        const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
                                           "TASK_RQ", "TASK_WQ", "POOL",
-                                          "LISTENER", "LISTENER_QUEUE", "PROXY", "SIGNALS" };
+                                          "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER",
+                                          "SIGNALS" };
        int lbl;
 
        for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
index b101b3e2a142e01ff02792ff2c7b574c37b9133c..ff4ec77caa9f5d74b3d753962038d46bb7df8d2e 100644 (file)
@@ -64,10 +64,9 @@ struct server *snr_check_ip_callback(struct server *srv, void *ip, unsigned char
 /* increase the number of cumulated connections on the designated server */
 static void inline srv_inc_sess_ctr(struct server *s)
 {
-       s->counters.cum_sess++;
-       update_freq_ctr(&s->sess_per_sec, 1);
-       if (s->sess_per_sec.curr_ctr > s->counters.sps_max)
-               s->counters.sps_max = s->sess_per_sec.curr_ctr;
+       HA_ATOMIC_ADD(&s->counters.cum_sess, 1);
+       HA_ATOMIC_UPDATE_MAX(&s->counters.sps_max,
+                            update_freq_ctr(&s->sess_per_sec, 1));
 }
 
 /* set the time of last session on the designated server */
index 00f452c163f10571bb1b7df22e3efc58f8d00e63..aae7d345dc1270f3f3007f45c7c86d56311d77c7 100644 (file)
@@ -262,17 +262,23 @@ static void inline stream_inc_http_err_ctr(struct stream *s)
 
 static void inline stream_add_srv_conn(struct stream *sess, struct server *srv)
 {
+       SPIN_LOCK(SERVER_LOCK, &srv->lock);
        sess->srv_conn = srv;
        LIST_ADD(&srv->actconns, &sess->by_srv);
+       SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
 }
 
 static void inline stream_del_srv_conn(struct stream *sess)
 {
-       if (!sess->srv_conn)
+       struct server *srv = sess->srv_conn;
+
+       if (!srv)
                return;
 
+       SPIN_LOCK(SERVER_LOCK, &srv->lock);
        sess->srv_conn = NULL;
        LIST_DEL(&sess->by_srv);
+       SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
 }
 
 static void inline stream_init_srv_conn(struct stream *sess)
index ecf04d7c9efe13d170078cb779053b37be1b8236..31fb76fb7ef0f4d9a711021f1a7d6a8092565b67 100644 (file)
@@ -32,6 +32,8 @@
 
 #include <common/config.h>
 #include <common/mini-clist.h>
+#include <common/hathreads.h>
+
 #include <eb32tree.h>
 
 #include <types/connection.h>
@@ -283,6 +285,10 @@ struct server {
                struct sample_expr *sni;        /* sample expression for SNI */
        } ssl_ctx;
 #endif
+
+#ifdef USE_THREAD
+       HA_SPINLOCK_T lock;
+#endif
        struct {
                const char *file;               /* file where the section appears */
                int line;                       /* line where the section appears */
index 5e51f39eafc85edc23db2c0eec4efb70558d21d9..d17635ff5ff30157596e9d8693a9df9b3b01dc79 100644 (file)
@@ -702,7 +702,7 @@ int assign_server(struct stream *s)
                }
                else if (srv != prev_srv) {
                        HA_ATOMIC_ADD(&s->be->be_counters.cum_lbconn, 1);
-                       srv->counters.cum_lbconn++;
+                       HA_ATOMIC_ADD(&srv->counters.cum_lbconn, 1);
                }
                s->target = &srv->obj_type;
        }
@@ -879,10 +879,10 @@ int assign_server_and_queue(struct stream *s)
                                        s->txn->flags |= TX_CK_DOWN;
                                }
                                s->flags |= SF_REDISP;
-                               prev_srv->counters.redispatches++;
+                               HA_ATOMIC_ADD(&prev_srv->counters.redispatches, 1);
                                HA_ATOMIC_ADD(&s->be->be_counters.redispatches, 1);
                        } else {
-                               prev_srv->counters.retries++;
+                               HA_ATOMIC_ADD(&prev_srv->counters.retries, 1);
                                HA_ATOMIC_ADD(&s->be->be_counters.retries, 1);
                        }
                }
@@ -1197,10 +1197,11 @@ int connect_server(struct stream *s)
        s->si[1].exp = tick_add_ifset(now_ms, s->be->timeout.connect);
 
        if (srv) {
+               int count;
+
                s->flags |= SF_CURR_SESS;
-               srv->cur_sess++;
-               if (srv->cur_sess > srv->counters.cur_sess_max)
-                       srv->counters.cur_sess_max = srv->cur_sess;
+               count = HA_ATOMIC_ADD(&srv->cur_sess, 1);
+               HA_ATOMIC_UPDATE_MAX(&srv->counters.cur_sess_max, count);
                if (s->be->lbprm.server_take_conn)
                        s->be->lbprm.server_take_conn(srv);
 
@@ -1277,7 +1278,7 @@ int srv_redispatch_connect(struct stream *s)
                        s->si[1].err_type = SI_ET_QUEUE_ERR;
                }
 
-               srv->counters.failed_conns++;
+               HA_ATOMIC_ADD(&srv->counters.failed_conns, 1);
                HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
                return 1;
 
@@ -1307,7 +1308,7 @@ int srv_redispatch_connect(struct stream *s)
                if (srv)
                        srv_set_sess_last(srv);
                if (srv)
-                       srv->counters.failed_conns++;
+                       HA_ATOMIC_ADD(&srv->counters.failed_conns, 1);
                HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
 
                /* release other streams waiting for this server */
index 3d60237724923a1f82e5d7a7a8ac8d2001c8b296..ed99bb56ac124208a98b6274df5ef533cd96d2bc 100644 (file)
@@ -242,7 +242,7 @@ static void set_server_check_status(struct check *check, short status, const cha
                if ((!(check->state & CHK_ST_AGENT) ||
                    (check->status >= HCHK_STATUS_L57DATA)) &&
                    (check->health >= check->rise)) {
-                       s->counters.failed_checks++;
+                       HA_ATOMIC_ADD(&s->counters.failed_checks, 1);
                        report = 1;
                        check->health--;
                        if (check->health < check->rise)
@@ -410,7 +410,7 @@ void __health_adjust(struct server *s, short status)
                return;
        }
 
-       s->consecutive_errors++;
+       HA_ATOMIC_ADD(&s->consecutive_errors, 1);
 
        if (s->consecutive_errors < s->consecutive_errors_limit)
                return;
@@ -449,7 +449,7 @@ void __health_adjust(struct server *s, short status)
        }
 
        s->consecutive_errors = 0;
-       s->counters.failed_hana++;
+       HA_ATOMIC_ADD(&s->counters.failed_hana, 1);
 
        if (s->check.fastinter) {
                expire = tick_add(now_ms, MS_TO_TICKS(s->check.fastinter));
index f5eebbe851b9e1ed64ebef942394f62ff3522b16..f905473ee9d81e98f98dd9937973f79ced70a356 100644 (file)
@@ -2080,6 +2080,7 @@ void deinit(void)
                                if (xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->destroy_srv)
                                        xprt_get(XPRT_SSL)->destroy_srv(s);
                        }
+                       SPIN_DESTROY(&s->lock);
                        free(s);
                        s = s_next;
                }/* end while(s) */
index cf1cc7af2bd51b8cd465a79985aef3adad1c429f..f60f8ed1c7b19a3dce2b4771c5c178be594fa29f 100644 (file)
@@ -4277,7 +4277,7 @@ void http_end_txn_clean_session(struct stream *s)
        if (objt_server(s->target)) {
                if (s->flags & SF_CURR_SESS) {
                        s->flags &= ~SF_CURR_SESS;
-                       objt_server(s->target)->cur_sess--;
+                       HA_ATOMIC_SUB(&objt_server(s->target)->cur_sess, 1);
                }
                if (may_dequeue_tasks(objt_server(s->target), be))
                        process_srv_queue(objt_server(s->target));
@@ -4605,7 +4605,7 @@ int http_sync_res_state(struct stream *s)
                        txn->rsp.msg_state = HTTP_MSG_ERROR;
                        HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
                        if (objt_server(s->target))
-                               objt_server(s->target)->counters.cli_aborts++;
+                               HA_ATOMIC_ADD(&objt_server(s->target)->counters.cli_aborts, 1);
                }
                goto wait_other_side;
        }
@@ -4881,7 +4881,7 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit)
                HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
                HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
                if (objt_server(s->target))
-                       objt_server(s->target)->counters.cli_aborts++;
+                       HA_ATOMIC_ADD(&objt_server(s->target)->counters.cli_aborts, 1);
 
                goto return_bad_req_stats_ok;
        }
@@ -4953,7 +4953,7 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit)
        HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
        HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
        if (objt_server(s->target))
-               objt_server(s->target)->counters.srv_aborts++;
+               HA_ATOMIC_ADD(&objt_server(s->target)->counters.srv_aborts, 1);
 
        if (!(s->flags & SF_ERR_MASK))
                s->flags |= SF_ERR_SRVCL;
@@ -5079,7 +5079,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
 
                        HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
                        if (objt_server(s->target)) {
-                               objt_server(s->target)->counters.failed_resp++;
+                               HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1);
                                health_adjust(objt_server(s->target), HANA_STATUS_HTTP_HDRRSP);
                        }
                abort_response:
@@ -5114,7 +5114,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
 
                        HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
                        if (objt_server(s->target)) {
-                               objt_server(s->target)->counters.failed_resp++;
+                               HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1);
                                health_adjust(objt_server(s->target), HANA_STATUS_HTTP_READ_ERROR);
                        }
 
@@ -5139,7 +5139,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
 
                        HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
                        if (objt_server(s->target)) {
-                               objt_server(s->target)->counters.failed_resp++;
+                               HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1);
                                health_adjust(objt_server(s->target), HANA_STATUS_HTTP_READ_TIMEOUT);
                        }
 
@@ -5162,7 +5162,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
                        HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
                        HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
                        if (objt_server(s->target))
-                               objt_server(s->target)->counters.cli_aborts++;
+                               HA_ATOMIC_ADD(&objt_server(s->target)->counters.cli_aborts, 1);
 
                        rep->analysers &= AN_RES_FLT_END;
                        channel_auto_close(rep);
@@ -5189,7 +5189,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
 
                        HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
                        if (objt_server(s->target)) {
-                               objt_server(s->target)->counters.failed_resp++;
+                               HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1);
                                health_adjust(objt_server(s->target), HANA_STATUS_HTTP_BROKEN_PIPE);
                        }
 
@@ -5255,7 +5255,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
                stream_inc_http_err_ctr(s);
 
        if (objt_server(s->target))
-               objt_server(s->target)->counters.p.http.rsp[n]++;
+               HA_ATOMIC_ADD(&objt_server(s->target)->counters.p.http.rsp[n], 1);
 
        /* RFC7230#2.6 has enforced the format of the HTTP version string to be
         * exactly one digit "." one digit. This check may be disabled using
@@ -5673,7 +5673,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
                        if (apply_filters_to_response(s, rep, rule_set) < 0) {
                        return_bad_resp:
                                if (objt_server(s->target)) {
-                                       objt_server(s->target)->counters.failed_resp++;
+                                       HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1);
                                        health_adjust(objt_server(s->target), HANA_STATUS_HTTP_RSP);
                                }
                                HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
@@ -5695,7 +5695,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
                /* has the response been denied ? */
                if (txn->flags & TX_SVDENY) {
                        if (objt_server(s->target))
-                               objt_server(s->target)->counters.failed_secu++;
+                               HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_secu, 1);
 
                        HA_ATOMIC_ADD(&s->be->be_counters.denied_resp, 1);
                        HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_resp, 1);
@@ -5845,7 +5845,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
                 * the 'checkcache' option, and send an alert.
                 */
                if (objt_server(s->target))
-                       objt_server(s->target)->counters.failed_secu++;
+                       HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_secu, 1);
 
                HA_ATOMIC_ADD(&s->be->be_counters.denied_resp, 1);
                HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_resp, 1);
@@ -6038,7 +6038,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit
                                s->flags |= SF_ERR_SRVCL;
                        HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
                        if (objt_server(s->target))
-                               objt_server(s->target)->counters.srv_aborts++;
+                               HA_ATOMIC_ADD(&objt_server(s->target)->counters.srv_aborts, 1);
                        goto return_bad_res_stats_ok;
                }
        }
@@ -6076,7 +6076,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit
  return_bad_res: /* let's centralize all bad responses */
        HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
        if (objt_server(s->target))
-               objt_server(s->target)->counters.failed_resp++;
+               HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1);
 
  return_bad_res_stats_ok:
        txn->rsp.err_state = txn->rsp.msg_state;
@@ -6105,7 +6105,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit
        HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
        HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
        if (objt_server(s->target))
-               objt_server(s->target)->counters.cli_aborts++;
+               HA_ATOMIC_ADD(&objt_server(s->target)->counters.cli_aborts, 1);
 
        if (!(s->flags & SF_ERR_MASK))
                s->flags |= SF_ERR_CLICL;
index ba2a9f7f5826e6e2ecc33f14f4e22e95f1995b06..95b8edafb132fe7702b97e1a6fcd38bc4e956df0 100644 (file)
@@ -122,7 +122,7 @@ static struct stream *pendconn_get_next_strm(struct server *srv, struct proxy *p
        strm->flags |= SF_ASSIGNED;
        strm->target = &srv->obj_type;
        stream_add_srv_conn(strm, srv);
-       srv->served++;
+       HA_ATOMIC_ADD(&srv->served, 1);
        HA_ATOMIC_ADD(&srv->proxy->served, 1);
        if (px->lbprm.server_take_conn)
                px->lbprm.server_take_conn(srv);
index 4c70ac4ad46d84c944da610c3e6382791cf8b544..b6986a9805abb922275ffd38979fdd1dea47f0ac 100644 (file)
@@ -761,9 +761,11 @@ void srv_shutdown_streams(struct server *srv, int why)
 {
        struct stream *stream, *stream_bck;
 
+       SPIN_LOCK(SERVER_LOCK, &srv->lock);
        list_for_each_entry_safe(stream, stream_bck, &srv->actconns, by_srv)
                if (stream->srv_conn == srv)
                        stream_shutdown(stream, why);
+       SPIN_UNLOCK(SERVER_LOCK, &srv->lock);
 }
 
 /* Shutdown all connections of all backup servers of a proxy. The caller must
@@ -1029,7 +1031,6 @@ void srv_clr_admin_flag(struct server *s, enum srv_admin mode)
        /* Register changes to be applied asynchronously */
        if (LIST_ISEMPTY(&s->update_status))
                LIST_ADDQ(&updated_servers, &s->update_status);
-
        /* stop going down if the equivalent flag is still present (forced or inherited) */
        if (((mode & SRV_ADMF_MAINT) && (s->next_admin & SRV_ADMF_MAINT)) ||
            ((mode & SRV_ADMF_DRAIN) && (s->next_admin & SRV_ADMF_DRAIN)))
@@ -2021,6 +2022,7 @@ int parse_server(const char *file, int linenum, char **args, struct proxy *curpr
 
                        /* Copy default server settings to new server settings. */
                        srv_settings_cpy(newsrv, &curproxy->defsrv, 0);
+                       SPIN_INIT(&newsrv->lock);
                        cur_arg++;
                } else {
                        newsrv = &curproxy->defsrv;
index 66e9be45135e90183aaedb9bde298c5558933035..e026f36a55b11dbc62504f82622e59a448327e43 100644 (file)
@@ -2880,9 +2880,11 @@ static int stats_process_http_post(struct stream_interface *si)
                                                if (px->state != PR_STSTOPPED) {
                                                        struct stream *sess, *sess_bck;
 
+                                                       SPIN_LOCK(SERVER_LOCK, &sv->lock);
                                                        list_for_each_entry_safe(sess, sess_bck, &sv->actconns, by_srv)
                                                                if (sess->srv_conn == sv)
                                                                        stream_shutdown(sess, SF_ERR_KILLED);
+                                                       SPIN_UNLOCK(SERVER_LOCK, &sv->lock);
 
                                                        altered_servers++;
                                                        total_servers++;
index 2703f41e414781dfa1686c379ee354cbb0aefb7b..7bfe7864f1ecb47a27caf9926bf6e80575d7919e 100644 (file)
@@ -298,7 +298,7 @@ static void stream_free(struct stream *s)
        if (objt_server(s->target)) { /* there may be requests left pending in queue */
                if (s->flags & SF_CURR_SESS) {
                        s->flags &= ~SF_CURR_SESS;
-                       objt_server(s->target)->cur_sess--;
+                       HA_ATOMIC_SUB(&objt_server(s->target)->cur_sess, 1);
                }
                if (may_dequeue_tasks(objt_server(s->target), s->be))
                        process_srv_queue(objt_server(s->target));
@@ -474,7 +474,7 @@ void stream_process_counters(struct stream *s)
                HA_ATOMIC_ADD(&s->be->be_counters.bytes_in,    bytes);
 
                if (objt_server(s->target))
-                       objt_server(s->target)->counters.bytes_in += bytes;
+                       HA_ATOMIC_ADD(&objt_server(s->target)->counters.bytes_in, bytes);
 
                if (sess->listener && sess->listener->counters)
                        HA_ATOMIC_ADD(&sess->listener->counters->bytes_in, bytes);
@@ -510,7 +510,7 @@ void stream_process_counters(struct stream *s)
                HA_ATOMIC_ADD(&s->be->be_counters.bytes_out,    bytes);
 
                if (objt_server(s->target))
-                       objt_server(s->target)->counters.bytes_out += bytes;
+                       HA_ATOMIC_ADD(&objt_server(s->target)->counters.bytes_out, bytes);
 
                if (sess->listener && sess->listener->counters)
                        HA_ATOMIC_ADD(&sess->listener->counters->bytes_out, bytes);
@@ -630,7 +630,7 @@ static int sess_update_st_cer(struct stream *s)
 
                if (s->flags & SF_CURR_SESS) {
                        s->flags &= ~SF_CURR_SESS;
-                       objt_server(s->target)->cur_sess--;
+                       HA_ATOMIC_SUB(&objt_server(s->target)->cur_sess, 1);
                }
 
                if ((si->flags & SI_FL_ERR) &&
@@ -663,7 +663,7 @@ static int sess_update_st_cer(struct stream *s)
                }
 
                if (objt_server(s->target))
-                       objt_server(s->target)->counters.failed_conns++;
+                       HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_conns, 1);
                HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
                sess_change_server(s, NULL);
                if (may_dequeue_tasks(objt_server(s->target), s->be))
@@ -711,7 +711,7 @@ static int sess_update_st_cer(struct stream *s)
                si->state = SI_ST_REQ;
        } else {
                if (objt_server(s->target))
-                       objt_server(s->target)->counters.retries++;
+                       HA_ATOMIC_ADD(&objt_server(s->target)->counters.retries, 1);
                HA_ATOMIC_ADD(&s->be->be_counters.retries, 1);
                si->state = SI_ST_ASS;
        }
@@ -860,7 +860,7 @@ static void sess_update_stream_int(struct stream *s)
                        if (srv)
                                srv_set_sess_last(srv);
                        if (srv)
-                               srv->counters.failed_conns++;
+                               HA_ATOMIC_ADD(&srv->counters.failed_conns, 1);
                        HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
 
                        /* release other streams waiting for this server */
@@ -916,7 +916,7 @@ static void sess_update_stream_int(struct stream *s)
                        si->exp = TICK_ETERNITY;
                        s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
                        if (srv)
-                               srv->counters.failed_conns++;
+                               HA_ATOMIC_ADD(&srv->counters.failed_conns, 1);
                        HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1);
                        si_shutr(si);
                        si_shutw(si);
@@ -1703,7 +1703,7 @@ struct task *process_stream(struct task *t)
                                HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
                                HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
                                if (srv)
-                                       srv->counters.cli_aborts++;
+                                       HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
                                if (!(s->flags & SF_ERR_MASK))
                                        s->flags |= SF_ERR_CLICL;
                                if (!(s->flags & SF_FINST_MASK))
@@ -1719,12 +1719,12 @@ struct task *process_stream(struct task *t)
                        stream_int_report_error(si_b);
                        HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1);
                        if (srv)
-                               srv->counters.failed_resp++;
+                               HA_ATOMIC_ADD(&srv->counters.failed_resp, 1);
                        if (!(req->analysers) && !(res->analysers)) {
                                HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
                                HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
                                if (srv)
-                                       srv->counters.srv_aborts++;
+                                       HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
                                if (!(s->flags & SF_ERR_MASK))
                                        s->flags |= SF_ERR_SRVCL;
                                if (!(s->flags & SF_FINST_MASK))
@@ -1782,7 +1782,7 @@ struct task *process_stream(struct task *t)
                if (srv) {
                        if (s->flags & SF_CURR_SESS) {
                                s->flags &= ~SF_CURR_SESS;
-                               srv->cur_sess--;
+                               HA_ATOMIC_SUB(&srv->cur_sess, 1);
                        }
                        sess_change_server(s, NULL);
                        if (may_dequeue_tasks(srv, s->be))
@@ -1980,28 +1980,28 @@ struct task *process_stream(struct task *t)
                                HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
                                HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
                                if (srv)
-                                       srv->counters.cli_aborts++;
+                                       HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
                                s->flags |= SF_ERR_CLICL;
                        }
                        else if (req->flags & CF_READ_TIMEOUT) {
                                HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
                                HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
                                if (srv)
-                                       srv->counters.cli_aborts++;
+                                       HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
                                s->flags |= SF_ERR_CLITO;
                        }
                        else if (req->flags & CF_WRITE_ERROR) {
                                HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
                                HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
                                if (srv)
-                                       srv->counters.srv_aborts++;
+                                       HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
                                s->flags |= SF_ERR_SRVCL;
                        }
                        else {
                                HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
                                HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
                                if (srv)
-                                       srv->counters.srv_aborts++;
+                                       HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
                                s->flags |= SF_ERR_SRVTO;
                        }
                        sess_set_term_flags(s);
@@ -2013,28 +2013,28 @@ struct task *process_stream(struct task *t)
                                HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
                                HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
                                if (srv)
-                                       srv->counters.srv_aborts++;
+                                       HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
                                s->flags |= SF_ERR_SRVCL;
                        }
                        else if (res->flags & CF_READ_TIMEOUT) {
                                HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1);
                                HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1);
                                if (srv)
-                                       srv->counters.srv_aborts++;
+                                       HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1);
                                s->flags |= SF_ERR_SRVTO;
                        }
                        else if (res->flags & CF_WRITE_ERROR) {
                                HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
                                HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
                                if (srv)
-                                       srv->counters.cli_aborts++;
+                                       HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
                                s->flags |= SF_ERR_CLICL;
                        }
                        else {
                                HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1);
                                HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1);
                                if (srv)
-                                       srv->counters.cli_aborts++;
+                                       HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1);
                                s->flags |= SF_ERR_CLITO;
                        }
                        sess_set_term_flags(s);
@@ -2532,7 +2532,7 @@ void sess_change_server(struct stream *sess, struct server *newsrv)
                return;
 
        if (sess->srv_conn) {
-               sess->srv_conn->served--;
+               HA_ATOMIC_SUB(&sess->srv_conn->served, 1);
                HA_ATOMIC_SUB(&sess->srv_conn->proxy->served, 1);
                if (sess->srv_conn->proxy->lbprm.server_drop_conn)
                        sess->srv_conn->proxy->lbprm.server_drop_conn(sess->srv_conn);
@@ -2540,7 +2540,7 @@ void sess_change_server(struct stream *sess, struct server *newsrv)
        }
 
        if (newsrv) {
-               newsrv->served++;
+               HA_ATOMIC_ADD(&newsrv->served, 1);
                HA_ATOMIC_ADD(&newsrv->proxy->served, 1);
                if (newsrv->proxy->lbprm.server_take_conn)
                        newsrv->proxy->lbprm.server_take_conn(newsrv);
@@ -3263,9 +3263,11 @@ static int cli_parse_shutdown_sessions_server(char **args, struct appctx *appctx
                return 1;
 
        /* kill all the stream that are on this server */
+       SPIN_LOCK(SERVER_LOCK, &sv->lock);
        list_for_each_entry_safe(strm, strm_bck, &sv->actconns, by_srv)
                if (strm->srv_conn == sv)
                        stream_shutdown(strm, SF_ERR_KILLED);
+       SPIN_UNLOCK(SERVER_LOCK, &sv->lock);
        return 1;
 }