#include "daemon/bindings/impl.h"
-#include "daemon/worker.h"
-
#include <unistd.h>
#include <uv.h>
static void event_free(uv_timer_t *timer)
{
- struct worker_ctx *worker = timer->loop->data;
- lua_State *L = worker->engine->L;
+ lua_State *L = the_worker->engine->L;
int ref = (intptr_t) timer->data;
luaL_unref(L, LUA_REGISTRYINDEX, ref);
free(timer);
static void event_callback(uv_timer_t *timer)
{
- struct worker_ctx *worker = timer->loop->data;
- lua_State *L = worker->engine->L;
+ lua_State *L = the_worker->engine->L;
/* Retrieve callback and execute */
lua_rawgeti(L, LUA_REGISTRYINDEX, (intptr_t) timer->data);
static void event_fdcallback(uv_poll_t* handle, int status, int events)
{
- struct worker_ctx *worker = handle->loop->data;
- lua_State *L = worker->engine->L;
+ lua_State *L = the_worker->engine->L;
/* Retrieve callback and execute */
lua_rawgeti(L, LUA_REGISTRYINDEX, (intptr_t) handle->data);
void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
const struct sockaddr *addr, unsigned flags)
{
- uv_loop_t *loop = handle->loop;
- struct worker_ctx *worker = loop->data;
struct session *s = handle->data;
if (session_flags(s)->closing) {
return;
assert(consumed == nread); (void)consumed;
session_wirebuf_process(s, addr);
session_wirebuf_discard(s);
- mp_flush(worker->pkt_pool.ctx);
+ mp_flush(the_worker->pkt_pool.ctx);
}
static int family_to_freebind_option(sa_family_t sa_family, int *level, int *name)
assert(!session_flags(s)->closing);
- struct worker_ctx *worker = timer->loop->data;
-
if (!session_tasklist_is_empty(s)) {
int finalized = session_tasklist_finalize_expired(s);
- worker->stats.timeout += finalized;
+ the_worker->stats.timeout += finalized;
/* session_tasklist_finalize_expired() may call worker_task_finalize().
* If session is a source session and there were IO errors,
* worker_task_finalize() can filnalize all tasks and close session. */
struct qr_task *t = session_waitinglist_pop(s, false);
worker_task_finalize(t, KR_STATE_FAIL);
worker_task_unref(t);
- worker->stats.timeout += 1;
+ the_worker->stats.timeout += 1;
if (session_flags(s)->closing) {
return;
}
}
- const struct engine *engine = worker->engine;
- const struct network *net = &engine->net;
+ const struct network *net = &the_worker->engine->net;
uint64_t idle_in_timeout = net->tcp.in_idle_timeout;
uint64_t last_activity = session_last_activity(s);
uint64_t idle_time = kr_now() - last_activity;
kr_log_verbose("[io] => closing connection to '%s'\n",
peer_str ? peer_str : "");
if (session_flags(s)->outgoing) {
- worker_del_tcp_waiting(worker, peer);
- worker_del_tcp_connected(worker, peer);
+ worker_del_tcp_waiting(the_worker, peer);
+ worker_del_tcp_connected(the_worker, peer);
}
session_close(s);
}
worker_end_tcp(s);
}
session_wirebuf_compress(s);
- struct worker_ctx *worker = handle->loop->data;
- mp_flush(worker->pkt_pool.ctx);
+ mp_flush(the_worker->pkt_pool.ctx);
}
static void _tcp_accept(uv_stream_t *master, int status, bool tls)
* We still need to keep in mind to only touch the buffer
* in this callback... */
assert(handle->loop->data);
- struct worker_ctx *worker = handle->loop->data;
- session->wire_buf = worker->wire_buf;
- session->wire_buf_size = sizeof(worker->wire_buf);
+ session->wire_buf = the_worker->wire_buf;
+ session->wire_buf_size = sizeof(the_worker->wire_buf);
}
uv_timer_init(handle->loop, &session->timeout);
if (session->wire_buf_start_idx == session->wire_buf_end_idx) {
return ret;
}
- struct worker_ctx *worker = session_get_handle(session)->loop->data;
size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
uint32_t max_iterations = (wirebuf_data_size / (KNOT_WIRE_HEADER_SIZE + KNOT_WIRE_QUESTION_MIN_SIZE)) + 1;
knot_pkt_t *query = NULL;
- while (((query = session_produce_packet(session, &worker->pkt_pool)) != NULL) &&
+ while (((query = session_produce_packet(session, &the_worker->pkt_pool)) != NULL) &&
(ret < max_iterations)) {
assert (!session_wirebuf_error(session));
int res = worker_submit(session, peer, query);
static int session_tls_hs_cb(struct session *session, int status)
{
assert(session_flags(session)->outgoing);
- uv_handle_t *handle = session_get_handle(session);
- uv_loop_t *loop = handle->loop;
- struct worker_ctx *worker = loop->data;
struct sockaddr *peer = session_get_peer(session);
- int deletion_res = worker_del_tcp_waiting(worker, peer);
+ int deletion_res = worker_del_tcp_waiting(the_worker, peer);
int ret = kr_ok();
if (status) {
struct kr_qflags *options = &task->ctx->req.options;
unsigned score = options->FORWARD || options->STUB ? KR_NS_FWD_DEAD : KR_NS_DEAD;
kr_nsrep_update_rtt(NULL, peer, score,
- worker->engine->resolver.cache_rtt,
+ the_worker->engine->resolver.cache_rtt,
KR_NS_UPDATE_NORESET);
}
#ifndef NDEBUG
assert(deletion_res != 0);
const char *key = tcpsess_key(peer);
assert(key);
- assert(map_contains(&worker->tcp_connected, key) != 0);
+ assert(map_contains(&the_worker->tcp_connected, key) != 0);
}
#endif
return ret;
}
}
- struct session *s = worker_find_tcp_connected(worker, peer);
+ struct session *s = worker_find_tcp_connected(the_worker, peer);
ret = kr_ok();
if (deletion_res == kr_ok()) {
/* peer was in the waiting list, add to the connected list. */
* peer already is in the connected list. */
ret = kr_error(EINVAL);
} else {
- ret = worker_add_tcp_connected(worker, peer, session);
+ ret = worker_add_tcp_connected(the_worker, peer, session);
}
} else {
/* peer wasn't in the waiting list.
/* Something went wrong.
* Either addition to the list of connected sessions
* or write to upstream failed. */
- worker_del_tcp_connected(worker, peer);
+ worker_del_tcp_connected(the_worker, peer);
session_waitinglist_finalize(session, KR_STATE_FAIL);
assert(session_tasklist_is_empty(session));
session_close(session);
return kr_error(EINVAL);
}
- struct worker_ctx *worker = handle->loop->data;
-
/* Parse packet */
int ret = parse_packet(query);
if (!query ||
(ret != kr_ok() && ret != kr_error(EMSGSIZE)) ||
(is_query == is_outgoing)) {
- if (query && !is_outgoing) worker->stats.dropped += 1;
+ if (query && !is_outgoing) the_worker->stats.dropped += 1;
return kr_error(EILSEQ);
}
struct qr_task *task = NULL;
const struct sockaddr *addr = NULL;
if (!is_outgoing) { /* request from a client */
- struct request_ctx *ctx = request_create(worker, session, peer,
+ struct request_ctx *ctx = request_create(the_worker, session, peer,
knot_wire_get_id(query->wire));
if (!ctx) {
return kr_error(ENOMEM);
session_timer_stop(session);
- uv_handle_t *handle = session_get_handle(session);
- struct worker_ctx *worker = handle->loop->data;
struct sockaddr *peer = session_get_peer(session);
- worker_del_tcp_waiting(worker, peer);
- worker_del_tcp_connected(worker, peer);
+ worker_del_tcp_waiting(the_worker, peer);
+ worker_del_tcp_connected(the_worker, peer);
session_flags(session)->connected = false;
struct tls_client_ctx_t *tls_client_ctx = session_tls_get_client_ctx(session);
the_worker = worker;
loop->data = the_worker;
- /* ^^^^ This shouldn't be used anymore, but it's hard to be 100% sure. */
+ /* ^^^^ Now this shouldn't be used anymore, but it's hard to be 100% sure. */
return kr_ok();
}