static void fr_printq_list_insert(struct fast_reload_printq* printq,
struct daemon* daemon);
static void fr_printq_remove(struct fast_reload_printq* printq);
+static void fr_check_cmd_from_thread(struct fast_reload_thread* fr);
static int
remote_setup_ctx(struct daemon_remote* rc, struct config_file* cfg)
return "exited";
case fast_reload_notification_printout:
return "printout";
+ case fast_reload_notification_reload_stop:
+ return "reload_stop";
+ case fast_reload_notification_reload_ack:
+ return "reload_ack";
+ case fast_reload_notification_reload_start:
+ return "reload_start";
default:
break;
}
if(!(ct->oldcfg = (struct config_file*)calloc(1,
sizeof(*ct->oldcfg)))) {
fr_construct_clear(ct);
+ log_err("out of memory");
return 0;
}
return 1;
return 1;
}
+/** fast reload, poll for ack incoming. */
+static void
+fr_poll_for_ack(struct fast_reload_thread* fr)
+{
+ int loopexit = 0, bcount = 0;
+ uint32_t cmd;
+ ssize_t ret;
+
+ if(fr->need_to_quit)
+ return;
+ /* Is there data? */
+ if(!sock_poll_timeout(fr->commpair[1], -1, 1, 0, NULL)) {
+ log_err("fr_poll_for_ack: poll failed");
+ return;
+ }
+
+ /* Read the data */
+ while(1) {
+ if(++loopexit > IPC_LOOP_MAX) {
+ log_err("fr_poll_for_ack: recv loops %s",
+ sock_strerror(errno));
+ return;
+ }
+ ret = recv(fr->commpair[1], ((char*)&cmd)+bcount,
+ sizeof(cmd)-bcount, 0);
+ if(ret == -1) {
+ if(
+#ifndef USE_WINSOCK
+ errno == EINTR || errno == EAGAIN
+# ifdef EWOULDBLOCK
+ || errno == EWOULDBLOCK
+# endif
+#else
+ WSAGetLastError() == WSAEINTR ||
+ WSAGetLastError() == WSAEINPROGRESS ||
+ WSAGetLastError() == WSAEWOULDBLOCK
+#endif
+ )
+ continue; /* Try again. */
+ log_err("fr_poll_for_ack: recv: %s",
+ sock_strerror(errno));
+ return;
+ } else if(ret+(ssize_t)bcount != sizeof(cmd)) {
+ bcount += ret;
+ if((size_t)bcount < sizeof(cmd))
+ continue;
+ }
+ break;
+ }
+ if(cmd == fast_reload_notification_exit) {
+ fr->need_to_quit = 1;
+ verbose(VERB_ALGO, "fast reload wait for ack: "
+ "exit notification received");
+ return;
+ }
+ if(cmd != fast_reload_notification_reload_ack) {
+ verbose(VERB_ALGO, "fast reload wait for ack: "
+ "wrong notification %d", (int)cmd);
+ }
+}
+
/** fast reload thread, reload ipc communication to stop and start threads. */
static int
fr_reload_ipc(struct fast_reload_thread* fr, struct config_file* newcfg,
struct fast_reload_construct* ct)
{
+ int result = 1;
+ fr_send_notification(fr, fast_reload_notification_reload_stop);
+ fr_poll_for_ack(fr);
if(!fr_reload_config(fr, newcfg, ct)) {
- return 0;
+ result = 0;
}
- return 1;
+ fr_send_notification(fr, fast_reload_notification_reload_start);
+ return result;
}
/** fast reload thread, load config */
fr->threadnum = numworkers+2;
fr->commpair[0] = -1;
fr->commpair[1] = -1;
+ fr->commreload[0] = -1;
+ fr->commreload[1] = -1;
if(!create_socketpair(fr->commpair, worker->daemon->rand)) {
free(fr);
worker->daemon->fast_reload_thread = NULL;
worker->daemon->fast_reload_thread = NULL;
return 0;
}
+ if(!create_socketpair(fr->commreload, worker->daemon->rand)) {
+ sock_close(fr->commpair[0]);
+ sock_close(fr->commpair[1]);
+ sock_close(fr->commreload[0]);
+ sock_close(fr->commreload[1]);
+ free(fr->fr_output);
+ free(fr);
+ worker->daemon->fast_reload_thread = NULL;
+ return 0;
+ }
lock_basic_init(&fr->fr_output_lock);
lock_protect(&fr->fr_output_lock, fr->fr_output,
sizeof(*fr->fr_output));
ub_event_free(fast_reload_thread->service_event);
sock_close(fast_reload_thread->commpair[0]);
sock_close(fast_reload_thread->commpair[1]);
+ sock_close(fast_reload_thread->commreload[0]);
+ sock_close(fast_reload_thread->commreload[1]);
if(fast_reload_thread->printq) {
fr_main_perform_printout(fast_reload_thread);
/* If it is empty now, there is nothing to print on fd. */
free(fast_reload_thread);
}
+/**
+ * Fast reload thread, send a command to the thread. Blocking on timeout.
+ * It handles received input from the thread, if any is received.
+ */
+static void
+fr_send_cmd_to(struct fast_reload_thread* fr,
+ enum fast_reload_notification status, int check_cmds, int blocking)
+{
+ int outevent, loopexit = 0, bcount = 0;
+ uint32_t cmd;
+ ssize_t ret;
+ verbose(VERB_ALGO, "send notification to fast reload thread: %s",
+ fr_notification_to_string(status));
+ cmd = status;
+ while(1) {
+ if(++loopexit > IPC_LOOP_MAX) {
+ log_err("send notification to fast reload: could not send notification: loop");
+ return;
+ }
+ if(check_cmds)
+ fr_check_cmd_from_thread(fr);
+ /* wait for socket to become writable */
+ if(!sock_poll_timeout(fr->commpair[0],
+ (blocking?-1:IPC_NOTIFICATION_WAIT),
+ 0, 1, &outevent)) {
+ log_err("send notification to fast reload: poll failed");
+ return;
+ }
+ if(!outevent)
+ continue;
+ ret = send(fr->commpair[0], ((char*)&cmd)+bcount,
+ sizeof(cmd)-bcount, 0);
+ if(ret == -1) {
+ if(
+#ifndef USE_WINSOCK
+ errno == EINTR || errno == EAGAIN
+# ifdef EWOULDBLOCK
+ || errno == EWOULDBLOCK
+# endif
+#else
+ WSAGetLastError() == WSAEINTR ||
+ WSAGetLastError() == WSAEINPROGRESS ||
+ WSAGetLastError() == WSAEWOULDBLOCK
+#endif
+ )
+ continue; /* Try again. */
+ log_err("send notification to fast reload: send: %s",
+ sock_strerror(errno));
+ return;
+ } else if(ret+(ssize_t)bcount != sizeof(cmd)) {
+ bcount += ret;
+ if((size_t)bcount < sizeof(cmd))
+ continue;
+ }
+ break;
+ }
+}
+
/** Fast reload, the main thread handles that the fast reload thread has
* exited. */
static void
comm_point_listen_for_rw(fr->printq->client_cp, 0, 1);
}
+/** fast reload, receive ack from workers that they are waiting, run
+ * by the mainthr after sending them reload_stop. */
+static void
+fr_read_ack_from_workers(struct fast_reload_thread* fr)
+{
+ struct daemon* daemon = fr->worker->daemon;
+ /* Every worker sends one byte, wait for num-1 bytes. */
+ int count=0, total=daemon->num-1;
+ while(count < total) {
+ uint8_t r;
+ ssize_t ret;
+ ret = recv(fr->commreload[0], &r, 1, 0);
+ if(ret == -1) {
+ if(
+#ifndef USE_WINSOCK
+ errno == EINTR || errno == EAGAIN
+# ifdef EWOULDBLOCK
+ || errno == EWOULDBLOCK
+# endif
+#else
+ WSAGetLastError() == WSAEINTR ||
+ WSAGetLastError() == WSAEINPROGRESS ||
+ WSAGetLastError() == WSAEWOULDBLOCK
+#endif
+ )
+ continue; /* Try again */
+ log_err("worker reload ack: recv failed: %s",
+ sock_strerror(errno));
+ return;
+ }
+ count++;
+ verbose(VERB_ALGO, "worker reload ack from (uint8_t)%d",
+ (int)r);
+ }
+}
+
+/** fast reload, poll for reload_start in mainthr waiting on a notification
+ * from the fast reload thread. */
+static void
+fr_poll_for_reload_start(struct fast_reload_thread* fr)
+{
+ int loopexit = 0, bcount = 0;
+ uint32_t cmd;
+ ssize_t ret;
+
+ /* Is there data? */
+ if(!sock_poll_timeout(fr->commpair[0], -1, 1, 0, NULL)) {
+ log_err("fr_poll_for_reload_start: poll failed");
+ return;
+ }
+
+ /* Read the data */
+ while(1) {
+ if(++loopexit > IPC_LOOP_MAX) {
+ log_err("fr_poll_for_reload_start: recv loops %s",
+ sock_strerror(errno));
+ return;
+ }
+ ret = recv(fr->commpair[0], ((char*)&cmd)+bcount,
+ sizeof(cmd)-bcount, 0);
+ if(ret == -1) {
+ if(
+#ifndef USE_WINSOCK
+ errno == EINTR || errno == EAGAIN
+# ifdef EWOULDBLOCK
+ || errno == EWOULDBLOCK
+# endif
+#else
+ WSAGetLastError() == WSAEINTR ||
+ WSAGetLastError() == WSAEINPROGRESS ||
+ WSAGetLastError() == WSAEWOULDBLOCK
+#endif
+ )
+ continue; /* Try again. */
+ log_err("fr_poll_for_reload_start: recv: %s",
+ sock_strerror(errno));
+ return;
+ } else if(ret+(ssize_t)bcount != sizeof(cmd)) {
+ bcount += ret;
+ if((size_t)bcount < sizeof(cmd))
+ continue;
+ }
+ break;
+ }
+ if(cmd != fast_reload_notification_reload_start) {
+ verbose(VERB_ALGO, "fast reload wait for ack: "
+ "wrong notification %d", (int)cmd);
+ }
+}
+
+
+/** fast reload thread, handle reload_stop notification, send reload stop
+ * to other threads over IPC and collect their ack. When that is done,
+ * ack to the caller, the fast reload thread, and wait for it to send start. */
+static void
+fr_main_perform_reload_stop(struct fast_reload_thread* fr)
+{
+ struct daemon* daemon = fr->worker->daemon;
+ int i;
+
+ /* Send reload_stop to other threads. */
+ for(i=0; i<daemon->num; i++) {
+ if(i == fr->worker->thread_num)
+ continue; /* Do not send to ourselves. */
+ worker_send_cmd(daemon->workers[i], worker_cmd_reload_stop);
+ }
+
+ /* Wait for the other threads to ack. */
+ fr_read_ack_from_workers(fr);
+
+ /* Send ack to fast reload thread. */
+ fr_send_cmd_to(fr, fast_reload_notification_reload_ack, 0, 1);
+
+ /* Wait for reload_start from fast reload thread to resume. */
+ fr_poll_for_reload_start(fr);
+
+ /* Send reload_start to other threads */
+ for(i=0; i<daemon->num; i++) {
+ if(i == fr->worker->thread_num)
+ continue; /* Do not send to ourselves. */
+ worker_send_cmd(daemon->workers[i], worker_cmd_reload_start);
+ }
+ verbose(VERB_ALGO, "worker resume after reload");
+}
+
/** Fast reload, perform the command received from the fast reload thread */
static void
fr_main_perform_cmd(struct fast_reload_thread* fr,
status == fast_reload_notification_done_error ||
status == fast_reload_notification_exited) {
fr_main_perform_done(fr);
+ } else if(status == fast_reload_notification_reload_stop) {
+ fr_main_perform_reload_stop(fr);
} else {
log_err("main received unknown status from fast reload: %d %s",
(int)status, fr_notification_to_string(status));
fr_printq_delete(printq);
}
-/**
- * Fast reload thread, send a command to the thread. Blocking on timeout.
- * It handles received input from the thread, if any is received.
- */
-static void
-fr_send_cmd_to(struct fast_reload_thread* fr,
- enum fast_reload_notification status)
-{
- int outevent, loopexit = 0, bcount = 0;
- uint32_t cmd;
- ssize_t ret;
- verbose(VERB_ALGO, "send notification to fast reload thread: %s",
- fr_notification_to_string(status));
- cmd = status;
- while(1) {
- if(++loopexit > IPC_LOOP_MAX) {
- log_err("send notification to fast reload: could not send notification: loop");
- return;
- }
- fr_check_cmd_from_thread(fr);
- /* wait for socket to become writable */
- if(!sock_poll_timeout(fr->commpair[0], IPC_NOTIFICATION_WAIT,
- 0, 1, &outevent)) {
- log_err("send notification to fast reload: poll failed");
- return;
- }
- if(!outevent)
- continue;
- ret = send(fr->commpair[0], ((char*)&cmd)+bcount,
- sizeof(cmd)-bcount, 0);
- if(ret == -1) {
- if(
-#ifndef USE_WINSOCK
- errno == EINTR || errno == EAGAIN
-# ifdef EWOULDBLOCK
- || errno == EWOULDBLOCK
-# endif
-#else
- WSAGetLastError() == WSAEINTR ||
- WSAGetLastError() == WSAEINPROGRESS ||
- WSAGetLastError() == WSAEWOULDBLOCK
-#endif
- )
- continue; /* Try again. */
- log_err("send notification to fast reload: send: %s",
- sock_strerror(errno));
- return;
- } else if(ret+(ssize_t)bcount != sizeof(cmd)) {
- bcount += ret;
- if((size_t)bcount < sizeof(cmd))
- continue;
- }
- break;
- }
-}
-
/** fast reload thread, send stop command to the thread, from the main thread.
*/
static void
fr_send_stop(struct fast_reload_thread* fr)
{
- fr_send_cmd_to(fr, fast_reload_notification_exit);
+ fr_send_cmd_to(fr, fast_reload_notification_exit, 1, 0);
}
void