fr_minmax_heap_t *time_order; //!< time ordered heap of requests
fr_rb_tree_t *dedup; //!< de-dup tree
+ fr_rb_tree_t *listeners; //!< so we can cancel requests when a listener goes away
+
fr_io_stats_t stats; //!< input / output stats
fr_time_elapsed_t cpu_time; //!< histogram of total CPU time per request
fr_time_elapsed_t wall_clock; //!< histogram of wall clock time per request
fr_channel_t **channel; //!< list of channels
};
+typedef struct {
+ fr_listen_t const *listener; //!< incoming packets
+
+ fr_rb_node_t node; //!< in tree of listeners
+
+ /*
+ * To save time, we don't care about num_elements here. Which means that we don't
+ * need to cache or lookup the fr_worker_listen_t when we free a request.
+ */
+ fr_dlist_head_t dlist; //!< of requests associated with this listener.
+} fr_worker_listen_t;
+
+
+static int8_t worker_listener_cmp(void const *one, void const *two)
+{
+ fr_worker_listen_t const *a = one, *b = two;
+
+ return CMP(a->listener, b->listener);
+}
+
+
/*
* Explicitly cleanup the memory allocated to the ring buffer,
* just in case valgrind complains about it.
}
}
-static int fr_worker_listen_cancel_self(UNUSED fr_worker_t *worker, UNUSED fr_listen_t const *li)
+static int fr_worker_listen_cancel_self(fr_worker_t *worker, fr_listen_t const *li)
{
+ fr_worker_listen_t *wl;
+ request_t *request;
+
+ wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = li });
+ if (!wl) return -1;
+
+ while ((request = fr_dlist_pop_head(&wl->dlist)) != NULL) {
+ RDEBUG("Canceling request due to socket being closed");
+ unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
+ }
+
+ (void) fr_rb_delete(worker->listeners, wl);
+ talloc_free(wl);
return 0;
}
fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
fr_assert(!fr_heap_entry_inserted(request->runnable_id));
+ fr_dlist_entry_unlink(&request->listen_entry);
+
#ifndef NDEBUG
request->async->el = NULL;
request->async->process = NULL;
}
worker_request_time_tracking_start(worker, request, now);
+
+ {
+ fr_worker_listen_t *wl;
+
+ wl = fr_rb_find(worker->listeners, &(fr_worker_listen_t) { .listener = listen });
+ if (!wl) {
+ MEM(wl = talloc_zero(worker, fr_worker_listen_t));
+ fr_dlist_init(&wl->dlist, request_t, listen_entry);
+ wl->listener = listen;
+
+ (void) fr_rb_insert(worker->listeners, wl);
+ }
+
+ fr_dlist_insert_tail(&wl->dlist, request);
+ }
}
/**
goto fail;
}
+ worker->listeners = fr_rb_inline_talloc_alloc(worker, fr_worker_listen_t, node, worker_listener_cmp, NULL);
+ if (!worker->listeners) {
+ fr_strerror_const("Failed creating listener tree");
+ goto fail;
+ }
+
worker->intp = unlang_interpret_init(worker, el,
&(unlang_request_func_t){
.init_internal = _worker_request_internal_init,