int ret = -1;
char msg[128];
- struct worker_ctx *worker = wrk_luaget(L);
+ struct worker_ctx *worker = the_worker;
if (!worker) {
strncpy(msg, "internal error, empty worker pointer", sizeof(msg));
goto finish;
if (err) lua_error_p(L, "%s", kr_strerror(err));
}
-static inline struct worker_ctx *wrk_luaget(lua_State *L) {
- lua_getglobal(L, "__worker");
- struct worker_ctx *worker = lua_touserdata(L, -1);
- lua_pop(L, 1);
- return worker;
-}
-
static inline int execute_callback(lua_State *L, int argc)
{
int ret = engine_pcall(L, argc);
/** Set TCP pipelining size. */
static int net_pipeline(lua_State *L)
{
- struct worker_ctx *worker = wrk_luaget(L);
+ struct worker_ctx *worker = the_worker;
if (!worker) {
return 0;
}
static int net_outgoing(lua_State *L, int family)
{
- struct worker_ctx *worker = wrk_luaget(L);
union inaddr *addr;
if (family == AF_INET)
- addr = (union inaddr*)&worker->out_addr4;
+ addr = (union inaddr*)&the_worker->out_addr4;
else
- addr = (union inaddr*)&worker->out_addr6;
+ addr = (union inaddr*)&the_worker->out_addr6;
if (lua_gettop(L) == 0) { /* Return the current value. */
if (addr->ip.sa_family == AF_UNSPEC) {
/** resolve_pkt(pkt, options, init_cb) */
static int wrk_resolve_pkt(lua_State *L)
{
- struct worker_ctx *worker = wrk_luaget(L);
- if (!worker) {
+ if (!the_worker) {
return 0;
}
lua_error_p(L, "invalid options");
/* Create task and start with a first question */
- struct qr_task *task = worker_resolve_start(worker, pkt, *options);
+ struct qr_task *task = worker_resolve_start(the_worker, pkt, *options);
if (!task) {
lua_error_p(L, "couldn't create a resolution request");
}
/** resolve(qname, qtype, qclass, options, init_cb) */
static int wrk_resolve(lua_State *L)
{
- struct worker_ctx *worker = wrk_luaget(L);
+ struct worker_ctx *worker = the_worker;
if (!worker) {
return 0;
}
/** Return worker statistics. */
static int wrk_stats(lua_State *L)
{
- struct worker_ctx *worker = wrk_luaget(L);
+ struct worker_ctx *worker = the_worker;
if (!worker) {
return 0;
}
return;
}
- struct worker_ctx *worker = (struct worker_ctx *)master->loop->data;
+ struct worker_ctx *worker = the_worker;
uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
if (!client) {
return;
return;
}
- const struct engine *engine = worker->engine;
- const struct network *net = &engine->net;
+ const struct network *net = &worker->engine->net;
uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
uint64_t timeout = KR_CONN_RTT_MAX / 2;
int vars_ref;
knot_mm_t pool;
unsigned int uid;
- void *daemon_context;
};
enum kr_rank {KR_RANK_INITIAL, KR_RANK_OMIT, KR_RANK_TRY, KR_RANK_INDET = 4, KR_RANK_BOGUS, KR_RANK_MISMATCH, KR_RANK_MISSING, KR_RANK_INSECURE, KR_RANK_AUTH = 16, KR_RANK_SECURE = 32};
struct kr_cdb_stats {
goto finish;
}
- struct engine *engine = ((struct worker_ctx *)stream->loop->data)->engine;
- lua_State *L = engine->L;
+ lua_State *L = the_worker->engine->L;
int ret = engine_cmd(L, cmd, false);
const char *message = "";
if (lua_gettop(L) > 0) {
kr_log_error("[system] failed to initialize engine: %s\n", kr_strerror(ret));
return EXIT_FAILURE;
}
- /* Create worker */
- struct worker_ctx *worker = worker_create(&engine, &pool, fork_id, args.forks);
- if (!worker) {
- kr_log_error("[system] not enough memory\n");
+ /* Initialize the worker */
+ ret = worker_init(&engine, fork_id, args.forks);
+ if (ret != 0) {
+ kr_log_error("[system] failed to initialize worker: %s\n", kr_strerror(ret));
return EXIT_FAILURE;
}
uv_loop_t *loop = uv_default_loop();
- worker->loop = loop;
- loop->data = worker;
-
/* Catch some signals. */
uv_signal_t sigint, sigterm;
if (true) ret = uv_signal_init(loop, &sigint);
cleanup:/* Cleanup. */
engine_deinit(&engine);
- worker_reclaim(worker);
+ worker_deinit();
if (loop != NULL) {
uv_loop_close(loop);
}
return kr_error(EINVAL);
}
/* First find callback in the endpoint registry. */
- struct worker_ctx *worker = net->loop->data; // LATER: the_worker
- lua_State *L = worker->engine->L;
+ lua_State *L = the_worker->engine->L;
void **pp = trie_get_try(net->endpoint_kinds, ep->flags.kind,
strlen(ep->flags.kind));
if (!pp && net->missing_kind_is_error) {
/** Notify the registered function about endpoint about to be closed. */
static void endpoint_close_lua_cb(struct network *net, struct endpoint *ep)
{
- struct worker_ctx *worker = net->loop->data; // LATER: the_worker
- lua_State *L = worker->engine->L;
+ lua_State *L = the_worker->engine->L;
void **pp = trie_get_try(net->endpoint_kinds, ep->flags.kind,
strlen(ep->flags.kind));
if (!pp && net->missing_kind_is_error) {
{
if (net != NULL) {
network_close_force(net);
- struct worker_ctx *worker = net->loop->data; // LATER: the_worker
- trie_apply(net->endpoint_kinds, kind_unregister, worker->engine->L);
+ trie_apply(net->endpoint_kinds, kind_unregister, the_worker->engine->L);
trie_free(net->endpoint_kinds);
tls_credentials_free(net->tls_credentials);
const struct sockaddr *addr);
static void on_tcp_connect_timeout(uv_timer_t *timer);
-/** @internal Get singleton worker. */
-static inline struct worker_ctx *get_worker(void)
-{
- return uv_default_loop()->data;
-}
+struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */
+struct worker_ctx *the_worker = NULL;
/*! @internal Create a UDP/TCP handle for an outgoing AF_INET* connection.
* socktype is SOCK_* */
req->pool = pool;
req->vars_ref = LUA_NOREF;
req->uid = uid;
- req->daemon_context = worker;
/* Remember query source addr */
if (!addr || (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)) {
static void on_connect(uv_connect_t *req, int status)
{
- struct worker_ctx *worker = get_worker();
+ struct worker_ctx *worker = the_worker;
+ assert(worker);
uv_stream_t *handle = req->handle;
struct session *session = handle->data;
struct sockaddr *peer = session_get_peer(session);
struct session *session = timer->data;
uv_timer_stop(timer);
- struct worker_ctx *worker = get_worker();
+ struct worker_ctx *worker = the_worker;
+ assert(worker);
assert (session_tasklist_is_empty(session));
{
return task->finished;
}
-/** Reserve worker buffers */
+
+/** Reserve worker buffers. We assume worker's been zeroed. */
static int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
+ worker->tcp_connected = map_make(NULL);
+ worker->tcp_waiting = map_make(NULL);
+ worker->subreq_out = trie_create(NULL);
+
array_init(worker->pool_mp);
if (array_reserve(worker->pool_mp, ring_maxlen)) {
return kr_error(ENOMEM);
}
- memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
+
worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
worker->pkt_pool.alloc = (knot_mm_alloc_t) mp_alloc;
- worker->subreq_out = trie_create(NULL);
- worker->tcp_connected = map_make(NULL);
- worker->tcp_waiting = map_make(NULL);
- worker->tcp_pipeline_max = MAX_PIPELINED;
- memset(&worker->stats, 0, sizeof(worker->stats));
+
return kr_ok();
}
array_clear(*list);
}
-void worker_reclaim(struct worker_ctx *worker)
+void worker_deinit(void)
{
- reclaim_mp_freelist(&worker->pool_mp);
- mp_delete(worker->pkt_pool.ctx);
- worker->pkt_pool.ctx = NULL;
- trie_free(worker->subreq_out);
- worker->subreq_out = NULL;
- map_clear(&worker->tcp_connected);
- map_clear(&worker->tcp_waiting);
+ struct worker_ctx *worker = the_worker;
+ assert(worker);
if (worker->z_import != NULL) {
zi_free(worker->z_import);
worker->z_import = NULL;
}
+ map_clear(&worker->tcp_connected);
+ map_clear(&worker->tcp_waiting);
+ trie_free(worker->subreq_out);
+ worker->subreq_out = NULL;
+
+ reclaim_mp_freelist(&worker->pool_mp);
+ mp_delete(worker->pkt_pool.ctx);
+ worker->pkt_pool.ctx = NULL;
+
+ the_worker = NULL;
}
-struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
- int worker_id, int worker_count)
+int worker_init(struct engine *engine, int worker_id, int worker_count)
{
assert(engine && engine->L);
+ assert(the_worker == NULL);
kr_bindings_register(engine->L);
/* Create main worker. */
- struct worker_ctx *worker = mm_alloc(pool, sizeof(*worker));
- if (!worker) {
- return NULL;
- }
+ struct worker_ctx *worker = &the_worker_value;
memset(worker, 0, sizeof(*worker));
+ worker->engine = engine;
+
+ uv_loop_t *loop = uv_default_loop();
+ worker->loop = loop;
+
worker->id = worker_id;
worker->count = worker_count;
- worker->engine = engine;
- worker->next_request_uid = UINT16_MAX + 1;
- worker_reserve(worker, MP_FREELIST_SIZE);
+
+ /* Register table for worker per-request variables */
+ lua_newtable(engine->L);
+ lua_setfield(engine->L, -2, "vars");
+ lua_getfield(engine->L, -1, "vars");
+ worker->vars_table_ref = luaL_ref(engine->L, LUA_REGISTRYINDEX);
+ lua_pop(engine->L, 1);
+
+ worker->tcp_pipeline_max = MAX_PIPELINED;
worker->out_addr4.sin_family = AF_UNSPEC;
worker->out_addr6.sin6_family = AF_UNSPEC;
- /* Register worker in Lua thread */
- lua_pushlightuserdata(engine->L, worker);
- lua_setglobal(engine->L, "__worker");
+
+ int ret = worker_reserve(worker, MP_FREELIST_SIZE);
+ if (ret) return ret;
+ worker->next_request_uid = UINT16_MAX + 1;
+
+ /* Set some worker.* fields in Lua */
lua_getglobal(engine->L, "worker");
lua_pushnumber(engine->L, worker_id);
lua_setfield(engine->L, -2, "id");
lua_setfield(engine->L, -2, "pid");
lua_pushnumber(engine->L, worker_count);
lua_setfield(engine->L, -2, "count");
- /* Register table for worker per-request variables */
- lua_newtable(engine->L);
- lua_setfield(engine->L, -2, "vars");
- lua_getfield(engine->L, -1, "vars");
- worker->vars_table_ref = luaL_ref(engine->L, LUA_REGISTRYINDEX);
- lua_pop(engine->L, 1);
- return worker;
+
+ the_worker = worker;
+ loop->data = the_worker;
+ /* ^^^^ This shouldn't be used anymore, but it's hard to be 100% sure. */
+ return kr_ok();
}
#undef VERBOSE_MSG
/** Zone import context (opaque). */
struct zone_import_ctx;
-/** Create and initialize the worker. */
-struct worker_ctx *worker_create(struct engine *engine, knot_mm_t *pool,
- int worker_id, int worker_count);
+/** Pointer to the singleton worker. NULL if not initialized. */
+KR_EXPORT extern struct worker_ctx *the_worker;
+
+/** Create and initialize the worker.
+ * \return error code (ENOMEM) */
+int worker_init(struct engine *engine, int worker_id, int worker_count);
+
+/** Destroy the worker (free memory). */
+void worker_deinit(void);
/**
* Process an incoming packet (query from a client or answer from upstream).
/** @return struct kr_request associated with opaque task */
struct kr_request *worker_task_request(struct qr_task *task);
-/** Collect worker mempools */
-void worker_reclaim(struct worker_ctx *worker);
-
int worker_task_step(struct qr_task *task, const struct sockaddr *packet_source,
knot_pkt_t *packet);
int vars_ref; /**< Reference to per-request variable table. LUA_NOREF if not set. */
knot_mm_t pool;
unsigned int uid; /** for logging purposes only */
- void *daemon_context; /** pointer to worker from daemon. Can be used in modules. */
};
/** Initializer for an array of *_selected. */
if (!ka_want) {
return ctx->state;
}
- const struct worker_ctx *worker = (const struct worker_ctx *)req->daemon_context;
- assert(worker);
- const struct network *net = &worker->engine->net;
+ const struct network *net = &the_worker->engine->net;
uint64_t timeout = net->tcp.in_idle_timeout / 100;
if (timeout > UINT16_MAX) {
timeout = UINT16_MAX;