return fr_control_message_send(nr->control, rb, FR_CONTROL_ID_INJECT, &my_inject, sizeof(my_inject));
}
+static fr_event_update_t const pause_read[] = {
+ FR_EVENT_SUSPEND(fr_event_io_func_t, read),
+ { 0 }
+};
+
+static fr_event_update_t const resume_read[] = {
+ FR_EVENT_RESUME(fr_event_io_func_t, read),
+ { 0 }
+};
+
+static fr_event_update_t const pause_write[] = {
+ FR_EVENT_SUSPEND(fr_event_io_func_t, write),
+ { 0 }
+};
+
+static fr_event_update_t const resume_write[] = {
+ FR_EVENT_RESUME(fr_event_io_func_t, write),
+ { 0 }
+};
+
static void fr_network_suspend(fr_network_t *nr)
{
- static fr_event_update_t pause_read[] = {
- FR_EVENT_SUSPEND(fr_event_io_func_t, read),
- { 0 }
- };
fr_rb_iter_inorder_t iter;
fr_network_socket_t *s;
static void fr_network_unsuspend(fr_network_t *nr)
{
- static fr_event_update_t resume_read[] = {
- FR_EVENT_RESUME(fr_event_io_func_t, read),
- { 0 }
- };
fr_rb_iter_inorder_t iter;
fr_network_socket_t *s;
/*
* Remove this worker from the array
*/
+ DEBUG3("Worker acked our close request");
for (i = 0; i < nr->num_workers; i++) {
- DEBUG3("Worker acked our close request");
if (nr->workers[i] == w) {
if (i == (nr->num_workers - 1)) break;
* Close the hole...
*/
memmove(&nr->workers[i], &nr->workers[i + 1],
- ((nr->num_workers - i) - 1) * sizeof(nr->workers[0]));
+ (uint8_t *) &nr->workers[nr->num_workers] - (uint8_t *) &nr->workers[i + 1]);
break;
}
}
(void) talloc_get_type_abort(nr, fr_network_t);
+ if (!nr->num_workers) {
+ RATE_LIMIT_GLOBAL(ERROR, "Failed sending packet to worker - "
+ "No workers are available");
+ return -1;
+ }
+
retry:
if (nr->num_workers == 1) {
worker = nr->workers[0];
if (worker->blocked) {
RATE_LIMIT_GLOBAL(ERROR, "Failed sending packet to worker - "
"In single-threaded mode and worker is blocked");
- drop:
+ drop:
worker->stats.dropped++;
return -1;
}
s->dead = true;
+ /*
+ * This FD is no longer part of the event loop.
+ */
fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
-
for (i = 0; i < nr->max_workers; i++) {
if (!nr->workers[i]) continue;
fr_message_done(&cd->m);
nr->stats.dropped++;
s->stats.dropped++;
-
- } else {
- /*
- * One more packet sent to a worker.
- */
- s->outstanding++;
+ return -1;
}
+ /*
+ * One more packet sent to a worker.
+ */
+ s->outstanding++;
return 0;
}
fr_network_socket_t *s = ctx;
fr_network_t *nr = s->nr;
- fr_cond_assert(s->listen->fd == sockfd);
+ if (!fr_cond_assert(s->listen->fd == sockfd)) return;
DEBUG3("network vnode");
}
-static fr_event_update_t const pause_write[] = {
- FR_EVENT_SUSPEND(fr_event_io_func_t, write),
- { 0 }
-};
-
-static fr_event_update_t const resume_write[] = {
- FR_EVENT_RESUME(fr_event_io_func_t, write),
- { 0 }
-};
-
-
/** Write packets to the network.
*
* @param el the event list
fr_rb_delete(nr->sockets, s);
fr_rb_delete(nr->sockets_by_num, s);
- fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
+ if (!s->dead) fr_event_fd_delete(nr->el, s->listen->fd, s->filter);
if (s->listen->app_io->close) {
s->listen->app_io->close(s->listen);
fr_message_done(&cd->m);
}
- talloc_free(s->waiting);
+ /* s->waiting is already talloc parented from s */
talloc_free(s->listen);
return 0;
fr_assert(data_size == sizeof(worker));
+ if (nr->num_workers >= nr->max_workers) {
+ ERROR("Too many workers");
+ return;
+ }
+
memcpy(&worker, data, data_size);
(void) talloc_get_type_abort(worker, fr_worker_t);
fr_channel_requestor_uctx_add(w->channel, w);
fr_channel_set_recv_reply(w->channel, nr, fr_network_recv_reply);
- /*
- * FIXME: This creates a race in the network loop
- * exit condition, because it can theoretically
- * be signalled to exit before the workers have
- * ACKd channel creation.
- */
- nr->num_workers++;
-
/*
* Insert the worker into the array of workers.
*/
if (nr->workers[i]) continue;
nr->workers[i] = w;
+ nr->num_workers++;
return;
}
-
- /*
- * Run out of room to put workers!
- */
- fr_assert(0 == 1);
}
/** Handle a network control message callback for a packet sent to a socket
continue;
}
+ (void) fr_heap_insert(&s->waiting, cd);
+
/*
- * No pending message, let's try writing it.
- *
- * If there is a pending message, then we're
- * waiting for IO write to become ready.
+ * No pending message, write it. If there is a pending write, the message will be left
+ * in the waiting queue.
*/
if (!s->pending) {
fr_assert(!s->blocked);
- (void) fr_heap_insert(&s->waiting, cd);
fr_network_write(nr->el, s->listen->fd, 0, s);
}
}