From 8d8aa0d681c001891839588c0d51fa3cc9f652c7 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Tue, 30 May 2017 15:36:50 +0200 Subject: [PATCH] MEDIUM: threads/listeners: Make listeners thread-safe First, we use atomic operations to update jobs/totalconn/actconn variables, listener's nbconn variable and listener's counters. Then we add a lock on listeners to protect access to their information. And finally, listener queues (global and per proxy) are also protected by a lock. Here, because access to these queues are unusal, we use the same lock for all queues instead of a global one for the global queue and a lock per proxy for others. --- include/common/buffer.h | 1 + include/common/hathreads.h | 4 +- include/proto/proxy.h | 4 +- include/types/listener.h | 6 ++ src/listener.c | 179 +++++++++++++++++++++++++------------ src/peers.c | 7 +- src/proto_http.c | 24 ++--- src/proto_tcp.c | 2 +- src/session.c | 6 +- src/ssl_sock.c | 4 +- src/stream.c | 7 +- src/tcp_rules.c | 8 +- 12 files changed, 165 insertions(+), 87 deletions(-) diff --git a/include/common/buffer.h b/include/common/buffer.h index c34e3ac8c7..17931cf209 100644 --- a/include/common/buffer.h +++ b/include/common/buffer.h @@ -651,6 +651,7 @@ static inline void b_reset(struct buffer *buf) buf->o = 0; buf->i = 0; buf->p = buf->data; + } /* Allocates a buffer and replaces *buf with this buffer. If no memory is diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 53b4e3d7f9..88049687a2 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -145,6 +145,8 @@ enum lock_label { TASK_RQ_LOCK, TASK_WQ_LOCK, POOL_LOCK, + LISTENER_LOCK, + LISTENER_QUEUE_LOCK, SIGNALS_LOCK, LOCK_LABELS }; @@ -230,7 +232,7 @@ static inline void show_lock_stats() { const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL", "TASK_RQ", "TASK_WQ", "POOL", - "SIGNALS" }; + "LISTENER", "LISTENER_QUEUE", "SIGNALS" }; int lbl; for (lbl = 0; lbl < LOCK_LABELS; lbl++) { diff --git a/include/proto/proxy.h b/include/proto/proxy.h index e5d20c4f5d..bfa7e3068e 100644 --- a/include/proto/proxy.h +++ b/include/proto/proxy.h @@ -112,7 +112,7 @@ static void inline proxy_inc_fe_conn_ctr(struct listener *l, struct proxy *fe) { fe->fe_counters.cum_conn++; if (l->counters) - l->counters->cum_conn++; + HA_ATOMIC_ADD(&l->counters->cum_conn, 1); update_freq_ctr(&fe->fe_conn_per_sec, 1); if (fe->fe_conn_per_sec.curr_ctr > fe->fe_counters.cps_max) @@ -124,7 +124,7 @@ static void inline proxy_inc_fe_sess_ctr(struct listener *l, struct proxy *fe) { fe->fe_counters.cum_sess++; if (l->counters) - l->counters->cum_sess++; + HA_ATOMIC_ADD(&l->counters->cum_sess, 1); update_freq_ctr(&fe->fe_sess_per_sec, 1); if (fe->fe_sess_per_sec.curr_ctr > fe->fe_counters.sps_max) fe->fe_counters.sps_max = fe->fe_sess_per_sec.curr_ctr; diff --git a/include/types/listener.h b/include/types/listener.h index bcebea8107..ac39758ca4 100644 --- a/include/types/listener.h +++ b/include/types/listener.h @@ -32,6 +32,8 @@ #include #include +#include + #include #include @@ -198,6 +200,10 @@ struct listener { int tcp_ut; /* for TCP, user timeout */ char *interface; /* interface name or NULL */ +#ifdef USE_THREAD + HA_SPINLOCK_T lock; +#endif + const struct netns_entry *netns; /* network namespace of the listener*/ struct list by_fe; /* chaining in frontend's list of listeners */ diff --git a/src/listener.c b/src/listener.c index a7e2b0d066..9646bbaaf8 100644 --- a/src/listener.c +++ b/src/listener.c @@ -38,6 +38,11 @@ #include #include +#ifdef USE_THREAD + /* listner_queue lock (same for global and per proxy queues) */ +static HA_SPINLOCK_T lq_lock; +#endif + /* List head of all known bind keywords */ static struct bind_kw_list bind_keywords = { .list = LIST_HEAD_INIT(bind_keywords.list) @@ -53,6 +58,7 @@ struct xfer_sock_list *xfer_sock_list = NULL; */ static void enable_listener(struct listener *listener) { + SPIN_LOCK(LISTENER_LOCK, &listener->lock); if (listener->state == LI_LISTEN) { if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) && listener->bind_conf->bind_proc && @@ -75,6 +81,7 @@ static void enable_listener(struct listener *listener) listener->state = LI_FULL; } } + SPIN_UNLOCK(LISTENER_LOCK, &listener->lock); } /* This function removes the specified listener's file descriptor from the @@ -83,13 +90,19 @@ static void enable_listener(struct listener *listener) */ static void disable_listener(struct listener *listener) { + SPIN_LOCK(LISTENER_LOCK, &listener->lock); if (listener->state < LI_READY) - return; + goto end; if (listener->state == LI_READY) fd_stop_recv(listener->fd); - if (listener->state == LI_LIMITED) + if (listener->state == LI_LIMITED) { + SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock); LIST_DEL(&listener->wait_queue); + SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock); + } listener->state = LI_LISTEN; + end: + SPIN_UNLOCK(LISTENER_LOCK, &listener->lock); } /* This function tries to temporarily disable a listener, depending on the OS @@ -101,8 +114,12 @@ static void disable_listener(struct listener *listener) */ int pause_listener(struct listener *l) { + int ret = 1; + + SPIN_LOCK(LISTENER_LOCK, &l->lock); + if (l->state <= LI_ZOMBIE) - return 1; + goto end; if (l->proto->pause) { /* Returns < 0 in case of failure, 0 if the listener @@ -110,18 +127,25 @@ int pause_listener(struct listener *l) */ int ret = l->proto->pause(l); - if (ret < 0) - return 0; + if (ret < 0) { + ret = 0; + goto end; + } else if (ret == 0) - return 1; + goto end; } - if (l->state == LI_LIMITED) + if (l->state == LI_LIMITED) { + SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock); LIST_DEL(&l->wait_queue); + SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock); + } fd_stop_recv(l->fd); l->state = LI_PAUSED; - return 1; + end: + SPIN_UNLOCK(LISTENER_LOCK, &l->lock); + return ret; } /* This function tries to resume a temporarily disabled listener. Paused, full, @@ -134,12 +158,16 @@ int pause_listener(struct listener *l) * stopped it. If the resume fails, 0 is returned and an error might be * displayed. */ -int resume_listener(struct listener *l) +static int __resume_listener(struct listener *l) { + int ret = 1; + + SPIN_LOCK(LISTENER_LOCK, &l->lock); + if ((global.mode & (MODE_DAEMON | MODE_MWORKER)) && l->bind_conf->bind_proc && !(l->bind_conf->bind_proc & (1UL << (relative_pid - 1)))) - return 1; + goto end; if (l->state == LI_ASSIGNED) { char msg[100]; @@ -151,42 +179,66 @@ int resume_listener(struct listener *l) else if (err & ERR_WARN) Warning("Resuming listener: %s\n", msg); - if (err & (ERR_FATAL | ERR_ABORT)) - return 0; + if (err & (ERR_FATAL | ERR_ABORT)) { + ret = 0; + goto end; + } } - if (l->state < LI_PAUSED || l->state == LI_ZOMBIE) - return 0; + if (l->state < LI_PAUSED || l->state == LI_ZOMBIE) { + ret = 0; + goto end; + } if (l->proto->sock_prot == IPPROTO_TCP && l->state == LI_PAUSED && - listen(l->fd, l->backlog ? l->backlog : l->maxconn) != 0) - return 0; + listen(l->fd, l->backlog ? l->backlog : l->maxconn) != 0) { + ret = 0; + goto end; + } if (l->state == LI_READY) - return 1; + goto end; if (l->state == LI_LIMITED) LIST_DEL(&l->wait_queue); if (l->nbconn >= l->maxconn) { l->state = LI_FULL; - return 1; + goto end; } fd_want_recv(l->fd); l->state = LI_READY; - return 1; + end: + SPIN_UNLOCK(LISTENER_LOCK, &l->lock); + return ret; +} + +int resume_listener(struct listener *l) +{ + int ret; + + SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock); + ret = __resume_listener(l); + SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock); + return ret; } /* Marks a ready listener as full so that the stream code tries to re-enable * it upon next close() using resume_listener(). + * + * Note: this function is only called from listener_accept so is already + * locked. */ static void listener_full(struct listener *l) { if (l->state >= LI_READY) { - if (l->state == LI_LIMITED) + if (l->state == LI_LIMITED) { + SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock); LIST_DEL(&l->wait_queue); + SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock); + } fd_stop_recv(l->fd); l->state = LI_FULL; @@ -195,11 +247,16 @@ static void listener_full(struct listener *l) /* Marks a ready listener as limited so that we only try to re-enable it when * resources are free again. It will be queued into the specified queue. + * + * Note: this function is only called from listener_accept so is already + * locked. */ static void limit_listener(struct listener *l, struct list *list) { if (l->state == LI_READY) { + SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock); LIST_ADDQ(list, &l->wait_queue); + SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock); fd_stop_recv(l->fd); l->state = LI_LIMITED; } @@ -239,22 +296,28 @@ void dequeue_all_listeners(struct list *list) { struct listener *listener, *l_back; + SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock); list_for_each_entry_safe(listener, l_back, list, wait_queue) { /* This cannot fail because the listeners are by definition in * the LI_LIMITED state. The function also removes the entry * from the queue. */ - resume_listener(listener); + __resume_listener(listener); } + SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock); } static int do_unbind_listener(struct listener *listener, int do_close) { + SPIN_LOCK(LISTENER_LOCK, &listener->lock); if (listener->state == LI_READY) fd_stop_recv(listener->fd); - if (listener->state == LI_LIMITED) + if (listener->state == LI_LIMITED) { + SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock); LIST_DEL(&listener->wait_queue); + SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock); + } if (listener->state >= LI_PAUSED) { if (do_close) { @@ -265,6 +328,7 @@ static int do_unbind_listener(struct listener *listener, int do_close) fd_remove(listener->fd); listener->state = LI_ASSIGNED; } + SPIN_UNLOCK(LISTENER_LOCK, &listener->lock); return ERR_NONE; } @@ -335,8 +399,9 @@ int create_listeners(struct bind_conf *bc, const struct sockaddr_storage *ss, proto->add(l, port); - jobs++; - listeners++; + SPIN_INIT(&l->lock); + HA_ATOMIC_ADD(&jobs, 1); + HA_ATOMIC_ADD(&listeners, 1); } return 1; } @@ -351,11 +416,14 @@ void delete_listener(struct listener *listener) { if (listener->state != LI_ASSIGNED) return; + + SPIN_LOCK(LISTENER_LOCK, &listener->lock); listener->state = LI_INIT; LIST_DEL(&listener->proto_list); listener->proto->nb_listeners--; - listeners--; - jobs--; + HA_ATOMIC_SUB(&jobs, 1); + HA_ATOMIC_SUB(&listeners, 1); + SPIN_UNLOCK(LISTENER_LOCK, &listener->lock); } /* This function is called on a read event from a listening socket, corresponding @@ -374,9 +442,12 @@ void listener_accept(int fd) static int accept4_broken; #endif + if (SPIN_TRYLOCK(LISTENER_LOCK, &l->lock)) + return; + if (unlikely(l->nbconn >= l->maxconn)) { listener_full(l); - return; + goto end; } if (!(l->options & LI_O_UNLIMITED) && global.sps_lim) { @@ -425,7 +496,7 @@ void listener_accept(int fd) /* frontend accept rate limit was reached */ limit_listener(l, &p->listener_queue); task_schedule(p->task, tick_add(now_ms, next_event_delay(&p->fe_sess_per_sec, p->fe_sps_lim, 0))); - return; + goto end; } if (max_accept > max) @@ -440,16 +511,17 @@ void listener_accept(int fd) while (max_accept--) { struct sockaddr_storage addr; socklen_t laddr = sizeof(addr); + unsigned int count; if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) { limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ - return; + goto end; } if (unlikely(p && p->feconn >= p->maxconn)) { limit_listener(l, &p->listener_queue); - return; + goto end; } #ifdef USE_ACCEPT4 @@ -477,7 +549,7 @@ void listener_accept(int fd) goto transient_error; } fd_cant_recv(fd); - return; /* nothing more to accept */ + goto end; /* nothing more to accept */ case EINVAL: /* might be trying to accept on a shut fd (eg: soft stop) */ goto transient_error; @@ -516,23 +588,19 @@ void listener_accept(int fd) close(cfd); limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ - return; + goto end; } /* increase the per-process number of cumulated connections */ if (!(l->options & LI_O_UNLIMITED)) { - update_freq_ctr(&global.conn_per_sec, 1); - if (global.conn_per_sec.curr_ctr > global.cps_max) - global.cps_max = global.conn_per_sec.curr_ctr; - actconn++; + count = update_freq_ctr(&global.conn_per_sec, 1); + HA_ATOMIC_UPDATE_MAX(&global.cps_max, count); + HA_ATOMIC_ADD(&actconn, 1); } - l->nbconn++; - - if (l->counters) { - if (l->nbconn > l->counters->conn_max) - l->counters->conn_max = l->nbconn; - } + count = HA_ATOMIC_ADD(&l->nbconn, 1); + if (l->counters) + HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, count); ret = l->accept(l, cfd, &addr); if (unlikely(ret <= 0)) { @@ -542,8 +610,8 @@ void listener_accept(int fd) * listener (ret < 0). */ if (!(l->options & LI_O_UNLIMITED)) - actconn--; - l->nbconn--; + HA_ATOMIC_SUB(&actconn, 1); + HA_ATOMIC_SUB(&l->nbconn, 1); if (ret == 0) /* successful termination */ continue; @@ -552,21 +620,18 @@ void listener_accept(int fd) if (l->nbconn >= l->maxconn) { listener_full(l); - return; + goto end; } /* increase the per-process number of cumulated connections */ if (!(l->options & LI_O_UNLIMITED)) { - update_freq_ctr(&global.sess_per_sec, 1); - if (global.sess_per_sec.curr_ctr > global.sps_max) - global.sps_max = global.sess_per_sec.curr_ctr; + count = update_freq_ctr(&global.sess_per_sec, 1); + HA_ATOMIC_UPDATE_MAX(&global.sps_max, count); } #ifdef USE_OPENSSL if (!(l->options & LI_O_UNLIMITED) && l->bind_conf && l->bind_conf->is_ssl) { - - update_freq_ctr(&global.ssl_per_sec, 1); - if (global.ssl_per_sec.curr_ctr > global.ssl_max) - global.ssl_max = global.ssl_per_sec.curr_ctr; + count = update_freq_ctr(&global.ssl_per_sec, 1); + HA_ATOMIC_UPDATE_MAX(&global.ssl_max, count); } #endif @@ -575,7 +640,7 @@ void listener_accept(int fd) /* we've exhausted max_accept, so there is no need to poll again */ stop: fd_done_recv(fd); - return; + goto end; transient_error: /* pause the listener and try again in 100 ms */ @@ -584,7 +649,8 @@ void listener_accept(int fd) wait_expire: limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_first(expire, global_listener_queue_task->expire)); - return; + end: + SPIN_UNLOCK(LISTENER_LOCK, &l->lock); } /* Notify the listener that a connection initiated from it was released. This @@ -596,8 +662,8 @@ void listener_release(struct listener *l) struct proxy *fe = l->bind_conf->frontend; if (!(l->options & LI_O_UNLIMITED)) - actconn--; - l->nbconn--; + HA_ATOMIC_SUB(&actconn, 1); + HA_ATOMIC_SUB(&l->nbconn, 1); if (l->state == LI_FULL) resume_listener(l); @@ -946,6 +1012,7 @@ static void __listener_init(void) sample_register_fetches(&smp_kws); acl_register_keywords(&acl_kws); bind_register_keywords(&bind_kws); + SPIN_INIT(&lq_lock); } /* diff --git a/src/peers.c b/src/peers.c index b98ec61a86..579e096d71 100644 --- a/src/peers.c +++ b/src/peers.c @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -1974,7 +1975,7 @@ static struct task *process_peer_sync(struct task * task) /* We've just recieved the signal */ if (!(peers->flags & PEERS_F_DONOTSTOP)) { /* add DO NOT STOP flag if not present */ - jobs++; + HA_ATOMIC_ADD(&jobs, 1); peers->flags |= PEERS_F_DONOTSTOP; ps = peers->local; for (st = ps->tables; st ; st = st->next) @@ -1994,7 +1995,7 @@ static struct task *process_peer_sync(struct task * task) if (ps->flags & PEER_F_TEACH_COMPLETE) { if (peers->flags & PEERS_F_DONOTSTOP) { /* resync of new process was complete, current process can die now */ - jobs--; + HA_ATOMIC_ADD(&jobs, 1); peers->flags &= ~PEERS_F_DONOTSTOP; for (st = ps->tables; st ; st = st->next) st->table->syncing--; @@ -2018,7 +2019,7 @@ static struct task *process_peer_sync(struct task * task) /* Other error cases */ if (peers->flags & PEERS_F_DONOTSTOP) { /* unable to resync new process, current process can die now */ - jobs--; + HA_ATOMIC_SUB(&jobs, 1); peers->flags &= ~PEERS_F_DONOTSTOP; for (st = ps->tables; st ; st = st->next) st->table->syncing--; diff --git a/src/proto_http.c b/src/proto_http.c index efbbc84bb4..0849f5222f 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -1744,7 +1744,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit) proxy_inc_fe_req_ctr(sess->fe); sess->fe->fe_counters.failed_req++; if (sess->listener->counters) - sess->listener->counters->failed_req++; + HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); if (!(s->flags & SF_FINST_MASK)) s->flags |= SF_FINST_R; @@ -1777,7 +1777,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit) proxy_inc_fe_req_ctr(sess->fe); sess->fe->fe_counters.failed_req++; if (sess->listener->counters) - sess->listener->counters->failed_req++; + HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); if (!(s->flags & SF_FINST_MASK)) s->flags |= SF_FINST_R; @@ -1807,7 +1807,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit) proxy_inc_fe_req_ctr(sess->fe); sess->fe->fe_counters.failed_req++; if (sess->listener->counters) - sess->listener->counters->failed_req++; + HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); if (!(s->flags & SF_FINST_MASK)) s->flags |= SF_FINST_R; @@ -2177,7 +2177,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit) sess->fe->fe_counters.failed_req++; if (sess->listener->counters) - sess->listener->counters->failed_req++; + HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); return_prx_cond: if (!(s->flags & SF_ERR_MASK)) @@ -3545,7 +3545,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s if (sess->fe != s->be) s->be->be_counters.denied_req++; if (sess->listener->counters) - sess->listener->counters->denied_req++; + HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1); goto done_without_exp; deny: /* this request was blocked (denied) */ @@ -3564,7 +3564,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s if (sess->fe != s->be) s->be->be_counters.denied_req++; if (sess->listener->counters) - sess->listener->counters->denied_req++; + HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1); goto return_prx_cond; return_bad_req: @@ -3583,7 +3583,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s sess->fe->fe_counters.failed_req++; if (sess->listener->counters) - sess->listener->counters->failed_req++; + HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); return_prx_cond: if (!(s->flags & SF_ERR_MASK)) @@ -3921,7 +3921,7 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit) sess->fe->fe_counters.failed_req++; if (sess->listener->counters) - sess->listener->counters->failed_req++; + HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); if (!(s->flags & SF_ERR_MASK)) s->flags |= SF_ERR_PRXCOND; @@ -4127,7 +4127,7 @@ int http_wait_for_request_body(struct stream *s, struct channel *req, int an_bit req->analysers &= AN_REQ_FLT_END; sess->fe->fe_counters.failed_req++; if (sess->listener->counters) - sess->listener->counters->failed_req++; + HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); return 0; } @@ -4912,7 +4912,7 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit) return_bad_req: /* let's centralize all bad requests */ sess->fe->fe_counters.failed_req++; if (sess->listener->counters) - sess->listener->counters->failed_req++; + HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); return_bad_req_stats_ok: txn->req.err_state = txn->req.msg_state; @@ -5700,7 +5700,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s s->be->be_counters.denied_resp++; sess->fe->fe_counters.denied_resp++; if (sess->listener->counters) - sess->listener->counters->denied_resp++; + HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1); goto return_srv_prx_502; } @@ -5850,7 +5850,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s s->be->be_counters.denied_resp++; sess->fe->fe_counters.denied_resp++; if (sess->listener->counters) - sess->listener->counters->denied_resp++; + HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1); Alert("Blocking cacheable cookie in response from instance %s, server %s.\n", s->be->id, objt_server(s->target) ? objt_server(s->target)->id : ""); diff --git a/src/proto_tcp.c b/src/proto_tcp.c index 1a26bcc47b..c6e33f23cd 100644 --- a/src/proto_tcp.c +++ b/src/proto_tcp.c @@ -1377,7 +1377,7 @@ static enum act_return tcp_exec_action_silent_drop(struct act_rule *rule, struct sess->fe->fe_counters.denied_req++; if (sess->listener->counters) - sess->listener->counters->denied_req++; + HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1); return ACT_RET_STOP; } diff --git a/src/session.c b/src/session.c index 54a879bcf7..3753a2ca69 100644 --- a/src/session.c +++ b/src/session.c @@ -57,8 +57,8 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type fe->fe_counters.conn_max = fe->feconn; if (li) proxy_inc_fe_conn_ctr(li, fe); - totalconn++; - jobs++; + HA_ATOMIC_ADD(&totalconn, 1); + HA_ATOMIC_ADD(&jobs, 1); } return sess; } @@ -69,7 +69,7 @@ void session_free(struct session *sess) session_store_counters(sess); vars_prune_per_sess(&sess->vars); pool_free2(pool2_session, sess); - jobs--; + HA_ATOMIC_SUB(&jobs, 1); } /* perform minimal intializations, report 0 in case of error, 1 if OK. */ diff --git a/src/ssl_sock.c b/src/ssl_sock.c index ca9647d365..508e9bd446 100644 --- a/src/ssl_sock.c +++ b/src/ssl_sock.c @@ -427,7 +427,7 @@ static void ssl_async_fd_free(int fd) /* Now we can safely call SSL_free, no more pending job in engines */ SSL_free(ssl); sslconns--; - jobs--; + HA_ATOMIC_SUB(&jobs, 1); } /* * function used to manage a returned SSL_ERROR_WANT_ASYNC @@ -5487,7 +5487,7 @@ static void ssl_sock_close(struct connection *conn) { fd_cant_recv(afd); } conn->xprt_ctx = NULL; - jobs++; + HA_ATOMIC_ADD(&jobs, 1); return; } /* Else we can remove the fds from the fdtab diff --git a/src/stream.c b/src/stream.c index 522441fd02..8cb9bfbc2f 100644 --- a/src/stream.c +++ b/src/stream.c @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -477,7 +478,7 @@ void stream_process_counters(struct stream *s) objt_server(s->target)->counters.bytes_in += bytes; if (sess->listener && sess->listener->counters) - sess->listener->counters->bytes_in += bytes; + HA_ATOMIC_ADD(&sess->listener->counters->bytes_in, bytes); for (i = 0; i < MAX_SESS_STKCTR; i++) { struct stkctr *stkctr = &s->stkctr[i]; @@ -514,7 +515,7 @@ void stream_process_counters(struct stream *s) objt_server(s->target)->counters.bytes_out += bytes; if (sess->listener && sess->listener->counters) - sess->listener->counters->bytes_out += bytes; + HA_ATOMIC_ADD(&sess->listener->counters->bytes_out, bytes); for (i = 0; i < MAX_SESS_STKCTR; i++) { struct stkctr *stkctr = &s->stkctr[i]; @@ -986,7 +987,7 @@ static void sess_set_term_flags(struct stream *s) strm_fe(s)->fe_counters.failed_req++; if (strm_li(s) && strm_li(s)->counters) - strm_li(s)->counters->failed_req++; + HA_ATOMIC_ADD(&strm_li(s)->counters->failed_req, 1); s->flags |= SF_FINST_R; } diff --git a/src/tcp_rules.c b/src/tcp_rules.c index bdf97c8a34..68b2c6a846 100644 --- a/src/tcp_rules.c +++ b/src/tcp_rules.c @@ -169,7 +169,7 @@ resume_execution: s->be->be_counters.denied_req++; sess->fe->fe_counters.denied_req++; if (sess->listener && sess->listener->counters) - sess->listener->counters->denied_req++; + HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1); if (!(s->flags & SF_ERR_MASK)) s->flags |= SF_ERR_PRXCOND; @@ -347,7 +347,7 @@ resume_execution: s->be->be_counters.denied_resp++; sess->fe->fe_counters.denied_resp++; if (sess->listener && sess->listener->counters) - sess->listener->counters->denied_resp++; + HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1); if (!(s->flags & SF_ERR_MASK)) s->flags |= SF_ERR_PRXCOND; @@ -429,7 +429,7 @@ int tcp_exec_l4_rules(struct session *sess) else if (rule->action == ACT_ACTION_DENY) { sess->fe->fe_counters.denied_conn++; if (sess->listener && sess->listener->counters) - sess->listener->counters->denied_conn++; + HA_ATOMIC_ADD(&sess->listener->counters->denied_conn, 1); result = 0; break; @@ -516,7 +516,7 @@ int tcp_exec_l5_rules(struct session *sess) else if (rule->action == ACT_ACTION_DENY) { sess->fe->fe_counters.denied_sess++; if (sess->listener && sess->listener->counters) - sess->listener->counters->denied_sess++; + HA_ATOMIC_ADD(&sess->listener->counters->denied_sess, 1); result = 0; break; -- 2.39.5