From 0846638f7fe4f271287ab2b982925e5114e6cfde Mon Sep 17 00:00:00 2001 From: Aurelien DARRAGON Date: Thu, 30 Jan 2025 13:26:42 +0100 Subject: [PATCH] MEDIUM: stream: interrupt costly rulesets after too many evaluations It is not rare to see configurations with a large number of "tcp-request content" or "http-request" rules for instance. A large number of rules combined with cpu-demanding actions (e.g.: actions that work on content) may create thread contention as all the rules from a given ruleset are evaluated under the same polling loop if the evaluation is not interrupted Thus, in this patch we add extra logic around "tcp-request content", "tcp-response content", "http-request" and "http-response" rulesets, so that when a certain number of rules are evaluated under the single polling loop, we force the evaluating function to yield. As such, the rule which was about to be evaluated is saved, and the function starts evaluating rules from the save pointer when it returns (in the next polling loop). We use task_wakeup(task, TASK_WOKEN_MSG) to explicitly wake the task so that no time is wasted and the processing is resumed ASAP. TASK_WOKEN_MSG is mandatory here because process_stream() expects TASK_WOKEN_MSG for explicit analyzers re-evaluation. rules_bcount stream's attribute was added to count how manu rules were evaluated since last interruption (yield). Also, SF_RULE_FYIELD flag was added to know that the s->current_rule was assigned due to forced yield and not regular yield. By default haproxy will enforce a yield every 50 rules, this behavior can be configured using the "tune.max-rules-at-once" global keyword. There is a limitation though: for now, if the ACT_OPT_FINAL flag is set on act_opts, we consider it is not safe to yield (as it is already the case for automatic yield). In this case instead of yielding an taking the risk of not being called back, we skip the yield and hope it will not create contention. This is something we should ideally try to improve in order to yield in all conditions. --- doc/configuration.txt | 23 ++++++++++ include/haproxy/defaults.h | 9 ++++ include/haproxy/global-t.h | 1 + include/haproxy/http_ana-t.h | 1 + include/haproxy/stream-t.h | 6 ++- src/cfgparse-global.c | 12 +++++ src/haproxy.c | 3 ++ src/http_ana.c | 87 +++++++++++++++++++++++++++++------- src/stream.c | 2 + src/tcp_rules.c | 42 +++++++++++++++-- 10 files changed, 165 insertions(+), 21 deletions(-) diff --git a/doc/configuration.txt b/doc/configuration.txt index 2d6ba4b147..7c208c47d1 100644 --- a/doc/configuration.txt +++ b/doc/configuration.txt @@ -1677,6 +1677,7 @@ The following keywords are supported in the "global" section : - tune.maxaccept - tune.maxpollevents - tune.maxrewrite + - tune.max-rules-at-once - tune.memory.hot-size - tune.pattern.cache-size - tune.peers.max-updates-at-once @@ -4083,6 +4084,28 @@ tune.maxrewrite larger than that. This means you don't have to worry about it when changing bufsize. +tune.max-rules-at-once + Sets the maximum number of rules that can be evaluated at once in ruleset + evaluating functions, provided that they support yielding. Indeed, it is + not rare to see configurations with a large number of "tcp-request content" + or "http-request" rules for instance. A large number of rules combined with + cpu-demanding actions (e.g.: actions that work on content) may create thread + contention as all the rules from a given ruleset are evaluated under the same + polling loop if the evaluation is not interrupted. This option ensures that no + more than number of rules may be excecuted under the same polling + loop for content-oriented rulesets (those that already support yielding due + to content inspection). What it does is that it forces the evaluating function + to yield, so that it comes back on the next polling loop to continues the + evaluation. + + Affected rulesets are: + - "tcp-request content" + - "tcp-response content" + - "http-request" + - "http-response" + + The default value is 50. + tune.memory.hot-size Sets the per-thread amount of memory that will be kept hot in the local cache and will never be recoverable by other threads. Access to this memory is very diff --git a/include/haproxy/defaults.h b/include/haproxy/defaults.h index f70bf43096..dbb39f4c3b 100644 --- a/include/haproxy/defaults.h +++ b/include/haproxy/defaults.h @@ -194,6 +194,15 @@ #define MAX_POLL_EVENTS 200 #endif +// the max number of rules evaluated in one call to rule handling function. +// If the function is able to yield, a forced yield will be enforced when +// reaching this value, else the evaluation will continue. Lowering this +// value may help to fight against thread contention with cpu-intensive +// rulesets +#ifndef MAX_RULES_AT_ONCE +#define MAX_RULES_AT_ONCE 50 +#endif + /* eternity when exprimed in timeval */ #ifndef TV_ETERNITY #define TV_ETERNITY (~0UL) diff --git a/include/haproxy/global-t.h b/include/haproxy/global-t.h index 6d4a3986c9..508ef846ad 100644 --- a/include/haproxy/global-t.h +++ b/include/haproxy/global-t.h @@ -161,6 +161,7 @@ struct global { unsigned char cluster_secret[16]; /* 128 bits of an SHA1 digest of a secret defined as ASCII string */ struct { int maxpollevents; /* max number of poll events at once */ + int max_rules_at_once; /* max number of rules excecuted in a single evaluation loop */ int maxaccept; /* max number of consecutive accept() */ int options; /* various tuning options */ int runqueue_depth;/* max number of tasks to run at once */ diff --git a/include/haproxy/http_ana-t.h b/include/haproxy/http_ana-t.h index f4f13d0701..10b8273d14 100644 --- a/include/haproxy/http_ana-t.h +++ b/include/haproxy/http_ana-t.h @@ -190,6 +190,7 @@ enum { enum rule_result { HTTP_RULE_RES_CONT = 0, /* nothing special, continue rules evaluation */ HTTP_RULE_RES_YIELD, /* call me later because some data is missing. */ + HTTP_RULE_RES_FYIELD, /* forced yield, not because of missing data */ HTTP_RULE_RES_STOP, /* stopped processing on an accept */ HTTP_RULE_RES_DENY, /* deny (or tarpit if TX_CLTARPIT) */ HTTP_RULE_RES_ABRT, /* abort request, msg already sent (eg: auth) */ diff --git a/include/haproxy/stream-t.h b/include/haproxy/stream-t.h index 335fed8c94..1f72dc4ed3 100644 --- a/include/haproxy/stream-t.h +++ b/include/haproxy/stream-t.h @@ -89,6 +89,7 @@ #define SF_SRC_ADDR 0x00800000 /* get the source ip/port with getsockname */ #define SF_BC_MARK 0x01000000 /* need to set specific mark on backend/srv conn upon connect */ #define SF_BC_TOS 0x02000000 /* need to set specific tos on backend/srv conn upon connect */ +#define SF_RULE_FYIELD 0x04000000 /* s->current_rule set because of forced yield */ /* This function is used to report flags in debugging tools. Please reflect * below any single-bit flag addition above in the same order via the @@ -103,7 +104,8 @@ static forceinline char *strm_show_flags(char *buf, size_t len, const char *deli _(0); /* flags & enums */ _(SF_IGNORE_PRST, _(SF_SRV_REUSED, _(SF_SRV_REUSED_ANTICIPATED, - _(SF_WEBSOCKET, _(SF_SRC_ADDR, _(SF_BC_MARK, _(SF_BC_TOS))))))); + _(SF_WEBSOCKET, _(SF_SRC_ADDR, _(SF_BC_MARK, _(SF_BC_TOS, + _(SF_RULE_FYIELD)))))))); _e(SF_FINST_MASK, SF_FINST_R, _e(SF_FINST_MASK, SF_FINST_C, _e(SF_FINST_MASK, SF_FINST_H, _e(SF_FINST_MASK, SF_FINST_D, @@ -259,6 +261,8 @@ struct stream { unsigned int conn_exp; /* wake up time for connect, queue, turn-around, ... */ unsigned int conn_err_type; /* first error detected, one of STRM_ET_* */ + uint32_t rules_bcount; /* number of rules evaluated since last yield */ + struct stream *parent; /* Pointer to the parent stream, if any. NULL most of time */ struct list list; /* position in the thread's streams list */ diff --git a/src/cfgparse-global.c b/src/cfgparse-global.c index f050b5b492..4064b2b9c5 100644 --- a/src/cfgparse-global.c +++ b/src/cfgparse-global.c @@ -1106,6 +1106,17 @@ static int cfg_parse_global_tune_opts(char **args, int section_type, return 0; } + else if (strcmp(args[0], "tune.max-rules-at-once") == 0) { + if (*(args[1]) == 0) { + memprintf(err, "'%s' expects a positive numeric value", args[0]); + return -1; + } + global.tune.max_rules_at_once = atoi(args[1]); + if (global.tune.max_rules_at_once < 0) { + memprintf(err, "'%s' expects a positive numeric value", args[0]); + return -1; + } + } else if (strcmp(args[0], "tune.maxaccept") == 0) { long max; @@ -1687,6 +1698,7 @@ static struct cfg_kw_list cfg_kws = {ILH, { { CFG_GLOBAL, "expose-experimental-directives", cfg_parse_global_non_std_directives }, { CFG_GLOBAL, "tune.runqueue-depth", cfg_parse_global_tune_opts }, { CFG_GLOBAL, "tune.maxpollevents", cfg_parse_global_tune_opts }, + { CFG_GLOBAL, "tune.max-rules-at-once", cfg_parse_global_tune_opts }, { CFG_GLOBAL, "tune.maxaccept", cfg_parse_global_tune_opts }, { CFG_GLOBAL, "tune.recv_enough", cfg_parse_global_tune_opts }, { CFG_GLOBAL, "tune.bufsize", cfg_parse_global_tune_opts }, diff --git a/src/haproxy.c b/src/haproxy.c index 22a17adc7c..3f5b9db097 100644 --- a/src/haproxy.c +++ b/src/haproxy.c @@ -2157,6 +2157,9 @@ static void step_init_2(int argc, char** argv) if (global.tune.maxpollevents <= 0) global.tune.maxpollevents = MAX_POLL_EVENTS; + if (global.tune.max_rules_at_once <= 0) + global.tune.max_rules_at_once = MAX_RULES_AT_ONCE; + if (global.tune.runqueue_depth <= 0) { /* tests on various thread counts from 1 to 64 have shown an * optimal queue depth following roughly 1/sqrt(threads). diff --git a/src/http_ana.c b/src/http_ana.c index 33cfbb0ce7..09a53b0f3a 100644 --- a/src/http_ana.c +++ b/src/http_ana.c @@ -404,6 +404,9 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s case HTTP_RULE_RES_YIELD: /* some data miss, call the function later. */ goto return_prx_yield; + case HTTP_RULE_RES_FYIELD: /* we must try again after context-switch */ + goto return_prx_fyield; + case HTTP_RULE_RES_CONT: case HTTP_RULE_RES_STOP: /* nothing to do */ break; @@ -635,6 +638,12 @@ int http_process_req_common(struct stream *s, struct channel *req, int an_bit, s DBG_TRACE_DEVEL("waiting for more data", STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn); return 0; + + return_prx_fyield: + channel_dont_connect(req); + DBG_TRACE_DEVEL("forced yield", + STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn); + return 0; } /* This function performs all the processing enabled for the current request. @@ -1773,6 +1782,9 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s case HTTP_RULE_RES_YIELD: /* some data miss, call the function later. */ goto return_prx_yield; + case HTTP_RULE_RES_FYIELD: /* we must try again after context-switch */ + goto return_prx_fyield; + case HTTP_RULE_RES_CONT: case HTTP_RULE_RES_STOP: /* nothing to do */ break; @@ -2034,6 +2046,13 @@ int http_process_res_common(struct stream *s, struct channel *rep, int an_bit, s DBG_TRACE_DEVEL("waiting for more data", STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn); return 0; + + return_prx_fyield: + channel_dont_close(rep); + DBG_TRACE_DEVEL("forced yield", + STRM_EV_STRM_ANA|STRM_EV_HTTP_ANA, s, txn); + return 0; + } /* This function is an analyser which forwards response body (including chunk @@ -2733,16 +2752,27 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis enum rule_result rule_ret = HTTP_RULE_RES_CONT; int act_opts = 0; + if ((s->scf->flags & SC_FL_ERROR) || + ((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && + (px->options & PR_O_ABRT_CLOSE))) + act_opts |= ACT_OPT_FINAL; + /* If "the current_rule_list" match the executed rule list, we are in * resume condition. If a resume is needed it is always in the action * and never in the ACL or converters. In this case, we initialise the * current rule, and go to the action execution point. */ if (s->current_rule) { + int forced = s->flags & SF_RULE_FYIELD; + rule = s->current_rule; s->current_rule = NULL; - if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules)) + s->flags &= ~SF_RULE_FYIELD; + if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules)) { + if (forced) + goto resume_rule; goto resume_execution; + } } s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules); @@ -2751,6 +2781,18 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis txn->req.flags &= ~HTTP_MSGF_SOFT_RW; list_for_each_entry(rule, s->current_rule_list, list) { + resume_rule: + /* check if budget is exceeded and we need to continue on the next + * polling loop, unless we know that we cannot yield + */ + if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) { + s->current_rule = rule; + s->flags |= SF_RULE_FYIELD; + rule_ret = HTTP_RULE_RES_FYIELD; + task_wakeup(s->task, TASK_WOKEN_MSG); + goto end; + } + /* check optional condition */ if (!acl_match_cond(rule->cond, px, sess, s, SMP_OPT_DIR_REQ|SMP_OPT_FINAL)) continue; @@ -2762,11 +2804,6 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis /* Always call the action function if defined */ if (rule->action_ptr) { - if ((s->scf->flags & SC_FL_ERROR) || - ((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && - (px->options & PR_O_ABRT_CLOSE))) - act_opts |= ACT_OPT_FINAL; - if (!(s->scf->flags & SC_FL_ERROR) & !(s->req.flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { s->waiting_entity.type = STRM_ENTITY_NONE; s->waiting_entity.ptr = NULL; @@ -2876,7 +2913,7 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis end: /* if the ruleset evaluation is finished reset the strict mode */ - if (rule_ret != HTTP_RULE_RES_YIELD) + if (rule_ret != HTTP_RULE_RES_YIELD && rule_ret != HTTP_RULE_RES_FYIELD) txn->req.flags &= ~HTTP_MSGF_SOFT_RW; /* we reached the end of the rules, nothing to report */ @@ -2884,8 +2921,8 @@ static enum rule_result http_req_get_intercept_rule(struct proxy *px, struct lis } /* Executes the http-response rules for stream and proxy . It - * returns one of 5 possible statuses: HTTP_RULE_RES_CONT, HTTP_RULE_RES_STOP, - * HTTP_RULE_RES_DONE, HTTP_RULE_RES_YIELD, or HTTP_RULE_RES_BADREQ. If *CONT + * returns one of 6 possible statuses: HTTP_RULE_RES_CONT, HTTP_RULE_RES_STOP, + * HTTP_RULE_RES_DONE, HTTP_RULE_RES_(F)YIELD, or HTTP_RULE_RES_BADREQ. If *CONT * is returned, the process can continue the evaluation of next rule list. If * *STOP or *DONE is returned, the process must stop the evaluation. If *BADREQ * is returned, it means the operation could not be processed and a server error @@ -2903,16 +2940,27 @@ static enum rule_result http_res_get_intercept_rule(struct proxy *px, struct lis if (final) act_opts |= ACT_OPT_FINAL; + if ((s->scf->flags & SC_FL_ERROR) || + ((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && + (px->options & PR_O_ABRT_CLOSE))) + act_opts |= ACT_OPT_FINAL; + /* If "the current_rule_list" match the executed rule list, we are in * resume condition. If a resume is needed it is always in the action * and never in the ACL or converters. In this case, we initialise the * current rule, and go to the action execution point. */ if (s->current_rule) { + int forced = s->flags & SF_RULE_FYIELD; + rule = s->current_rule; s->current_rule = NULL; - if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules)) + s->flags &= ~SF_RULE_FYIELD; + if (s->current_rule_list == rules || (def_rules && s->current_rule_list == def_rules)) { + if (forced) + goto resume_rule; goto resume_execution; + } } s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules); @@ -2922,6 +2970,18 @@ static enum rule_result http_res_get_intercept_rule(struct proxy *px, struct lis txn->rsp.flags &= ~HTTP_MSGF_SOFT_RW; list_for_each_entry(rule, s->current_rule_list, list) { + resume_rule: + /* check if budget is exceeded and we need to continue on the next + * polling loop, unless we know that we cannot yield + */ + if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) { + s->current_rule = rule; + s->flags |= SF_RULE_FYIELD; + rule_ret = HTTP_RULE_RES_YIELD; + task_wakeup(s->task, TASK_WOKEN_MSG); + goto end; + } + /* check optional condition */ if (!acl_match_cond(rule->cond, px, sess, s, SMP_OPT_DIR_RES|SMP_OPT_FINAL)) continue; @@ -2933,11 +2993,6 @@ resume_execution: /* Always call the action function if defined */ if (rule->action_ptr) { - if ((s->scf->flags & SC_FL_ERROR) || - ((s->scf->flags & (SC_FL_EOS|SC_FL_ABRT_DONE)) && - (px->options & PR_O_ABRT_CLOSE))) - act_opts |= ACT_OPT_FINAL; - if (!(s->scb->flags & SC_FL_ERROR) & !(s->res.flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { s->waiting_entity.type = STRM_ENTITY_NONE; s->waiting_entity.ptr = NULL; @@ -3037,7 +3092,7 @@ resume_execution: end: /* if the ruleset evaluation is finished reset the strict mode */ - if (rule_ret != HTTP_RULE_RES_YIELD) + if (rule_ret != HTTP_RULE_RES_YIELD && rule_ret != HTTP_RULE_RES_FYIELD) txn->rsp.flags &= ~HTTP_MSGF_SOFT_RW; /* we reached the end of the rules, nothing to report */ diff --git a/src/stream.c b/src/stream.c index d0a4a96d46..f673c4f4fb 100644 --- a/src/stream.c +++ b/src/stream.c @@ -2018,6 +2018,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) */ ana_list = ana_back = req->analysers; + s->rules_bcount = 0; while (ana_list && max_loops--) { /* Warning! ensure that analysers are always placed in ascending order! */ ANALYZE (s, req, flt_start_analyze, ana_list, ana_back, AN_REQ_FLT_START_FE); @@ -2100,6 +2101,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) */ ana_list = ana_back = res->analysers; + s->rules_bcount = 0; while (ana_list && max_loops--) { /* Warning! ensure that analysers are always placed in ascending order! */ ANALYZE (s, res, flt_start_analyze, ana_list, ana_back, AN_RES_FLT_START_FE); diff --git a/src/tcp_rules.c b/src/tcp_rules.c index d4c40258bf..c81f271faa 100644 --- a/src/tcp_rules.c +++ b/src/tcp_rules.c @@ -135,22 +135,39 @@ int tcp_inspect_request(struct stream *s, struct channel *req, int an_bit) * current rule, and go to the action execution point. */ if (s->current_rule) { + int forced = s->flags & SF_RULE_FYIELD; + rule = s->current_rule; s->current_rule = NULL; + s->flags &= ~SF_RULE_FYIELD; if (!(req->flags & SC_FL_ERROR) && !(req->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { s->waiting_entity.type = STRM_ENTITY_NONE; s->waiting_entity.ptr = NULL; } - if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) + if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) { + if (forced) + goto resume_rule; goto resume_execution; + } } s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules); restart: list_for_each_entry(rule, s->current_rule_list, list) { - enum acl_test_res ret = ACL_TEST_PASS; + resume_rule: + /* check if budget is exceeded and we need to continue on the next + * polling loop, unless we know that we cannot yield + */ + if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) { + s->current_rule = rule; + s->flags |= SF_RULE_FYIELD; + task_wakeup(s->task, TASK_WOKEN_MSG); + goto missing_data; + } if (rule->cond) { + enum acl_test_res ret = ACL_TEST_PASS; + ret = acl_exec_cond(rule->cond, s->be, sess, s, SMP_OPT_DIR_REQ | partial); if (ret == ACL_TEST_MISS) goto missing_data; @@ -331,22 +348,39 @@ int tcp_inspect_response(struct stream *s, struct channel *rep, int an_bit) * current rule, and go to the action execution point. */ if (s->current_rule) { + int forced = s->flags & SF_RULE_FYIELD; + rule = s->current_rule; s->current_rule = NULL; + s->flags &= ~SF_RULE_FYIELD; if (!(rep->flags & SC_FL_ERROR) && !(rep->flags & (CF_READ_TIMEOUT|CF_WRITE_TIMEOUT))) { s->waiting_entity.type = STRM_ENTITY_NONE; s->waiting_entity.ptr = NULL; } - if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) + if ((def_rules && s->current_rule_list == def_rules) || s->current_rule_list == rules) { + if (forced) + goto resume_rule; goto resume_execution; + } } s->current_rule_list = ((!def_rules || s->current_rule_list == def_rules) ? rules : def_rules); restart: list_for_each_entry(rule, s->current_rule_list, list) { - enum acl_test_res ret = ACL_TEST_PASS; + resume_rule: + /* check if budget is exceeded and we need to continue on the next + * polling loop, unless we know that we cannot yield + */ + if (s->rules_bcount++ >= global.tune.max_rules_at_once && !(act_opts & ACT_OPT_FINAL)) { + s->current_rule = rule; + s->flags |= SF_RULE_FYIELD; + task_wakeup(s->task, TASK_WOKEN_MSG); + goto missing_data; + } if (rule->cond) { + enum acl_test_res ret = ACL_TEST_PASS; + ret = acl_exec_cond(rule->cond, s->be, sess, s, SMP_OPT_DIR_RES | partial); if (ret == ACL_TEST_MISS) goto missing_data; -- 2.47.3