]> git.ipfire.org Git - thirdparty/unbound.git/commitdiff
- fast-reload, reload ipc to stop and start threads.
authorW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Tue, 9 Jan 2024 14:02:27 +0000 (15:02 +0100)
committerW.C.A. Wijngaards <wouter@nlnetlabs.nl>
Tue, 9 Jan 2024 14:02:27 +0000 (15:02 +0100)
daemon/remote.c
daemon/remote.h
daemon/worker.c
daemon/worker.h

index ead1067f2cb2fe235a09511ed148e3e8268222d1..3234d625d7e5d86fc7f84b72793d4994011db8aa 100644 (file)
@@ -122,6 +122,7 @@ static int fr_printq_empty(struct fast_reload_printq* printq);
 static void fr_printq_list_insert(struct fast_reload_printq* printq,
        struct daemon* daemon);
 static void fr_printq_remove(struct fast_reload_printq* printq);
+static void fr_check_cmd_from_thread(struct fast_reload_thread* fr);
 
 static int
 remote_setup_ctx(struct daemon_remote* rc, struct config_file* cfg)
@@ -3509,6 +3510,12 @@ fr_notification_to_string(enum fast_reload_notification status)
                return "exited";
        case fast_reload_notification_printout:
                return "printout";
+       case fast_reload_notification_reload_stop:
+               return "reload_stop";
+       case fast_reload_notification_reload_ack:
+               return "reload_ack";
+       case fast_reload_notification_reload_start:
+               return "reload_start";
        default:
                break;
        }
@@ -3792,6 +3799,7 @@ fr_construct_from_config(struct fast_reload_thread* fr,
        if(!(ct->oldcfg = (struct config_file*)calloc(1,
                sizeof(*ct->oldcfg)))) {
                fr_construct_clear(ct);
+               log_err("out of memory");
                return 0;
        }
        return 1;
@@ -3876,15 +3884,80 @@ fr_reload_config(struct fast_reload_thread* fr, struct config_file* newcfg,
        return 1;
 }
 
+/** fast reload, poll for ack incoming. */
+static void
+fr_poll_for_ack(struct fast_reload_thread* fr)
+{
+       int loopexit = 0, bcount = 0;
+       uint32_t cmd;
+       ssize_t ret;
+
+       if(fr->need_to_quit)
+               return;
+       /* Is there data? */
+       if(!sock_poll_timeout(fr->commpair[1], -1, 1, 0, NULL)) {
+               log_err("fr_poll_for_ack: poll failed");
+               return;
+       }
+
+       /* Read the data */
+       while(1) {
+               if(++loopexit > IPC_LOOP_MAX) {
+                       log_err("fr_poll_for_ack: recv loops %s",
+                               sock_strerror(errno));
+                       return;
+               }
+               ret = recv(fr->commpair[1], ((char*)&cmd)+bcount,
+                       sizeof(cmd)-bcount, 0);
+               if(ret == -1) {
+                       if(
+#ifndef USE_WINSOCK
+                               errno == EINTR || errno == EAGAIN
+#  ifdef EWOULDBLOCK
+                               || errno == EWOULDBLOCK
+#  endif
+#else
+                               WSAGetLastError() == WSAEINTR ||
+                               WSAGetLastError() == WSAEINPROGRESS ||
+                               WSAGetLastError() == WSAEWOULDBLOCK
+#endif
+                               )
+                               continue; /* Try again. */
+                       log_err("fr_poll_for_ack: recv: %s",
+                               sock_strerror(errno));
+                       return;
+               } else if(ret+(ssize_t)bcount != sizeof(cmd)) {
+                       bcount += ret;
+                       if((size_t)bcount < sizeof(cmd))
+                               continue;
+               }
+               break;
+       }
+       if(cmd == fast_reload_notification_exit) {
+               fr->need_to_quit = 1;
+               verbose(VERB_ALGO, "fast reload wait for ack: "
+                       "exit notification received");
+               return;
+       }
+       if(cmd != fast_reload_notification_reload_ack) {
+               verbose(VERB_ALGO, "fast reload wait for ack: "
+                       "wrong notification %d", (int)cmd);
+       }
+}
+
 /** fast reload thread, reload ipc communication to stop and start threads. */
 static int
 fr_reload_ipc(struct fast_reload_thread* fr, struct config_file* newcfg,
        struct fast_reload_construct* ct)
 {
+       int result = 1;
+       fr_send_notification(fr, fast_reload_notification_reload_stop);
+       fr_poll_for_ack(fr);
        if(!fr_reload_config(fr, newcfg, ct)) {
-               return 0;
+               result = 0;
        }
-       return 1;
+       fr_send_notification(fr, fast_reload_notification_reload_start);
+       return result;
 }
 
 /** fast reload thread, load config */
@@ -4291,6 +4364,8 @@ fast_reload_thread_setup(struct worker* worker)
        fr->threadnum = numworkers+2;
        fr->commpair[0] = -1;
        fr->commpair[1] = -1;
+       fr->commreload[0] = -1;
+       fr->commreload[1] = -1;
        if(!create_socketpair(fr->commpair, worker->daemon->rand)) {
                free(fr);
                worker->daemon->fast_reload_thread = NULL;
@@ -4306,6 +4381,16 @@ fast_reload_thread_setup(struct worker* worker)
                worker->daemon->fast_reload_thread = NULL;
                return 0;
        }
+       if(!create_socketpair(fr->commreload, worker->daemon->rand)) {
+               sock_close(fr->commpair[0]);
+               sock_close(fr->commpair[1]);
+               sock_close(fr->commreload[0]);
+               sock_close(fr->commreload[1]);
+               free(fr->fr_output);
+               free(fr);
+               worker->daemon->fast_reload_thread = NULL;
+               return 0;
+       }
        lock_basic_init(&fr->fr_output_lock);
        lock_protect(&fr->fr_output_lock, fr->fr_output,
                sizeof(*fr->fr_output));
@@ -4327,6 +4412,8 @@ fast_reload_thread_desetup(struct fast_reload_thread* fast_reload_thread)
                ub_event_free(fast_reload_thread->service_event);
        sock_close(fast_reload_thread->commpair[0]);
        sock_close(fast_reload_thread->commpair[1]);
+       sock_close(fast_reload_thread->commreload[0]);
+       sock_close(fast_reload_thread->commreload[1]);
        if(fast_reload_thread->printq) {
                fr_main_perform_printout(fast_reload_thread);
                /* If it is empty now, there is nothing to print on fd. */
@@ -4352,6 +4439,64 @@ fast_reload_thread_desetup(struct fast_reload_thread* fast_reload_thread)
        free(fast_reload_thread);
 }
 
+/**
+ * Fast reload thread, send a command to the thread. Blocking on timeout.
+ * It handles received input from the thread, if any is received.
+ */
+static void
+fr_send_cmd_to(struct fast_reload_thread* fr,
+       enum fast_reload_notification status, int check_cmds, int blocking)
+{
+       int outevent, loopexit = 0, bcount = 0;
+       uint32_t cmd;
+       ssize_t ret;
+       verbose(VERB_ALGO, "send notification to fast reload thread: %s",
+               fr_notification_to_string(status));
+       cmd = status;
+       while(1) {
+               if(++loopexit > IPC_LOOP_MAX) {
+                       log_err("send notification to fast reload: could not send notification: loop");
+                       return;
+               }
+               if(check_cmds)
+                       fr_check_cmd_from_thread(fr);
+               /* wait for socket to become writable */
+               if(!sock_poll_timeout(fr->commpair[0],
+                       (blocking?-1:IPC_NOTIFICATION_WAIT),
+                       0, 1, &outevent)) {
+                       log_err("send notification to fast reload: poll failed");
+                       return;
+               }
+               if(!outevent)
+                       continue;
+               ret = send(fr->commpair[0], ((char*)&cmd)+bcount,
+                       sizeof(cmd)-bcount, 0);
+               if(ret == -1) {
+                       if(
+#ifndef USE_WINSOCK
+                               errno == EINTR || errno == EAGAIN
+#  ifdef EWOULDBLOCK
+                               || errno == EWOULDBLOCK
+#  endif
+#else
+                               WSAGetLastError() == WSAEINTR ||
+                               WSAGetLastError() == WSAEINPROGRESS ||
+                               WSAGetLastError() == WSAEWOULDBLOCK
+#endif
+                               )
+                               continue; /* Try again. */
+                       log_err("send notification to fast reload: send: %s",
+                               sock_strerror(errno));
+                       return;
+               } else if(ret+(ssize_t)bcount != sizeof(cmd)) {
+                       bcount += ret;
+                       if((size_t)bcount < sizeof(cmd))
+                               continue;
+               }
+               break;
+       }
+}
+
 /** Fast reload, the main thread handles that the fast reload thread has
  * exited. */
 static void
@@ -4411,6 +4556,131 @@ fr_main_perform_printout(struct fast_reload_thread* fr)
                comm_point_listen_for_rw(fr->printq->client_cp, 0, 1);
 }
 
+/** fast reload, receive ack from workers that they are waiting, run
+ * by the mainthr after sending them reload_stop. */
+static void
+fr_read_ack_from_workers(struct fast_reload_thread* fr)
+{
+       struct daemon* daemon = fr->worker->daemon;
+       /* Every worker sends one byte, wait for num-1 bytes. */
+       int count=0, total=daemon->num-1;
+       while(count < total) {
+               uint8_t r;
+               ssize_t ret;
+               ret = recv(fr->commreload[0], &r, 1, 0);
+               if(ret == -1) {
+                       if(
+#ifndef USE_WINSOCK
+                                errno == EINTR || errno == EAGAIN
+#  ifdef EWOULDBLOCK
+                                || errno == EWOULDBLOCK
+#  endif
+#else
+                                WSAGetLastError() == WSAEINTR ||
+                                WSAGetLastError() == WSAEINPROGRESS ||
+                                WSAGetLastError() == WSAEWOULDBLOCK
+#endif
+                                )
+                               continue; /* Try again */
+                       log_err("worker reload ack: recv failed: %s",
+                               sock_strerror(errno));
+                       return;
+               }
+               count++;
+               verbose(VERB_ALGO, "worker reload ack from (uint8_t)%d",
+                       (int)r);
+       }
+}
+
+/** fast reload, poll for reload_start in mainthr waiting on a notification
+ * from the fast reload thread. */
+static void
+fr_poll_for_reload_start(struct fast_reload_thread* fr)
+{
+       int loopexit = 0, bcount = 0;
+       uint32_t cmd;
+       ssize_t ret;
+
+       /* Is there data? */
+       if(!sock_poll_timeout(fr->commpair[0], -1, 1, 0, NULL)) {
+               log_err("fr_poll_for_reload_start: poll failed");
+               return;
+       }
+
+       /* Read the data */
+       while(1) {
+               if(++loopexit > IPC_LOOP_MAX) {
+                       log_err("fr_poll_for_reload_start: recv loops %s",
+                               sock_strerror(errno));
+                       return;
+               }
+               ret = recv(fr->commpair[0], ((char*)&cmd)+bcount,
+                       sizeof(cmd)-bcount, 0);
+               if(ret == -1) {
+                       if(
+#ifndef USE_WINSOCK
+                               errno == EINTR || errno == EAGAIN
+#  ifdef EWOULDBLOCK
+                               || errno == EWOULDBLOCK
+#  endif
+#else
+                               WSAGetLastError() == WSAEINTR ||
+                               WSAGetLastError() == WSAEINPROGRESS ||
+                               WSAGetLastError() == WSAEWOULDBLOCK
+#endif
+                               )
+                               continue; /* Try again. */
+                       log_err("fr_poll_for_reload_start: recv: %s",
+                               sock_strerror(errno));
+                       return;
+               } else if(ret+(ssize_t)bcount != sizeof(cmd)) {
+                       bcount += ret;
+                       if((size_t)bcount < sizeof(cmd))
+                               continue;
+               }
+               break;
+       }
+       if(cmd != fast_reload_notification_reload_start) {
+               verbose(VERB_ALGO, "fast reload wait for ack: "
+                       "wrong notification %d", (int)cmd);
+       }
+}
+
+
+/** fast reload thread, handle reload_stop notification, send reload stop
+ * to other threads over IPC and collect their ack. When that is done,
+ * ack to the caller, the fast reload thread, and wait for it to send start. */
+static void
+fr_main_perform_reload_stop(struct fast_reload_thread* fr)
+{
+       struct daemon* daemon = fr->worker->daemon;
+       int i;
+
+       /* Send reload_stop to other threads. */
+       for(i=0; i<daemon->num; i++) {
+               if(i == fr->worker->thread_num)
+                       continue; /* Do not send to ourselves. */
+               worker_send_cmd(daemon->workers[i], worker_cmd_reload_stop);
+       }
+
+       /* Wait for the other threads to ack. */
+       fr_read_ack_from_workers(fr);
+
+       /* Send ack to fast reload thread. */
+       fr_send_cmd_to(fr, fast_reload_notification_reload_ack, 0, 1);
+
+       /* Wait for reload_start from fast reload thread to resume. */
+       fr_poll_for_reload_start(fr);
+
+       /* Send reload_start to other threads */
+       for(i=0; i<daemon->num; i++) {
+               if(i == fr->worker->thread_num)
+                       continue; /* Do not send to ourselves. */
+               worker_send_cmd(daemon->workers[i], worker_cmd_reload_start);
+       }
+       verbose(VERB_ALGO, "worker resume after reload");
+}
+
 /** Fast reload, perform the command received from the fast reload thread */
 static void
 fr_main_perform_cmd(struct fast_reload_thread* fr,
@@ -4424,6 +4694,8 @@ fr_main_perform_cmd(struct fast_reload_thread* fr,
                status == fast_reload_notification_done_error ||
                status == fast_reload_notification_exited) {
                fr_main_perform_done(fr);
+       } else if(status == fast_reload_notification_reload_stop) {
+               fr_main_perform_reload_stop(fr);
        } else {
                log_err("main received unknown status from fast reload: %d %s",
                        (int)status, fr_notification_to_string(status));
@@ -4819,68 +5091,12 @@ fr_printq_remove(struct fast_reload_printq* printq)
        fr_printq_delete(printq);
 }
 
-/**
- * Fast reload thread, send a command to the thread. Blocking on timeout.
- * It handles received input from the thread, if any is received.
- */
-static void
-fr_send_cmd_to(struct fast_reload_thread* fr,
-       enum fast_reload_notification status)
-{
-       int outevent, loopexit = 0, bcount = 0;
-       uint32_t cmd;
-       ssize_t ret;
-       verbose(VERB_ALGO, "send notification to fast reload thread: %s",
-               fr_notification_to_string(status));
-       cmd = status;
-       while(1) {
-               if(++loopexit > IPC_LOOP_MAX) {
-                       log_err("send notification to fast reload: could not send notification: loop");
-                       return;
-               }
-               fr_check_cmd_from_thread(fr);
-               /* wait for socket to become writable */
-               if(!sock_poll_timeout(fr->commpair[0], IPC_NOTIFICATION_WAIT,
-                       0, 1, &outevent)) {
-                       log_err("send notification to fast reload: poll failed");
-                       return;
-               }
-               if(!outevent)
-                       continue;
-               ret = send(fr->commpair[0], ((char*)&cmd)+bcount,
-                       sizeof(cmd)-bcount, 0);
-               if(ret == -1) {
-                       if(
-#ifndef USE_WINSOCK
-                               errno == EINTR || errno == EAGAIN
-#  ifdef EWOULDBLOCK
-                               || errno == EWOULDBLOCK
-#  endif
-#else
-                               WSAGetLastError() == WSAEINTR ||
-                               WSAGetLastError() == WSAEINPROGRESS ||
-                               WSAGetLastError() == WSAEWOULDBLOCK
-#endif
-                               )
-                               continue; /* Try again. */
-                       log_err("send notification to fast reload: send: %s",
-                               sock_strerror(errno));
-                       return;
-               } else if(ret+(ssize_t)bcount != sizeof(cmd)) {
-                       bcount += ret;
-                       if((size_t)bcount < sizeof(cmd))
-                               continue;
-               }
-               break;
-       }
-}
-
 /** fast reload thread, send stop command to the thread, from the main thread.
  */
 static void
 fr_send_stop(struct fast_reload_thread* fr)
 {
-       fr_send_cmd_to(fr, fast_reload_notification_exit);
+       fr_send_cmd_to(fr, fast_reload_notification_exit, 1, 0);
 }
 
 void
index 82441598ea5f5516e543150c0b0d873ed578c34e..3121f696a61094e9b66fec58c480ffd51b58fb5a 100644 (file)
@@ -137,7 +137,13 @@ enum fast_reload_notification {
        /** the fast reload thread has exited, after being told to exit */
        fast_reload_notification_exited = 4,
        /** the fast reload thread has information to print out */
-       fast_reload_notification_printout = 5
+       fast_reload_notification_printout = 5,
+       /** stop as part of the reload the thread and other threads */
+       fast_reload_notification_reload_stop = 6,
+       /** ack the stop as part of the reload */
+       fast_reload_notification_reload_ack = 7,
+       /** resume from stop as part of the reload */
+       fast_reload_notification_reload_start = 8
 };
 
 /**
@@ -206,6 +212,9 @@ struct fast_reload_thread {
         * to be printed. The remote control thread can pick them up with
         * the lock. */
        struct config_strlist_head* fr_output;
+
+       /** communication socket pair, to respond to the reload request */
+       int commreload[2];
 };
 
 /**
index c4ae79352b93e07c8e9140005e7941d5b4d08323..7ee0d8d5419089a275c63f15c7c08204d65f4578 100644 (file)
@@ -369,6 +369,62 @@ worker_check_request(sldns_buffer* pkt, struct worker* worker,
        return;
 }
 
+/** stop and wait to resume the worker */
+static void
+worker_stop_and_wait(struct worker* worker)
+{
+       uint8_t c = (uint8_t)worker->thread_num;
+       ssize_t ret;
+       uint8_t* buf = NULL;
+       uint32_t len = 0, cmd;
+       while(1) {
+               ret = send(worker->daemon->fast_reload_thread->commreload[1],
+                       &c, 1, 0);
+               if(ret == -1) {
+                       if(
+#ifndef USE_WINSOCK
+                               errno == EINTR || errno == EAGAIN
+#  ifdef EWOULDBLOCK
+                               || errno == EWOULDBLOCK
+#  endif
+#else
+                               WSAGetLastError() == WSAEINTR ||
+                               WSAGetLastError() == WSAEINPROGRESS ||
+                               WSAGetLastError() == WSAEWOULDBLOCK
+#endif
+                               )
+                               continue; /* Try again. */
+                       log_err("worker reload ack reply: send failed: %s",
+                               sock_strerror(errno));
+                       break;
+               }
+               break;
+       }
+       /* wait for reload */
+       if(!tube_read_msg(worker->cmd, &buf, &len, 0)) {
+               log_err("worker reload read reply failed");
+               return;
+       }
+       if(len != sizeof(uint32_t)) {
+               log_err("worker reload reply, bad control msg length %d",
+                       (int)len);
+               free(buf);
+               return;
+       }
+       cmd = sldns_read_uint32(buf);
+       free(buf);
+       if(cmd == worker_cmd_quit) {
+               /* quit anyway */
+               verbose(VERB_ALGO, "reload reply, control cmd quit");
+               comm_base_exit(worker->base);
+               return;
+       }
+       if(cmd != worker_cmd_reload_start) {
+               log_err("worker reload reply, wrong reply command");
+       }
+       verbose(VERB_ALGO, "worker resume after reload");
+}
+
 void
 worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), uint8_t* msg,
        size_t len, int error, void* arg)
@@ -404,6 +460,10 @@ worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), uint8_t* msg,
                verbose(VERB_ALGO, "got control cmd remote");
                daemon_remote_exec(worker);
                break;
+       case worker_cmd_reload_stop:
+               verbose(VERB_ALGO, "got control cmd reload_stop");
+               worker_stop_and_wait(worker);
+               break;
        default:
                log_err("bad command %d", (int)cmd);
                break;
index ab2fc728d274b32d76f0cf67b3b58cc3b9330fc9..84e851d06bbf96ef6a1ac2c7165d3e157f80c5a8 100644 (file)
@@ -72,7 +72,11 @@ enum worker_commands {
        /** obtain statistics without statsclear */
        worker_cmd_stats_noreset,
        /** execute remote control command */
-       worker_cmd_remote
+       worker_cmd_remote,
+       /** for fast-reload, perform stop */
+       worker_cmd_reload_stop,
+       /** for fast-reload, start again */
+       worker_cmd_reload_start
 };
 
 /**