static void udp_queue_send(int fd)
{
udp_queue_t *const q = state.udp_queues[fd];
- const uv_handle_t * const uv_h = NULL; // TODO: get a real value?
+ if (!q->len) return;
+ // whole queue shares `fd`, so the UV handle is the same as well
+ struct request_ctx *ctx = worker_task_get_request(q->items[0].task);
+ const uv_handle_t * const uv_h =
+ session_get_handle(worker_request_get_source_session(ctx));
+ assert(uv_h);
+
for (int i = 0; i < q->len;) { // send from `i` onwards
int len_done = sendmmsg(fd, q->msgvec + i, q->len - i, 0);
(void)likely(len_done == q->len - i);
task->finished = true;
if (source_session == NULL) {
- (void) qr_task_on_send(task, NULL, kr_error(EIO));
+ (void)qr_task_on_send(task, NULL/*src_handle*/, kr_error(EIO));
return state == KR_STATE_DONE ? kr_ok() : kr_error(EIO);
}
+ const uv_handle_t *src_handle = session_get_handle(source_session);
if (unlikely(ctx->req.answer == NULL)) { /* meant to be dropped */
- (void) qr_task_on_send(task, NULL, kr_ok());
+ (void)qr_task_on_send(task, src_handle, kr_ok());
return kr_ok();
}
/* Send back answer */
int ret;
- const uv_handle_t *src_handle = session_get_handle(source_session);
if (src_handle->type != UV_UDP && src_handle->type != UV_TCP
&& src_handle->type != UV_POLL) {
assert(false);
}
if (ret != kr_ok()) {
- (void) qr_task_on_send(task, NULL, kr_error(EIO));
+ (void)qr_task_on_send(task, src_handle, kr_error(EIO));
/* Since source session is erroneous detach all tasks. */
while (!session_tasklist_is_empty(source_session)) {
struct qr_task *t = session_tasklist_del_first(source_session, false);