From: W.C.A. Wijngaards Date: Tue, 9 Jan 2024 14:02:27 +0000 (+0100) Subject: - fast-reload, reload ipc to stop and start threads. X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=eee7d884579bd1669cff847676211219ee15ae3a;p=thirdparty%2Funbound.git - fast-reload, reload ipc to stop and start threads. --- diff --git a/daemon/remote.c b/daemon/remote.c index ead1067f2..3234d625d 100644 --- a/daemon/remote.c +++ b/daemon/remote.c @@ -122,6 +122,7 @@ static int fr_printq_empty(struct fast_reload_printq* printq); static void fr_printq_list_insert(struct fast_reload_printq* printq, struct daemon* daemon); static void fr_printq_remove(struct fast_reload_printq* printq); +static void fr_check_cmd_from_thread(struct fast_reload_thread* fr); static int remote_setup_ctx(struct daemon_remote* rc, struct config_file* cfg) @@ -3509,6 +3510,12 @@ fr_notification_to_string(enum fast_reload_notification status) return "exited"; case fast_reload_notification_printout: return "printout"; + case fast_reload_notification_reload_stop: + return "reload_stop"; + case fast_reload_notification_reload_ack: + return "reload_ack"; + case fast_reload_notification_reload_start: + return "reload_start"; default: break; } @@ -3792,6 +3799,7 @@ fr_construct_from_config(struct fast_reload_thread* fr, if(!(ct->oldcfg = (struct config_file*)calloc(1, sizeof(*ct->oldcfg)))) { fr_construct_clear(ct); + log_err("out of memory"); return 0; } return 1; @@ -3876,15 +3884,80 @@ fr_reload_config(struct fast_reload_thread* fr, struct config_file* newcfg, return 1; } +/** fast reload, poll for ack incoming. */ +static void +fr_poll_for_ack(struct fast_reload_thread* fr) +{ + int loopexit = 0, bcount = 0; + uint32_t cmd; + ssize_t ret; + + if(fr->need_to_quit) + return; + /* Is there data? */ + if(!sock_poll_timeout(fr->commpair[1], -1, 1, 0, NULL)) { + log_err("fr_poll_for_ack: poll failed"); + return; + } + + /* Read the data */ + while(1) { + if(++loopexit > IPC_LOOP_MAX) { + log_err("fr_poll_for_ack: recv loops %s", + sock_strerror(errno)); + return; + } + ret = recv(fr->commpair[1], ((char*)&cmd)+bcount, + sizeof(cmd)-bcount, 0); + if(ret == -1) { + if( +#ifndef USE_WINSOCK + errno == EINTR || errno == EAGAIN +# ifdef EWOULDBLOCK + || errno == EWOULDBLOCK +# endif +#else + WSAGetLastError() == WSAEINTR || + WSAGetLastError() == WSAEINPROGRESS || + WSAGetLastError() == WSAEWOULDBLOCK +#endif + ) + continue; /* Try again. */ + log_err("fr_poll_for_ack: recv: %s", + sock_strerror(errno)); + return; + } else if(ret+(ssize_t)bcount != sizeof(cmd)) { + bcount += ret; + if((size_t)bcount < sizeof(cmd)) + continue; + } + break; + } + if(cmd == fast_reload_notification_exit) { + fr->need_to_quit = 1; + verbose(VERB_ALGO, "fast reload wait for ack: " + "exit notification received"); + return; + } + if(cmd != fast_reload_notification_reload_ack) { + verbose(VERB_ALGO, "fast reload wait for ack: " + "wrong notification %d", (int)cmd); + } +} + /** fast reload thread, reload ipc communication to stop and start threads. */ static int fr_reload_ipc(struct fast_reload_thread* fr, struct config_file* newcfg, struct fast_reload_construct* ct) { + int result = 1; + fr_send_notification(fr, fast_reload_notification_reload_stop); + fr_poll_for_ack(fr); if(!fr_reload_config(fr, newcfg, ct)) { - return 0; + result = 0; } - return 1; + fr_send_notification(fr, fast_reload_notification_reload_start); + return result; } /** fast reload thread, load config */ @@ -4291,6 +4364,8 @@ fast_reload_thread_setup(struct worker* worker) fr->threadnum = numworkers+2; fr->commpair[0] = -1; fr->commpair[1] = -1; + fr->commreload[0] = -1; + fr->commreload[1] = -1; if(!create_socketpair(fr->commpair, worker->daemon->rand)) { free(fr); worker->daemon->fast_reload_thread = NULL; @@ -4306,6 +4381,16 @@ fast_reload_thread_setup(struct worker* worker) worker->daemon->fast_reload_thread = NULL; return 0; } + if(!create_socketpair(fr->commreload, worker->daemon->rand)) { + sock_close(fr->commpair[0]); + sock_close(fr->commpair[1]); + sock_close(fr->commreload[0]); + sock_close(fr->commreload[1]); + free(fr->fr_output); + free(fr); + worker->daemon->fast_reload_thread = NULL; + return 0; + } lock_basic_init(&fr->fr_output_lock); lock_protect(&fr->fr_output_lock, fr->fr_output, sizeof(*fr->fr_output)); @@ -4327,6 +4412,8 @@ fast_reload_thread_desetup(struct fast_reload_thread* fast_reload_thread) ub_event_free(fast_reload_thread->service_event); sock_close(fast_reload_thread->commpair[0]); sock_close(fast_reload_thread->commpair[1]); + sock_close(fast_reload_thread->commreload[0]); + sock_close(fast_reload_thread->commreload[1]); if(fast_reload_thread->printq) { fr_main_perform_printout(fast_reload_thread); /* If it is empty now, there is nothing to print on fd. */ @@ -4352,6 +4439,64 @@ fast_reload_thread_desetup(struct fast_reload_thread* fast_reload_thread) free(fast_reload_thread); } +/** + * Fast reload thread, send a command to the thread. Blocking on timeout. + * It handles received input from the thread, if any is received. + */ +static void +fr_send_cmd_to(struct fast_reload_thread* fr, + enum fast_reload_notification status, int check_cmds, int blocking) +{ + int outevent, loopexit = 0, bcount = 0; + uint32_t cmd; + ssize_t ret; + verbose(VERB_ALGO, "send notification to fast reload thread: %s", + fr_notification_to_string(status)); + cmd = status; + while(1) { + if(++loopexit > IPC_LOOP_MAX) { + log_err("send notification to fast reload: could not send notification: loop"); + return; + } + if(check_cmds) + fr_check_cmd_from_thread(fr); + /* wait for socket to become writable */ + if(!sock_poll_timeout(fr->commpair[0], + (blocking?-1:IPC_NOTIFICATION_WAIT), + 0, 1, &outevent)) { + log_err("send notification to fast reload: poll failed"); + return; + } + if(!outevent) + continue; + ret = send(fr->commpair[0], ((char*)&cmd)+bcount, + sizeof(cmd)-bcount, 0); + if(ret == -1) { + if( +#ifndef USE_WINSOCK + errno == EINTR || errno == EAGAIN +# ifdef EWOULDBLOCK + || errno == EWOULDBLOCK +# endif +#else + WSAGetLastError() == WSAEINTR || + WSAGetLastError() == WSAEINPROGRESS || + WSAGetLastError() == WSAEWOULDBLOCK +#endif + ) + continue; /* Try again. */ + log_err("send notification to fast reload: send: %s", + sock_strerror(errno)); + return; + } else if(ret+(ssize_t)bcount != sizeof(cmd)) { + bcount += ret; + if((size_t)bcount < sizeof(cmd)) + continue; + } + break; + } +} + /** Fast reload, the main thread handles that the fast reload thread has * exited. */ static void @@ -4411,6 +4556,131 @@ fr_main_perform_printout(struct fast_reload_thread* fr) comm_point_listen_for_rw(fr->printq->client_cp, 0, 1); } +/** fast reload, receive ack from workers that they are waiting, run + * by the mainthr after sending them reload_stop. */ +static void +fr_read_ack_from_workers(struct fast_reload_thread* fr) +{ + struct daemon* daemon = fr->worker->daemon; + /* Every worker sends one byte, wait for num-1 bytes. */ + int count=0, total=daemon->num-1; + while(count < total) { + uint8_t r; + ssize_t ret; + ret = recv(fr->commreload[0], &r, 1, 0); + if(ret == -1) { + if( +#ifndef USE_WINSOCK + errno == EINTR || errno == EAGAIN +# ifdef EWOULDBLOCK + || errno == EWOULDBLOCK +# endif +#else + WSAGetLastError() == WSAEINTR || + WSAGetLastError() == WSAEINPROGRESS || + WSAGetLastError() == WSAEWOULDBLOCK +#endif + ) + continue; /* Try again */ + log_err("worker reload ack: recv failed: %s", + sock_strerror(errno)); + return; + } + count++; + verbose(VERB_ALGO, "worker reload ack from (uint8_t)%d", + (int)r); + } +} + +/** fast reload, poll for reload_start in mainthr waiting on a notification + * from the fast reload thread. */ +static void +fr_poll_for_reload_start(struct fast_reload_thread* fr) +{ + int loopexit = 0, bcount = 0; + uint32_t cmd; + ssize_t ret; + + /* Is there data? */ + if(!sock_poll_timeout(fr->commpair[0], -1, 1, 0, NULL)) { + log_err("fr_poll_for_reload_start: poll failed"); + return; + } + + /* Read the data */ + while(1) { + if(++loopexit > IPC_LOOP_MAX) { + log_err("fr_poll_for_reload_start: recv loops %s", + sock_strerror(errno)); + return; + } + ret = recv(fr->commpair[0], ((char*)&cmd)+bcount, + sizeof(cmd)-bcount, 0); + if(ret == -1) { + if( +#ifndef USE_WINSOCK + errno == EINTR || errno == EAGAIN +# ifdef EWOULDBLOCK + || errno == EWOULDBLOCK +# endif +#else + WSAGetLastError() == WSAEINTR || + WSAGetLastError() == WSAEINPROGRESS || + WSAGetLastError() == WSAEWOULDBLOCK +#endif + ) + continue; /* Try again. */ + log_err("fr_poll_for_reload_start: recv: %s", + sock_strerror(errno)); + return; + } else if(ret+(ssize_t)bcount != sizeof(cmd)) { + bcount += ret; + if((size_t)bcount < sizeof(cmd)) + continue; + } + break; + } + if(cmd != fast_reload_notification_reload_start) { + verbose(VERB_ALGO, "fast reload wait for ack: " + "wrong notification %d", (int)cmd); + } +} + + +/** fast reload thread, handle reload_stop notification, send reload stop + * to other threads over IPC and collect their ack. When that is done, + * ack to the caller, the fast reload thread, and wait for it to send start. */ +static void +fr_main_perform_reload_stop(struct fast_reload_thread* fr) +{ + struct daemon* daemon = fr->worker->daemon; + int i; + + /* Send reload_stop to other threads. */ + for(i=0; inum; i++) { + if(i == fr->worker->thread_num) + continue; /* Do not send to ourselves. */ + worker_send_cmd(daemon->workers[i], worker_cmd_reload_stop); + } + + /* Wait for the other threads to ack. */ + fr_read_ack_from_workers(fr); + + /* Send ack to fast reload thread. */ + fr_send_cmd_to(fr, fast_reload_notification_reload_ack, 0, 1); + + /* Wait for reload_start from fast reload thread to resume. */ + fr_poll_for_reload_start(fr); + + /* Send reload_start to other threads */ + for(i=0; inum; i++) { + if(i == fr->worker->thread_num) + continue; /* Do not send to ourselves. */ + worker_send_cmd(daemon->workers[i], worker_cmd_reload_start); + } + verbose(VERB_ALGO, "worker resume after reload"); +} + /** Fast reload, perform the command received from the fast reload thread */ static void fr_main_perform_cmd(struct fast_reload_thread* fr, @@ -4424,6 +4694,8 @@ fr_main_perform_cmd(struct fast_reload_thread* fr, status == fast_reload_notification_done_error || status == fast_reload_notification_exited) { fr_main_perform_done(fr); + } else if(status == fast_reload_notification_reload_stop) { + fr_main_perform_reload_stop(fr); } else { log_err("main received unknown status from fast reload: %d %s", (int)status, fr_notification_to_string(status)); @@ -4819,68 +5091,12 @@ fr_printq_remove(struct fast_reload_printq* printq) fr_printq_delete(printq); } -/** - * Fast reload thread, send a command to the thread. Blocking on timeout. - * It handles received input from the thread, if any is received. - */ -static void -fr_send_cmd_to(struct fast_reload_thread* fr, - enum fast_reload_notification status) -{ - int outevent, loopexit = 0, bcount = 0; - uint32_t cmd; - ssize_t ret; - verbose(VERB_ALGO, "send notification to fast reload thread: %s", - fr_notification_to_string(status)); - cmd = status; - while(1) { - if(++loopexit > IPC_LOOP_MAX) { - log_err("send notification to fast reload: could not send notification: loop"); - return; - } - fr_check_cmd_from_thread(fr); - /* wait for socket to become writable */ - if(!sock_poll_timeout(fr->commpair[0], IPC_NOTIFICATION_WAIT, - 0, 1, &outevent)) { - log_err("send notification to fast reload: poll failed"); - return; - } - if(!outevent) - continue; - ret = send(fr->commpair[0], ((char*)&cmd)+bcount, - sizeof(cmd)-bcount, 0); - if(ret == -1) { - if( -#ifndef USE_WINSOCK - errno == EINTR || errno == EAGAIN -# ifdef EWOULDBLOCK - || errno == EWOULDBLOCK -# endif -#else - WSAGetLastError() == WSAEINTR || - WSAGetLastError() == WSAEINPROGRESS || - WSAGetLastError() == WSAEWOULDBLOCK -#endif - ) - continue; /* Try again. */ - log_err("send notification to fast reload: send: %s", - sock_strerror(errno)); - return; - } else if(ret+(ssize_t)bcount != sizeof(cmd)) { - bcount += ret; - if((size_t)bcount < sizeof(cmd)) - continue; - } - break; - } -} - /** fast reload thread, send stop command to the thread, from the main thread. */ static void fr_send_stop(struct fast_reload_thread* fr) { - fr_send_cmd_to(fr, fast_reload_notification_exit); + fr_send_cmd_to(fr, fast_reload_notification_exit, 1, 0); } void diff --git a/daemon/remote.h b/daemon/remote.h index 82441598e..3121f696a 100644 --- a/daemon/remote.h +++ b/daemon/remote.h @@ -137,7 +137,13 @@ enum fast_reload_notification { /** the fast reload thread has exited, after being told to exit */ fast_reload_notification_exited = 4, /** the fast reload thread has information to print out */ - fast_reload_notification_printout = 5 + fast_reload_notification_printout = 5, + /** stop as part of the reload the thread and other threads */ + fast_reload_notification_reload_stop = 6, + /** ack the stop as part of the reload */ + fast_reload_notification_reload_ack = 7, + /** resume from stop as part of the reload */ + fast_reload_notification_reload_start = 8 }; /** @@ -206,6 +212,9 @@ struct fast_reload_thread { * to be printed. The remote control thread can pick them up with * the lock. */ struct config_strlist_head* fr_output; + + /** communication socket pair, to respond to the reload request */ + int commreload[2]; }; /** diff --git a/daemon/worker.c b/daemon/worker.c index c4ae79352..7ee0d8d54 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -369,6 +369,62 @@ worker_check_request(sldns_buffer* pkt, struct worker* worker, return; } +/** stop and wait to resume the worker */ +static void +worker_stop_and_wait(struct worker* worker) +{ + uint8_t c = (uint8_t)worker->thread_num; + ssize_t ret; + uint8_t* buf = NULL; + uint32_t len = 0, cmd; + while(1) { + ret = send(worker->daemon->fast_reload_thread->commreload[1], + &c, 1, 0); + if(ret == -1) { + if( +#ifndef USE_WINSOCK + errno == EINTR || errno == EAGAIN +# ifdef EWOULDBLOCK + || errno == EWOULDBLOCK +# endif +#else + WSAGetLastError() == WSAEINTR || + WSAGetLastError() == WSAEINPROGRESS || + WSAGetLastError() == WSAEWOULDBLOCK +#endif + ) + continue; /* Try again. */ + log_err("worker reload ack reply: send failed: %s", + sock_strerror(errno)); + break; + } + break; + } + /* wait for reload */ + if(!tube_read_msg(worker->cmd, &buf, &len, 0)) { + log_err("worker reload read reply failed"); + return; + } + if(len != sizeof(uint32_t)) { + log_err("worker reload reply, bad control msg length %d", + (int)len); + free(buf); + return; + } + cmd = sldns_read_uint32(buf); + free(buf); + if(cmd == worker_cmd_quit) { + /* quit anyway */ + verbose(VERB_ALGO, "reload reply, control cmd quit"); + comm_base_exit(worker->base); + return; + } + if(cmd != worker_cmd_reload_start) { + log_err("worker reload reply, wrong reply command"); + } + verbose(VERB_ALGO, "worker resume after reload"); +} + void worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), uint8_t* msg, size_t len, int error, void* arg) @@ -404,6 +460,10 @@ worker_handle_control_cmd(struct tube* ATTR_UNUSED(tube), uint8_t* msg, verbose(VERB_ALGO, "got control cmd remote"); daemon_remote_exec(worker); break; + case worker_cmd_reload_stop: + verbose(VERB_ALGO, "got control cmd reload_stop"); + worker_stop_and_wait(worker); + break; default: log_err("bad command %d", (int)cmd); break; diff --git a/daemon/worker.h b/daemon/worker.h index ab2fc728d..84e851d06 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -72,7 +72,11 @@ enum worker_commands { /** obtain statistics without statsclear */ worker_cmd_stats_noreset, /** execute remote control command */ - worker_cmd_remote + worker_cmd_remote, + /** for fast-reload, perform stop */ + worker_cmd_reload_stop, + /** for fast-reload, start again */ + worker_cmd_reload_start }; /**