static _Thread_local fr_ring_buffer_t *fr_worker_rb;
+typedef struct {
+ fr_channel_t *ch;
+
+ /*
+ * To save time, we don't care about num_elements here. Which means that we don't
+ * need to cache or lookup the fr_worker_listen_t when we free a request.
+ */
+ fr_dlist_head_t dlist;
+} fr_worker_channel_t;
+
/**
* A worker which takes packets from a master, and processes them.
*/
fr_event_timer_t const *ev_cleanup; //!< timer for max_request_time
- fr_channel_t **channel; //!< list of channels
+ fr_worker_channel_t *channel; //!< list of channels
};
typedef struct {
worker_request_bootstrap(worker, cd, fr_time());
}
+static void worker_requests_cancel(fr_worker_channel_t *ch)
+{
+ request_t *request;
+
+ while ((request = fr_dlist_pop_head(&ch->dlist)) != NULL) {
+ unlang_interpret_signal(request, FR_SIGNAL_CANCEL);
+ }
+}
+
static void worker_exit(fr_worker_t *worker)
{
worker->exiting = true;
ok = false;
for (i = 0; i < worker->config.max_channels; i++) {
- fr_assert(worker->channel[i] != ch);
+ fr_assert(worker->channel[i].ch != ch);
- if (worker->channel[i] != NULL) continue;
+ if (worker->channel[i].ch != NULL) continue;
+
+ worker->channel[i].ch = ch;
+ fr_dlist_init(&worker->channel[i].dlist, fr_async_t, entry);
- worker->channel[i] = ch;
DEBUG3("Received channel %p into array entry %d", ch, i);
ms = fr_message_set_create(worker, worker->config.message_set_size,
* of channels.
*/
for (i = 0; i < worker->config.max_channels; i++) {
- if (!worker->channel[i]) continue;
+ if (!worker->channel[i].ch) continue;
+
+ if (worker->channel[i].ch != ch) continue;
- if (worker->channel[i] != ch) continue;
+ worker_requests_cancel(&worker->channel[i]);
ms = fr_channel_responder_uctx_get(ch);
fr_message_set_gc(ms);
talloc_free(ms);
- worker->channel[i] = NULL;
+ worker->channel[i].ch = NULL;
+
+ fr_assert(!fr_dlist_head(&worker->channel[i].dlist)); /* we can't look at num_elements */
fr_assert(worker->num_channels > 0);
+
worker->num_channels--;
ok = true;
break;
request->async = talloc_zero(request, fr_async_t);
request->async->recv_time = now;
request->async->el = worker->el;
+ fr_dlist_entry_init(&request->async->entry);
}
static inline CC_HINT(always_inline)
* automatically freed when our talloc context is freed.
*/
for (i = 0; i < worker->config.max_channels; i++) {
- if (!worker->channel[i]) continue;
+ if (!worker->channel[i].ch) continue;
- fr_channel_responder_ack_close(worker->channel[i]);
+ fr_channel_responder_ack_close(worker->channel[i].ch);
}
talloc_free(worker);
*/
worker_request_time_tracking_end(worker, request, now);
+ /*
+ * Remove it from the list of requests associated with this channel.
+ */
+ if (fr_dlist_entry_in_list(&request->async->entry)) {
+ fr_dlist_entry_unlink(&request->async->entry);
+ }
+
/*
* These conditions are true when the server is
* exiting and we're stopping all the requests.
fr_assert(!fr_heap_entry_inserted(request->runnable_id));
fr_assert(!fr_minmax_heap_entry_inserted(request->time_order_id));
+ fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
}
/** Detached request (i.e. one generated by the interpreter with no parent) is now complete
*/
(void)fr_minmax_heap_extract(worker->time_order, request);
+ fr_assert(!fr_dlist_entry_in_list(&request->async->entry));
+
/*
* Detached requests have to be freed by us
* as nothing else can free them.
CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20));
CHECK_CONFIG_TIME_DELTA(max_request_time, fr_time_delta_from_sec(5), fr_time_delta_from_sec(120));
- worker->channel = talloc_zero_array(worker, fr_channel_t *, worker->config.max_channels);
+ worker->channel = talloc_zero_array(worker, fr_worker_channel_t, worker->config.max_channels);
if (!worker->channel) {
talloc_free(worker);
goto nomem;
(void) talloc_get_type_abort(worker->dedup, fr_rb_tree_t);
for (i = 0; i < worker->config.max_channels; i++) {
- if (!worker->channel[i]) continue;
+ if (!worker->channel[i].ch) continue;
- (void) talloc_get_type_abort(worker->channel[i], fr_channel_t);
+ (void) talloc_get_type_abort(worker->channel[i].ch, fr_channel_t);
}
}
#endif