int session_wirebuf_process(struct session *session, const struct sockaddr *peer)
{
int ret = 0;
- if (session->wire_buf_start_idx == session->wire_buf_end_idx) {
+ if (session->wire_buf_start_idx == session->wire_buf_end_idx)
return ret;
- }
+
size_t wirebuf_data_size = session->wire_buf_end_idx - session->wire_buf_start_idx;
- uint32_t max_iterations = (wirebuf_data_size / (KNOT_WIRE_HEADER_SIZE + KNOT_WIRE_QUESTION_MIN_SIZE)) + 1;
+ uint32_t max_iterations = (wirebuf_data_size /
+ (KNOT_WIRE_HEADER_SIZE + KNOT_WIRE_QUESTION_MIN_SIZE)) + 1;
knot_pkt_t *query = NULL;
+
while (((query = session_produce_packet(session, &the_worker->pkt_pool)) != NULL) &&
(ret < max_iterations)) {
assert (!session_wirebuf_error(session));
int res = worker_submit(session, peer, query);
- if (res != kr_error(EILSEQ)) {
- /* Packet has been successfully parsed. */
+ /* Errors from worker_submit() are intetionally *not* handled in order to
+ * ensure the entire wire buffer is processed. */
+ if (res == kr_ok())
ret += 1;
- }
if (session_discard_packet(session, query) < 0) {
/* Packet data isn't stored in memory as expected.
- something went wrong, normally should not happen. */
+ * something went wrong, normally should not happen. */
break;
}
}
- if (session_wirebuf_error(session)) {
+
+ /* worker_submit() may cause the session to close (e.g. due to IO
+ * write error when the packet triggers an immediate answer). This is
+ * an error state, as well as any wirebuf error. */
+ if (session->sflags.closing || session_wirebuf_error(session))
ret = -1;
- }
+
return ret;
}
{
assert(task && task->leading == false);
if (task->finished) {
- return 0;
+ return kr_ok();
}
struct request_ctx *ctx = task->ctx;
struct session *source_session = ctx->source.session;
task->finished = true;
if (source_session == NULL) {
(void) qr_task_on_send(task, NULL, kr_error(EIO));
- return state == KR_STATE_DONE ? 0 : kr_error(EIO);
+ return state == KR_STATE_DONE ? kr_ok() : kr_error(EIO);
}
+ if (session_flags(source_session)->closing ||
+ ctx->source.addr.ip.sa_family == AF_UNSPEC)
+ return kr_error(EINVAL);
+
/* Reference task as the callback handler can close it */
qr_task_ref(task);
/* Send back answer */
- assert(!session_flags(source_session)->closing);
- assert(ctx->source.addr.ip.sa_family != AF_UNSPEC);
-
int ret;
const uv_handle_t *src_handle = session_get_handle(source_session);
if (src_handle->type != UV_UDP && src_handle->type != UV_TCP) {
qr_task_unref(task);
- return state == KR_STATE_DONE ? 0 : kr_error(EIO);
+ if (ret != kr_ok() || state != KR_STATE_DONE)
+ return kr_error(EIO);
+ return kr_ok();
}
static int udp_task_step(struct qr_task *task,