return protolayer_break(ctx, kr_error(EINVAL));
}
+ int status = kr_ok();
struct session2 *session = ctx->manager->session;
struct pl_dns_stream_sess_data *stream_sess = sess_data;
struct wire_buf *wb = ctx->payload.wire_buf;
+ const uint32_t max_iters = (wire_buf_data_length(wb) /
+ (KNOT_WIRE_HEADER_SIZE + KNOT_WIRE_QUESTION_MIN_SIZE)) + 1;
+ int iters = 0;
knot_pkt_t *pkt;
- while ((pkt = produce_stream_packet(wb))) {
+ while ((pkt = produce_stream_packet(wb)) && iters < max_iters) {
if (stream_sess->single && stream_sess->produced) {
if (kr_log_is_debug(WORKER, NULL)) {
kr_log_debug(WORKER, "Unexpected extra data from %s\n",
kr_straddr(ctx->comm->src_addr));
}
- mp_flush(the_worker->pkt_pool.ctx);
worker_end_tcp(session);
- return protolayer_break(ctx, KNOT_EMALF);
+ status = KNOT_EMALF;
+ goto exit;
}
stream_sess->produced = true;
- if (!pkt)
- return protolayer_break(ctx, KNOT_EMALF);
+ if (!pkt) {
+ status = KNOT_EMALF;
+ goto exit;
+ }
int ret = worker_submit(session, ctx->comm, NULL, NULL, pkt);
wire_buf_movestart(wb);
- mp_flush(the_worker->pkt_pool.ctx);
- if (ret) {
- worker_end_tcp(session);
- return protolayer_break(ctx, ret);
+ if (ret == kr_ok()) {
+ iters += 1;
}
}
+
+
+
+ /* worker_submit() may cause the session to close (e.g. due to IO
+ * write error when the packet triggers an immediate answer). This is
+ * an error state, as well as any wirebuf error. */
+ if (session->closing)
+ status = kr_error(EIO);
+
+exit:
wire_buf_movestart(wb);
- return protolayer_break(ctx, kr_ok());
+ mp_flush(the_worker->pkt_pool.ctx);
+ return protolayer_break(ctx, status);
}
struct sized_iovs {