peer_str, uv_strerror(status));
}
worker_end_tcp(s);
- session2_event(s, PROTOLAYER_EVENT_FORCE_CLOSE, NULL);
return status;
}
return status;
}
-static void qr_task_wrap_finished(int status, struct session2 *session, const void *target, void *baton)
+static void qr_task_wrap_finished(int status, struct session2 *session,
+ const void *target, void *baton)
{
struct qr_task *task = baton;
- qr_task_on_send(task, NULL, status);
+ qr_task_on_send(task, session2_get_handle(session), status);
qr_task_unref(task);
wire_buf_reset(&session->wire_buf);
}
* If no, most likely this is timed out connection
* which was removed from waiting list by
* on_tcp_connect_timeout() callback. */
- struct session2 *s = worker_find_tcp_waiting(peer);
- if (!s || s != session) {
+ struct session2 *found_session = worker_find_tcp_waiting(peer);
+ if (!found_session || found_session != session) {
/* session isn't on the waiting list.
* it's timed out session. */
if (log_debug) {
return;
}
- s = worker_find_tcp_connected(peer);
- if (s) {
+ found_session = worker_find_tcp_connected(peer);
+ if (found_session) {
/* session already in the connected list.
* Something went wrong, it can be due to races when kresd has tried
* to reconnect to upstream after unsuccessful attempt. */
}
/* TODO */
- session->connected = true;
+ session2_event(session, PROTOLAYER_EVENT_CONNECT, NULL);
session2_start_read(session);
int ret = kr_ok();
/* Close pending I/O requests */
subreq_finalize(task, packet_source, packet);
- if ((kr_now() - worker_task_creation_time(task)) >= KR_RESOLVE_TIME_LIMIT) {
+ if ((kr_now() - task->creation_time) >= KR_RESOLVE_TIME_LIMIT) {
struct kr_request *req = worker_task_request(task);
if (!kr_fails_assert(req))
kr_query_inform_timeout(req, req->current_query);
worker_del_tcp_waiting(peer);
worker_del_tcp_connected(peer);
- session->connected = false;
+ session2_event(session, PROTOLAYER_EVENT_DISCONNECT, NULL);
while (!session2_waitinglist_is_empty(session)) {
struct qr_task *task = session2_waitinglist_pop(session, false);
worker_task_unref(task);
}
+ session2_event(session, PROTOLAYER_EVENT_DISCONNECT, NULL);
+ session2_event(session, PROTOLAYER_EVENT_FORCE_CLOSE, NULL);
return kr_ok();
// session_flags(session)->connected = false;
peer_str ? peer_str : "");
}
- qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT);
+ if (qry)
+ qry->server_selection.error(qry, task->transport, KR_SELECTION_TCP_CONNECT_TIMEOUT);
the_worker->stats.timeout += session2_waitinglist_get_len(session);
session2_waitinglist_retry(session, true);
struct protolayer_data *layer)
{
struct session2 *session = manager->session;
- if (session->closing || event != PROTOLAYER_EVENT_TIMEOUT)
+ if (session->closing)
+ return true;
+
+ if (event == PROTOLAYER_EVENT_TIMEOUT) {
+ if (session->connected)
+ return pl_dns_stream_resolution_timeout(manager->session);
+ else
+ return pl_dns_stream_connection_timeout(manager->session);
+ } else if (event == PROTOLAYER_EVENT_CONNECT) {
+ session->connected = true;
+ return true;
+ } else if (event == PROTOLAYER_EVENT_DISCONNECT) {
+ session->connected = false;
return true;
+ }
- if (session->connected)
- return pl_dns_stream_resolution_timeout(manager->session);
- else
- return pl_dns_stream_connection_timeout(manager->session);
+ return true;
+}
+
+static knot_pkt_t *produce_stream_packet(struct wire_buf *wb)
+{
+ uint16_t pkt_len = knot_wire_read_u16(wire_buf_data(wb));
+ if (wire_buf_data_length(wb) < pkt_len + sizeof(uint16_t))
+ return NULL;
+
+ wire_buf_trim(wb, sizeof(uint16_t));
+ knot_pkt_t *pkt = produce_packet(wire_buf_data(wb), pkt_len);
+ wire_buf_trim(wb, pkt_len);
+ return pkt;
}
static enum protolayer_cb_result pl_dns_stream_unwrap(
return protolayer_break(ctx, kr_error(EINVAL));
}
+ struct session2 *session = ctx->manager->session;
struct pl_dns_stream_sess_data *stream = protolayer_sess_data(layer);
-
- if (stream->single && stream->produced) {
- if (kr_log_is_debug(WORKER, NULL)) {
- kr_log_debug(WORKER, "Unexpected extra data from %s\n",
- kr_straddr(ctx->comm.src_addr));
- }
- return protolayer_break(ctx, KNOT_EMALF);
- }
-
struct wire_buf *wb = ctx->payload.wire_buf;
- if (wire_buf_data_length(wb) < sizeof(uint16_t))
- return protolayer_break(ctx, KNOT_EMALF);
- uint16_t pkt_len = knot_wire_read_u16(wire_buf_data(wb));
- if (wire_buf_data_length(wb) < pkt_len + sizeof(uint16_t))
- return protolayer_wait(ctx);
+ knot_pkt_t *pkt;
+ while ((pkt = produce_stream_packet(wb))) {
+ if (stream->single && stream->produced) {
+ if (kr_log_is_debug(WORKER, NULL)) {
+ kr_log_debug(WORKER, "Unexpected extra data from %s\n",
+ kr_straddr(ctx->comm.src_addr));
+ }
+ worker_end_tcp(session);
+ return protolayer_break(ctx, KNOT_EMALF);
+ }
- wire_buf_trim(wb, sizeof(uint16_t));
- knot_pkt_t *pkt = produce_packet(wire_buf_data(wb), pkt_len);
- wire_buf_trim(wb, pkt_len);
- stream->produced = true;
- if (!pkt)
- return protolayer_break(ctx, KNOT_EMALF);
+ stream->produced = true;
+ if (!pkt)
+ return protolayer_break(ctx, KNOT_EMALF);
- int ret = worker_submit(ctx->manager->session, &ctx->comm, NULL, NULL, pkt);
+ int ret = worker_submit(session, &ctx->comm, NULL, NULL, pkt);
+ if (ret) {
+ worker_end_tcp(session);
+ return protolayer_break(ctx, ret);
+ }
+ }
wire_buf_movestart(wb);
- return protolayer_break(ctx, ret);
+ return protolayer_break(ctx, kr_ok());
}
struct sized_iovs {