From beeabf531468bc2f40845a83e76726130dc9f98e Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Fri, 1 Oct 2021 18:23:30 +0200 Subject: [PATCH] MINOR: task: provide 3 task_new_* wrappers to simplify the API We'll need to improve the API to pass other arguments in the future, so let's start to adapt better to the current use cases. task_new() is used: - 18 times as task_new(tid_bit) - 18 times as task_new(MAX_THREADS_MASK) - 2 times with a single bit (in a loop) - 1 in the debug code that uses a mask This patch provides 3 new functions to achieve this: - task_new_here() to create a task on the calling thread - task_new_anywhere() to create a task to be run anywhere - task_new_on() to create a task to run on a specific thread The change is trivial and will allow us to later concentrate the required adaptations to these 3 functions only. It's still possible to call task_new() if needed but a comment was added to encourage the use of the new ones instead. The debug code was not changed and still uses it. --- include/haproxy/applet.h | 2 +- include/haproxy/task.h | 32 ++++++++++++++++++++++++++++++-- src/cfgparse.c | 6 +++--- src/check.c | 9 +++++---- src/connection.c | 2 +- src/dns.c | 8 ++++---- src/flt_spoe.c | 2 +- src/hlua.c | 10 +++++----- src/listener.c | 2 +- src/mailers.c | 2 +- src/mux_fcgi.c | 4 ++-- src/mux_h1.c | 4 ++-- src/mux_h2.c | 4 ++-- src/mux_quic.c | 4 ++-- src/peers.c | 2 +- src/proxy.c | 4 ++-- src/resolvers.c | 4 ++-- src/server.c | 2 +- src/session.c | 2 +- src/sink.c | 2 +- src/stick_table.c | 2 +- src/stream.c | 2 +- src/xprt_quic.c | 2 +- 23 files changed, 71 insertions(+), 42 deletions(-) diff --git a/include/haproxy/applet.h b/include/haproxy/applet.h index 717e017c5e..97b9c347b4 100644 --- a/include/haproxy/applet.h +++ b/include/haproxy/applet.h @@ -68,7 +68,7 @@ static inline struct appctx *appctx_new(struct applet *applet) appctx->obj_type = OBJ_TYPE_APPCTX; appctx->applet = applet; appctx_init(appctx); - appctx->t = task_new(tid_bit); + appctx->t = task_new_here(); if (unlikely(appctx->t == NULL)) { pool_free(pool_head_appctx, appctx); return NULL; diff --git a/include/haproxy/task.h b/include/haproxy/task.h index aa9e3b2e34..7b9b4e625b 100644 --- a/include/haproxy/task.h +++ b/include/haproxy/task.h @@ -462,8 +462,9 @@ static inline struct tasklet *tasklet_new(void) /* * Allocate and initialise a new task. The new task is returned, or NULL in - * case of lack of memory. The task count is incremented. Tasks should only - * be allocated this way, and must be freed using task_free(). + * case of lack of memory. The task count is incremented. This API might change + * in the near future, so prefer one of the task_new_*() wrappers below which + * are usually more suitable. Tasks must be freed using task_free(). */ static inline struct task *task_new(unsigned long thread_mask) { @@ -475,6 +476,33 @@ static inline struct task *task_new(unsigned long thread_mask) return t; } +/* Allocate and initialize a new task, to run on global thread . The new + * task is returned, or NULL in case of lack of memory. It's up to the caller + * to pass a valid thread number (in tid space, 0 to nbthread-1). The task + * count is incremented. + */ +static inline struct task *task_new_on(uint thr) +{ + return task_new(1UL << thr); +} + +/* Allocate and initialize a new task, to run on the calling thread. The new + * task is returned, or NULL in case of lack of memory. The task count is + * incremented. + */ +static inline struct task *task_new_here() +{ + return task_new(tid_bit); +} + +/* Allocate and initialize a new task, to run on any thread. The new task is + * returned, or NULL in case of lack of memory. The task count is incremented. + */ +static inline struct task *task_new_anywhere() +{ + return task_new(MAX_THREADS_MASK); +} + /* * Free a task. Its context must have been freed since it will be lost. The * task count is decremented. It it is the current task, this one is reset. diff --git a/src/cfgparse.c b/src/cfgparse.c index 250e4ed58f..2ef62afbe1 100644 --- a/src/cfgparse.c +++ b/src/cfgparse.c @@ -3680,7 +3680,7 @@ out_uri_auth_compat: } } - idle_conn_task = task_new(MAX_THREADS_MASK); + idle_conn_task = task_new_anywhere(); if (!idle_conn_task) { ha_alert("parsing : failed to allocate global idle connection task.\n"); cfgerr++; @@ -3690,7 +3690,7 @@ out_uri_auth_compat: idle_conn_task->context = NULL; for (i = 0; i < global.nbthread; i++) { - idle_conns[i].cleanup_task = task_new(1UL << i); + idle_conns[i].cleanup_task = task_new_on(i); if (!idle_conns[i].cleanup_task) { ha_alert("parsing : failed to allocate idle connection tasks for thread '%d'.\n", i); cfgerr++; @@ -3769,7 +3769,7 @@ out_uri_auth_compat: } /* create the task associated with the proxy */ - curproxy->task = task_new(MAX_THREADS_MASK); + curproxy->task = task_new_anywhere(); if (curproxy->task) { curproxy->task->context = curproxy; curproxy->task->process = manage_proxy; diff --git a/src/check.c b/src/check.c index aedbed1838..9ac66a5459 100644 --- a/src/check.c +++ b/src/check.c @@ -1388,13 +1388,14 @@ int start_check_task(struct check *check, int mininter, int nbcheck, int srvpos) { struct task *t; - unsigned long thread_mask = MAX_THREADS_MASK; + /* task for the check. Process-based checks exclusively run on thread 1. */ if (check->type == PR_O2_EXT_CHK) - thread_mask = 1; + t = task_new_on(1); + else + t = task_new_anywhere(); - /* task for the check */ - if ((t = task_new(thread_mask)) == NULL) { + if (!t) { ha_alert("Starting [%s:%s] check: out of memory.\n", check->server->proxy->id, check->server->id); return 0; diff --git a/src/connection.c b/src/connection.c index bf8c60e4ff..a4a8a8b137 100644 --- a/src/connection.c +++ b/src/connection.c @@ -1686,7 +1686,7 @@ static struct task *mux_stopping_process(struct task *t, void *ctx, unsigned int static int allocate_mux_cleanup(void) { /* allocates the thread bound mux_stopping_data task */ - mux_stopping_data[tid].task = task_new(tid_bit); + mux_stopping_data[tid].task = task_new_here(); if (!mux_stopping_data[tid].task) { ha_alert("Failed to allocate the task for connection cleanup on thread %d.\n", tid); return 0; diff --git a/src/dns.c b/src/dns.c index bc3310a9ee..fa6f2b9073 100644 --- a/src/dns.c +++ b/src/dns.c @@ -1027,7 +1027,7 @@ struct dns_session *dns_session_new(struct dns_stream_server *dss) /* never fail because it is the first watcher attached to the ring */ DISGUISE(ring_attach(&ds->ring)); - if ((ds->task_exp = task_new(tid_bit)) == NULL) + if ((ds->task_exp = task_new_here()) == NULL) goto error; ds->task_exp->process = dns_process_query_exp; @@ -1223,7 +1223,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv) goto out; } /* Create the task associated to the resolver target handling conns */ - if ((dss->task_req = task_new(MAX_THREADS_MASK)) == NULL) { + if ((dss->task_req = task_new_anywhere()) == NULL) { ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id); goto out; } @@ -1240,7 +1240,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv) } /* Create the task associated to the resolver target handling conns */ - if ((dss->task_rsp = task_new(MAX_THREADS_MASK)) == NULL) { + if ((dss->task_rsp = task_new_anywhere()) == NULL) { ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id); goto out; } @@ -1250,7 +1250,7 @@ int dns_stream_init(struct dns_nameserver *ns, struct server *srv) dss->task_rsp->context = ns; /* Create the task associated to the resolver target handling conns */ - if ((dss->task_idle = task_new(MAX_THREADS_MASK)) == NULL) { + if ((dss->task_idle = task_new_anywhere()) == NULL) { ha_alert("memory allocation error initializing the ring for dns tcp server '%s'.\n", srv->id); goto out; } diff --git a/src/flt_spoe.c b/src/flt_spoe.c index 70aa8697e7..3262fd0cf5 100644 --- a/src/flt_spoe.c +++ b/src/flt_spoe.c @@ -1998,7 +1998,7 @@ spoe_create_appctx(struct spoe_config *conf) goto out_free_appctx; appctx->st0 = SPOE_APPCTX_ST_CONNECT; - if ((SPOE_APPCTX(appctx)->task = task_new(tid_bit)) == NULL) + if ((SPOE_APPCTX(appctx)->task = task_new_here()) == NULL) goto out_free_spoe_appctx; SPOE_APPCTX(appctx)->owner = appctx; diff --git a/src/hlua.c b/src/hlua.c index df463492b4..baf503b970 100644 --- a/src/hlua.c +++ b/src/hlua.c @@ -8251,9 +8251,9 @@ static int hlua_register_task(lua_State *L) * otherwise, inherit the current thread identifier */ if (state_id == 0) - task = task_new(MAX_THREADS_MASK); + task = task_new_anywhere(); else - task = task_new(tid_bit); + task = task_new_here(); if (!task) goto alloc_error; @@ -8941,7 +8941,7 @@ static int hlua_applet_tcp_init(struct appctx *ctx, struct proxy *px, struct str ctx->ctx.hlua_apptcp.flags = 0; /* Create task used by signal to wakeup applets. */ - task = task_new(tid_bit); + task = task_new_here(); if (!task) { SEND_ERR(px, "Lua applet tcp '%s': out of memory.\n", ctx->rule->arg.hlua_rule->fcn->name); @@ -9134,7 +9134,7 @@ static int hlua_applet_http_init(struct appctx *ctx, struct proxy *px, struct st ctx->ctx.hlua_apphttp.flags |= APPLET_HTTP11; /* Create task used by signal to wakeup applets. */ - task = task_new(tid_bit); + task = task_new_here(); if (!task) { SEND_ERR(px, "Lua applet http '%s': out of memory.\n", ctx->rule->arg.hlua_rule->fcn->name); @@ -9753,7 +9753,7 @@ static int hlua_cli_parse_fct(char **args, char *payload, struct appctx *appctx, * We use the same wakeup function than the Lua applet_tcp and * applet_http. It is absolutely compatible. */ - appctx->ctx.hlua_cli.task = task_new(tid_bit); + appctx->ctx.hlua_cli.task = task_new_here(); if (!appctx->ctx.hlua_cli.task) { SEND_ERR(NULL, "Lua cli '%s': out of memory.\n", fcn->name); goto error; diff --git a/src/listener.c b/src/listener.c index 9f408a622e..bfe3216048 100644 --- a/src/listener.c +++ b/src/listener.c @@ -1134,7 +1134,7 @@ void listener_release(struct listener *l) /* Initializes the listener queues. Returns 0 on success, otherwise ERR_* flags */ static int listener_queue_init() { - global_listener_queue_task = task_new(MAX_THREADS_MASK); + global_listener_queue_task = task_new_anywhere(); if (!global_listener_queue_task) { ha_alert("Out of memory when initializing global listener queue\n"); return ERR_FATAL|ERR_ABORT; diff --git a/src/mailers.c b/src/mailers.c index 3df02f0f55..3d01d7532e 100644 --- a/src/mailers.c +++ b/src/mailers.c @@ -133,7 +133,7 @@ int init_email_alert(struct mailers *mls, struct proxy *p, char **err) check->addr = mailer->addr; check->port = get_host_port(&mailer->addr); - if ((t = task_new(MAX_THREADS_MASK)) == NULL) { + if ((t = task_new_anywhere()) == NULL) { memprintf(err, "out of memory while allocating mailer alerts task"); goto error; } diff --git a/src/mux_fcgi.c b/src/mux_fcgi.c index 3f127c78e5..5fb1c5e1b6 100644 --- a/src/mux_fcgi.c +++ b/src/mux_fcgi.c @@ -734,7 +734,7 @@ static int fcgi_init(struct connection *conn, struct proxy *px, struct session * fconn->app = app; fconn->task = NULL; if (tick_isset(fconn->timeout)) { - t = task_new(tid_bit); + t = task_new_here(); if (!t) { TRACE_ERROR("fconn task allocation failure", FCGI_EV_FCONN_NEW|FCGI_EV_FCONN_END|FCGI_EV_FCONN_ERR); goto fail; @@ -4247,7 +4247,7 @@ static int fcgi_takeover(struct connection *conn, int orig_tid) __ha_barrier_store(); task_kill(task); - fcgi->task = task_new(tid_bit); + fcgi->task = task_new_here(); if (!fcgi->task) { fcgi_release(fcgi); return -1; diff --git a/src/mux_h1.c b/src/mux_h1.c index dcfa3eef02..5dfd26cc88 100644 --- a/src/mux_h1.c +++ b/src/mux_h1.c @@ -808,7 +808,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session &h1c->conn->stopping_list); } if (tick_isset(h1c->timeout)) { - t = task_new(tid_bit); + t = task_new_here(); if (!t) { TRACE_ERROR("H1C task allocation failure", H1_EV_H1C_NEW|H1_EV_H1C_END|H1_EV_H1C_ERR); goto fail; @@ -3738,7 +3738,7 @@ static int h1_takeover(struct connection *conn, int orig_tid) __ha_barrier_store(); task_kill(task); - h1c->task = task_new(tid_bit); + h1c->task = task_new_here(); if (!h1c->task) { h1_release(h1c); return -1; diff --git a/src/mux_h2.c b/src/mux_h2.c index dfe0b37954..ffdafc8761 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -945,7 +945,7 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s h2c->proxy = prx; h2c->task = NULL; if (tick_isset(h2c->timeout)) { - t = task_new(tid_bit); + t = task_new_here(); if (!t) goto fail; @@ -6636,7 +6636,7 @@ static int h2_takeover(struct connection *conn, int orig_tid) __ha_barrier_store(); task_kill(task); - h2c->task = task_new(tid_bit); + h2c->task = task_new_here(); if (!h2c->task) { h2_release(h2c); return -1; diff --git a/src/mux_quic.c b/src/mux_quic.c index 320f612e8a..c4673ee86f 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -602,7 +602,7 @@ static int qc_init(struct connection *conn, struct proxy *prx, qcc->proxy = prx; qcc->task = NULL; if (tick_isset(qcc->timeout)) { - t = task_new(tid_bit); + t = task_new_here(); if (!t) goto fail; @@ -2107,7 +2107,7 @@ static int qc_takeover(struct connection *conn, int orig_tid) __ha_barrier_store(); task_kill(task); - qcc->task = task_new(tid_bit); + qcc->task = task_new_here(); if (!qcc->task) { qc_release(qcc); return -1; diff --git a/src/peers.c b/src/peers.c index 8af7475518..ab4d4122fc 100644 --- a/src/peers.c +++ b/src/peers.c @@ -3503,7 +3503,7 @@ int peers_init_sync(struct peers *peers) peers->peers_fe->maxconn += 3; } - peers->sync_task = task_new(MAX_THREADS_MASK); + peers->sync_task = task_new_anywhere(); if (!peers->sync_task) return 0; diff --git a/src/proxy.c b/src/proxy.c index ad5120acf1..db876e60d5 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -2039,7 +2039,7 @@ static void do_soft_stop_now() /* schedule a hard-stop after a delay if needed */ if (tick_isset(global.hard_stop_after)) { - task = task_new(MAX_THREADS_MASK); + task = task_new_anywhere(); if (task) { task->process = hard_stop; task_schedule(task, tick_add(now_ms, global.hard_stop_after)); @@ -2077,7 +2077,7 @@ void soft_stop(void) stopping = 1; if (tick_isset(global.grace_delay)) { - task = task_new(MAX_THREADS_MASK); + task = task_new_anywhere(); if (task) { ha_notice("Scheduling a soft-stop in %u ms.\n", global.grace_delay); send_log(NULL, LOG_WARNING, "Scheduling a soft-stop in %u ms.\n", global.grace_delay); diff --git a/src/resolvers.c b/src/resolvers.c index 3b9a246e6d..fe7b6a81ed 100644 --- a/src/resolvers.c +++ b/src/resolvers.c @@ -2412,7 +2412,7 @@ static int resolvers_finalize_config(void) } /* Create the task associated to the resolvers section */ - if ((t = task_new(MAX_THREADS_MASK)) == NULL) { + if ((t = task_new_anywhere()) == NULL) { ha_alert("resolvers '%s' : out of memory.\n", resolvers->id); err_code |= (ERR_ALERT|ERR_ABORT); goto err; @@ -2453,7 +2453,7 @@ static int resolvers_finalize_config(void) } } - srv->srvrq_check = task_new(MAX_THREADS_MASK); + srv->srvrq_check = task_new_anywhere(); if (!srv->srvrq_check) { ha_alert("%s '%s' : unable to create SRVRQ task for server '%s'.\n", proxy_type_str(px), px->id, srv->id); diff --git a/src/server.c b/src/server.c index 213d10632e..b44b164693 100644 --- a/src/server.c +++ b/src/server.c @@ -4480,7 +4480,7 @@ static int init_srv_slowstart(struct server *srv) struct task *t; if (srv->slowstart) { - if ((t = task_new(MAX_THREADS_MASK)) == NULL) { + if ((t = task_new_anywhere()) == NULL) { ha_alert("Cannot activate slowstart for server %s/%s: out of memory.\n", srv->proxy->id, srv->id); return ERR_ALERT | ERR_FATAL; } diff --git a/src/session.c b/src/session.c index e3601cb457..d913d567df 100644 --- a/src/session.c +++ b/src/session.c @@ -248,7 +248,7 @@ int session_accept_fd(struct connection *cli_conn) * conn -- owner ---> task <-----+ */ if (cli_conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) { - if (unlikely((sess->task = task_new(tid_bit)) == NULL)) + if (unlikely((sess->task = task_new_here()) == NULL)) goto out_free_sess; sess->task->context = sess; diff --git a/src/sink.c b/src/sink.c index b869d2eafb..d694e58182 100644 --- a/src/sink.c +++ b/src/sink.c @@ -731,7 +731,7 @@ static struct task *process_sink_forward(struct task * task, void *context, unsi */ int sink_init_forward(struct sink *sink) { - sink->forward_task = task_new(MAX_THREADS_MASK); + sink->forward_task = task_new_anywhere(); if (!sink->forward_task) return 0; diff --git a/src/stick_table.c b/src/stick_table.c index 6f07080f64..f5d76322c4 100644 --- a/src/stick_table.c +++ b/src/stick_table.c @@ -648,7 +648,7 @@ int stktable_init(struct stktable *t) t->exp_next = TICK_ETERNITY; if ( t->expire ) { - t->exp_task = task_new(MAX_THREADS_MASK); + t->exp_task = task_new_anywhere(); if (!t->exp_task) return 0; t->exp_task->process = process_table_expire; diff --git a/src/stream.c b/src/stream.c index 89e85d88de..e4d5ac9649 100644 --- a/src/stream.c +++ b/src/stream.c @@ -429,7 +429,7 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin, struct bu s->pcli_flags = 0; s->unique_id = IST_NULL; - if ((t = task_new(tid_bit)) == NULL) + if ((t = task_new_here()) == NULL) goto out_fail_alloc; s->task = t; diff --git a/src/xprt_quic.c b/src/xprt_quic.c index d1bfea8c1a..32b8fced2d 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -3046,7 +3046,7 @@ static struct quic_conn *qc_new_conn(unsigned int version, int ipv4, */ static int quic_conn_init_timer(struct quic_conn *qc) { - qc->timer_task = task_new(MAX_THREADS_MASK); + qc->timer_task = task_new_anywhere(); if (!qc->timer_task) return 0; -- 2.39.5