*/
static void fr_network_socket_dead(fr_network_t *nr, fr_network_socket_t *s)
{
+ int i;
+
if (s->dead) return;
s->dead = true;
fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
+
+ for (i = 0; i < nr->max_workers; i++) {
+ if (!nr->workers[i]) continue;
+
+ (void) fr_worker_listen_cancel(nr->workers[i]->worker, s->listen);
+ }
+
/*
* If there are no outstanding packets, then we can free
* it now.
fr_network_t *nr = s->nr;
fr_channel_data_t *cd;
+ fr_assert(s->outstanding == 0);
+
fr_rb_delete(nr->sockets, s);
fr_rb_delete(nr->sockets_by_num, s);
#define CACHE_LINE_SIZE 64
static alignas(CACHE_LINE_SIZE) atomic_uint64_t request_number = 0;
+static _Thread_local fr_ring_buffer_t *fr_worker_rb;
+
/**
* A worker which takes packets from a master, and processes them.
*/
fr_channel_t **channel; //!< list of channels
};
+/*
+ * Explicitly cleanup the memory allocated to the ring buffer,
+ * just in case valgrind complains about it.
+ */
+static int _fr_worker_rb_free(void *arg)
+{
+ return talloc_free(arg);
+}
+
+/** Initialise thread local storage
+ *
+ * @return fr_ring_buffer_t for messages
+ */
+static inline fr_ring_buffer_t *fr_worker_rb_init(void)
+{
+ fr_ring_buffer_t *rb;
+
+ rb = fr_worker_rb;
+ if (rb) return rb;
+
+ rb = fr_ring_buffer_create(NULL, FR_CONTROL_MAX_MESSAGES * FR_CONTROL_MAX_SIZE);
+ if (!rb) {
+ fr_perror("Failed allocating memory for worker ring buffer");
+ return NULL;
+ }
+
+ fr_atexit_thread_local(fr_worker_rb, _fr_worker_rb_free, rb);
+
+ return rb;
+}
+
+static inline bool is_worker_thread(fr_worker_t const *worker)
+{
+ return (pthread_equal(pthread_self(), worker->thread_id) != 0);
+}
+
static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd, fr_time_t now);
static void worker_send_reply(fr_worker_t *worker, request_t *request, bool do_not_respond, fr_time_t now);
static void worker_max_request_time(UNUSED fr_event_list_t *el, UNUSED fr_time_t when, void *uctx);
}
}
+static int fr_worker_listen_cancel_self(UNUSED fr_worker_t *worker, UNUSED fr_listen_t const *li)
+{
+
+ return 0;
+}
+
+
+/** A socket is going away, so clean up any requests which use this socket.
+ *
+ * @param[in] ctx the worker
+ * @param[in] data the message
+ * @param[in] data_size size of the data
+ * @param[in] now the current time
+ */
+static void worker_listen_cancel_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
+{
+ fr_listen_t const *li;
+ fr_worker_t *worker = ctx;
+
+ fr_assert(data_size == sizeof(li));
+
+ memcpy(&li, data, sizeof(li));
+
+ (void) fr_worker_listen_cancel_self(worker, li);
+}
/** Send a NAK to the network thread
*
goto fail;
}
+ if (fr_control_callback_add(worker->control, FR_CONTROL_ID_LISTEN_DEAD, worker, worker_listen_cancel_callback) < 0) {
+ fr_strerror_const_push("Failed adding callback for listeners");
+ goto fail;
+ }
+
worker->runnable = fr_heap_talloc_alloc(worker, worker_runnable_cmp, request_t, runnable_id, 0);
if (!worker->runnable) {
fr_strerror_const("Failed creating runnable heap");
return ch;
}
+int fr_worker_listen_cancel(fr_worker_t *worker, fr_listen_t const *li)
+{
+ fr_ring_buffer_t *rb;
+
+ /*
+ * Skip a bunch of work if we're already in the worker thread.
+ */
+ if (is_worker_thread(worker)) {
+ return fr_worker_listen_cancel_self(worker, li);
+ }
+
+ rb = fr_worker_rb_init();
+ if (!rb) return -1;
+
+ return fr_control_message_send(worker->control, rb, FR_CONTROL_ID_LISTEN, &li, sizeof(li));
+}
+
#ifdef WITH_VERIFY_PTR
/** Verify the worker data structures.
*