XX(TIMEOUT) /**< Signal that the session has timed out. */\
XX(CONNECT) /**< Signal that a connection has been established. */\
XX(CONNECT_FAIL) /**< Signal that a connection could not have been established. */\
- XX(DISCONNECT) /**< Signal that a connection has ended. */
+ XX(DISCONNECT) /**< Signal that a connection has ended. */\
+ XX(STATS_SEND_ERR) /**< Failed task send - update stats. */\
+ XX(STATS_QRY_OUT) /**< Outgoing query submission - update stats. */
/** Event type, to be interpreted by a layer. */
enum protolayer_event_type {
* Only needs to be valid for session initialization. */
};
-/** Allocates and initializes a new manager. */
-struct protolayer_manager *protolayer_manager_new(
- struct session2 *s,
- enum protolayer_grp grp,
- struct protolayer_data_param *layer_param,
- size_t layer_param_count);
-
-/** Deinitializes all layer data in the manager and deallocates it. */
-void protolayer_manager_free(struct protolayer_manager *m);
-
-
/** Global data for a specific layered protocol. This is to be initialized in
* the `protolayer_globals` global array (below) during the start of the
* resolver. It contains pointers to the specific protocol's functions. */
* to close. */
bool closing : 1;
- /** If true, a connection is established. Only applicable to sessions
- * using connection-based protocols.
- *
- * TODO: move to `worker`? */
- bool connected : 1;
-
- /** If true, session is being rate-limited.
- *
- * TODO: move to `worker`? */
- bool throttled : 1;
-
/** If true, encryption takes place in this session. Layers may use
* this to determine whether padding should be applied. A layer that
* provides security shall set this to `true` during session
* initialization. */
bool secure : 1;
+
+ /** If true, the session contains a stream-based protocol layer.
+ * Set during protocol layer initialization by the stream-based layer. */
+ bool stream : 1;
+
+ /** If true, a connection is established. Only applicable to sessions
+ * using connection-based protocols. One of the stream-based protocol
+ * layers is going to be the writer for this flag. */
+ bool connected : 1;
+
+ /** If true, session is being rate-limited. One of the protocol layers
+ * is going to be the writer for this flag. */
+ bool throttled : 1;
};
/** Allocates and initializes a new session with the specified protocol layer
const struct sockaddr *dst_addr = comm->dst_addr;
const struct proxy_result *proxy = comm->proxy;
- req->qsource.comm_flags.tcp = session2_get_handle(session)->type == UV_TCP;
+ req->qsource.comm_flags.tcp = session->stream;
req->qsource.comm_flags.tls = session->secure;
// req->qsource.comm_flags.http = session->has_http; /* TODO */
req->qsource.comm_flags.http = false;
/*@ Register new qr_task within session. */
static int qr_task_register(struct qr_task *task, struct session2 *session)
{
- if (kr_fails_assert(!session->outgoing && session2_get_handle(session)->type == UV_TCP))
+ if (kr_fails_assert(!session->outgoing && session->stream))
return kr_error(EINVAL);
session2_tasklist_add(session, task);
}
/* This is called when we send subrequest / answer */
-int qr_task_on_send(struct qr_task *task, const uv_handle_t *handle, int status)
+int qr_task_on_send(struct qr_task *task, struct session2 *s, int status)
{
if (task->finished) {
kr_require(task->leading == false);
qr_task_complete(task);
}
- if (!handle || kr_fails_assert(handle->data))
+ if (!s)
return status;
- struct session2* s = handle->data;
- if (handle->type == UV_UDP && s->outgoing) {
+ if (!s->stream && s->outgoing) {
// This should ensure that we are only dealing with our question to upstream
if (kr_fails_assert(!knot_wire_get_qr(task->pktbuf->wire)))
return status;
}
}
- if (handle->type == UV_TCP) {
+ if (s->stream) {
if (status != 0) { // session probably not usable anymore; typically: ECONNRESET
const struct kr_request *req = &task->ctx->req;
if (kr_log_is_debug(WORKER, req)) {
const void *target, void *baton)
{
struct qr_task *task = baton;
- uv_handle_t *handle = session2_get_handle(session);
- qr_task_on_send(task, handle, status);
+ qr_task_on_send(task, session, status);
qr_task_unref(task);
wire_buf_reset(&session->wire_buf);
}
int ret = 0;
// struct request_ctx *ctx = task->ctx; /* TODO: used with doh below */
- uv_handle_t *handle = session2_get_handle(session);
- if (kr_fails_assert(handle && handle->data == session))
- return qr_task_on_send(task, NULL, kr_error(EINVAL));
- const bool is_stream = handle->type == UV_TCP;
- kr_require(is_stream || handle->type == UV_UDP);
-
if (addr == NULL)
addr = session2_get_peer(session);
if (pkt == NULL)
pkt = worker_task_get_pktbuf(task);
- if (session->outgoing && handle->type == UV_TCP) {
+ if (session->outgoing && session->stream) {
size_t try_limit = session2_tasklist_get_len(session) + 1;
uint16_t msg_id = knot_wire_get_id(pkt->wire);
size_t try_count = 0;
ret = kr_error(UV_EMFILE);
}
- /* TODO: doh */
-// if (session_flags(session)->has_http)
-// the_worker->stats.err_http += 1;
-// else
- if (session->secure)
- the_worker->stats.err_tls += 1;
- else if (handle->type == UV_UDP)
- the_worker->stats.err_udp += 1;
- else
- the_worker->stats.err_tcp += 1;
+ session2_event(session, PROTOLAYER_EVENT_STATS_SEND_ERR, NULL);
}
/* Update outgoing query statistics */
if (session->outgoing && addr) {
- if (session->secure)
- the_worker->stats.tls += 1;
- else if (handle->type == UV_UDP)
- the_worker->stats.udp += 1;
- else
- the_worker->stats.tcp += 1;
+ session2_event(session, PROTOLAYER_EVENT_STATS_QRY_OUT, NULL);
if (addr->sa_family == AF_INET6)
the_worker->stats.ipv6 += 1;
session2_timer_start(session, MAX_TCP_INACTIVITY, MAX_TCP_INACTIVITY);
}
-static uv_handle_t *transmit(struct qr_task *task)
+static int transmit(struct qr_task *task)
{
- uv_handle_t *ret = NULL;
+ if (!task)
+ return kr_error(EINVAL);
- if (task) {
- struct kr_transport* transport = task->transport;
+ struct kr_transport* transport = task->transport;
+ struct sockaddr_in6 *choice = (struct sockaddr_in6 *)&transport->address;
- struct sockaddr_in6 *choice = (struct sockaddr_in6 *)&transport->address;
+ if (!choice)
+ return kr_error(EINVAL);
+ if (task->pending_count >= MAX_PENDING)
+ return kr_error(EBUSY);
+ /* Checkout answer before sending it */
+ struct request_ctx *ctx = task->ctx;
+ int ret = kr_resolve_checkout(&ctx->req, NULL, transport, task->pktbuf);
+ if (ret)
+ return ret;
- if (!choice) {
- return ret;
- }
- if (task->pending_count >= MAX_PENDING) {
- return ret;
- }
- /* Checkout answer before sending it */
- struct request_ctx *ctx = task->ctx;
- if (kr_resolve_checkout(&ctx->req, NULL, transport, task->pktbuf) != 0) {
- return ret;
- }
- ret = ioreq_spawn(SOCK_DGRAM, choice->sin6_family,
- PROTOLAYER_GRP_DOUDP, NULL, 0);
- if (!ret) {
- return ret;
- }
- struct sockaddr *addr = (struct sockaddr *)choice;
- struct session2 *session = ret->data;
- struct sockaddr *peer = session2_get_peer(session);
- kr_assert(peer->sa_family == AF_UNSPEC && session->outgoing);
- kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
- memcpy(peer, addr, kr_sockaddr_len(addr));
- if (qr_task_send(task, session, (struct sockaddr *)choice,
- task->pktbuf) != 0) {
- session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
- ret = NULL;
- } else {
- task->pending[task->pending_count] = session;
- task->pending_count += 1;
- session2_start_read(session); /* Start reading answer */
- }
+ uv_handle_t *handle = ioreq_spawn(SOCK_DGRAM, choice->sin6_family,
+ PROTOLAYER_GRP_DOUDP, NULL, 0);
+ if (!handle)
+ return kr_error(EINVAL);
+
+ struct sockaddr *addr = (struct sockaddr *)choice;
+ struct session2 *session = handle->data;
+ struct sockaddr *peer = session2_get_peer(session);
+ kr_assert(peer->sa_family == AF_UNSPEC && session->outgoing);
+ kr_require(addr->sa_family == AF_INET || addr->sa_family == AF_INET6);
+ memcpy(peer, addr, kr_sockaddr_len(addr));
+
+ ret = qr_task_send(task, session, (struct sockaddr *)choice, task->pktbuf);
+ if (ret) {
+ session2_event(session, PROTOLAYER_EVENT_CLOSE, NULL);
+ return ret;
}
- return ret;
+
+ task->pending[task->pending_count] = session;
+ task->pending_count += 1;
+ session2_start_read(session); /* Start reading answer */
+ return kr_ok();
}
struct request_ctx *ctx = task->ctx;
xdp_handle_data_t *xhd = src_handle->data;
if (kr_fails_assert(xhd && xhd->socket && xhd->session == ctx->source.session))
- return qr_task_on_send(task, src_handle, kr_error(EINVAL));
+ return qr_task_on_send(task, NULL, kr_error(EINVAL));
knot_xdp_msg_t msg;
#if KNOT_VERSION_HEX >= 0x030100
uv_idle_start(&xhd->tx_waker, xdp_tx_waker);
kr_log_debug(XDP, "pushed a packet, ret = %d\n", ret);
- return qr_task_on_send(task, src_handle, ret);
+ return qr_task_on_send(task, xhd->session, ret);
#else
kr_assert(!EINVAL);
return kr_error(EINVAL);
if (subreq_enqueue(task)) {
return kr_ok(); /* Will be notified when outgoing query finishes. */
}
+
/* Start transmitting */
- uv_handle_t *handle = transmit(task);
- if (handle == NULL) {
+ int err = transmit(task);
+ if (err) {
subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KR_STATE_FAIL);
}
}
}
-static int parse_packet(knot_pkt_t *query)
-{
- if (!query){
- return kr_error(EINVAL);
- }
-
- /* Parse query packet. */
- int ret = knot_pkt_parse(query, 0);
- if (ret == KNOT_ETRAIL) {
- /* Extra data after message end. */
- ret = kr_error(EMSGSIZE);
- } else if (ret != KNOT_EOK) {
- /* Malformed query. */
- ret = kr_error(EPROTO);
- } else {
- ret = kr_ok();
- }
-
- return ret;
-}
-
int worker_submit(struct session2 *session, struct comm_info *comm,
const uint8_t *eth_from, const uint8_t *eth_to, knot_pkt_t *pkt)
{
if (!session || !pkt)
return kr_error(EINVAL);
- uv_handle_t *handle = session2_get_handle(session);
- if (!handle || !handle->loop->data)
- return kr_error(EINVAL);
-
const bool is_query = (knot_wire_get_qr(pkt->wire) == 0);
const bool is_outgoing = session->outgoing;
return kr_error(ENOMEM);
}
- if (handle->type == UV_TCP && qr_task_register(task, session)) {
+ if (session->stream && qr_task_register(task, session)) {
return kr_error(ENOMEM);
}
} else { /* response from upstream */
/* Note receive time for RTT calculation */
task->recv_time = kr_now();
}
- if (kr_fails_assert(!uv_is_closing(session2_get_handle(session))))
+ if (kr_fails_assert(!session->closing))
return kr_error(EINVAL);
/* Packet was successfully parsed.
} sent;
};
-static void pl_dns_stream_sess_init_common(struct pl_dns_stream_sess_data *stream,
+static void pl_dns_stream_sess_init_common(struct session2 *session,
+ struct pl_dns_stream_sess_data *stream,
bool single)
{
+ session->stream = true;
*stream = (struct pl_dns_stream_sess_data){
.single = single
};
void *param)
{
struct pl_dns_stream_sess_data *stream = protolayer_sess_data(layer);
- pl_dns_stream_sess_init_common(stream, false);
+ pl_dns_stream_sess_init_common(manager->session, stream, false);
return kr_ok();
}
void *param)
{
struct pl_dns_stream_sess_data *stream = protolayer_sess_data(layer);
- pl_dns_stream_sess_init_common(stream, true);
+ pl_dns_stream_sess_init_common(manager->session, stream, true);
return kr_ok();
}
static int pl_dns_stream_iter_init(struct protolayer_manager *manager,
- struct protolayer_data *layer)
+ struct protolayer_data *layer)
{
struct pl_dns_stream_iter_data *stream = protolayer_iter_data(layer);
*stream = (struct pl_dns_stream_iter_data){0};