From: Christopher Faulet Date: Thu, 8 Jun 2017 12:04:45 +0000 (+0200) Subject: MEDIUM: threads/server: Add a lock per server and atomically update server vars X-Git-Tag: v1.8-rc1~145 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=29f77e846bd9cad8b9984dc5304e6edcaa4f435d;p=thirdparty%2Fhaproxy.git MEDIUM: threads/server: Add a lock per server and atomically update server vars The server's lock is use, among other things, to lock acces to the active connection list of a server. --- diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 80a4b24431..5003d51e37 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -148,6 +148,7 @@ enum lock_label { LISTENER_LOCK, LISTENER_QUEUE_LOCK, PROXY_LOCK, + SERVER_LOCK, SIGNALS_LOCK, LOCK_LABELS }; @@ -233,7 +234,8 @@ static inline void show_lock_stats() { const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL", "TASK_RQ", "TASK_WQ", "POOL", - "LISTENER", "LISTENER_QUEUE", "PROXY", "SIGNALS" }; + "LISTENER", "LISTENER_QUEUE", "PROXY", "SERVER", + "SIGNALS" }; int lbl; for (lbl = 0; lbl < LOCK_LABELS; lbl++) { diff --git a/include/proto/server.h b/include/proto/server.h index b101b3e2a1..ff4ec77caa 100644 --- a/include/proto/server.h +++ b/include/proto/server.h @@ -64,10 +64,9 @@ struct server *snr_check_ip_callback(struct server *srv, void *ip, unsigned char /* increase the number of cumulated connections on the designated server */ static void inline srv_inc_sess_ctr(struct server *s) { - s->counters.cum_sess++; - update_freq_ctr(&s->sess_per_sec, 1); - if (s->sess_per_sec.curr_ctr > s->counters.sps_max) - s->counters.sps_max = s->sess_per_sec.curr_ctr; + HA_ATOMIC_ADD(&s->counters.cum_sess, 1); + HA_ATOMIC_UPDATE_MAX(&s->counters.sps_max, + update_freq_ctr(&s->sess_per_sec, 1)); } /* set the time of last session on the designated server */ diff --git a/include/proto/stream.h b/include/proto/stream.h index 00f452c163..aae7d345dc 100644 --- a/include/proto/stream.h +++ b/include/proto/stream.h @@ -262,17 +262,23 @@ static void inline stream_inc_http_err_ctr(struct stream *s) static void inline stream_add_srv_conn(struct stream *sess, struct server *srv) { + SPIN_LOCK(SERVER_LOCK, &srv->lock); sess->srv_conn = srv; LIST_ADD(&srv->actconns, &sess->by_srv); + SPIN_UNLOCK(SERVER_LOCK, &srv->lock); } static void inline stream_del_srv_conn(struct stream *sess) { - if (!sess->srv_conn) + struct server *srv = sess->srv_conn; + + if (!srv) return; + SPIN_LOCK(SERVER_LOCK, &srv->lock); sess->srv_conn = NULL; LIST_DEL(&sess->by_srv); + SPIN_UNLOCK(SERVER_LOCK, &srv->lock); } static void inline stream_init_srv_conn(struct stream *sess) diff --git a/include/types/server.h b/include/types/server.h index ecf04d7c9e..31fb76fb7e 100644 --- a/include/types/server.h +++ b/include/types/server.h @@ -32,6 +32,8 @@ #include #include +#include + #include #include @@ -283,6 +285,10 @@ struct server { struct sample_expr *sni; /* sample expression for SNI */ } ssl_ctx; #endif + +#ifdef USE_THREAD + HA_SPINLOCK_T lock; +#endif struct { const char *file; /* file where the section appears */ int line; /* line where the section appears */ diff --git a/src/backend.c b/src/backend.c index 5e51f39eaf..d17635ff5f 100644 --- a/src/backend.c +++ b/src/backend.c @@ -702,7 +702,7 @@ int assign_server(struct stream *s) } else if (srv != prev_srv) { HA_ATOMIC_ADD(&s->be->be_counters.cum_lbconn, 1); - srv->counters.cum_lbconn++; + HA_ATOMIC_ADD(&srv->counters.cum_lbconn, 1); } s->target = &srv->obj_type; } @@ -879,10 +879,10 @@ int assign_server_and_queue(struct stream *s) s->txn->flags |= TX_CK_DOWN; } s->flags |= SF_REDISP; - prev_srv->counters.redispatches++; + HA_ATOMIC_ADD(&prev_srv->counters.redispatches, 1); HA_ATOMIC_ADD(&s->be->be_counters.redispatches, 1); } else { - prev_srv->counters.retries++; + HA_ATOMIC_ADD(&prev_srv->counters.retries, 1); HA_ATOMIC_ADD(&s->be->be_counters.retries, 1); } } @@ -1197,10 +1197,11 @@ int connect_server(struct stream *s) s->si[1].exp = tick_add_ifset(now_ms, s->be->timeout.connect); if (srv) { + int count; + s->flags |= SF_CURR_SESS; - srv->cur_sess++; - if (srv->cur_sess > srv->counters.cur_sess_max) - srv->counters.cur_sess_max = srv->cur_sess; + count = HA_ATOMIC_ADD(&srv->cur_sess, 1); + HA_ATOMIC_UPDATE_MAX(&srv->counters.cur_sess_max, count); if (s->be->lbprm.server_take_conn) s->be->lbprm.server_take_conn(srv); @@ -1277,7 +1278,7 @@ int srv_redispatch_connect(struct stream *s) s->si[1].err_type = SI_ET_QUEUE_ERR; } - srv->counters.failed_conns++; + HA_ATOMIC_ADD(&srv->counters.failed_conns, 1); HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1); return 1; @@ -1307,7 +1308,7 @@ int srv_redispatch_connect(struct stream *s) if (srv) srv_set_sess_last(srv); if (srv) - srv->counters.failed_conns++; + HA_ATOMIC_ADD(&srv->counters.failed_conns, 1); HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1); /* release other streams waiting for this server */ diff --git a/src/checks.c b/src/checks.c index 3d60237724..ed99bb56ac 100644 --- a/src/checks.c +++ b/src/checks.c @@ -242,7 +242,7 @@ static void set_server_check_status(struct check *check, short status, const cha if ((!(check->state & CHK_ST_AGENT) || (check->status >= HCHK_STATUS_L57DATA)) && (check->health >= check->rise)) { - s->counters.failed_checks++; + HA_ATOMIC_ADD(&s->counters.failed_checks, 1); report = 1; check->health--; if (check->health < check->rise) @@ -410,7 +410,7 @@ void __health_adjust(struct server *s, short status) return; } - s->consecutive_errors++; + HA_ATOMIC_ADD(&s->consecutive_errors, 1); if (s->consecutive_errors < s->consecutive_errors_limit) return; @@ -449,7 +449,7 @@ void __health_adjust(struct server *s, short status) } s->consecutive_errors = 0; - s->counters.failed_hana++; + HA_ATOMIC_ADD(&s->counters.failed_hana, 1); if (s->check.fastinter) { expire = tick_add(now_ms, MS_TO_TICKS(s->check.fastinter)); diff --git a/src/haproxy.c b/src/haproxy.c index f5eebbe851..f905473ee9 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -2080,6 +2080,7 @@ void deinit(void) if (xprt_get(XPRT_SSL) && xprt_get(XPRT_SSL)->destroy_srv) xprt_get(XPRT_SSL)->destroy_srv(s); } + SPIN_DESTROY(&s->lock); free(s); s = s_next; }/* end while(s) */ diff --git a/src/proto_http.c b/src/proto_http.c index cf1cc7af2b..f60f8ed1c7 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -4277,7 +4277,7 @@ void http_end_txn_clean_session(struct stream *s) if (objt_server(s->target)) { if (s->flags & SF_CURR_SESS) { s->flags &= ~SF_CURR_SESS; - objt_server(s->target)->cur_sess--; + HA_ATOMIC_SUB(&objt_server(s->target)->cur_sess, 1); } if (may_dequeue_tasks(objt_server(s->target), be)) process_srv_queue(objt_server(s->target)); @@ -4605,7 +4605,7 @@ int http_sync_res_state(struct stream *s) txn->rsp.msg_state = HTTP_MSG_ERROR; HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); if (objt_server(s->target)) - objt_server(s->target)->counters.cli_aborts++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.cli_aborts, 1); } goto wait_other_side; } @@ -4881,7 +4881,7 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit) HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); if (objt_server(s->target)) - objt_server(s->target)->counters.cli_aborts++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.cli_aborts, 1); goto return_bad_req_stats_ok; } @@ -4953,7 +4953,7 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit) HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); if (objt_server(s->target)) - objt_server(s->target)->counters.srv_aborts++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.srv_aborts, 1); if (!(s->flags & SF_ERR_MASK)) s->flags |= SF_ERR_SRVCL; @@ -5079,7 +5079,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (objt_server(s->target)) { - objt_server(s->target)->counters.failed_resp++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1); health_adjust(objt_server(s->target), HANA_STATUS_HTTP_HDRRSP); } abort_response: @@ -5114,7 +5114,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (objt_server(s->target)) { - objt_server(s->target)->counters.failed_resp++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1); health_adjust(objt_server(s->target), HANA_STATUS_HTTP_READ_ERROR); } @@ -5139,7 +5139,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (objt_server(s->target)) { - objt_server(s->target)->counters.failed_resp++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1); health_adjust(objt_server(s->target), HANA_STATUS_HTTP_READ_TIMEOUT); } @@ -5162,7 +5162,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); if (objt_server(s->target)) - objt_server(s->target)->counters.cli_aborts++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.cli_aborts, 1); rep->analysers &= AN_RES_FLT_END; channel_auto_close(rep); @@ -5189,7 +5189,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (objt_server(s->target)) { - objt_server(s->target)->counters.failed_resp++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1); health_adjust(objt_server(s->target), HANA_STATUS_HTTP_BROKEN_PIPE); } @@ -5255,7 +5255,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) stream_inc_http_err_ctr(s); if (objt_server(s->target)) - objt_server(s->target)->counters.p.http.rsp[n]++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.p.http.rsp[n], 1); /* RFC7230#2.6 has enforced the format of the HTTP version string to be * exactly one digit "." one digit. This check may be disabled using @@ -5673,7 +5673,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s if (apply_filters_to_response(s, rep, rule_set) < 0) { return_bad_resp: if (objt_server(s->target)) { - objt_server(s->target)->counters.failed_resp++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1); health_adjust(objt_server(s->target), HANA_STATUS_HTTP_RSP); } HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); @@ -5695,7 +5695,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s /* has the response been denied ? */ if (txn->flags & TX_SVDENY) { if (objt_server(s->target)) - objt_server(s->target)->counters.failed_secu++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_secu, 1); HA_ATOMIC_ADD(&s->be->be_counters.denied_resp, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_resp, 1); @@ -5845,7 +5845,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s * the 'checkcache' option, and send an alert. */ if (objt_server(s->target)) - objt_server(s->target)->counters.failed_secu++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_secu, 1); HA_ATOMIC_ADD(&s->be->be_counters.denied_resp, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_resp, 1); @@ -6038,7 +6038,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit s->flags |= SF_ERR_SRVCL; HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); if (objt_server(s->target)) - objt_server(s->target)->counters.srv_aborts++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.srv_aborts, 1); goto return_bad_res_stats_ok; } } @@ -6076,7 +6076,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit return_bad_res: /* let's centralize all bad responses */ HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (objt_server(s->target)) - objt_server(s->target)->counters.failed_resp++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_resp, 1); return_bad_res_stats_ok: txn->rsp.err_state = txn->rsp.msg_state; @@ -6105,7 +6105,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); if (objt_server(s->target)) - objt_server(s->target)->counters.cli_aborts++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.cli_aborts, 1); if (!(s->flags & SF_ERR_MASK)) s->flags |= SF_ERR_CLICL; diff --git a/src/queue.c b/src/queue.c index ba2a9f7f58..95b8edafb1 100644 --- a/src/queue.c +++ b/src/queue.c @@ -122,7 +122,7 @@ static struct stream *pendconn_get_next_strm(struct server *srv, struct proxy *p strm->flags |= SF_ASSIGNED; strm->target = &srv->obj_type; stream_add_srv_conn(strm, srv); - srv->served++; + HA_ATOMIC_ADD(&srv->served, 1); HA_ATOMIC_ADD(&srv->proxy->served, 1); if (px->lbprm.server_take_conn) px->lbprm.server_take_conn(srv); diff --git a/src/server.c b/src/server.c index 4c70ac4ad4..b6986a9805 100644 --- a/src/server.c +++ b/src/server.c @@ -761,9 +761,11 @@ void srv_shutdown_streams(struct server *srv, int why) { struct stream *stream, *stream_bck; + SPIN_LOCK(SERVER_LOCK, &srv->lock); list_for_each_entry_safe(stream, stream_bck, &srv->actconns, by_srv) if (stream->srv_conn == srv) stream_shutdown(stream, why); + SPIN_UNLOCK(SERVER_LOCK, &srv->lock); } /* Shutdown all connections of all backup servers of a proxy. The caller must @@ -1029,7 +1031,6 @@ void srv_clr_admin_flag(struct server *s, enum srv_admin mode) /* Register changes to be applied asynchronously */ if (LIST_ISEMPTY(&s->update_status)) LIST_ADDQ(&updated_servers, &s->update_status); - /* stop going down if the equivalent flag is still present (forced or inherited) */ if (((mode & SRV_ADMF_MAINT) && (s->next_admin & SRV_ADMF_MAINT)) || ((mode & SRV_ADMF_DRAIN) && (s->next_admin & SRV_ADMF_DRAIN))) @@ -2021,6 +2022,7 @@ int parse_server(const char *file, int linenum, char **args, struct proxy *curpr /* Copy default server settings to new server settings. */ srv_settings_cpy(newsrv, &curproxy->defsrv, 0); + SPIN_INIT(&newsrv->lock); cur_arg++; } else { newsrv = &curproxy->defsrv; diff --git a/src/stats.c b/src/stats.c index 66e9be4513..e026f36a55 100644 --- a/src/stats.c +++ b/src/stats.c @@ -2880,9 +2880,11 @@ static int stats_process_http_post(struct stream_interface *si) if (px->state != PR_STSTOPPED) { struct stream *sess, *sess_bck; + SPIN_LOCK(SERVER_LOCK, &sv->lock); list_for_each_entry_safe(sess, sess_bck, &sv->actconns, by_srv) if (sess->srv_conn == sv) stream_shutdown(sess, SF_ERR_KILLED); + SPIN_UNLOCK(SERVER_LOCK, &sv->lock); altered_servers++; total_servers++; diff --git a/src/stream.c b/src/stream.c index 2703f41e41..7bfe7864f1 100644 --- a/src/stream.c +++ b/src/stream.c @@ -298,7 +298,7 @@ static void stream_free(struct stream *s) if (objt_server(s->target)) { /* there may be requests left pending in queue */ if (s->flags & SF_CURR_SESS) { s->flags &= ~SF_CURR_SESS; - objt_server(s->target)->cur_sess--; + HA_ATOMIC_SUB(&objt_server(s->target)->cur_sess, 1); } if (may_dequeue_tasks(objt_server(s->target), s->be)) process_srv_queue(objt_server(s->target)); @@ -474,7 +474,7 @@ void stream_process_counters(struct stream *s) HA_ATOMIC_ADD(&s->be->be_counters.bytes_in, bytes); if (objt_server(s->target)) - objt_server(s->target)->counters.bytes_in += bytes; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.bytes_in, bytes); if (sess->listener && sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->bytes_in, bytes); @@ -510,7 +510,7 @@ void stream_process_counters(struct stream *s) HA_ATOMIC_ADD(&s->be->be_counters.bytes_out, bytes); if (objt_server(s->target)) - objt_server(s->target)->counters.bytes_out += bytes; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.bytes_out, bytes); if (sess->listener && sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->bytes_out, bytes); @@ -630,7 +630,7 @@ static int sess_update_st_cer(struct stream *s) if (s->flags & SF_CURR_SESS) { s->flags &= ~SF_CURR_SESS; - objt_server(s->target)->cur_sess--; + HA_ATOMIC_SUB(&objt_server(s->target)->cur_sess, 1); } if ((si->flags & SI_FL_ERR) && @@ -663,7 +663,7 @@ static int sess_update_st_cer(struct stream *s) } if (objt_server(s->target)) - objt_server(s->target)->counters.failed_conns++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.failed_conns, 1); HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1); sess_change_server(s, NULL); if (may_dequeue_tasks(objt_server(s->target), s->be)) @@ -711,7 +711,7 @@ static int sess_update_st_cer(struct stream *s) si->state = SI_ST_REQ; } else { if (objt_server(s->target)) - objt_server(s->target)->counters.retries++; + HA_ATOMIC_ADD(&objt_server(s->target)->counters.retries, 1); HA_ATOMIC_ADD(&s->be->be_counters.retries, 1); si->state = SI_ST_ASS; } @@ -860,7 +860,7 @@ static void sess_update_stream_int(struct stream *s) if (srv) srv_set_sess_last(srv); if (srv) - srv->counters.failed_conns++; + HA_ATOMIC_ADD(&srv->counters.failed_conns, 1); HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1); /* release other streams waiting for this server */ @@ -916,7 +916,7 @@ static void sess_update_stream_int(struct stream *s) si->exp = TICK_ETERNITY; s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now); if (srv) - srv->counters.failed_conns++; + HA_ATOMIC_ADD(&srv->counters.failed_conns, 1); HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1); si_shutr(si); si_shutw(si); @@ -1703,7 +1703,7 @@ struct task *process_stream(struct task *t) HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); if (srv) - srv->counters.cli_aborts++; + HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1); if (!(s->flags & SF_ERR_MASK)) s->flags |= SF_ERR_CLICL; if (!(s->flags & SF_FINST_MASK)) @@ -1719,12 +1719,12 @@ struct task *process_stream(struct task *t) stream_int_report_error(si_b); HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (srv) - srv->counters.failed_resp++; + HA_ATOMIC_ADD(&srv->counters.failed_resp, 1); if (!(req->analysers) && !(res->analysers)) { HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); if (srv) - srv->counters.srv_aborts++; + HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1); if (!(s->flags & SF_ERR_MASK)) s->flags |= SF_ERR_SRVCL; if (!(s->flags & SF_FINST_MASK)) @@ -1782,7 +1782,7 @@ struct task *process_stream(struct task *t) if (srv) { if (s->flags & SF_CURR_SESS) { s->flags &= ~SF_CURR_SESS; - srv->cur_sess--; + HA_ATOMIC_SUB(&srv->cur_sess, 1); } sess_change_server(s, NULL); if (may_dequeue_tasks(srv, s->be)) @@ -1980,28 +1980,28 @@ struct task *process_stream(struct task *t) HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); if (srv) - srv->counters.cli_aborts++; + HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1); s->flags |= SF_ERR_CLICL; } else if (req->flags & CF_READ_TIMEOUT) { HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); if (srv) - srv->counters.cli_aborts++; + HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1); s->flags |= SF_ERR_CLITO; } else if (req->flags & CF_WRITE_ERROR) { HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); if (srv) - srv->counters.srv_aborts++; + HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1); s->flags |= SF_ERR_SRVCL; } else { HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); if (srv) - srv->counters.srv_aborts++; + HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1); s->flags |= SF_ERR_SRVTO; } sess_set_term_flags(s); @@ -2013,28 +2013,28 @@ struct task *process_stream(struct task *t) HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); if (srv) - srv->counters.srv_aborts++; + HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1); s->flags |= SF_ERR_SRVCL; } else if (res->flags & CF_READ_TIMEOUT) { HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); if (srv) - srv->counters.srv_aborts++; + HA_ATOMIC_ADD(&srv->counters.srv_aborts, 1); s->flags |= SF_ERR_SRVTO; } else if (res->flags & CF_WRITE_ERROR) { HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); if (srv) - srv->counters.cli_aborts++; + HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1); s->flags |= SF_ERR_CLICL; } else { HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); if (srv) - srv->counters.cli_aborts++; + HA_ATOMIC_ADD(&srv->counters.cli_aborts, 1); s->flags |= SF_ERR_CLITO; } sess_set_term_flags(s); @@ -2532,7 +2532,7 @@ void sess_change_server(struct stream *sess, struct server *newsrv) return; if (sess->srv_conn) { - sess->srv_conn->served--; + HA_ATOMIC_SUB(&sess->srv_conn->served, 1); HA_ATOMIC_SUB(&sess->srv_conn->proxy->served, 1); if (sess->srv_conn->proxy->lbprm.server_drop_conn) sess->srv_conn->proxy->lbprm.server_drop_conn(sess->srv_conn); @@ -2540,7 +2540,7 @@ void sess_change_server(struct stream *sess, struct server *newsrv) } if (newsrv) { - newsrv->served++; + HA_ATOMIC_ADD(&newsrv->served, 1); HA_ATOMIC_ADD(&newsrv->proxy->served, 1); if (newsrv->proxy->lbprm.server_take_conn) newsrv->proxy->lbprm.server_take_conn(newsrv); @@ -3263,9 +3263,11 @@ static int cli_parse_shutdown_sessions_server(char **args, struct appctx *appctx return 1; /* kill all the stream that are on this server */ + SPIN_LOCK(SERVER_LOCK, &sv->lock); list_for_each_entry_safe(strm, strm_bck, &sv->actconns, by_srv) if (strm->srv_conn == sv) stream_shutdown(strm, SF_ERR_KILLED); + SPIN_UNLOCK(SERVER_LOCK, &sv->lock); return 1; }