From: Christopher Faulet Date: Fri, 2 Jun 2017 13:33:24 +0000 (+0200) Subject: MEDIUM: threads/proxy: Add a lock per proxy and atomically update proxy vars X-Git-Tag: v1.8-rc1~147 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ff8abcd31d390c5bc3b72c8acf991144c91c28ef;p=thirdparty%2Fhaproxy.git MEDIUM: threads/proxy: Add a lock per proxy and atomically update proxy vars Now, each proxy contains a lock that must be used when necessary to protect it. Moreover, all proxy's counters are now updated using atomic operations. --- diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 88049687a2..80a4b24431 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -147,6 +147,7 @@ enum lock_label { POOL_LOCK, LISTENER_LOCK, LISTENER_QUEUE_LOCK, + PROXY_LOCK, SIGNALS_LOCK, LOCK_LABELS }; @@ -232,7 +233,7 @@ static inline void show_lock_stats() { const char *labels[LOCK_LABELS] = {"THREAD_SYNC", "FDTAB", "FDCACHE", "FD", "POLL", "TASK_RQ", "TASK_WQ", "POOL", - "LISTENER", "LISTENER_QUEUE", "SIGNALS" }; + "LISTENER", "LISTENER_QUEUE", "PROXY", "SIGNALS" }; int lbl; for (lbl = 0; lbl < LOCK_LABELS; lbl++) { diff --git a/include/proto/proxy.h b/include/proto/proxy.h index bfa7e3068e..cb86159a9b 100644 --- a/include/proto/proxy.h +++ b/include/proto/proxy.h @@ -110,42 +110,38 @@ static inline void proxy_reset_timeouts(struct proxy *proxy) /* increase the number of cumulated connections received on the designated frontend */ static void inline proxy_inc_fe_conn_ctr(struct listener *l, struct proxy *fe) { - fe->fe_counters.cum_conn++; + HA_ATOMIC_ADD(&fe->fe_counters.cum_conn, 1); if (l->counters) HA_ATOMIC_ADD(&l->counters->cum_conn, 1); - - update_freq_ctr(&fe->fe_conn_per_sec, 1); - if (fe->fe_conn_per_sec.curr_ctr > fe->fe_counters.cps_max) - fe->fe_counters.cps_max = fe->fe_conn_per_sec.curr_ctr; + HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.cps_max, + update_freq_ctr(&fe->fe_conn_per_sec, 1)); } /* increase the number of cumulated connections accepted by the designated frontend */ static void inline proxy_inc_fe_sess_ctr(struct listener *l, struct proxy *fe) { - fe->fe_counters.cum_sess++; + + HA_ATOMIC_ADD(&fe->fe_counters.cum_sess, 1); if (l->counters) HA_ATOMIC_ADD(&l->counters->cum_sess, 1); - update_freq_ctr(&fe->fe_sess_per_sec, 1); - if (fe->fe_sess_per_sec.curr_ctr > fe->fe_counters.sps_max) - fe->fe_counters.sps_max = fe->fe_sess_per_sec.curr_ctr; + HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.sps_max, + update_freq_ctr(&fe->fe_sess_per_sec, 1)); } /* increase the number of cumulated connections on the designated backend */ static void inline proxy_inc_be_ctr(struct proxy *be) { - be->be_counters.cum_conn++; - update_freq_ctr(&be->be_sess_per_sec, 1); - if (be->be_sess_per_sec.curr_ctr > be->be_counters.sps_max) - be->be_counters.sps_max = be->be_sess_per_sec.curr_ctr; + HA_ATOMIC_ADD(&be->be_counters.cum_conn, 1); + HA_ATOMIC_UPDATE_MAX(&be->be_counters.sps_max, + update_freq_ctr(&be->be_sess_per_sec, 1)); } /* increase the number of cumulated requests on the designated frontend */ static void inline proxy_inc_fe_req_ctr(struct proxy *fe) { - fe->fe_counters.p.http.cum_req++; - update_freq_ctr(&fe->fe_req_per_sec, 1); - if (fe->fe_req_per_sec.curr_ctr > fe->fe_counters.p.http.rps_max) - fe->fe_counters.p.http.rps_max = fe->fe_req_per_sec.curr_ctr; + HA_ATOMIC_ADD(&fe->fe_counters.p.http.cum_req, 1); + HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.p.http.rps_max, + update_freq_ctr(&fe->fe_req_per_sec, 1)); } #endif /* _PROTO_PROXY_H */ diff --git a/include/types/proxy.h b/include/types/proxy.h index 1130f2fae3..3e0c3eeed4 100644 --- a/include/types/proxy.h +++ b/include/types/proxy.h @@ -32,6 +32,8 @@ #include #include #include +#include + #include #include @@ -439,6 +441,10 @@ struct proxy { * name is used */ struct list filter_configs; /* list of the filters that are declared on this proxy */ + +#ifdef USE_THREAD + HA_SPINLOCK_T lock; +#endif }; struct switching_rule { diff --git a/src/backend.c b/src/backend.c index d0810e2ac9..475efa36d8 100644 --- a/src/backend.c +++ b/src/backend.c @@ -701,7 +701,7 @@ int assign_server(struct stream *s) goto out; } else if (srv != prev_srv) { - s->be->be_counters.cum_lbconn++; + HA_ATOMIC_ADD(&s->be->be_counters.cum_lbconn, 1); srv->counters.cum_lbconn++; } s->target = &srv->obj_type; @@ -880,10 +880,10 @@ int assign_server_and_queue(struct stream *s) } s->flags |= SF_REDISP; prev_srv->counters.redispatches++; - s->be->be_counters.redispatches++; + HA_ATOMIC_ADD(&s->be->be_counters.redispatches, 1); } else { prev_srv->counters.retries++; - s->be->be_counters.retries++; + HA_ATOMIC_ADD(&s->be->be_counters.retries, 1); } } } @@ -1279,7 +1279,7 @@ int srv_redispatch_connect(struct stream *s) } srv->counters.failed_conns++; - s->be->be_counters.failed_conns++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1); return 1; case SRV_STATUS_NOSRV: @@ -1288,7 +1288,7 @@ int srv_redispatch_connect(struct stream *s) s->si[1].err_type = SI_ET_CONN_ERR; } - s->be->be_counters.failed_conns++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1); return 1; case SRV_STATUS_QUEUED: @@ -1309,7 +1309,7 @@ int srv_redispatch_connect(struct stream *s) srv_set_sess_last(srv); if (srv) srv->counters.failed_conns++; - s->be->be_counters.failed_conns++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1); /* release other streams waiting for this server */ if (may_dequeue_tasks(srv, s->be)) @@ -1328,7 +1328,7 @@ int srv_redispatch_connect(struct stream *s) void set_backend_down(struct proxy *be) { be->last_change = now.tv_sec; - be->down_trans++; + HA_ATOMIC_ADD(&be->down_trans, 1); if (!(global.mode & MODE_STARTING)) { Alert("%s '%s' has no server available!\n", proxy_type_str(be), be->id); diff --git a/src/flt_http_comp.c b/src/flt_http_comp.c index 0ed2528958..7a44f2de65 100644 --- a/src/flt_http_comp.c +++ b/src/flt_http_comp.c @@ -332,9 +332,9 @@ comp_http_end(struct stream *s, struct filter *filter, goto end; if (strm_fe(s)->mode == PR_MODE_HTTP) - strm_fe(s)->fe_counters.p.http.comp_rsp++; + HA_ATOMIC_ADD(&strm_fe(s)->fe_counters.p.http.comp_rsp, 1); if ((s->flags & SF_BE_ASSIGNED) && (s->be->mode == PR_MODE_HTTP)) - s->be->be_counters.p.http.comp_rsp++; + HA_ATOMIC_ADD(&s->be->be_counters.p.http.comp_rsp, 1); end: return 1; } @@ -754,11 +754,11 @@ http_compression_buffer_end(struct comp_state *st, struct stream *s, /* update input rate */ if (st->comp_ctx && st->comp_ctx->cur_lvl > 0) { update_freq_ctr(&global.comp_bps_in, st->consumed); - strm_fe(s)->fe_counters.comp_in += st->consumed; - s->be->be_counters.comp_in += st->consumed; + HA_ATOMIC_ADD(&strm_fe(s)->fe_counters.comp_in, st->consumed); + HA_ATOMIC_ADD(&s->be->be_counters.comp_in, st->consumed); } else { - strm_fe(s)->fe_counters.comp_byp += st->consumed; - s->be->be_counters.comp_byp += st->consumed; + HA_ATOMIC_ADD(&strm_fe(s)->fe_counters.comp_byp, st->consumed); + HA_ATOMIC_ADD(&s->be->be_counters.comp_byp, st->consumed); } /* copy the remaining data in the tmp buffer. */ @@ -780,8 +780,8 @@ http_compression_buffer_end(struct comp_state *st, struct stream *s, if (st->comp_ctx && st->comp_ctx->cur_lvl > 0) { update_freq_ctr(&global.comp_bps_out, to_forward); - strm_fe(s)->fe_counters.comp_out += to_forward; - s->be->be_counters.comp_out += to_forward; + HA_ATOMIC_ADD(&strm_fe(s)->fe_counters.comp_out, to_forward); + HA_ATOMIC_ADD(&s->be->be_counters.comp_out, to_forward); } return to_forward; diff --git a/src/haproxy.c b/src/haproxy.c index 81122c839e..f5eebbe851 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -2128,6 +2128,7 @@ void deinit(void) p0 = p; p = p->next; + SPIN_DESTROY(&p0->lock); free(p0); }/* end while(p) */ diff --git a/src/log.c b/src/log.c index aa184aef29..864560b5c4 100644 --- a/src/log.c +++ b/src/log.c @@ -2384,7 +2384,7 @@ void strm_log(struct stream *s) size = build_logline(s, logline, global.max_syslog_len, &sess->fe->logformat); if (size > 0) { - sess->fe->log_count++; + HA_ATOMIC_ADD(&sess->fe->log_count, 1); __send_log(sess->fe, level, logline, size + 1, logline_rfc5424, sd_size); s->logs.logwait = 0; } diff --git a/src/proto_http.c b/src/proto_http.c index 0849f5222f..cfc0e72856 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -1742,7 +1742,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit) req->analysers &= AN_REQ_FLT_END; stream_inc_http_req_ctr(s); proxy_inc_fe_req_ctr(sess->fe); - sess->fe->fe_counters.failed_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.failed_req, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); @@ -1775,7 +1775,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit) stream_inc_http_req_ctr(s); proxy_inc_fe_req_ctr(sess->fe); - sess->fe->fe_counters.failed_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.failed_req, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); @@ -1805,7 +1805,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit) stream_inc_http_err_ctr(s); stream_inc_http_req_ctr(s); proxy_inc_fe_req_ctr(sess->fe); - sess->fe->fe_counters.failed_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.failed_req, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); @@ -1916,7 +1916,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit) struct acl_cond *cond; s->flags |= SF_MONITOR; - sess->fe->fe_counters.intercepted_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.intercepted_req, 1); /* Check if we want to fail this monitor request or not */ list_for_each_entry(cond, &sess->fe->mon_fail_cond, list) { @@ -2175,7 +2175,7 @@ int http_wait_for_request(struct stream *s, struct channel *req, int an_bit) txn->status = 400; http_reply_and_close(s, txn->status, http_error_message(s)); - sess->fe->fe_counters.failed_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.failed_req, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); @@ -3467,7 +3467,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s if (unlikely(objt_applet(s->target) == &http_stats_applet)) { /* process the stats request now */ if (sess->fe == s->be) /* report it if the request was intercepted by the frontend */ - sess->fe->fe_counters.intercepted_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.intercepted_req, 1); if (!(s->flags & SF_ERR_MASK)) // this is not really an error but it is s->flags |= SF_ERR_LOCAL; // to mark that it comes from the proxy @@ -3541,9 +3541,9 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s if (!req->analyse_exp) req->analyse_exp = tick_add(now_ms, 0); stream_inc_http_err_ctr(s); - sess->fe->fe_counters.denied_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_req, 1); if (sess->fe != s->be) - s->be->be_counters.denied_req++; + HA_ATOMIC_ADD(&s->be->be_counters.denied_req, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1); goto done_without_exp; @@ -3560,9 +3560,9 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s s->logs.tv_request = now; http_reply_and_close(s, txn->status, http_error_message(s)); stream_inc_http_err_ctr(s); - sess->fe->fe_counters.denied_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_req, 1); if (sess->fe != s->be) - s->be->be_counters.denied_req++; + HA_ATOMIC_ADD(&s->be->be_counters.denied_req, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1); goto return_prx_cond; @@ -3581,7 +3581,7 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s txn->status = 400; http_reply_and_close(s, txn->status, http_error_message(s)); - sess->fe->fe_counters.failed_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.failed_req, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); @@ -3919,7 +3919,7 @@ int http_process_request(struct stream *s, struct channel *req, int an_bit) req->analysers &= AN_REQ_FLT_END; http_reply_and_close(s, txn->status, http_error_message(s)); - sess->fe->fe_counters.failed_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.failed_req, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); @@ -4125,7 +4125,7 @@ int http_wait_for_request_body(struct stream *s, struct channel *req, int an_bit return_err_msg: req->analysers &= AN_REQ_FLT_END; - sess->fe->fe_counters.failed_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.failed_req, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); return 0; @@ -4216,7 +4216,7 @@ void http_end_txn_clean_session(struct stream *s) } if (s->flags & SF_BE_ASSIGNED) { - be->beconn--; + HA_ATOMIC_SUB(&be->beconn, 1); if (unlikely(s->srv_conn)) sess_change_server(s, NULL); } @@ -4232,12 +4232,12 @@ void http_end_txn_clean_session(struct stream *s) n = 0; if (fe->mode == PR_MODE_HTTP) { - fe->fe_counters.p.http.rsp[n]++; + HA_ATOMIC_ADD(&fe->fe_counters.p.http.rsp[n], 1); } if ((s->flags & SF_BE_ASSIGNED) && (be->mode == PR_MODE_HTTP)) { - be->be_counters.p.http.rsp[n]++; - be->be_counters.p.http.cum_req++; + HA_ATOMIC_ADD(&be->be_counters.p.http.rsp[n], 1); + HA_ATOMIC_ADD(&be->be_counters.p.http.cum_req, 1); } } @@ -4603,7 +4603,7 @@ int http_sync_res_state(struct stream *s) else if (chn->flags & CF_SHUTW) { txn->rsp.err_state = txn->rsp.msg_state; txn->rsp.msg_state = HTTP_MSG_ERROR; - s->be->be_counters.cli_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); if (objt_server(s->target)) objt_server(s->target)->counters.cli_aborts++; } @@ -4878,8 +4878,8 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit) s->flags |= SF_FINST_D; } - sess->fe->fe_counters.cli_aborts++; - s->be->be_counters.cli_aborts++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); + HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); if (objt_server(s->target)) objt_server(s->target)->counters.cli_aborts++; @@ -4910,7 +4910,7 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit) return 0; return_bad_req: /* let's centralize all bad requests */ - sess->fe->fe_counters.failed_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.failed_req, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->failed_req, 1); @@ -4950,8 +4950,8 @@ int http_request_forward_body(struct stream *s, struct channel *req, int an_bit) req->analysers &= AN_REQ_FLT_END; s->res.analysers &= AN_RES_FLT_END; /* we're in data phase, we want to abort both directions */ - sess->fe->fe_counters.srv_aborts++; - s->be->be_counters.srv_aborts++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); + HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); if (objt_server(s->target)) objt_server(s->target)->counters.srv_aborts++; @@ -5077,7 +5077,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) if (msg->msg_state == HTTP_MSG_ERROR || msg->err_pos >= 0) http_capture_bad_message(&s->be->invalid_rep, s, msg, msg->err_state, sess->fe); - s->be->be_counters.failed_resp++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (objt_server(s->target)) { objt_server(s->target)->counters.failed_resp++; health_adjust(objt_server(s->target), HANA_STATUS_HTTP_HDRRSP); @@ -5112,7 +5112,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) else if (txn->flags & TX_NOT_FIRST) goto abort_keep_alive; - s->be->be_counters.failed_resp++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (objt_server(s->target)) { objt_server(s->target)->counters.failed_resp++; health_adjust(objt_server(s->target), HANA_STATUS_HTTP_READ_ERROR); @@ -5137,7 +5137,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) if (msg->err_pos >= 0) http_capture_bad_message(&s->be->invalid_rep, s, msg, msg->err_state, sess->fe); - s->be->be_counters.failed_resp++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (objt_server(s->target)) { objt_server(s->target)->counters.failed_resp++; health_adjust(objt_server(s->target), HANA_STATUS_HTTP_READ_TIMEOUT); @@ -5159,8 +5159,8 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) /* client abort with an abortonclose */ else if ((rep->flags & CF_SHUTR) && ((s->req.flags & (CF_SHUTR|CF_SHUTW)) == (CF_SHUTR|CF_SHUTW))) { - sess->fe->fe_counters.cli_aborts++; - s->be->be_counters.cli_aborts++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); + HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); if (objt_server(s->target)) objt_server(s->target)->counters.cli_aborts++; @@ -5187,7 +5187,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) else if (txn->flags & TX_NOT_FIRST) goto abort_keep_alive; - s->be->be_counters.failed_resp++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (objt_server(s->target)) { objt_server(s->target)->counters.failed_resp++; health_adjust(objt_server(s->target), HANA_STATUS_HTTP_BROKEN_PIPE); @@ -5214,7 +5214,7 @@ int http_wait_for_response(struct stream *s, struct channel *rep, int an_bit) else if (txn->flags & TX_NOT_FIRST) goto abort_keep_alive; - s->be->be_counters.failed_resp++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); rep->analysers &= AN_RES_FLT_END; channel_auto_close(rep); @@ -5676,7 +5676,7 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s objt_server(s->target)->counters.failed_resp++; health_adjust(objt_server(s->target), HANA_STATUS_HTTP_RSP); } - s->be->be_counters.failed_resp++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); return_srv_prx_502: rep->analysers &= AN_RES_FLT_END; txn->status = 502; @@ -5697,8 +5697,8 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s if (objt_server(s->target)) objt_server(s->target)->counters.failed_secu++; - s->be->be_counters.denied_resp++; - sess->fe->fe_counters.denied_resp++; + HA_ATOMIC_ADD(&s->be->be_counters.denied_resp, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_resp, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1); @@ -5847,8 +5847,8 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s if (objt_server(s->target)) objt_server(s->target)->counters.failed_secu++; - s->be->be_counters.denied_resp++; - sess->fe->fe_counters.denied_resp++; + HA_ATOMIC_ADD(&s->be->be_counters.denied_resp, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_resp, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1); @@ -6036,7 +6036,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit if (!buffer_pending(res->buf)) { if (!(s->flags & SF_ERR_MASK)) s->flags |= SF_ERR_SRVCL; - s->be->be_counters.srv_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); if (objt_server(s->target)) objt_server(s->target)->counters.srv_aborts++; goto return_bad_res_stats_ok; @@ -6074,7 +6074,7 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit return 0; return_bad_res: /* let's centralize all bad responses */ - s->be->be_counters.failed_resp++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (objt_server(s->target)) objt_server(s->target)->counters.failed_resp++; @@ -6102,8 +6102,8 @@ int http_response_forward_body(struct stream *s, struct channel *res, int an_bit res->analysers &= AN_RES_FLT_END; s->req.analysers &= AN_REQ_FLT_END; /* we're in data phase, we want to abort both directions */ - sess->fe->fe_counters.cli_aborts++; - s->be->be_counters.cli_aborts++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); + HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); if (objt_server(s->target)) objt_server(s->target)->counters.cli_aborts++; diff --git a/src/proto_tcp.c b/src/proto_tcp.c index c6e33f23cd..200f642cca 100644 --- a/src/proto_tcp.c +++ b/src/proto_tcp.c @@ -1368,14 +1368,14 @@ static enum act_return tcp_exec_action_silent_drop(struct act_rule *rule, struct channel_abort(&strm->res); strm->req.analysers = 0; strm->res.analysers = 0; - strm->be->be_counters.denied_req++; + HA_ATOMIC_ADD(&strm->be->be_counters.denied_req, 1); if (!(strm->flags & SF_ERR_MASK)) strm->flags |= SF_ERR_PRXCOND; if (!(strm->flags & SF_FINST_MASK)) strm->flags |= SF_FINST_R; } - sess->fe->fe_counters.denied_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_req, 1); if (sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1); diff --git a/src/proxy.c b/src/proxy.c index 71091d01df..9169ed9fe0 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -761,6 +761,8 @@ void init_new_proxy(struct proxy *p) /* initial uuid is unassigned (-1) */ p->uuid = -1; + + SPIN_INIT(&p->lock); } /* @@ -1253,9 +1255,8 @@ int stream_set_backend(struct stream *s, struct proxy *be) return 0; s->be = be; - be->beconn++; - if (be->beconn > be->be_counters.conn_max) - be->be_counters.conn_max = be->beconn; + HA_ATOMIC_UPDATE_MAX(&be->be_counters.conn_max, + HA_ATOMIC_ADD(&be->beconn, 1)); proxy_inc_be_ctr(be); /* assign new parameters to the stream from the new backend */ diff --git a/src/queue.c b/src/queue.c index 302667affe..ba2a9f7f58 100644 --- a/src/queue.c +++ b/src/queue.c @@ -123,7 +123,7 @@ static struct stream *pendconn_get_next_strm(struct server *srv, struct proxy *p strm->target = &srv->obj_type; stream_add_srv_conn(strm, srv); srv->served++; - srv->proxy->served++; + HA_ATOMIC_ADD(&srv->proxy->served, 1); if (px->lbprm.server_take_conn) px->lbprm.server_take_conn(srv); @@ -182,8 +182,7 @@ struct pendconn *pendconn_add(struct stream *strm) LIST_ADDQ(&strm->be->pendconns, &p->list); strm->be->nbpend++; strm->logs.prx_queue_size += strm->be->nbpend; - if (strm->be->nbpend > strm->be->be_counters.nbpend_max) - strm->be->be_counters.nbpend_max = strm->be->nbpend; + HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, strm->be->nbpend); } strm->be->totpend++; return p; diff --git a/src/session.c b/src/session.c index 3753a2ca69..3f4887895a 100644 --- a/src/session.c +++ b/src/session.c @@ -52,9 +52,8 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type memset(sess->stkctr, 0, sizeof(sess->stkctr)); vars_init(&sess->vars, SCOPE_SESS); sess->task = NULL; - fe->feconn++; - if (fe->feconn > fe->fe_counters.conn_max) - fe->fe_counters.conn_max = fe->feconn; + HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.conn_max, + HA_ATOMIC_ADD(&fe->feconn, 1)); if (li) proxy_inc_fe_conn_ctr(li, fe); HA_ATOMIC_ADD(&totalconn, 1); @@ -65,7 +64,7 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type void session_free(struct session *sess) { - sess->fe->feconn--; + HA_ATOMIC_SUB(&sess->fe->feconn, 1); session_store_counters(sess); vars_prune_per_sess(&sess->vars); pool_free2(pool2_session, sess); diff --git a/src/stream.c b/src/stream.c index 8cb9bfbc2f..2703f41e41 100644 --- a/src/stream.c +++ b/src/stream.c @@ -470,9 +470,8 @@ void stream_process_counters(struct stream *s) bytes = s->req.total - s->logs.bytes_in; s->logs.bytes_in = s->req.total; if (bytes) { - sess->fe->fe_counters.bytes_in += bytes; - - s->be->be_counters.bytes_in += bytes; + HA_ATOMIC_ADD(&sess->fe->fe_counters.bytes_in, bytes); + HA_ATOMIC_ADD(&s->be->be_counters.bytes_in, bytes); if (objt_server(s->target)) objt_server(s->target)->counters.bytes_in += bytes; @@ -507,9 +506,8 @@ void stream_process_counters(struct stream *s) bytes = s->res.total - s->logs.bytes_out; s->logs.bytes_out = s->res.total; if (bytes) { - sess->fe->fe_counters.bytes_out += bytes; - - s->be->be_counters.bytes_out += bytes; + HA_ATOMIC_ADD(&sess->fe->fe_counters.bytes_out, bytes); + HA_ATOMIC_ADD(&s->be->be_counters.bytes_out, bytes); if (objt_server(s->target)) objt_server(s->target)->counters.bytes_out += bytes; @@ -666,7 +664,7 @@ static int sess_update_st_cer(struct stream *s) if (objt_server(s->target)) objt_server(s->target)->counters.failed_conns++; - s->be->be_counters.failed_conns++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1); sess_change_server(s, NULL); if (may_dequeue_tasks(objt_server(s->target), s->be)) process_srv_queue(objt_server(s->target)); @@ -714,7 +712,7 @@ static int sess_update_st_cer(struct stream *s) } else { if (objt_server(s->target)) objt_server(s->target)->counters.retries++; - s->be->be_counters.retries++; + HA_ATOMIC_ADD(&s->be->be_counters.retries, 1); si->state = SI_ST_ASS; } @@ -863,7 +861,7 @@ static void sess_update_stream_int(struct stream *s) srv_set_sess_last(srv); if (srv) srv->counters.failed_conns++; - s->be->be_counters.failed_conns++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1); /* release other streams waiting for this server */ sess_change_server(s, NULL); @@ -919,7 +917,7 @@ static void sess_update_stream_int(struct stream *s) s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now); if (srv) srv->counters.failed_conns++; - s->be->be_counters.failed_conns++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_conns, 1); si_shutr(si); si_shutw(si); req->flags |= CF_WRITE_TIMEOUT; @@ -985,7 +983,7 @@ static void sess_set_term_flags(struct stream *s) if (!(s->flags & SF_FINST_MASK)) { if (s->si[1].state < SI_ST_REQ) { - strm_fe(s)->fe_counters.failed_req++; + HA_ATOMIC_ADD(&strm_fe(s)->fe_counters.failed_req, 1); if (strm_li(s) && strm_li(s)->counters) HA_ATOMIC_ADD(&strm_li(s)->counters->failed_req, 1); @@ -1136,7 +1134,7 @@ enum act_return process_use_service(struct act_rule *rule, struct proxy *px, appctx_wakeup(appctx); if (sess->fe == s->be) /* report it if the request was intercepted by the frontend */ - sess->fe->fe_counters.intercepted_req++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.intercepted_req, 1); /* The flag SF_ASSIGNED prevent from server assignment. */ s->flags |= SF_ASSIGNED; @@ -1702,8 +1700,8 @@ struct task *process_stream(struct task *t) si_shutw(si_f); stream_int_report_error(si_f); if (!(req->analysers) && !(res->analysers)) { - s->be->be_counters.cli_aborts++; - sess->fe->fe_counters.cli_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); if (srv) srv->counters.cli_aborts++; if (!(s->flags & SF_ERR_MASK)) @@ -1719,12 +1717,12 @@ struct task *process_stream(struct task *t) si_shutr(si_b); si_shutw(si_b); stream_int_report_error(si_b); - s->be->be_counters.failed_resp++; + HA_ATOMIC_ADD(&s->be->be_counters.failed_resp, 1); if (srv) srv->counters.failed_resp++; if (!(req->analysers) && !(res->analysers)) { - s->be->be_counters.srv_aborts++; - sess->fe->fe_counters.srv_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); if (srv) srv->counters.srv_aborts++; if (!(s->flags & SF_ERR_MASK)) @@ -1979,29 +1977,29 @@ struct task *process_stream(struct task *t) /* Report it if the client got an error or a read timeout expired */ req->analysers = 0; if (req->flags & CF_READ_ERROR) { - s->be->be_counters.cli_aborts++; - sess->fe->fe_counters.cli_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); if (srv) srv->counters.cli_aborts++; s->flags |= SF_ERR_CLICL; } else if (req->flags & CF_READ_TIMEOUT) { - s->be->be_counters.cli_aborts++; - sess->fe->fe_counters.cli_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); if (srv) srv->counters.cli_aborts++; s->flags |= SF_ERR_CLITO; } else if (req->flags & CF_WRITE_ERROR) { - s->be->be_counters.srv_aborts++; - sess->fe->fe_counters.srv_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); if (srv) srv->counters.srv_aborts++; s->flags |= SF_ERR_SRVCL; } else { - s->be->be_counters.srv_aborts++; - sess->fe->fe_counters.srv_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); if (srv) srv->counters.srv_aborts++; s->flags |= SF_ERR_SRVTO; @@ -2012,29 +2010,29 @@ struct task *process_stream(struct task *t) /* Report it if the server got an error or a read timeout expired */ res->analysers = 0; if (res->flags & CF_READ_ERROR) { - s->be->be_counters.srv_aborts++; - sess->fe->fe_counters.srv_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); if (srv) srv->counters.srv_aborts++; s->flags |= SF_ERR_SRVCL; } else if (res->flags & CF_READ_TIMEOUT) { - s->be->be_counters.srv_aborts++; - sess->fe->fe_counters.srv_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.srv_aborts, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.srv_aborts, 1); if (srv) srv->counters.srv_aborts++; s->flags |= SF_ERR_SRVTO; } else if (res->flags & CF_WRITE_ERROR) { - s->be->be_counters.cli_aborts++; - sess->fe->fe_counters.cli_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); if (srv) srv->counters.cli_aborts++; s->flags |= SF_ERR_CLICL; } else { - s->be->be_counters.cli_aborts++; - sess->fe->fe_counters.cli_aborts++; + HA_ATOMIC_ADD(&s->be->be_counters.cli_aborts, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.cli_aborts, 1); if (srv) srv->counters.cli_aborts++; s->flags |= SF_ERR_CLITO; @@ -2426,7 +2424,7 @@ struct task *process_stream(struct task *t) } if (s->flags & SF_BE_ASSIGNED) - s->be->beconn--; + HA_ATOMIC_SUB(&s->be->beconn, 1); if (sess->listener) listener_release(sess->listener); @@ -2451,12 +2449,12 @@ struct task *process_stream(struct task *t) n = 0; if (sess->fe->mode == PR_MODE_HTTP) { - sess->fe->fe_counters.p.http.rsp[n]++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.p.http.rsp[n], 1); } if ((s->flags & SF_BE_ASSIGNED) && (s->be->mode == PR_MODE_HTTP)) { - s->be->be_counters.p.http.rsp[n]++; - s->be->be_counters.p.http.cum_req++; + HA_ATOMIC_ADD(&s->be->be_counters.p.http.rsp[n], 1); + HA_ATOMIC_ADD(&s->be->be_counters.p.http.cum_req, 1); } } @@ -2513,10 +2511,12 @@ void stream_update_time_stats(struct stream *s) swrate_add(&srv->counters.d_time, TIME_STATS_SAMPLES, t_data); swrate_add(&srv->counters.t_time, TIME_STATS_SAMPLES, t_close); } + SPIN_LOCK(PROXY_LOCK, &s->be->lock); swrate_add(&s->be->be_counters.q_time, TIME_STATS_SAMPLES, t_queue); swrate_add(&s->be->be_counters.c_time, TIME_STATS_SAMPLES, t_connect); swrate_add(&s->be->be_counters.d_time, TIME_STATS_SAMPLES, t_data); swrate_add(&s->be->be_counters.t_time, TIME_STATS_SAMPLES, t_close); + SPIN_UNLOCK(PROXY_LOCK, &s->be->lock); } /* @@ -2533,7 +2533,7 @@ void sess_change_server(struct stream *sess, struct server *newsrv) if (sess->srv_conn) { sess->srv_conn->served--; - sess->srv_conn->proxy->served--; + HA_ATOMIC_SUB(&sess->srv_conn->proxy->served, 1); if (sess->srv_conn->proxy->lbprm.server_drop_conn) sess->srv_conn->proxy->lbprm.server_drop_conn(sess->srv_conn); stream_del_srv_conn(sess); @@ -2541,7 +2541,7 @@ void sess_change_server(struct stream *sess, struct server *newsrv) if (newsrv) { newsrv->served++; - newsrv->proxy->served++; + HA_ATOMIC_ADD(&newsrv->proxy->served, 1); if (newsrv->proxy->lbprm.server_take_conn) newsrv->proxy->lbprm.server_take_conn(newsrv); stream_add_srv_conn(sess, newsrv); diff --git a/src/tcp_rules.c b/src/tcp_rules.c index 68b2c6a846..c72afb01ac 100644 --- a/src/tcp_rules.c +++ b/src/tcp_rules.c @@ -166,8 +166,8 @@ resume_execution: channel_abort(&s->res); req->analysers = 0; - s->be->be_counters.denied_req++; - sess->fe->fe_counters.denied_req++; + HA_ATOMIC_ADD(&s->be->be_counters.denied_req, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_req, 1); if (sess->listener && sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->denied_req, 1); @@ -344,8 +344,8 @@ resume_execution: channel_abort(&s->req); rep->analysers = 0; - s->be->be_counters.denied_resp++; - sess->fe->fe_counters.denied_resp++; + HA_ATOMIC_ADD(&s->be->be_counters.denied_resp, 1); + HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_resp, 1); if (sess->listener && sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->denied_resp, 1); @@ -427,7 +427,7 @@ int tcp_exec_l4_rules(struct session *sess) break; } else if (rule->action == ACT_ACTION_DENY) { - sess->fe->fe_counters.denied_conn++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_conn, 1); if (sess->listener && sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->denied_conn, 1); @@ -514,7 +514,7 @@ int tcp_exec_l5_rules(struct session *sess) break; } else if (rule->action == ACT_ACTION_DENY) { - sess->fe->fe_counters.denied_sess++; + HA_ATOMIC_ADD(&sess->fe->fe_counters.denied_sess, 1); if (sess->listener && sess->listener->counters) HA_ATOMIC_ADD(&sess->listener->counters->denied_sess, 1);