]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: threads/listeners: Make listeners thread-safe
authorChristopher Faulet <cfaulet@haproxy.com>
Tue, 30 May 2017 13:36:50 +0000 (15:36 +0200)
committerWilly Tarreau <w@1wt.eu>
Tue, 31 Oct 2017 12:58:30 +0000 (13:58 +0100)
First, we use atomic operations to update jobs/totalconn/actconn variables,
listener's nbconn variable and listener's counters. Then we add a lock on
listeners to protect access to their information. And finally, listener queues
(global and per proxy) are also protected by a lock. Here, because access to
these queues are unusal, we use the same lock for all queues instead of a global
one for the global queue and a lock per proxy for others.

12 files changed:
include/common/buffer.h
include/common/hathreads.h
include/proto/proxy.h
include/types/listener.h
src/listener.c
src/peers.c
src/proto_http.c
src/proto_tcp.c
src/session.c
src/ssl_sock.c
src/stream.c
src/tcp_rules.c

index c34e3ac8c74605694a0539dfe5a02d2a85b037e4..17931cf20931f163aef4bd7772e5f5caab2a2b0f 100644 (file)
@@ -651,6 +651,7 @@ static inline void b_reset(struct buffer *buf)
        buf->o = 0;
        buf->i = 0;
        buf->p = buf->data;
+
 }
 
 /* Allocates a buffer and replaces *buf with this buffer. If no memory is
index 53b4e3d7f9307ef96f96cfd934f6ef601ed25caa..88049687a263d99d81118a6362cc630e6e59506a 100644 (file)
@@ -145,6 +145,8 @@ enum lock_label {
        TASK_RQ_LOCK,
        TASK_WQ_LOCK,
        POOL_LOCK,
+       LISTENER_LOCK,
+       LISTENER_QUEUE_LOCK,
        SIGNALS_LOCK,
        LOCK_LABELS
 };
@@ -230,7 +232,7 @@ static inline void show_lock_stats()
 {
        const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL",
                                           "TASK_RQ", "TASK_WQ", "POOL",
-                                          "SIGNALS" };
+                                          "LISTENER", "LISTENER_QUEUE", "SIGNALS" };
        int lbl;
 
        for (lbl = 0; lbl < LOCK_LABELS; lbl++) {
index e5d20c4f5d1eeacc125ef0491b87692c8d5cfefb..bfa7e3068e48a8cbedaae6fbabab13ff1c4bc1fb 100644 (file)
@@ -112,7 +112,7 @@ static void inline proxy_inc_fe_conn_ctr(struct listener *l, struct proxy *fe)
 {
        fe->fe_counters.cum_conn++;
        if (l->counters)
-               l->counters->cum_conn++;
+               HA_ATOMIC_ADD(&l->counters->cum_conn, 1);
 
        update_freq_ctr(&fe->fe_conn_per_sec, 1);
        if (fe->fe_conn_per_sec.curr_ctr > fe->fe_counters.cps_max)
@@ -124,7 +124,7 @@ static void inline proxy_inc_fe_sess_ctr(struct listener *l, struct proxy *fe)
 {
        fe->fe_counters.cum_sess++;
        if (l->counters)
-               l->counters->cum_sess++;
+               HA_ATOMIC_ADD(&l->counters->cum_sess, 1);
        update_freq_ctr(&fe->fe_sess_per_sec, 1);
        if (fe->fe_sess_per_sec.curr_ctr > fe->fe_counters.sps_max)
                fe->fe_counters.sps_max = fe->fe_sess_per_sec.curr_ctr;
index bcebea8107094cde577a18c07233ebd014439bf6..ac39758ca4e627a7a7082a4fd9b95891a7844d64 100644 (file)
@@ -32,6 +32,8 @@
 
 #include <common/config.h>
 #include <common/mini-clist.h>
+#include <common/hathreads.h>
+
 #include <types/obj_type.h>
 #include <eb32tree.h>
 
@@ -198,6 +200,10 @@ struct listener {
        int tcp_ut;                     /* for TCP, user timeout */
        char *interface;                /* interface name or NULL */
 
+#ifdef USE_THREAD
+       HA_SPINLOCK_T lock;
+#endif
+
        const struct netns_entry *netns; /* network namespace of the listener*/
 
        struct list by_fe;              /* chaining in frontend's list of listeners */
index a7e2b0d06619de530138324c6676017fc6c335ad..9646bbaaf8ccf367e8c1dbbebda297478d276cb1 100644 (file)
 #include <proto/stream.h>
 #include <proto/task.h>
 
+#ifdef USE_THREAD
+ /* listner_queue lock (same for global and per proxy queues) */
+static HA_SPINLOCK_T lq_lock;
+#endif
+
 /* List head of all known bind keywords */
 static struct bind_kw_list bind_keywords = {
        .list = LIST_HEAD_INIT(bind_keywords.list)
@@ -53,6 +58,7 @@ struct xfer_sock_list *xfer_sock_list = NULL;
  */
 static void enable_listener(struct listener *listener)
 {
+       SPIN_LOCK(LISTENER_LOCK, &listener->lock);
        if (listener->state == LI_LISTEN) {
                if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) &&
                    listener->bind_conf->bind_proc &&
@@ -75,6 +81,7 @@ static void enable_listener(struct listener *listener)
                        listener->state = LI_FULL;
                }
        }
+       SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
 }
 
 /* This function removes the specified listener's file descriptor from the
@@ -83,13 +90,19 @@ static void enable_listener(struct listener *listener)
  */
 static void disable_listener(struct listener *listener)
 {
+       SPIN_LOCK(LISTENER_LOCK, &listener->lock);
        if (listener->state < LI_READY)
-               return;
+               goto end;
        if (listener->state == LI_READY)
                fd_stop_recv(listener->fd);
-       if (listener->state == LI_LIMITED)
+       if (listener->state == LI_LIMITED) {
+               SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
                LIST_DEL(&listener->wait_queue);
+               SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+       }
        listener->state = LI_LISTEN;
+  end:
+       SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
 }
 
 /* This function tries to temporarily disable a listener, depending on the OS
@@ -101,8 +114,12 @@ static void disable_listener(struct listener *listener)
  */
 int pause_listener(struct listener *l)
 {
+       int ret = 1;
+
+       SPIN_LOCK(LISTENER_LOCK, &l->lock);
+
        if (l->state <= LI_ZOMBIE)
-               return 1;
+               goto end;
 
        if (l->proto->pause) {
                /* Returns < 0 in case of failure, 0 if the listener
@@ -110,18 +127,25 @@ int pause_listener(struct listener *l)
                 */
                int ret = l->proto->pause(l);
 
-               if (ret < 0)
-                       return 0;
+               if (ret < 0) {
+                       ret = 0;
+                       goto end;
+               }
                else if (ret == 0)
-                       return 1;
+                       goto end;
        }
 
-       if (l->state == LI_LIMITED)
+       if (l->state == LI_LIMITED) {
+               SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
                LIST_DEL(&l->wait_queue);
+               SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+       }
 
        fd_stop_recv(l->fd);
        l->state = LI_PAUSED;
-       return 1;
+  end:
+       SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
+       return ret;
 }
 
 /* This function tries to resume a temporarily disabled listener. Paused, full,
@@ -134,12 +158,16 @@ int pause_listener(struct listener *l)
  * stopped it. If the resume fails, 0 is returned and an error might be
  * displayed.
  */
-int resume_listener(struct listener *l)
+static int __resume_listener(struct listener *l)
 {
+       int ret = 1;
+
+       SPIN_LOCK(LISTENER_LOCK, &l->lock);
+
        if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) &&
            l->bind_conf->bind_proc &&
            !(l->bind_conf->bind_proc & (1UL << (relative_pid - 1))))
-               return 1;
+               goto end;
 
        if (l->state == LI_ASSIGNED) {
                char msg[100];
@@ -151,42 +179,66 @@ int resume_listener(struct listener *l)
                else if (err & ERR_WARN)
                        Warning("Resuming listener: %s\n", msg);
 
-               if (err & (ERR_FATAL | ERR_ABORT))
-                       return 0;
+               if (err & (ERR_FATAL | ERR_ABORT)) {
+                       ret = 0;
+                       goto end;
+               }
        }
 
-       if (l->state < LI_PAUSED || l->state == LI_ZOMBIE)
-               return 0;
+       if (l->state < LI_PAUSED || l->state == LI_ZOMBIE) {
+               ret = 0;
+               goto end;
+       }
 
        if (l->proto->sock_prot == IPPROTO_TCP &&
            l->state == LI_PAUSED &&
-           listen(l->fd, l->backlog ? l->backlog : l->maxconn) != 0)
-               return 0;
+           listen(l->fd, l->backlog ? l->backlog : l->maxconn) != 0) {
+               ret = 0;
+               goto end;
+       }
 
        if (l->state == LI_READY)
-               return 1;
+               goto end;
 
        if (l->state == LI_LIMITED)
                LIST_DEL(&l->wait_queue);
 
        if (l->nbconn >= l->maxconn) {
                l->state = LI_FULL;
-               return 1;
+               goto end;
        }
 
        fd_want_recv(l->fd);
        l->state = LI_READY;
-       return 1;
+  end:
+       SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
+       return ret;
+}
+
+int resume_listener(struct listener *l)
+{
+       int ret;
+
+       SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+       ret = __resume_listener(l);
+       SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+       return ret;
 }
 
 /* Marks a ready listener as full so that the stream code tries to re-enable
  * it upon next close() using resume_listener().
+ *
+ * Note: this function is only called from listener_accept so <l> is already
+ *       locked.
  */
 static void listener_full(struct listener *l)
 {
        if (l->state >= LI_READY) {
-               if (l->state == LI_LIMITED)
+               if (l->state == LI_LIMITED) {
+                       SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
                        LIST_DEL(&l->wait_queue);
+                       SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+               }
 
                fd_stop_recv(l->fd);
                l->state = LI_FULL;
@@ -195,11 +247,16 @@ static void listener_full(struct listener *l)
 
 /* Marks a ready listener as limited so that we only try to re-enable it when
  * resources are free again. It will be queued into the specified queue.
+ *
+ * Note: this function is only called from listener_accept so <l> is already
+ *       locked.
  */
 static void limit_listener(struct listener *l, struct list *list)
 {
        if (l->state == LI_READY) {
+               SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
                LIST_ADDQ(list, &l->wait_queue);
+               SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
                fd_stop_recv(l->fd);
                l->state = LI_LIMITED;
        }
@@ -239,22 +296,28 @@ void dequeue_all_listeners(struct list *list)
 {
        struct listener *listener, *l_back;
 
+       SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
        list_for_each_entry_safe(listener, l_back, list, wait_queue) {
                /* This cannot fail because the listeners are by definition in
                 * the LI_LIMITED state. The function also removes the entry
                 * from the queue.
                 */
-               resume_listener(listener);
+               __resume_listener(listener);
        }
+       SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
 }
 
 static int do_unbind_listener(struct listener *listener, int do_close)
 {
+       SPIN_LOCK(LISTENER_LOCK, &listener->lock);
        if (listener->state == LI_READY)
                fd_stop_recv(listener->fd);
 
-       if (listener->state == LI_LIMITED)
+       if (listener->state == LI_LIMITED) {
+               SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock);
                LIST_DEL(&listener->wait_queue);
+               SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock);
+       }
 
        if (listener->state >= LI_PAUSED) {
                if (do_close) {
@@ -265,6 +328,7 @@ static int do_unbind_listener(struct listener *listener, int do_close)
                        fd_remove(listener->fd);
                listener->state = LI_ASSIGNED;
        }
+       SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
        return ERR_NONE;
 }
 
@@ -335,8 +399,9 @@ int create_listeners(struct bind_conf *bc, const struct sockaddr_storage *ss,
 
                proto->add(l, port);
 
-               jobs++;
-               listeners++;
+               SPIN_INIT(&l->lock);
+               HA_ATOMIC_ADD(&jobs, 1);
+               HA_ATOMIC_ADD(&listeners, 1);
        }
        return 1;
 }
@@ -351,11 +416,14 @@ void delete_listener(struct listener *listener)
 {
        if (listener->state != LI_ASSIGNED)
                return;
+
+       SPIN_LOCK(LISTENER_LOCK, &listener->lock);
        listener->state = LI_INIT;
        LIST_DEL(&listener->proto_list);
        listener->proto->nb_listeners--;
-       listeners--;
-       jobs--;
+       HA_ATOMIC_SUB(&jobs, 1);
+       HA_ATOMIC_SUB(&listeners, 1);
+       SPIN_UNLOCK(LISTENER_LOCK, &listener->lock);
 }
 
 /* This function is called on a read event from a listening socket, corresponding
@@ -374,9 +442,12 @@ void listener_accept(int fd)
        static int accept4_broken;
 #endif
 
+       if (SPIN_TRYLOCK(LISTENER_LOCK, &l->lock))
+               return;
+
        if (unlikely(l->nbconn >= l->maxconn)) {
                listener_full(l);
-               return;
+               goto end;
        }
 
        if (!(l->options & LI_O_UNLIMITED) && global.sps_lim) {
@@ -425,7 +496,7 @@ void listener_accept(int fd)
                        /* frontend accept rate limit was reached */
                        limit_listener(l, &p->listener_queue);
                        task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0)));
-                       return;
+                       goto end;
                }
 
                if (max_accept > max)
@@ -440,16 +511,17 @@ void listener_accept(int fd)
        while (max_accept--) {
                struct sockaddr_storage addr;
                socklen_t laddr = sizeof(addr);
+               unsigned int count;
 
                if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) {
                        limit_listener(l, &global_listener_queue);
                        task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
-                       return;
+                       goto end;
                }
 
                if (unlikely(p && p->feconn >= p->maxconn)) {
                        limit_listener(l, &p->listener_queue);
-                       return;
+                       goto end;
                }
 
 #ifdef USE_ACCEPT4
@@ -477,7 +549,7 @@ void listener_accept(int fd)
                                        goto transient_error;
                                }
                                fd_cant_recv(fd);
-                               return;   /* nothing more to accept */
+                               goto end; /* nothing more to accept */
                        case EINVAL:
                                /* might be trying to accept on a shut fd (eg: soft stop) */
                                goto transient_error;
@@ -516,23 +588,19 @@ void listener_accept(int fd)
                        close(cfd);
                        limit_listener(l, &global_listener_queue);
                        task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */
-                       return;
+                       goto end;
                }
 
                /* increase the per-process number of cumulated connections */
                if (!(l->options & LI_O_UNLIMITED)) {
-                       update_freq_ctr(&global.conn_per_sec, 1);
-                       if (global.conn_per_sec.curr_ctr > global.cps_max)
-                               global.cps_max = global.conn_per_sec.curr_ctr;
-                       actconn++;
+                       count = update_freq_ctr(&global.conn_per_sec, 1);
+                       HA_ATOMIC_UPDATE_MAX(&global.cps_max, count);
+                       HA_ATOMIC_ADD(&actconn, 1);
                }
 
-               l->nbconn++;
-
-               if (l->counters) {
-                       if (l->nbconn > l->counters->conn_max)
-                               l->counters->conn_max = l->nbconn;
-               }
+               count = HA_ATOMIC_ADD(&l->nbconn, 1);
+               if (l->counters)
+                       HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, count);
 
                ret = l->accept(l, cfd, &addr);
                if (unlikely(ret <= 0)) {
@@ -542,8 +610,8 @@ void listener_accept(int fd)
                         * listener (ret < 0).
                         */
                        if (!(l->options & LI_O_UNLIMITED))
-                               actconn--;
-                       l->nbconn--;
+                               HA_ATOMIC_SUB(&actconn, 1);
+                       HA_ATOMIC_SUB(&l->nbconn, 1);
                        if (ret == 0) /* successful termination */
                                continue;
 
@@ -552,21 +620,18 @@ void listener_accept(int fd)
 
                if (l->nbconn >= l->maxconn) {
                        listener_full(l);
-                       return;
+                       goto end;
                }
 
                /* increase the per-process number of cumulated connections */
                if (!(l->options & LI_O_UNLIMITED)) {
-                       update_freq_ctr(&global.sess_per_sec, 1);
-                       if (global.sess_per_sec.curr_ctr > global.sps_max)
-                               global.sps_max = global.sess_per_sec.curr_ctr;
+                       count = update_freq_ctr(&global.sess_per_sec, 1);
+                       HA_ATOMIC_UPDATE_MAX(&global.sps_max, count);
                }
 #ifdef USE_OPENSSL
                if (!(l->options & LI_O_UNLIMITED) && l->bind_conf && l->bind_conf->is_ssl) {
-
-                       update_freq_ctr(&global.ssl_per_sec, 1);
-                       if (global.ssl_per_sec.curr_ctr > global.ssl_max)
-                               global.ssl_max = global.ssl_per_sec.curr_ctr;
+                       count = update_freq_ctr(&global.ssl_per_sec, 1);
+                       HA_ATOMIC_UPDATE_MAX(&global.ssl_max, count);
                }
 #endif
 
@@ -575,7 +640,7 @@ void listener_accept(int fd)
        /* we've exhausted max_accept, so there is no need to poll again */
  stop:
        fd_done_recv(fd);
-       return;
+       goto end;
 
  transient_error:
        /* pause the listener and try again in 100 ms */
@@ -584,7 +649,8 @@ void listener_accept(int fd)
  wait_expire:
        limit_listener(l, &global_listener_queue);
        task_schedule(global_listener_queue_task, tick_first(expire, global_listener_queue_task->expire));
-       return;
+ end:
+       SPIN_UNLOCK(LISTENER_LOCK, &l->lock);
 }
 
 /* Notify the listener that a connection initiated from it was released. This
@@ -596,8 +662,8 @@ void listener_release(struct listener *l)
        struct proxy *fe = l->bind_conf->frontend;
 
        if (!(l->options & LI_O_UNLIMITED))
-               actconn--;
-       l->nbconn--;
+               HA_ATOMIC_SUB(&actconn, 1);
+       HA_ATOMIC_SUB(&l->nbconn, 1);
        if (l->state == LI_FULL)
                resume_listener(l);
 
@@ -946,6 +1012,7 @@ static void __listener_init(void)
        sample_register_fetches(&smp_kws);
        acl_register_keywords(&acl_kws);
        bind_register_keywords(&bind_kws);
+       SPIN_INIT(&lq_lock);
 }
 
 /*
index b98ec61a8607dab60e8153b47d735b94cb80621f..579e096d71ebea62bc911c77ca34c1493b97551e 100644 (file)
@@ -24,6 +24,7 @@
 #include <common/config.h>
 #include <common/time.h>
 #include <common/standard.h>
+#include <common/hathreads.h>
 
 #include <types/global.h>
 #include <types/listener.h>
@@ -1974,7 +1975,7 @@ static struct task *process_peer_sync(struct task * task)
                        /* We've just recieved the signal */
                        if (!(peers->flags & PEERS_F_DONOTSTOP)) {
                                /* add DO NOT STOP flag if not present */
-                               jobs++;
+                               HA_ATOMIC_ADD(&jobs, 1);
                                peers->flags |= PEERS_F_DONOTSTOP;
                                ps = peers->local;
                                for (st = ps->tables; st ; st = st->next)
@@ -1994,7 +1995,7 @@ static struct task *process_peer_sync(struct task * task)
                if (ps->flags & PEER_F_TEACH_COMPLETE) {
                        if (peers->flags & PEERS_F_DONOTSTOP) {
                                /* resync of new process was complete, current process can die now */
-                               jobs--;
+                               HA_ATOMIC_ADD(&jobs, 1);
                                peers->flags &= ~PEERS_F_DONOTSTOP;
                                for (st = ps->tables; st ; st = st->next)
                                        st->table->syncing--;
@@ -2018,7 +2019,7 @@ static struct task *process_peer_sync(struct task * task)
                                /* Other error cases */
                                if (peers->flags & PEERS_F_DONOTSTOP) {
                                        /* unable to resync new process, current process can die now */
-                                       jobs--;
+                                       HA_ATOMIC_SUB(&jobs, 1);
                                        peers->flags &= ~PEERS_F_DONOTSTOP;
                                        for (st = ps->tables; st ; st = st->next)
                                                st->table->syncing--;
index efbbc84bb46eef78689490f48c71101296f2d8a9..0849f5222ff52349f5a912ba063553182404fd3b 100644 (file)
@@ -1744,7 +1744,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
                        proxy_inc_fe_req_ctr(sess->fe);
                        sess->fe->fe_counters.failed_req++;
                        if (sess->listener->counters)
-                               sess->listener->counters->failed_req++;
+                               HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
                        if (!(s->flags & SF_FINST_MASK))
                                s->flags |= SF_FINST_R;
@@ -1777,7 +1777,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
                        proxy_inc_fe_req_ctr(sess->fe);
                        sess->fe->fe_counters.failed_req++;
                        if (sess->listener->counters)
-                               sess->listener->counters->failed_req++;
+                               HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
                        if (!(s->flags & SF_FINST_MASK))
                                s->flags |= SF_FINST_R;
@@ -1807,7 +1807,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
                        proxy_inc_fe_req_ctr(sess->fe);
                        sess->fe->fe_counters.failed_req++;
                        if (sess->listener->counters)
-                               sess->listener->counters->failed_req++;
+                               HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
                        if (!(s->flags & SF_FINST_MASK))
                                s->flags |= SF_FINST_R;
@@ -2177,7 +2177,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit)
 
        sess->fe->fe_counters.failed_req++;
        if (sess->listener->counters)
-               sess->listener->counters->failed_req++;
+               HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
  return_prx_cond:
        if (!(s->flags & SF_ERR_MASK))
@@ -3545,7 +3545,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
        if (sess->fe != s->be)
                s->be->be_counters.denied_req++;
        if (sess->listener->counters)
-               sess->listener->counters->denied_req++;
+               HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
        goto done_without_exp;
 
  deny: /* this request was blocked (denied) */
@@ -3564,7 +3564,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
        if (sess->fe != s->be)
                s->be->be_counters.denied_req++;
        if (sess->listener->counters)
-               sess->listener->counters->denied_req++;
+               HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
        goto return_prx_cond;
 
  return_bad_req:
@@ -3583,7 +3583,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s
 
        sess->fe->fe_counters.failed_req++;
        if (sess->listener->counters)
-               sess->listener->counters->failed_req++;
+               HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
  return_prx_cond:
        if (!(s->flags & SF_ERR_MASK))
@@ -3921,7 +3921,7 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit)
 
        sess->fe->fe_counters.failed_req++;
        if (sess->listener->counters)
-               sess->listener->counters->failed_req++;
+               HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
        if (!(s->flags & SF_ERR_MASK))
                s->flags |= SF_ERR_PRXCOND;
@@ -4127,7 +4127,7 @@ int http_wait_for_request_body(struct stream *s, struct channel *req, int an_bit
        req->analysers &= AN_REQ_FLT_END;
        sess->fe->fe_counters.failed_req++;
        if (sess->listener->counters)
-               sess->listener->counters->failed_req++;
+               HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
        return 0;
 }
 
@@ -4912,7 +4912,7 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit)
  return_bad_req: /* let's centralize all bad requests */
        sess->fe->fe_counters.failed_req++;
        if (sess->listener->counters)
-               sess->listener->counters->failed_req++;
+               HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1);
 
  return_bad_req_stats_ok:
        txn->req.err_state = txn->req.msg_state;
@@ -5700,7 +5700,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
                        s->be->be_counters.denied_resp++;
                        sess->fe->fe_counters.denied_resp++;
                        if (sess->listener->counters)
-                               sess->listener->counters->denied_resp++;
+                               HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
 
                        goto return_srv_prx_502;
                }
@@ -5850,7 +5850,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s
                s->be->be_counters.denied_resp++;
                sess->fe->fe_counters.denied_resp++;
                if (sess->listener->counters)
-                       sess->listener->counters->denied_resp++;
+                       HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
 
                Alert("Blocking cacheable cookie in response from instance %s, server %s.\n",
                      s->be->id, objt_server(s->target) ? objt_server(s->target)->id : "<dispatch>");
index 1a26bcc47b74e80f18794c169b34a695de8a9cbd..c6e33f23cdf009cda3070b354bc1e2b1d24505db 100644 (file)
@@ -1377,7 +1377,7 @@ static enum act_return tcp_exec_action_silent_drop(struct act_rule *rule, struct
 
        sess->fe->fe_counters.denied_req++;
        if (sess->listener->counters)
-               sess->listener->counters->denied_req++;
+               HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
 
        return ACT_RET_STOP;
 }
index 54a879bcf77daf8a6edda8a3d8c38997c89c437a..3753a2ca6915643e499b9517a38ffc5eb237a2ef 100644 (file)
@@ -57,8 +57,8 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type
                        fe->fe_counters.conn_max = fe->feconn;
                if (li)
                        proxy_inc_fe_conn_ctr(li, fe);
-               totalconn++;
-               jobs++;
+               HA_ATOMIC_ADD(&totalconn, 1);
+               HA_ATOMIC_ADD(&jobs, 1);
        }
        return sess;
 }
@@ -69,7 +69,7 @@ void session_free(struct session *sess)
        session_store_counters(sess);
        vars_prune_per_sess(&sess->vars);
        pool_free2(pool2_session, sess);
-       jobs--;
+       HA_ATOMIC_SUB(&jobs, 1);
 }
 
 /* perform minimal intializations, report 0 in case of error, 1 if OK. */
index ca9647d365213c0b42c8d3face1f21ecf4f4f15a..508e9bd446fc10fa9ea4071387171403cf52683b 100644 (file)
@@ -427,7 +427,7 @@ static void ssl_async_fd_free(int fd)
        /* Now we can safely call SSL_free, no more pending job in engines */
        SSL_free(ssl);
        sslconns--;
-       jobs--;
+       HA_ATOMIC_SUB(&jobs, 1);
 }
 /*
  * function used to manage a returned SSL_ERROR_WANT_ASYNC
@@ -5487,7 +5487,7 @@ static void ssl_sock_close(struct connection *conn) {
                                        fd_cant_recv(afd);
                                }
                                conn->xprt_ctx = NULL;
-                               jobs++;
+                               HA_ATOMIC_ADD(&jobs, 1);
                                return;
                        }
                        /* Else we can remove the fds from the fdtab
index 522441fd029483c8bd9b5c222696d8148445a839..8cb9bfbc2f9f0e936195e31773b9e36ac4fb4e2c 100644 (file)
@@ -19,6 +19,7 @@
 #include <common/buffer.h>
 #include <common/debug.h>
 #include <common/memory.h>
+#include <common/hathreads.h>
 
 #include <types/applet.h>
 #include <types/capture.h>
@@ -477,7 +478,7 @@ void stream_process_counters(struct stream *s)
                        objt_server(s->target)->counters.bytes_in += bytes;
 
                if (sess->listener && sess->listener->counters)
-                       sess->listener->counters->bytes_in += bytes;
+                       HA_ATOMIC_ADD(&sess->listener->counters->bytes_in, bytes);
 
                for (i = 0; i < MAX_SESS_STKCTR; i++) {
                        struct stkctr *stkctr = &s->stkctr[i];
@@ -514,7 +515,7 @@ void stream_process_counters(struct stream *s)
                        objt_server(s->target)->counters.bytes_out += bytes;
 
                if (sess->listener && sess->listener->counters)
-                       sess->listener->counters->bytes_out += bytes;
+                       HA_ATOMIC_ADD(&sess->listener->counters->bytes_out, bytes);
 
                for (i = 0; i < MAX_SESS_STKCTR; i++) {
                        struct stkctr *stkctr = &s->stkctr[i];
@@ -986,7 +987,7 @@ static void sess_set_term_flags(struct stream *s)
 
                        strm_fe(s)->fe_counters.failed_req++;
                        if (strm_li(s) && strm_li(s)->counters)
-                               strm_li(s)->counters->failed_req++;
+                               HA_ATOMIC_ADD(&strm_li(s)->counters->failed_req, 1);
 
                        s->flags |= SF_FINST_R;
                }
index bdf97c8a340b3f8320f6f1319341a277eb6730e3..68b2c6a84696ffafc28fceb0a6133cea24934111 100644 (file)
@@ -169,7 +169,7 @@ resume_execution:
                                s->be->be_counters.denied_req++;
                                sess->fe->fe_counters.denied_req++;
                                if (sess->listener && sess->listener->counters)
-                                       sess->listener->counters->denied_req++;
+                                       HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1);
 
                                if (!(s->flags & SF_ERR_MASK))
                                        s->flags |= SF_ERR_PRXCOND;
@@ -347,7 +347,7 @@ resume_execution:
                                s->be->be_counters.denied_resp++;
                                sess->fe->fe_counters.denied_resp++;
                                if (sess->listener && sess->listener->counters)
-                                       sess->listener->counters->denied_resp++;
+                                       HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1);
 
                                if (!(s->flags & SF_ERR_MASK))
                                        s->flags |= SF_ERR_PRXCOND;
@@ -429,7 +429,7 @@ int tcp_exec_l4_rules(struct session *sess)
                        else if (rule->action == ACT_ACTION_DENY) {
                                sess->fe->fe_counters.denied_conn++;
                                if (sess->listener && sess->listener->counters)
-                                       sess->listener->counters->denied_conn++;
+                                       HA_ATOMIC_ADD(&sess->listener->counters->denied_conn, 1);
 
                                result = 0;
                                break;
@@ -516,7 +516,7 @@ int tcp_exec_l5_rules(struct session *sess)
                        else if (rule->action == ACT_ACTION_DENY) {
                                sess->fe->fe_counters.denied_sess++;
                                if (sess->listener && sess->listener->counters)
-                                       sess->listener->counters->denied_sess++;
+                                       HA_ATOMIC_ADD(&sess->listener->counters->denied_sess, 1);
 
                                result = 0;
                                break;