return;
}
+ // allow deferring EOF for incoming connections to send answer even if half-closed
+ if (!s->outgoing && (nread == UV_EOF)) {
+ if (kr_log_is_debug(IO, NULL)) {
+ struct sockaddr *peer = session2_get_peer(s);
+ char *peer_str = kr_straddr(peer);
+ kr_log_debug(IO, "=> connection to '%s' half-closed by peer (EOF)\n",
+ peer_str ? peer_str : "");
+ }
+ session2_event(s, PROTOLAYER_EVENT_EOF, NULL);
+ return;
+ }
+
if (nread < 0 || !buf->base) {
if (kr_log_is_debug(IO, NULL)) {
struct sockaddr *peer = session2_get_peer(s);
return false;
}
+static inline ssize_t session2_get_protocol(
+ struct session2 *s, enum protolayer_type protocol)
+{
+ const struct protolayer_grp *grp = &protolayer_grps[s->proto];
+ for (ssize_t i = 0; i < grp->num_layers; i++) {
+ enum protolayer_type found = grp->layers[i];
+ if (protocol == found)
+ return i;
+ }
+
+ return -1;
+}
/** Gets layer-specific session data for the layer with the specified index
* from the manager. */
return protolayer_sess_data_get(ctx->session, ctx->layer_ix);
}
+void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol) {
+ ssize_t layer_ix = session2_get_protocol(s, protocol);
+ if (layer_ix < 0)
+ return NULL;
+
+ return protolayer_sess_data_get(s, layer_ix);
+}
+
/** Gets layer-specific iteration data for the layer with the specified index
* from the context. */
static inline struct protolayer_data *protolayer_iter_data_get(
return protolayer_iter_data_get(ctx, ctx->layer_ix);
}
-static inline ssize_t session2_get_protocol(
- struct session2 *s, enum protolayer_type protocol)
-{
- const struct protolayer_grp *grp = &protolayer_grps[s->proto];
- for (ssize_t i = 0; i < grp->num_layers; i++) {
- enum protolayer_type found = grp->layers[i];
- if (protocol == found)
- return i;
- }
-
- return -1;
-}
-
static inline bool protolayer_iter_ctx_is_last(struct protolayer_iter_ctx *ctx)
{
unsigned int last_ix = (ctx->direction == PROTOLAYER_UNWRAP)
if (s->closing)
return kr_ok();
+ if (event == PROTOLAYER_EVENT_EOF) {
+ // no layer wanted to retain TCP half-closed state
+ session2_force_close(s);
+ return kr_ok();
+ }
+
bool is_close_event = (event == PROTOLAYER_EVENT_CLOSE ||
event == PROTOLAYER_EVENT_FORCE_CLOSE);
if (is_close_event) {
XX(MALFORMED) \
/** Signal that a connection has ended. */\
XX(DISCONNECT) \
+ /** Signal EOF from peer (e.g. half-closed TCP connection). */\
+ XX(EOF) \
/** Failed task send - update stats. */\
XX(STATS_SEND_ERR) \
/** Outgoing query submission - update stats. */\
* queue iterators, as it does not need to iterate through the whole queue. */
bool protolayer_queue_has_payload(const protolayer_iter_ctx_queue_t *queue);
+/** Gets layer-specific session data for the specified protocol layer.
+ * Returns NULL if the layer is not present in the session. */
+void *protolayer_sess_data_get_proto(struct session2 *s, enum protolayer_type protocol);
+
/** Gets layer-specific session data for the last processed layer.
* To be used after returning from its callback for async continuation but before calling protolayer_continue. */
void *protolayer_sess_data_get_current(struct protolayer_iter_ctx *ctx);
return PROTOLAYER_EVENT_PROPAGATE;
}
+ if (event == PROTOLAYER_EVENT_EOF) {
+ // TCP half-closed state not allowed
+ session2_force_close(s);
+ return PROTOLAYER_EVENT_CONSUME;
+ }
+
if (tls->client_side) {
if (event == PROTOLAYER_EVENT_CONNECT)
return pl_tls_client_connect_start(tls, s);
qr_task_free((task)); \
} while (0)
+struct pl_dns_stream_sess_data {
+ struct protolayer_data h;
+ bool single : 1; /**< True: Stream only allows a single packet */
+ bool produced : 1; /**< True: At least one packet has been produced */
+ bool connected : 1; /**< True: The stream is connected */
+ bool half_closed : 1; /**< True: EOF was received, the stream is half-closed */
+};
+
/* Forward decls */
static void qr_task_free(struct qr_task *task);
static int qr_task_step(struct qr_task *task,
static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt);
-
struct worker_ctx the_worker_value; /**< Static allocation is suitable for the singleton. */
struct worker_ctx *the_worker = NULL;
session2_close(source_session);
}
+ if (source_session->stream && !source_session->closing) {
+ struct pl_dns_stream_sess_data *stream =
+ protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_MULTI_STREAM);
+ if (!stream)
+ stream = protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_UNSIZED_STREAM);
+ if (!stream)
+ stream = protolayer_sess_data_get_proto(source_session, PROTOLAYER_TYPE_DNS_SINGLE_STREAM);
+ if (stream && stream->half_closed) {
+ session2_force_close(source_session);
+ }
+ }
+
qr_task_unref(task);
if (ret != kr_ok() || state != KR_STATE_DONE)
}
}
-struct pl_dns_stream_sess_data {
- struct protolayer_data h;
- bool single : 1; /**< True: Stream only allows a single packet */
- bool produced : 1; /**< True: At least one packet has been produced */
- bool connected : 1; /**< True: The stream is connected */
-};
-
static int pl_dns_stream_sess_init(struct session2 *session,
void *sess_data, void *param)
{
return PROTOLAYER_EVENT_PROPAGATE;
}
+static enum protolayer_event_cb_result pl_dns_stream_eof(
+ struct session2 *session, struct pl_dns_stream_sess_data *stream)
+{
+ if (!session2_is_empty(session)) {
+ stream->half_closed = true;
+ return PROTOLAYER_EVENT_CONSUME;
+ }
+ return PROTOLAYER_EVENT_PROPAGATE;
+}
+
static enum protolayer_event_cb_result pl_dns_stream_event_unwrap(
enum protolayer_event_type event, void **baton,
struct session2 *session, void *sess_data)
case PROTOLAYER_EVENT_FORCE_CLOSE:
return pl_dns_stream_disconnected(session, stream);
+ case PROTOLAYER_EVENT_EOF:
+ return pl_dns_stream_eof(session, stream);
+
default:
return PROTOLAYER_EVENT_PROPAGATE;
}