From: Vladimír Čunát Date: Wed, 27 Feb 2019 16:13:39 +0000 (+0100) Subject: worker: convert to a proper singleton X-Git-Tag: v4.1.0~21^2~17 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a550a1506509d6228d21726cd8be6840f816f4f7;p=thirdparty%2Fknot-resolver.git worker: convert to a proper singleton On many places we've been assuming that there's only a single worker, but we still often didn't utilize the property well. To get the pointer we used various ways, all even untyped: - __worker global variable in lua - uv_default_loop()->data - kr_request::daemon_context Now we instead simply define a global typed pointer the_worker. Nitpick: also worker_{init,deinit}() are reordered to correspond to the order of the fields, etc. --- diff --git a/daemon/bindings/cache.c b/daemon/bindings/cache.c index 0e0c04dda..f20d5d9b0 100644 --- a/daemon/bindings/cache.c +++ b/daemon/bindings/cache.c @@ -390,7 +390,7 @@ static int cache_zone_import(lua_State *L) int ret = -1; char msg[128]; - struct worker_ctx *worker = wrk_luaget(L); + struct worker_ctx *worker = the_worker; if (!worker) { strncpy(msg, "internal error, empty worker pointer", sizeof(msg)); goto finish; diff --git a/daemon/bindings/impl.h b/daemon/bindings/impl.h index c4d06c84b..1bd0d4126 100644 --- a/daemon/bindings/impl.h +++ b/daemon/bindings/impl.h @@ -84,13 +84,6 @@ static inline void lua_error_maybe(lua_State *L, int err) if (err) lua_error_p(L, "%s", kr_strerror(err)); } -static inline struct worker_ctx *wrk_luaget(lua_State *L) { - lua_getglobal(L, "__worker"); - struct worker_ctx *worker = lua_touserdata(L, -1); - lua_pop(L, 1); - return worker; -} - static inline int execute_callback(lua_State *L, int argc) { int ret = engine_pcall(L, argc); diff --git a/daemon/bindings/net.c b/daemon/bindings/net.c index d4b2c2ea2..0d7e7d3ab 100644 --- a/daemon/bindings/net.c +++ b/daemon/bindings/net.c @@ -319,7 +319,7 @@ static int net_bufsize(lua_State *L) /** Set TCP pipelining size. */ static int net_pipeline(lua_State *L) { - struct worker_ctx *worker = wrk_luaget(L); + struct worker_ctx *worker = the_worker; if (!worker) { return 0; } @@ -824,12 +824,11 @@ static int net_tls_sticket_secret_file(lua_State *L) static int net_outgoing(lua_State *L, int family) { - struct worker_ctx *worker = wrk_luaget(L); union inaddr *addr; if (family == AF_INET) - addr = (union inaddr*)&worker->out_addr4; + addr = (union inaddr*)&the_worker->out_addr4; else - addr = (union inaddr*)&worker->out_addr6; + addr = (union inaddr*)&the_worker->out_addr6; if (lua_gettop(L) == 0) { /* Return the current value. */ if (addr->ip.sa_family == AF_UNSPEC) { diff --git a/daemon/bindings/worker.c b/daemon/bindings/worker.c index ed859c7de..34035d2dc 100644 --- a/daemon/bindings/worker.c +++ b/daemon/bindings/worker.c @@ -21,8 +21,7 @@ /** resolve_pkt(pkt, options, init_cb) */ static int wrk_resolve_pkt(lua_State *L) { - struct worker_ctx *worker = wrk_luaget(L); - if (!worker) { + if (!the_worker) { return 0; } @@ -36,7 +35,7 @@ static int wrk_resolve_pkt(lua_State *L) lua_error_p(L, "invalid options"); /* Create task and start with a first question */ - struct qr_task *task = worker_resolve_start(worker, pkt, *options); + struct qr_task *task = worker_resolve_start(the_worker, pkt, *options); if (!task) { lua_error_p(L, "couldn't create a resolution request"); } @@ -57,7 +56,7 @@ static int wrk_resolve_pkt(lua_State *L) /** resolve(qname, qtype, qclass, options, init_cb) */ static int wrk_resolve(lua_State *L) { - struct worker_ctx *worker = wrk_luaget(L); + struct worker_ctx *worker = the_worker; if (!worker) { return 0; } @@ -122,7 +121,7 @@ static inline double getseconds(uv_timeval_t *tv) /** Return worker statistics. */ static int wrk_stats(lua_State *L) { - struct worker_ctx *worker = wrk_luaget(L); + struct worker_ctx *worker = the_worker; if (!worker) { return 0; } diff --git a/daemon/io.c b/daemon/io.c index 6a7ae9589..7c406bea4 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -281,7 +281,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls) return; } - struct worker_ctx *worker = (struct worker_ctx *)master->loop->data; + struct worker_ctx *worker = the_worker; uv_tcp_t *client = malloc(sizeof(uv_tcp_t)); if (!client) { return; @@ -323,8 +323,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls) return; } - const struct engine *engine = worker->engine; - const struct network *net = &engine->net; + const struct network *net = &worker->engine->net; uint64_t idle_in_timeout = net->tcp.in_idle_timeout; uint64_t timeout = KR_CONN_RTT_MAX / 2; diff --git a/daemon/lua/kres-gen.lua b/daemon/lua/kres-gen.lua index bac7fbff5..c63c97163 100644 --- a/daemon/lua/kres-gen.lua +++ b/daemon/lua/kres-gen.lua @@ -198,7 +198,6 @@ struct kr_request { int vars_ref; knot_mm_t pool; unsigned int uid; - void *daemon_context; }; enum kr_rank {KR_RANK_INITIAL, KR_RANK_OMIT, KR_RANK_TRY, KR_RANK_INDET = 4, KR_RANK_BOGUS, KR_RANK_MISMATCH, KR_RANK_MISSING, KR_RANK_INSECURE, KR_RANK_AUTH = 16, KR_RANK_SECURE = 32}; struct kr_cdb_stats { diff --git a/daemon/main.c b/daemon/main.c index 54b8d83c1..1a93c45d7 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -122,8 +122,7 @@ static void tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t goto finish; } - struct engine *engine = ((struct worker_ctx *)stream->loop->data)->engine; - lua_State *L = engine->L; + lua_State *L = the_worker->engine->L; int ret = engine_cmd(L, cmd, false); const char *message = ""; if (lua_gettop(L) > 0) { @@ -764,17 +763,14 @@ int main(int argc, char **argv) kr_log_error("[system] failed to initialize engine: %s\n", kr_strerror(ret)); return EXIT_FAILURE; } - /* Create worker */ - struct worker_ctx *worker = worker_create(&engine, &pool, fork_id, args.forks); - if (!worker) { - kr_log_error("[system] not enough memory\n"); + /* Initialize the worker */ + ret = worker_init(&engine, fork_id, args.forks); + if (ret != 0) { + kr_log_error("[system] failed to initialize worker: %s\n", kr_strerror(ret)); return EXIT_FAILURE; } uv_loop_t *loop = uv_default_loop(); - worker->loop = loop; - loop->data = worker; - /* Catch some signals. */ uv_signal_t sigint, sigterm; if (true) ret = uv_signal_init(loop, &sigint); @@ -840,7 +836,7 @@ int main(int argc, char **argv) cleanup:/* Cleanup. */ engine_deinit(&engine); - worker_reclaim(worker); + worker_deinit(); if (loop != NULL) { uv_loop_close(loop); } diff --git a/daemon/network.c b/daemon/network.c index 509fc40ac..62e1c4dee 100644 --- a/daemon/network.c +++ b/daemon/network.c @@ -51,8 +51,7 @@ static int endpoint_open_lua_cb(struct network *net, struct endpoint *ep, return kr_error(EINVAL); } /* First find callback in the endpoint registry. */ - struct worker_ctx *worker = net->loop->data; // LATER: the_worker - lua_State *L = worker->engine->L; + lua_State *L = the_worker->engine->L; void **pp = trie_get_try(net->endpoint_kinds, ep->flags.kind, strlen(ep->flags.kind)); if (!pp && net->missing_kind_is_error) { @@ -115,8 +114,7 @@ int network_engage_endpoints(struct network *net) /** Notify the registered function about endpoint about to be closed. */ static void endpoint_close_lua_cb(struct network *net, struct endpoint *ep) { - struct worker_ctx *worker = net->loop->data; // LATER: the_worker - lua_State *L = worker->engine->L; + lua_State *L = the_worker->engine->L; void **pp = trie_get_try(net->endpoint_kinds, ep->flags.kind, strlen(ep->flags.kind)); if (!pp && net->missing_kind_is_error) { @@ -215,8 +213,7 @@ void network_deinit(struct network *net) { if (net != NULL) { network_close_force(net); - struct worker_ctx *worker = net->loop->data; // LATER: the_worker - trie_apply(net->endpoint_kinds, kind_unregister, worker->engine->L); + trie_apply(net->endpoint_kinds, kind_unregister, the_worker->engine->L); trie_free(net->endpoint_kinds); tls_credentials_free(net->tls_credentials); diff --git a/daemon/worker.c b/daemon/worker.c index 5858912c8..fac20fcad 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -122,11 +122,8 @@ static struct session* worker_find_tcp_waiting(struct worker_ctx *worker, const struct sockaddr *addr); static void on_tcp_connect_timeout(uv_timer_t *timer); -/** @internal Get singleton worker. */ -static inline struct worker_ctx *get_worker(void) -{ - return uv_default_loop()->data; -} +struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */ +struct worker_ctx *the_worker = NULL; /*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection. * socktype is SOCK_* */ @@ -294,7 +291,6 @@ static struct request_ctx *request_create(struct worker_ctx *worker, req->pool = pool; req->vars_ref = LUA_NOREF; req->uid = uid; - req->daemon_context = worker; /* Remember query source addr */ if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) { @@ -829,7 +825,8 @@ static int send_waiting(struct session *session) static void on_connect(uv_connect_t *req, int status) { - struct worker_ctx *worker = get_worker(); + struct worker_ctx *worker = the_worker; + assert(worker); uv_stream_t *handle = req->handle; struct session *session = handle->data; struct sockaddr *peer = session_get_peer(session); @@ -954,7 +951,8 @@ static void on_tcp_connect_timeout(uv_timer_t *timer) struct session *session = timer->data; uv_timer_stop(timer); - struct worker_ctx *worker = get_worker(); + struct worker_ctx *worker = the_worker; + assert(worker); assert (session_tasklist_is_empty(session)); @@ -1939,21 +1937,22 @@ bool worker_task_finished(struct qr_task *task) { return task->finished; } -/** Reserve worker buffers */ + +/** Reserve worker buffers. We assume worker's been zeroed. */ static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) { + worker->tcp_connected = map_make(NULL); + worker->tcp_waiting = map_make(NULL); + worker->subreq_out = trie_create(NULL); + array_init(worker->pool_mp); if (array_reserve(worker->pool_mp, ring_maxlen)) { return kr_error(ENOMEM); } - memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool)); + worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t)); worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc; - worker->subreq_out = trie_create(NULL); - worker->tcp_connected = map_make(NULL); - worker->tcp_waiting = map_make(NULL); - worker->tcp_pipeline_max = MAX_PIPELINED; - memset(&worker->stats, 0, sizeof(worker->stats)); + return kr_ok(); } @@ -1967,43 +1966,59 @@ static inline void reclaim_mp_freelist(mp_freelist_t *list) array_clear(*list); } -void worker_reclaim(struct worker_ctx *worker) +void worker_deinit(void) { - reclaim_mp_freelist(&worker->pool_mp); - mp_delete(worker->pkt_pool.ctx); - worker->pkt_pool.ctx = NULL; - trie_free(worker->subreq_out); - worker->subreq_out = NULL; - map_clear(&worker->tcp_connected); - map_clear(&worker->tcp_waiting); + struct worker_ctx *worker = the_worker; + assert(worker); if (worker->z_import != NULL) { zi_free(worker->z_import); worker->z_import = NULL; } + map_clear(&worker->tcp_connected); + map_clear(&worker->tcp_waiting); + trie_free(worker->subreq_out); + worker->subreq_out = NULL; + + reclaim_mp_freelist(&worker->pool_mp); + mp_delete(worker->pkt_pool.ctx); + worker->pkt_pool.ctx = NULL; + + the_worker = NULL; } -struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool, - int worker_id, int worker_count) +int worker_init(struct engine *engine, int worker_id, int worker_count) { assert(engine && engine->L); + assert(the_worker == NULL); kr_bindings_register(engine->L); /* Create main worker. */ - struct worker_ctx *worker = mm_alloc(pool, sizeof(*worker)); - if (!worker) { - return NULL; - } + struct worker_ctx *worker = &the_worker_value; memset(worker, 0, sizeof(*worker)); + worker->engine = engine; + + uv_loop_t *loop = uv_default_loop(); + worker->loop = loop; + worker->id = worker_id; worker->count = worker_count; - worker->engine = engine; - worker->next_request_uid = UINT16_MAX + 1; - worker_reserve(worker, MP_FREELIST_SIZE); + + /* Register table for worker per-request variables */ + lua_newtable(engine->L); + lua_setfield(engine->L, -2, "vars"); + lua_getfield(engine->L, -1, "vars"); + worker->vars_table_ref = luaL_ref(engine->L, LUA_REGISTRYINDEX); + lua_pop(engine->L, 1); + + worker->tcp_pipeline_max = MAX_PIPELINED; worker->out_addr4.sin_family = AF_UNSPEC; worker->out_addr6.sin6_family = AF_UNSPEC; - /* Register worker in Lua thread */ - lua_pushlightuserdata(engine->L, worker); - lua_setglobal(engine->L, "__worker"); + + int ret = worker_reserve(worker, MP_FREELIST_SIZE); + if (ret) return ret; + worker->next_request_uid = UINT16_MAX + 1; + + /* Set some worker.* fields in Lua */ lua_getglobal(engine->L, "worker"); lua_pushnumber(engine->L, worker_id); lua_setfield(engine->L, -2, "id"); @@ -2011,13 +2026,11 @@ struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool, lua_setfield(engine->L, -2, "pid"); lua_pushnumber(engine->L, worker_count); lua_setfield(engine->L, -2, "count"); - /* Register table for worker per-request variables */ - lua_newtable(engine->L); - lua_setfield(engine->L, -2, "vars"); - lua_getfield(engine->L, -1, "vars"); - worker->vars_table_ref = luaL_ref(engine->L, LUA_REGISTRYINDEX); - lua_pop(engine->L, 1); - return worker; + + the_worker = worker; + loop->data = the_worker; + /* ^^^^ This shouldn't be used anymore, but it's hard to be 100% sure. */ + return kr_ok(); } #undef VERBOSE_MSG diff --git a/daemon/worker.h b/daemon/worker.h index 2882d4904..c1336efeb 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -30,9 +30,15 @@ struct session; /** Zone import context (opaque). */ struct zone_import_ctx; -/** Create and initialize the worker. */ -struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool, - int worker_id, int worker_count); +/** Pointer to the singleton worker. NULL if not initialized. */ +KR_EXPORT extern struct worker_ctx *the_worker; + +/** Create and initialize the worker. + * \return error code (ENOMEM) */ +int worker_init(struct engine *engine, int worker_id, int worker_count); + +/** Destroy the worker (free memory). */ +void worker_deinit(void); /** * Process an incoming packet (query from a client or answer from upstream). @@ -67,9 +73,6 @@ int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query); /** @return struct kr_request associated with opaque task */ struct kr_request *worker_task_request(struct qr_task *task); -/** Collect worker mempools */ -void worker_reclaim(struct worker_ctx *worker); - int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet); diff --git a/lib/resolve.h b/lib/resolve.h index ae695b913..657aef6fb 100644 --- a/lib/resolve.h +++ b/lib/resolve.h @@ -233,7 +233,6 @@ struct kr_request { int vars_ref; /**< Reference to per-request variable table. LUA_NOREF if not set. */ knot_mm_t pool; unsigned int uid; /** for logging purposes only */ - void *daemon_context; /** pointer to worker from daemon. Can be used in modules. */ }; /** Initializer for an array of *_selected. */ diff --git a/modules/edns_keepalive/edns_keepalive.c b/modules/edns_keepalive/edns_keepalive.c index 1a598638a..12370b824 100644 --- a/modules/edns_keepalive/edns_keepalive.c +++ b/modules/edns_keepalive/edns_keepalive.c @@ -41,9 +41,7 @@ static int edns_keepalive_finalize(kr_layer_t *ctx) if (!ka_want) { return ctx->state; } - const struct worker_ctx *worker = (const struct worker_ctx *)req->daemon_context; - assert(worker); - const struct network *net = &worker->engine->net; + const struct network *net = &the_worker->engine->net; uint64_t timeout = net->tcp.in_idle_timeout / 100; if (timeout > UINT16_MAX) { timeout = UINT16_MAX;