]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
worker: convert to a proper singleton
authorVladimír Čunát <vladimir.cunat@nic.cz>
Wed, 27 Feb 2019 16:13:39 +0000 (17:13 +0100)
committerVladimír Čunát <vladimir.cunat@nic.cz>
Thu, 13 Jun 2019 13:02:41 +0000 (15:02 +0200)
On many places we've been assuming that there's only a single worker,
but we still often didn't utilize the property well.  To get the pointer
we used various ways, all even untyped:
 - __worker global variable in lua
 - uv_default_loop()->data
 - kr_request::daemon_context

Now we instead simply define a global typed pointer the_worker.

Nitpick: also worker_{init,deinit}() are reordered to correspond
to the order of the fields, etc.

12 files changed:
daemon/bindings/cache.c
daemon/bindings/impl.h
daemon/bindings/net.c
daemon/bindings/worker.c
daemon/io.c
daemon/lua/kres-gen.lua
daemon/main.c
daemon/network.c
daemon/worker.c
daemon/worker.h
lib/resolve.h
modules/edns_keepalive/edns_keepalive.c

index 0e0c04dda46a1a42dd3d317df7bfdcd9fb2afbe3..f20d5d9b0c873bfbcfad2dcdb3edd4d5c704c40f 100644 (file)
@@ -390,7 +390,7 @@ static int cache_zone_import(lua_State *L)
        int ret = -1;
        char msg[128];
 
-       struct worker_ctx *worker = wrk_luaget(L);
+       struct worker_ctx *worker = the_worker;
        if (!worker) {
                strncpy(msg, "internal error, empty worker pointer", sizeof(msg));
                goto finish;
index c4d06c84b4a246fcf2dca72d8972ea25fa9ea375..1bd0d4126c8f0fadbd6458c9965cbb2b106e4240 100644 (file)
@@ -84,13 +84,6 @@ static inline void lua_error_maybe(lua_State *L, int err)
        if (err) lua_error_p(L, "%s", kr_strerror(err));
 }
 
-static inline struct worker_ctx *wrk_luaget(lua_State *L) {
-       lua_getglobal(L, "__worker");
-       struct worker_ctx *worker = lua_touserdata(L, -1);
-       lua_pop(L, 1);
-       return worker;
-}
-
 static inline int execute_callback(lua_State *L, int argc)
 {
        int ret = engine_pcall(L, argc);
index d4b2c2ea2ae531cd883a57e5639663a5ae2b56bf..0d7e7d3ab761c1234f5e39b731c7263b157856ec 100644 (file)
@@ -319,7 +319,7 @@ static int net_bufsize(lua_State *L)
 /** Set TCP pipelining size. */
 static int net_pipeline(lua_State *L)
 {
-       struct worker_ctx *worker = wrk_luaget(L);
+       struct worker_ctx *worker = the_worker;
        if (!worker) {
                return 0;
        }
@@ -824,12 +824,11 @@ static int net_tls_sticket_secret_file(lua_State *L)
 
 static int net_outgoing(lua_State *L, int family)
 {
-       struct worker_ctx *worker = wrk_luaget(L);
        union inaddr *addr;
        if (family == AF_INET)
-               addr = (union inaddr*)&worker->out_addr4;
+               addr = (union inaddr*)&the_worker->out_addr4;
        else
-               addr = (union inaddr*)&worker->out_addr6;
+               addr = (union inaddr*)&the_worker->out_addr6;
 
        if (lua_gettop(L) == 0) { /* Return the current value. */
                if (addr->ip.sa_family == AF_UNSPEC) {
index ed859c7de2a04f61ac715f6591d784bb6bc12e21..34035d2dcdb52787fe0124b66b7c1e88209e99d3 100644 (file)
@@ -21,8 +21,7 @@
 /** resolve_pkt(pkt, options, init_cb) */
 static int wrk_resolve_pkt(lua_State *L)
 {
-       struct worker_ctx *worker = wrk_luaget(L);
-       if (!worker) {
+       if (!the_worker) {
                return 0;
        }
 
@@ -36,7 +35,7 @@ static int wrk_resolve_pkt(lua_State *L)
                lua_error_p(L, "invalid options");
 
        /* Create task and start with a first question */
-       struct qr_task *task = worker_resolve_start(worker, pkt, *options);
+       struct qr_task *task = worker_resolve_start(the_worker, pkt, *options);
        if (!task) {
                lua_error_p(L, "couldn't create a resolution request");
        }
@@ -57,7 +56,7 @@ static int wrk_resolve_pkt(lua_State *L)
 /** resolve(qname, qtype, qclass, options, init_cb) */
 static int wrk_resolve(lua_State *L)
 {
-       struct worker_ctx *worker = wrk_luaget(L);
+       struct worker_ctx *worker = the_worker;
        if (!worker) {
                return 0;
        }
@@ -122,7 +121,7 @@ static inline double getseconds(uv_timeval_t *tv)
 /** Return worker statistics. */
 static int wrk_stats(lua_State *L)
 {
-       struct worker_ctx *worker = wrk_luaget(L);
+       struct worker_ctx *worker = the_worker;
        if (!worker) {
                return 0;
        }
index 6a7ae9589e13822ac3639dd8988bb1f915c3f079..7c406bea429ad4b48f07c367574b41c5044265c2 100644 (file)
@@ -281,7 +281,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
                return;
        }
 
-       struct worker_ctx *worker = (struct worker_ctx *)master->loop->data;
+       struct worker_ctx *worker = the_worker;
        uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
        if (!client) {
                return;
@@ -323,8 +323,7 @@ static void _tcp_accept(uv_stream_t *master, int status, bool tls)
                return;
        }
 
-       const struct engine *engine = worker->engine;
-       const struct network *net = &engine->net;
+       const struct network *net = &worker->engine->net;
        uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
 
        uint64_t timeout = KR_CONN_RTT_MAX / 2;
index bac7fbff559ae0db74dc6cf9ffe1bafec64c9892..c63c97163c52b1669139969841b9e498d68c464f 100644 (file)
@@ -198,7 +198,6 @@ struct kr_request {
        int vars_ref;
        knot_mm_t pool;
        unsigned int uid;
-       void *daemon_context;
 };
 enum kr_rank {KR_RANK_INITIAL, KR_RANK_OMIT, KR_RANK_TRY, KR_RANK_INDET = 4, KR_RANK_BOGUS, KR_RANK_MISMATCH, KR_RANK_MISSING, KR_RANK_INSECURE, KR_RANK_AUTH = 16, KR_RANK_SECURE = 32};
 struct kr_cdb_stats {
index 54b8d83c1b99bcd25270e98be2a13e9f75782f63..1a93c45d7380692c72c06be6b1254fa35b088ff8 100644 (file)
@@ -122,8 +122,7 @@ static void tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t
                        goto finish;
                }
 
-               struct engine *engine = ((struct worker_ctx *)stream->loop->data)->engine;
-               lua_State *L = engine->L;
+               lua_State *L = the_worker->engine->L;
                int ret = engine_cmd(L, cmd, false);
                const char *message = "";
                if (lua_gettop(L) > 0) {
@@ -764,17 +763,14 @@ int main(int argc, char **argv)
                kr_log_error("[system] failed to initialize engine: %s\n", kr_strerror(ret));
                return EXIT_FAILURE;
        }
-       /* Create worker */
-       struct worker_ctx *worker = worker_create(&engine, &pool, fork_id, args.forks);
-       if (!worker) {
-               kr_log_error("[system] not enough memory\n");
+       /* Initialize the worker */
+       ret = worker_init(&engine, fork_id, args.forks);
+       if (ret != 0) {
+               kr_log_error("[system] failed to initialize worker: %s\n", kr_strerror(ret));
                return EXIT_FAILURE;
        }
 
        uv_loop_t *loop = uv_default_loop();
-       worker->loop = loop;
-       loop->data = worker;
-
        /* Catch some signals. */
        uv_signal_t sigint, sigterm;
        if (true) ret = uv_signal_init(loop, &sigint);
@@ -840,7 +836,7 @@ int main(int argc, char **argv)
 
 cleanup:/* Cleanup. */
        engine_deinit(&engine);
-       worker_reclaim(worker);
+       worker_deinit();
        if (loop != NULL) {
                uv_loop_close(loop);
        }
index 509fc40acab35a7f448f16e5a125af8e74ad119d..62e1c4deeebd71ad7c1b052ceb10ec4618f954d8 100644 (file)
@@ -51,8 +51,7 @@ static int endpoint_open_lua_cb(struct network *net, struct endpoint *ep,
                return kr_error(EINVAL);
        }
        /* First find callback in the endpoint registry. */
-       struct worker_ctx *worker = net->loop->data; // LATER: the_worker
-       lua_State *L = worker->engine->L;
+       lua_State *L = the_worker->engine->L;
        void **pp = trie_get_try(net->endpoint_kinds, ep->flags.kind,
                                strlen(ep->flags.kind));
        if (!pp && net->missing_kind_is_error) {
@@ -115,8 +114,7 @@ int network_engage_endpoints(struct network *net)
 /** Notify the registered function about endpoint about to be closed. */
 static void endpoint_close_lua_cb(struct network *net, struct endpoint *ep)
 {
-       struct worker_ctx *worker = net->loop->data; // LATER: the_worker
-       lua_State *L = worker->engine->L;
+       lua_State *L = the_worker->engine->L;
        void **pp = trie_get_try(net->endpoint_kinds, ep->flags.kind,
                                strlen(ep->flags.kind));
        if (!pp && net->missing_kind_is_error) {
@@ -215,8 +213,7 @@ void network_deinit(struct network *net)
 {
        if (net != NULL) {
                network_close_force(net);
-               struct worker_ctx *worker = net->loop->data; // LATER: the_worker
-               trie_apply(net->endpoint_kinds, kind_unregister, worker->engine->L);
+               trie_apply(net->endpoint_kinds, kind_unregister, the_worker->engine->L);
                trie_free(net->endpoint_kinds);
 
                tls_credentials_free(net->tls_credentials);
index 5858912c8db8986f35511d6fd2a7b691860b5613..fac20fcad07be815af8a00acae49aa165f549ff7 100644 (file)
@@ -122,11 +122,8 @@ static struct session* worker_find_tcp_waiting(struct worker_ctx *worker,
                                               const struct sockaddr *addr);
 static void on_tcp_connect_timeout(uv_timer_t *timer);
 
-/** @internal Get singleton worker. */
-static inline struct worker_ctx *get_worker(void)
-{
-       return uv_default_loop()->data;
-}
+struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */
+struct worker_ctx *the_worker = NULL;
 
 /*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
  *  socktype is SOCK_* */
@@ -294,7 +291,6 @@ static struct request_ctx *request_create(struct worker_ctx *worker,
        req->pool = pool;
        req->vars_ref = LUA_NOREF;
        req->uid = uid;
-       req->daemon_context = worker;
 
        /* Remember query source addr */
        if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) {
@@ -829,7 +825,8 @@ static int send_waiting(struct session *session)
 
 static void on_connect(uv_connect_t *req, int status)
 {
-       struct worker_ctx *worker = get_worker();
+       struct worker_ctx *worker = the_worker;
+       assert(worker);
        uv_stream_t *handle = req->handle;
        struct session *session = handle->data;
        struct sockaddr *peer = session_get_peer(session);
@@ -954,7 +951,8 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
        struct session *session = timer->data;
 
        uv_timer_stop(timer);
-       struct worker_ctx *worker = get_worker();
+       struct worker_ctx *worker = the_worker;
+       assert(worker);
 
        assert (session_tasklist_is_empty(session));
 
@@ -1939,21 +1937,22 @@ bool worker_task_finished(struct qr_task *task)
 {
        return task->finished;
 }
-/** Reserve worker buffers */
+
+/** Reserve worker buffers.  We assume worker's been zeroed. */
 static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
 {
+       worker->tcp_connected = map_make(NULL);
+       worker->tcp_waiting = map_make(NULL);
+       worker->subreq_out = trie_create(NULL);
+
        array_init(worker->pool_mp);
        if (array_reserve(worker->pool_mp, ring_maxlen)) {
                return kr_error(ENOMEM);
        }
-       memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
+
        worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
        worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
-       worker->subreq_out = trie_create(NULL);
-       worker->tcp_connected = map_make(NULL);
-       worker->tcp_waiting = map_make(NULL);
-       worker->tcp_pipeline_max = MAX_PIPELINED;
-       memset(&worker->stats, 0, sizeof(worker->stats));
+
        return kr_ok();
 }
 
@@ -1967,43 +1966,59 @@ static inline void reclaim_mp_freelist(mp_freelist_t *list)
        array_clear(*list);
 }
 
-void worker_reclaim(struct worker_ctx *worker)
+void worker_deinit(void)
 {
-       reclaim_mp_freelist(&worker->pool_mp);
-       mp_delete(worker->pkt_pool.ctx);
-       worker->pkt_pool.ctx = NULL;
-       trie_free(worker->subreq_out);
-       worker->subreq_out = NULL;
-       map_clear(&worker->tcp_connected);
-       map_clear(&worker->tcp_waiting);
+       struct worker_ctx *worker = the_worker;
+       assert(worker);
        if (worker->z_import != NULL) {
                zi_free(worker->z_import);
                worker->z_import = NULL;
        }
+       map_clear(&worker->tcp_connected);
+       map_clear(&worker->tcp_waiting);
+       trie_free(worker->subreq_out);
+       worker->subreq_out = NULL;
+
+       reclaim_mp_freelist(&worker->pool_mp);
+       mp_delete(worker->pkt_pool.ctx);
+       worker->pkt_pool.ctx = NULL;
+
+       the_worker = NULL;
 }
 
-struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
-               int worker_id, int worker_count)
+int worker_init(struct engine *engine, int worker_id, int worker_count)
 {
        assert(engine && engine->L);
+       assert(the_worker == NULL);
        kr_bindings_register(engine->L);
 
        /* Create main worker. */
-       struct worker_ctx *worker = mm_alloc(pool, sizeof(*worker));
-       if (!worker) {
-               return NULL;
-       }
+       struct worker_ctx *worker = &the_worker_value;
        memset(worker, 0, sizeof(*worker));
+       worker->engine = engine;
+
+       uv_loop_t *loop = uv_default_loop();
+       worker->loop = loop;
+
        worker->id = worker_id;
        worker->count = worker_count;
-       worker->engine = engine;
-       worker->next_request_uid = UINT16_MAX + 1;
-       worker_reserve(worker, MP_FREELIST_SIZE);
+
+       /* Register table for worker per-request variables */
+       lua_newtable(engine->L);
+       lua_setfield(engine->L, -2, "vars");
+       lua_getfield(engine->L, -1, "vars");
+       worker->vars_table_ref = luaL_ref(engine->L, LUA_REGISTRYINDEX);
+       lua_pop(engine->L, 1);
+
+       worker->tcp_pipeline_max = MAX_PIPELINED;
        worker->out_addr4.sin_family = AF_UNSPEC;
        worker->out_addr6.sin6_family = AF_UNSPEC;
-       /* Register worker in Lua thread */
-       lua_pushlightuserdata(engine->L, worker);
-       lua_setglobal(engine->L, "__worker");
+
+       int ret = worker_reserve(worker, MP_FREELIST_SIZE);
+       if (ret) return ret;
+       worker->next_request_uid = UINT16_MAX + 1;
+
+       /* Set some worker.* fields in Lua */
        lua_getglobal(engine->L, "worker");
        lua_pushnumber(engine->L, worker_id);
        lua_setfield(engine->L, -2, "id");
@@ -2011,13 +2026,11 @@ struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
        lua_setfield(engine->L, -2, "pid");
        lua_pushnumber(engine->L, worker_count);
        lua_setfield(engine->L, -2, "count");
-       /* Register table for worker per-request variables */
-       lua_newtable(engine->L);
-       lua_setfield(engine->L, -2, "vars");
-       lua_getfield(engine->L, -1, "vars");
-       worker->vars_table_ref = luaL_ref(engine->L, LUA_REGISTRYINDEX);
-       lua_pop(engine->L, 1);
-       return worker;
+
+       the_worker = worker;
+       loop->data = the_worker;
+       /* ^^^^ This shouldn't be used anymore, but it's hard to be 100% sure. */
+       return kr_ok();
 }
 
 #undef VERBOSE_MSG
index 2882d4904d355babd5825d4db2fc2a2de4748845..c1336efebe309ff681601aa1e4ab49ef89cb09f6 100644 (file)
@@ -30,9 +30,15 @@ struct session;
 /** Zone import context (opaque). */
 struct zone_import_ctx;
 
-/** Create and initialize the worker. */
-struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
-               int worker_id, int worker_count);
+/** Pointer to the singleton worker.  NULL if not initialized. */
+KR_EXPORT extern struct worker_ctx *the_worker;
+
+/** Create and initialize the worker.
+ * \return error code (ENOMEM) */
+int worker_init(struct engine *engine, int worker_id, int worker_count);
+
+/** Destroy the worker (free memory). */
+void worker_deinit(void);
 
 /**
  * Process an incoming packet (query from a client or answer from upstream).
@@ -67,9 +73,6 @@ int worker_resolve_exec(struct qr_task *task, knot_pkt_t *query);
 /** @return struct kr_request associated with opaque task */
 struct kr_request *worker_task_request(struct qr_task *task);
 
-/** Collect worker mempools */
-void worker_reclaim(struct worker_ctx *worker);
-
 int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
                     knot_pkt_t *packet);
 
index ae695b9136dc6207159cded2b8ceed6c66159b25..657aef6fb6df4c5e382e6744739340c56bbe3f15 100644 (file)
@@ -233,7 +233,6 @@ struct kr_request {
        int vars_ref; /**< Reference to per-request variable table. LUA_NOREF if not set. */
        knot_mm_t pool;
        unsigned int uid; /** for logging purposes only */
-       void *daemon_context; /** pointer to worker from daemon. Can be used in modules. */
 };
 
 /** Initializer for an array of *_selected. */
index 1a598638aa7e04712b8b856b2018b4510b22c725..12370b824b65c0f9932c33175576ea5d6231d5a8 100644 (file)
@@ -41,9 +41,7 @@ static int edns_keepalive_finalize(kr_layer_t *ctx)
        if (!ka_want) {
                return ctx->state;
        }
-       const struct worker_ctx *worker = (const struct worker_ctx *)req->daemon_context;
-       assert(worker);
-       const struct network *net = &worker->engine->net;
+       const struct network *net = &the_worker->engine->net;
        uint64_t timeout = net->tcp.in_idle_timeout / 100;
        if (timeout > UINT16_MAX) {
                timeout = UINT16_MAX;