]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
xap_helper: support SIGTTIN+SIGTTOU worker adjustments
authorEric Wong <e@80x24.org>
Mon, 4 Sep 2023 10:36:04 +0000 (10:36 +0000)
committerEric Wong <e@80x24.org>
Tue, 5 Sep 2023 03:01:40 +0000 (03:01 +0000)
Being able to tune worker process counts on-the-fly when
xap_helper gets used with -{netd,httpd,imapd} will be useful
for tuning new setups.

lib/PublicInbox/IPC.pm
lib/PublicInbox/XapHelper.pm
lib/PublicInbox/xap_helper.h
t/xap_helper.t

index 766c377f83479d9f80909cfdd41ebde0ddf52dcf..528b91333742ffb30ccd5f71827929d0a07b1135 100644 (file)
@@ -42,7 +42,7 @@ if ($enc && $dec) { # should be custom ops
        *ipc_thaw = \&Storable::thaw;
 }
 
-my $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
+our $recv_cmd = PublicInbox::Spawn->can('recv_cmd4');
 our $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
        require PublicInbox::CmdIPC4;
        $recv_cmd //= PublicInbox::CmdIPC4->can('recv_cmd4');
index 36266e656c9235806ab39b1f74bb59000783e69e..0e79e5e21371c297cea629d40b468b9dbc6e8979 100644 (file)
@@ -10,9 +10,11 @@ $GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev));
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::CodeSearch;
 use PublicInbox::IPC;
+use PublicInbox::DS qw(awaitpid);
+use POSIX qw(:signal_h);
 use Fcntl qw(LOCK_UN LOCK_EX);
 my $X = \%PublicInbox::Search::X;
-our (%SRCH, %PIDS, $parent_pid);
+our (%SRCH, %WORKERS, $parent_pid, $alive, $nworker, $workerset);
 our $stderr = \*STDERR;
 
 # only short options for portability in C++ implementation
@@ -171,11 +173,17 @@ sub dispatch {
 sub recv_loop {
        local $SIG{__WARN__} = sub { print $stderr @_ };
        my $rbuf;
-       while (!defined($parent_pid) || getppid != $parent_pid) {
-               my $req = bless {}, __PACKAGE__;
-               my @fds = PublicInbox::IPC::recv_cmd(\*STDIN, $rbuf, 4096*33);
+       my $in = \*STDIN;
+       while (!defined($parent_pid) || getppid == $parent_pid) {
+               PublicInbox::DS::sig_setmask($workerset);
+               my @fds = $PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33);
                scalar(@fds) or exit(66); # EX_NOINPUT
-               $fds[0] // die "recvmsg: $!";
+               if (!defined($fds[0])) {
+                       next if $!{EINTR};
+                       die "recvmsg: $!";
+               }
+               PublicInbox::DS::block_signals();
+               my $req = bless {}, __PACKAGE__;
                my $i = 0;
                for my $fd (@fds) {
                        open($req->{$i++}, '+<&=', $fd) and next;
@@ -195,38 +203,89 @@ sub recv_loop {
        }
 }
 
+sub reap_worker { # awaitpid CB
+       my ($pid, $nr) = @_;
+       delete $WORKERS{$nr};
+       if (($? >> 8) == 66) { # EX_NOINPUT
+               $alive = undef;
+               PublicInbox::DS->SetLoopTimeout(1);
+       } elsif ($?) {
+               warn "worker[$nr] died \$?=$?\n";
+       }
+       PublicInbox::DS::requeue(\&start_workers) if $alive;
+}
+
 sub start_worker ($) {
        my ($nr) = @_;
-       my $pid = fork // return warn("fork: $!");
+       my $pid = fork;
+       if (!defined($pid)) {
+               warn("fork: $!");
+               return undef;
+       };
        if ($pid == 0) {
-               undef %PIDS;
+               undef %WORKERS;
+               PublicInbox::DS::Reset();
+               $SIG{TERM} = sub { $parent_pid = -1 };
+               $SIG{TTIN} = $SIG{TTOU} = 'IGNORE';
+               $SIG{CHLD} = 'DEFAULT'; # Xapian may use this
                recv_loop();
                exit(0);
        } else {
-               $PIDS{$pid} = $nr;
+               $WORKERS{$nr} = $pid;
+               awaitpid($pid, \&reap_worker, $nr);
+       }
+}
+
+sub start_workers {
+       for my $nr (grep { !defined($WORKERS{$_}) } (0..($nworker - 1))) {
+               start_worker($nr) if $alive;
        }
 }
 
+sub do_sigttou {
+       if ($alive && $nworker > 1) {
+               --$nworker;
+               my @nr = grep { $_ >= $nworker } keys %WORKERS;
+               kill('TERM', @WORKERS{@nr});
+       }
+}
+
+sub xh_alive { $alive || scalar(keys %WORKERS) }
+
 sub start (@) {
        my (@argv) = @_;
-       local (%SRCH, %PIDS, $parent_pid);
+       local (%SRCH, %WORKERS);
+       local $alive = 1;
        PublicInbox::Search::load_xapian();
        $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or
                die 'bad args';
-       return recv_loop() if !$opt->{j};
-       die '-j must be >= 0' if $opt->{j} < 0;
-       start_worker($_) for (1..($opt->{j}));
-
-       my $quit;
-       until ($quit) {
-               my $p = waitpid(-1, 0) or return;
-               if (defined(my $nr = delete $PIDS{$p})) {
-                       $quit = 1 if ($? >> 8) == 66; # EX_NOINPUT
-                       start_worker($nr) if !$quit;
-               } else {
-                       warn "W: unknown pid=$p reaped\n";
-               }
+       local $workerset = POSIX::SigSet->new;
+       $workerset->fillset or die "fillset: $!";
+       for (@PublicInbox::DS::UNBLOCKABLE) {
+               $workerset->delset($_) or die "delset($_): $!";
        }
+
+       local $nworker = $opt->{j};
+       return recv_loop() if $nworker == 0;
+       die '-j must be >= 0' if $nworker < 0;
+       for (POSIX::SIGTERM, POSIX::SIGCHLD) {
+               $workerset->delset($_) or die "delset($_): $!";
+       }
+       local $parent_pid = $$;
+       my $sig = {
+               TTIN => sub {
+                       if ($alive) {
+                               ++$nworker;
+                               PublicInbox::DS::requeue(\&start_workers)
+                       }
+               },
+               TTOU => \&do_sigttou,
+               CHLD => \&PublicInbox::DS::enqueue_reap,
+       };
+       PublicInbox::DS::block_signals();
+       start_workers();
+       @PublicInbox::DS::post_loop_do = \&xh_alive;
+       PublicInbox::DS::event_loop($sig);
 }
 
 1;
index 871a381c7abaed76fb307e280d70e70a6a78b7d4..493a24f46a2a596d8a8f773a86101207d829fb50 100644 (file)
 #      define STDERR_ASSIGNABLE (0)
 #endif
 
-static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P
-static pid_t parent_pid;
+// assert functions are used correctly (e.g. ensure hackers don't
+// cause EINVAL/EFAULT).  Not for stuff that can fail due to HW
+// failures.
+# define CHECK(type, expect, expr) do { \
+       type ckvar______ = (expr); \
+       assert(ckvar______ == (expect) && "BUG" && __FILE__ && __LINE__); \
+} while (0)
+
+static const int sock_fd = STDIN_FILENO; // SOCK_SEQPACKET as stdin :P
+static volatile pid_t parent_pid; // may be set in worker sighandler (sigw)
+static sigset_t fullset, workerset;
+static bool alive = true;
 #if STDERR_ASSIGNABLE
 static FILE *orig_err = stderr;
 #endif
 static int orig_err_fd = -1;
 static void *srch_tree; // tsearch + tdelete + twalk
 static pid_t *worker_pids; // nr => pid
-static unsigned long nworker;
+static unsigned long nworker, nworker_hwm;
+static int pipefds[2];
 
 // PublicInbox::Search and PublicInbox::CodeSearch generate these:
 static void mail_nrp_init(void);
@@ -598,11 +609,21 @@ static bool recv_req(struct req *req, char *rbuf, size_t *len)
        msg.msg_control = &cmsg.hdr;
        msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE);
 
+       // allow SIGTERM to hit
+       CHECK(int, 0, sigprocmask(SIG_SETMASK, &workerset, NULL));
+
        ssize_t r = recvmsg(sock_fd, &msg, 0);
-       if (r < 0)
-               err(EXIT_FAILURE, "recvmsg");
-       if (r == 0)
+       if (r == 0) {
                exit(EX_NOINPUT); /* grandparent went away */
+       } else if (r < 0) {
+               if (errno == EINTR)
+                       return false; // retry recv_loop
+               err(EXIT_FAILURE, "recvmsg");
+       }
+
+       // success! no signals for the rest of the request/response cycle
+       CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, NULL));
+
        *len = r;
        if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
                        cmsg.hdr.cmsg_type == SCM_RIGHTS) {
@@ -875,9 +896,19 @@ static void stderr_restore(FILE *tmp_err)
        clearerr(stderr);
 }
 
+static void sigw(int sig) // SIGTERM handler for worker
+{
+       parent_pid = -1; // break out of recv_loop
+}
+
 static void recv_loop(void) // worker process loop
 {
        static char rbuf[4096 * 33]; // per-process
+       struct sigaction sa = {};
+       sa.sa_handler = sigw;
+
+       CHECK(int, 0, sigaction(SIGTERM, &sa, NULL));
+
        while (!parent_pid || getppid() == parent_pid) {
                size_t len = sizeof(rbuf);
                struct req req = {};
@@ -904,18 +935,6 @@ static void insert_pid(pid_t pid, unsigned nr)
        worker_pids[nr] = pid;
 }
 
-static int delete_pid(pid_t pid)
-{
-       for (unsigned nr = 0; nr < nworker; nr++) {
-               if (worker_pids[nr] == pid) {
-                       worker_pids[nr] = 0;
-                       return nr;
-               }
-       }
-       warnx("W: unknown pid=%d reaped", (int)pid);
-       return -1;
-}
-
 static void start_worker(unsigned nr)
 {
        pid_t pid = fork();
@@ -925,11 +944,31 @@ static void start_worker(unsigned nr)
                insert_pid(pid, nr);
        } else {
                cleanup_pids();
+               xclose(pipefds[0]);
+               xclose(pipefds[1]);
+               if (signal(SIGCHLD, SIG_DFL) == SIG_ERR)
+                       err(EXIT_FAILURE, "signal CHLD");
+               if (signal(SIGTTIN, SIG_IGN) == SIG_ERR)
+                       err(EXIT_FAILURE, "signal TTIN");
+               if (signal(SIGTTOU, SIG_IGN) == SIG_ERR)
+                       err(EXIT_FAILURE, "signal TTIN");
                recv_loop();
                exit(0);
        }
 }
 
+static void start_workers(void)
+{
+       sigset_t old;
+
+       CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, &old));
+       for (unsigned long nr = 0; nr < nworker; nr++) {
+               if (!worker_pids[nr])
+                       start_worker(nr);
+       }
+       CHECK(int, 0, sigprocmask(SIG_SETMASK, &old, NULL));
+}
+
 static void cleanup_all(void)
 {
        cleanup_pids();
@@ -939,6 +978,121 @@ static void cleanup_all(void)
 #endif
 }
 
+static void sigp(int sig) // parent signal handler
+{
+       static const char eagain[] = "signals coming in too fast";
+       static const char bad_sig[] = "BUG: bad sig\n";
+       static const char write_err[] = "BUG: sigp write: ";
+       char c = 0;
+
+       switch (sig) {
+       case SIGCHLD: c = '.'; break;
+       case SIGTTOU: c = '-'; break;
+       case SIGTTIN: c = '+'; break;
+       default:
+               write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1);
+               _exit(EXIT_FAILURE);
+       }
+       ssize_t w = write(pipefds[1], &c, 1);
+       if (w == sizeof(c)) return;
+       int e = 0;
+       if (w < 0) {
+               e = errno;
+               if (e == EAGAIN) {
+                       write(STDERR_FILENO, eagain, sizeof(eagain) - 1);
+                       return;
+               }
+       }
+       struct iovec iov[3];
+       iov[0].iov_base = (void *)write_err;
+       iov[0].iov_len = sizeof(write_err) - 1;
+       iov[1].iov_base = (void *)(e ? strerror(e) : "zero write");
+       iov[1].iov_len = strlen((const char *)iov[1].iov_base);
+       iov[2].iov_base = (void *)"\n";
+       iov[2].iov_len = 1;
+       (void)writev(STDERR_FILENO, iov, MY_ARRAY_SIZE(iov));
+       _exit(EXIT_FAILURE);
+}
+
+static void reaped_worker(pid_t pid, int st)
+{
+       unsigned long nr = 0;
+       for (; nr < nworker_hwm; nr++) {
+               if (worker_pids[nr] == pid) {
+                       worker_pids[nr] = 0;
+                       break;
+               }
+       }
+       if (nr >= nworker_hwm) {
+               warnx("W: unknown pid=%d reaped $?=%d", (int)pid, st);
+               return;
+       }
+       if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
+               alive = false;
+       else if (st)
+               warnx("worker[%lu] died $?=%d", nr, st);
+       if (alive)
+               start_workers();
+}
+
+static void do_sigchld(void)
+{
+       while (1) {
+               int st;
+               pid_t pid = waitpid(-1, &st, WNOHANG);
+               if (pid > 0) {
+                       reaped_worker(pid, st);
+               } else if (pid == 0) {
+                       return;
+               } else {
+                       switch (errno) {
+                       case ECHILD: return;
+                       case EINTR: break; // can it happen w/ WNOHANG?
+                       default: err(EXIT_FAILURE, "BUG: waitpid");
+                       }
+               }
+       }
+}
+
+static void do_sigttin(void)
+{
+       if (!alive) return;
+       void *p = reallocarray(worker_pids, nworker + 1, sizeof(pid_t));
+       if (!p) {
+               warn("reallocarray");
+       } else {
+               worker_pids = (pid_t *)p;
+               worker_pids[nworker++] = 0;
+               if (nworker_hwm < nworker)
+                       nworker_hwm = nworker;
+               start_workers();
+       }
+}
+
+static void do_sigttou(void)
+{
+       if (!alive || nworker <= 1) return;
+
+       // worker_pids array does not shrink
+       --nworker;
+       for (unsigned long nr = nworker; nr < nworker_hwm; nr++) {
+               pid_t pid = worker_pids[nr];
+               if (pid != 0 && kill(pid, SIGTERM))
+                       warn("BUG?: kill(%d, SIGTERM)", (int)pid);
+       }
+}
+
+static size_t living_workers(void)
+{
+       size_t ret = 0;
+
+       for (unsigned long nr = 0; nr < nworker_hwm; nr++) {
+               if (worker_pids[nr])
+                       ret++;
+       }
+       return ret;
+}
+
 int main(int argc, char *argv[])
 {
        int c;
@@ -953,7 +1107,7 @@ int main(int argc, char *argv[])
                        err(EXIT_FAILURE, "dup(2)");
        }
 
-       nworker = 0;
+       nworker = 1;
 #ifdef _SC_NPROCESSORS_ONLN
        long j = sysconf(_SC_NPROCESSORS_ONLN);
        if (j > 0)
@@ -981,27 +1135,79 @@ int main(int argc, char *argv[])
                        errx(EXIT_FAILURE, "BUG: `-%c'", c);
                }
        }
-       if (nworker == 0) {
+       sigset_t pset; // parent-only
+       CHECK(int, 0, sigfillset(&pset));
+
+       // global sigsets:
+       CHECK(int, 0, sigfillset(&fullset));
+       CHECK(int, 0, sigfillset(&workerset));
+
+#define DELSET(sig) do { \
+       CHECK(int, 0, sigdelset(&fullset, sig)); \
+       CHECK(int, 0, sigdelset(&workerset, sig)); \
+       CHECK(int, 0, sigdelset(&pset, sig)); \
+} while (0)
+       DELSET(SIGABRT);
+       DELSET(SIGBUS);
+       DELSET(SIGFPE);
+       DELSET(SIGILL);
+       DELSET(SIGSEGV);
+       DELSET(SIGXCPU);
+       DELSET(SIGXFSZ);
+#undef DELSET
+
+       if (nworker == 0) { // no SIGTERM handling w/o workers
                recv_loop();
-       } else {
-               parent_pid = getpid();
-               worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
-               if (!worker_pids)
-                       err(EXIT_FAILURE, "calloc");
-               for (unsigned i = 0; i < nworker; i++)
-                       start_worker(i);
-
-               int st;
-               pid_t pid;
-               bool quit = false;
-               while ((pid = wait(&st)) > 0) {
-                       int nr = delete_pid(pid);
-                       if (nr < 0) continue;
-                       if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
-                               quit = true;
-                       if (!quit)
-                               start_worker(nr);
+               return 0;
+       }
+       CHECK(int, 0, sigdelset(&workerset, SIGTERM));
+       CHECK(int, 0, sigdelset(&workerset, SIGCHLD));
+       parent_pid = getpid();
+       nworker_hwm = nworker;
+       worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
+       if (!worker_pids) err(EXIT_FAILURE, "calloc");
+
+       if (pipe(pipefds)) err(EXIT_FAILURE, "pipe");
+       int fl = fcntl(F_GETFL, pipefds[1]);
+       if (fl == -1) err(EXIT_FAILURE, "F_GETFL");
+       if (fcntl(F_SETFL, pipefds[1], fl | O_NONBLOCK))
+               err(EXIT_FAILURE, "F_SETFL");
+
+       CHECK(int, 0, sigdelset(&pset, SIGCHLD));
+       CHECK(int, 0, sigdelset(&pset, SIGTTIN));
+       CHECK(int, 0, sigdelset(&pset, SIGTTOU));
+
+       struct sigaction sa = {};
+       sa.sa_handler = sigp;
+
+       CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL));
+       CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL));
+       sa.sa_flags = SA_NOCLDSTOP;
+       CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL));
+
+       CHECK(int, 0, sigprocmask(SIG_SETMASK, &pset, NULL));
+
+       start_workers();
+
+       char sbuf[64];
+       while (alive || living_workers()) {
+               ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf));
+               if (n < 0) {
+                       if (errno == EINTR) continue;
+                       err(EXIT_FAILURE, "read");
+               } else if (n == 0) {
+                       errx(EXIT_FAILURE, "read EOF");
+               }
+               do_sigchld();
+               for (ssize_t i = 0; i < n; i++) {
+                       switch (sbuf[i]) {
+                       case '.': break; // do_sigchld already called
+                       case '-': do_sigttou(); break;
+                       case '+': do_sigttin(); break;
+                       default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
+                       }
                }
        }
+
        return 0;
 }
index b68f2773d73c74ec96be127ae903bcc40ef0f604..f4b3581f3143d4769877aca7c4765b07a4c50fd4 100644 (file)
@@ -95,21 +95,55 @@ my $test = sub {
        my $stats = do { local $/; <$err_rd> };
        is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported');
 
-       if ($arg[-1] !~ /\('-j0'\)/) {
-               kill('KILL', $cinfo{pid});
+       return $ar if $cinfo{pid} == $pid;
+
+       # test worker management:
+       kill('TERM', $cinfo{pid});
+       my $tries = 0;
+       do {
                $r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
-               %info = map {
-                       split(/=/, $_, 2)
-               } split(/ /, do { local $/; <$r> });
-               isnt($info{pid}, $cinfo{pid}, 'spawned new worker');
+               %info = map { split(/=/, $_, 2) }
+                       split(/ /, do { local $/; <$r> });
+       } while ($info{pid} == $cinfo{pid} && ++$tries < 10);
+       isnt($info{pid}, $cinfo{pid}, 'spawned new worker');
+
+       my %pids;
+       $tries = 0;
+       my @ins = ($s, qw(test_inspect -d), $ibx_idx[0]);
+       kill('TTIN', $pid);
+       until (scalar(keys %pids) >= 2 || ++$tries > 10) {
+               tick;
+               my @r = map { $doreq->(@ins) } (0..5);
+               for my $fh (@r) {
+                       my $buf = do { local $/; <$fh> } // die "read: $!";
+                       $buf =~ /\bpid=(\d+)/ and $pids{$1} = undef;
+               }
+       }
+       is(scalar keys %pids, 2, 'have two pids');
+
+       kill('TTOU', $pid);
+       %pids = ();
+       my $delay = $tries * 0.11 * ($ENV{VALGRIND} ? 10 : 1);
+       $tries = 0;
+       diag 'waiting '.$delay.'s for SIGTTOU';
+       tick($delay);
+       until (scalar(keys %pids) == 1 || ++$tries > 100) {
+               %pids = ();
+               my @r = map { $doreq->(@ins) } (0..5);
+               for my $fh (@r) {
+                       my $buf = do { local $/; <$fh> } // die "read: $!";
+                       $buf =~ /\bpid=(\d+)/ and $pids{$1} = undef;
+               }
        }
+       is(scalar keys %pids, 1, 'have one pid') or diag explain(\%pids);
+       is($info{pid}, (keys %pids)[0], 'kept oldest PID after TTOU');
+
        $ar;
 };
-my $ar;
 
 my @NO_CXX = (1);
 unless ($ENV{TEST_XH_CXX_ONLY}) {
-       $ar = $test->(qw[-MPublicInbox::XapHelper -e
+       my $ar = $test->(qw[-MPublicInbox::XapHelper -e
                        PublicInbox::XapHelper::start('-j0')]);
        $ar = $test->(qw[-MPublicInbox::XapHelper -e
                        PublicInbox::XapHelper::start('-j1')]);
@@ -119,10 +153,10 @@ SKIP: {
                require PublicInbox::XapHelperCxx;
                PublicInbox::XapHelperCxx::check_build();
        };
-       skip "XapHelperCxx build: $@", 1 if $@;
+       skip "XapHelperCxx build: $@", 1 if $@ || $ENV{PI_NO_CXX};
 
        @NO_CXX = $ENV{TEST_XH_CXX_ONLY} ? (0) : (0, 1);
-       $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
+       my $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
                        PublicInbox::XapHelperCxx::start('-j0')]);
        $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
                        PublicInbox::XapHelperCxx::start('-j1')]);
@@ -142,6 +176,7 @@ my @id2root;
        close $fh;
 }
 
+my $ar;
 for my $n (@NO_CXX) {
        local $ENV{PI_NO_CXX} = $n;
        my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');