struct fr_network_s {
char const *name; //!< Network ID for logging.
+ pthread_t thread_id; //!< for self
+
bool started; //!< Set to true when the first worker is added.
bool suspended; //!< whether or not we're suspended.
return rb;
}
+static inline bool is_network_thread(fr_network_t const *nr)
+{
+ return (pthread_equal(pthread_self(), nr->thread_id) != 0);
+}
+
+static int fr_network_listen_add_self(fr_network_t *nr, fr_listen_t *listen);
+
/** Add a fr_listen_t to a network
*
* @param nr the network
{
fr_ring_buffer_t *rb;
+ /*
+ * Skip a bunch of work if we're already in the network thread.
+ */
+ if (is_network_thread(nr)) {
+ return fr_network_listen_add_self(nr, li);
+ }
+
rb = fr_network_rb_init();
if (!rb) return -1;
{
fr_network_socket_t *s;
+ fr_assert(is_network_thread(nr));
+
s = fr_rb_find(nr->sockets, &(fr_network_socket_t){ .listen = li });
if (!s) return -1;
return 0;
}
+
/** Handle a network control message callback for a new listener
*
* @param[in] ctx the network
static void fr_network_listen_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
{
fr_network_t *nr = ctx;
+ fr_listen_t *listen;
+
+ fr_assert(data_size == sizeof(listen));
+
+ if (data_size != sizeof(listen)) return;
+
+ memcpy(&listen, data, sizeof(listen));
+
+ (void) fr_network_listen_add_self(nr, listen);
+}
+
+static int fr_network_listen_add_self(fr_network_t *nr, fr_listen_t *listen)
+{
fr_network_socket_t *s;
fr_app_io_t const *app_io;
size_t size;
int num_messages;
- fr_assert(data_size == sizeof(s->listen));
-
- if (data_size != sizeof(s->listen)) return;
-
s = talloc_zero(nr, fr_network_socket_t);
fr_assert(s != NULL);
s->nr = nr;
- memcpy(&s->listen, data, sizeof(s->listen));
+ s->listen = listen;
s->number = nr->num_sockets++;
MEM(s->waiting = fr_heap_alloc(s, waiting_cmp, fr_channel_data_t, channel.heap_id, 0));
if (!s->ms) {
PERROR("Failed creating message buffers for network IO");
talloc_free(s);
- return;
+ return -1;
}
app_io = s->listen->app_io;
s) < 0) {
PERROR("Failed adding new socket to network event loop");
talloc_free(s);
- return;
+ return -1;
}
/*
s->listen->name, cf_section_name2(s->listen->server_cs));
DEBUG3("Using new socket %s with FD %d", s->listen->name, s->listen->fd);
+
+ return 0;
}
/** Handle a network control message callback for a new "watch directory"
talloc_set_destructor(nr, _fr_network_free);
nr->name = talloc_strdup(nr, name);
+
+ nr->thread_id = pthread_self();
nr->el = el;
nr->log = logger;
nr->lvl = lvl;
+
nr->max_workers = MAX_WORKERS;
nr->num_workers = 0;
nr->signal_pipe[0] = -1;