use PublicInbox::Search qw(xap_terms);
use PublicInbox::CodeSearch;
use PublicInbox::IPC;
+use PublicInbox::DS qw(awaitpid);
+use POSIX qw(:signal_h);
use Fcntl qw(LOCK_UN LOCK_EX);
my $X = \%PublicInbox::Search::X;
-our (%SRCH, %PIDS, $parent_pid);
+our (%SRCH, %WORKERS, $parent_pid, $alive, $nworker, $workerset);
our $stderr = \*STDERR;
# only short options for portability in C++ implementation
sub recv_loop {
local $SIG{__WARN__} = sub { print $stderr @_ };
my $rbuf;
- while (!defined($parent_pid) || getppid != $parent_pid) {
- my $req = bless {}, __PACKAGE__;
- my @fds = PublicInbox::IPC::recv_cmd(\*STDIN, $rbuf, 4096*33);
+ my $in = \*STDIN;
+ while (!defined($parent_pid) || getppid == $parent_pid) {
+ PublicInbox::DS::sig_setmask($workerset);
+ my @fds = $PublicInbox::IPC::recv_cmd->($in, $rbuf, 4096*33);
scalar(@fds) or exit(66); # EX_NOINPUT
- $fds[0] // die "recvmsg: $!";
+ if (!defined($fds[0])) {
+ next if $!{EINTR};
+ die "recvmsg: $!";
+ }
+ PublicInbox::DS::block_signals();
+ my $req = bless {}, __PACKAGE__;
my $i = 0;
for my $fd (@fds) {
open($req->{$i++}, '+<&=', $fd) and next;
}
}
+sub reap_worker { # awaitpid CB
+ my ($pid, $nr) = @_;
+ delete $WORKERS{$nr};
+ if (($? >> 8) == 66) { # EX_NOINPUT
+ $alive = undef;
+ PublicInbox::DS->SetLoopTimeout(1);
+ } elsif ($?) {
+ warn "worker[$nr] died \$?=$?\n";
+ }
+ PublicInbox::DS::requeue(\&start_workers) if $alive;
+}
+
sub start_worker ($) {
my ($nr) = @_;
- my $pid = fork // return warn("fork: $!");
+ my $pid = fork;
+ if (!defined($pid)) {
+ warn("fork: $!");
+ return undef;
+ };
if ($pid == 0) {
- undef %PIDS;
+ undef %WORKERS;
+ PublicInbox::DS::Reset();
+ $SIG{TERM} = sub { $parent_pid = -1 };
+ $SIG{TTIN} = $SIG{TTOU} = 'IGNORE';
+ $SIG{CHLD} = 'DEFAULT'; # Xapian may use this
recv_loop();
exit(0);
} else {
- $PIDS{$pid} = $nr;
+ $WORKERS{$nr} = $pid;
+ awaitpid($pid, \&reap_worker, $nr);
+ }
+}
+
+sub start_workers {
+ for my $nr (grep { !defined($WORKERS{$_}) } (0..($nworker - 1))) {
+ start_worker($nr) if $alive;
}
}
+sub do_sigttou {
+ if ($alive && $nworker > 1) {
+ --$nworker;
+ my @nr = grep { $_ >= $nworker } keys %WORKERS;
+ kill('TERM', @WORKERS{@nr});
+ }
+}
+
+sub xh_alive { $alive || scalar(keys %WORKERS) }
+
sub start (@) {
my (@argv) = @_;
- local (%SRCH, %PIDS, $parent_pid);
+ local (%SRCH, %WORKERS);
+ local $alive = 1;
PublicInbox::Search::load_xapian();
$GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or
die 'bad args';
- return recv_loop() if !$opt->{j};
- die '-j must be >= 0' if $opt->{j} < 0;
- start_worker($_) for (1..($opt->{j}));
-
- my $quit;
- until ($quit) {
- my $p = waitpid(-1, 0) or return;
- if (defined(my $nr = delete $PIDS{$p})) {
- $quit = 1 if ($? >> 8) == 66; # EX_NOINPUT
- start_worker($nr) if !$quit;
- } else {
- warn "W: unknown pid=$p reaped\n";
- }
+ local $workerset = POSIX::SigSet->new;
+ $workerset->fillset or die "fillset: $!";
+ for (@PublicInbox::DS::UNBLOCKABLE) {
+ $workerset->delset($_) or die "delset($_): $!";
}
+
+ local $nworker = $opt->{j};
+ return recv_loop() if $nworker == 0;
+ die '-j must be >= 0' if $nworker < 0;
+ for (POSIX::SIGTERM, POSIX::SIGCHLD) {
+ $workerset->delset($_) or die "delset($_): $!";
+ }
+ local $parent_pid = $$;
+ my $sig = {
+ TTIN => sub {
+ if ($alive) {
+ ++$nworker;
+ PublicInbox::DS::requeue(\&start_workers)
+ }
+ },
+ TTOU => \&do_sigttou,
+ CHLD => \&PublicInbox::DS::enqueue_reap,
+ };
+ PublicInbox::DS::block_signals();
+ start_workers();
+ @PublicInbox::DS::post_loop_do = \&xh_alive;
+ PublicInbox::DS::event_loop($sig);
}
1;
# define STDERR_ASSIGNABLE (0)
#endif
-static const int sock_fd = 0; // SOCK_SEQPACKET as stdin :P
-static pid_t parent_pid;
+// assert functions are used correctly (e.g. ensure hackers don't
+// cause EINVAL/EFAULT). Not for stuff that can fail due to HW
+// failures.
+# define CHECK(type, expect, expr) do { \
+ type ckvar______ = (expr); \
+ assert(ckvar______ == (expect) && "BUG" && __FILE__ && __LINE__); \
+} while (0)
+
+static const int sock_fd = STDIN_FILENO; // SOCK_SEQPACKET as stdin :P
+static volatile pid_t parent_pid; // may be set in worker sighandler (sigw)
+static sigset_t fullset, workerset;
+static bool alive = true;
#if STDERR_ASSIGNABLE
static FILE *orig_err = stderr;
#endif
static int orig_err_fd = -1;
static void *srch_tree; // tsearch + tdelete + twalk
static pid_t *worker_pids; // nr => pid
-static unsigned long nworker;
+static unsigned long nworker, nworker_hwm;
+static int pipefds[2];
// PublicInbox::Search and PublicInbox::CodeSearch generate these:
static void mail_nrp_init(void);
msg.msg_control = &cmsg.hdr;
msg.msg_controllen = CMSG_SPACE(RECV_FD_SPACE);
+ // allow SIGTERM to hit
+ CHECK(int, 0, sigprocmask(SIG_SETMASK, &workerset, NULL));
+
ssize_t r = recvmsg(sock_fd, &msg, 0);
- if (r < 0)
- err(EXIT_FAILURE, "recvmsg");
- if (r == 0)
+ if (r == 0) {
exit(EX_NOINPUT); /* grandparent went away */
+ } else if (r < 0) {
+ if (errno == EINTR)
+ return false; // retry recv_loop
+ err(EXIT_FAILURE, "recvmsg");
+ }
+
+ // success! no signals for the rest of the request/response cycle
+ CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, NULL));
+
*len = r;
if (r > 0 && cmsg.hdr.cmsg_level == SOL_SOCKET &&
cmsg.hdr.cmsg_type == SCM_RIGHTS) {
clearerr(stderr);
}
+static void sigw(int sig) // SIGTERM handler for worker
+{
+ parent_pid = -1; // break out of recv_loop
+}
+
static void recv_loop(void) // worker process loop
{
static char rbuf[4096 * 33]; // per-process
+ struct sigaction sa = {};
+ sa.sa_handler = sigw;
+
+ CHECK(int, 0, sigaction(SIGTERM, &sa, NULL));
+
while (!parent_pid || getppid() == parent_pid) {
size_t len = sizeof(rbuf);
struct req req = {};
worker_pids[nr] = pid;
}
-static int delete_pid(pid_t pid)
-{
- for (unsigned nr = 0; nr < nworker; nr++) {
- if (worker_pids[nr] == pid) {
- worker_pids[nr] = 0;
- return nr;
- }
- }
- warnx("W: unknown pid=%d reaped", (int)pid);
- return -1;
-}
-
static void start_worker(unsigned nr)
{
pid_t pid = fork();
insert_pid(pid, nr);
} else {
cleanup_pids();
+ xclose(pipefds[0]);
+ xclose(pipefds[1]);
+ if (signal(SIGCHLD, SIG_DFL) == SIG_ERR)
+ err(EXIT_FAILURE, "signal CHLD");
+ if (signal(SIGTTIN, SIG_IGN) == SIG_ERR)
+ err(EXIT_FAILURE, "signal TTIN");
+ if (signal(SIGTTOU, SIG_IGN) == SIG_ERR)
+ err(EXIT_FAILURE, "signal TTIN");
recv_loop();
exit(0);
}
}
+static void start_workers(void)
+{
+ sigset_t old;
+
+ CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, &old));
+ for (unsigned long nr = 0; nr < nworker; nr++) {
+ if (!worker_pids[nr])
+ start_worker(nr);
+ }
+ CHECK(int, 0, sigprocmask(SIG_SETMASK, &old, NULL));
+}
+
static void cleanup_all(void)
{
cleanup_pids();
#endif
}
+static void sigp(int sig) // parent signal handler
+{
+ static const char eagain[] = "signals coming in too fast";
+ static const char bad_sig[] = "BUG: bad sig\n";
+ static const char write_err[] = "BUG: sigp write: ";
+ char c = 0;
+
+ switch (sig) {
+ case SIGCHLD: c = '.'; break;
+ case SIGTTOU: c = '-'; break;
+ case SIGTTIN: c = '+'; break;
+ default:
+ write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1);
+ _exit(EXIT_FAILURE);
+ }
+ ssize_t w = write(pipefds[1], &c, 1);
+ if (w == sizeof(c)) return;
+ int e = 0;
+ if (w < 0) {
+ e = errno;
+ if (e == EAGAIN) {
+ write(STDERR_FILENO, eagain, sizeof(eagain) - 1);
+ return;
+ }
+ }
+ struct iovec iov[3];
+ iov[0].iov_base = (void *)write_err;
+ iov[0].iov_len = sizeof(write_err) - 1;
+ iov[1].iov_base = (void *)(e ? strerror(e) : "zero write");
+ iov[1].iov_len = strlen((const char *)iov[1].iov_base);
+ iov[2].iov_base = (void *)"\n";
+ iov[2].iov_len = 1;
+ (void)writev(STDERR_FILENO, iov, MY_ARRAY_SIZE(iov));
+ _exit(EXIT_FAILURE);
+}
+
+static void reaped_worker(pid_t pid, int st)
+{
+ unsigned long nr = 0;
+ for (; nr < nworker_hwm; nr++) {
+ if (worker_pids[nr] == pid) {
+ worker_pids[nr] = 0;
+ break;
+ }
+ }
+ if (nr >= nworker_hwm) {
+ warnx("W: unknown pid=%d reaped $?=%d", (int)pid, st);
+ return;
+ }
+ if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
+ alive = false;
+ else if (st)
+ warnx("worker[%lu] died $?=%d", nr, st);
+ if (alive)
+ start_workers();
+}
+
+static void do_sigchld(void)
+{
+ while (1) {
+ int st;
+ pid_t pid = waitpid(-1, &st, WNOHANG);
+ if (pid > 0) {
+ reaped_worker(pid, st);
+ } else if (pid == 0) {
+ return;
+ } else {
+ switch (errno) {
+ case ECHILD: return;
+ case EINTR: break; // can it happen w/ WNOHANG?
+ default: err(EXIT_FAILURE, "BUG: waitpid");
+ }
+ }
+ }
+}
+
+static void do_sigttin(void)
+{
+ if (!alive) return;
+ void *p = reallocarray(worker_pids, nworker + 1, sizeof(pid_t));
+ if (!p) {
+ warn("reallocarray");
+ } else {
+ worker_pids = (pid_t *)p;
+ worker_pids[nworker++] = 0;
+ if (nworker_hwm < nworker)
+ nworker_hwm = nworker;
+ start_workers();
+ }
+}
+
+static void do_sigttou(void)
+{
+ if (!alive || nworker <= 1) return;
+
+ // worker_pids array does not shrink
+ --nworker;
+ for (unsigned long nr = nworker; nr < nworker_hwm; nr++) {
+ pid_t pid = worker_pids[nr];
+ if (pid != 0 && kill(pid, SIGTERM))
+ warn("BUG?: kill(%d, SIGTERM)", (int)pid);
+ }
+}
+
+static size_t living_workers(void)
+{
+ size_t ret = 0;
+
+ for (unsigned long nr = 0; nr < nworker_hwm; nr++) {
+ if (worker_pids[nr])
+ ret++;
+ }
+ return ret;
+}
+
int main(int argc, char *argv[])
{
int c;
err(EXIT_FAILURE, "dup(2)");
}
- nworker = 0;
+ nworker = 1;
#ifdef _SC_NPROCESSORS_ONLN
long j = sysconf(_SC_NPROCESSORS_ONLN);
if (j > 0)
errx(EXIT_FAILURE, "BUG: `-%c'", c);
}
}
- if (nworker == 0) {
+ sigset_t pset; // parent-only
+ CHECK(int, 0, sigfillset(&pset));
+
+ // global sigsets:
+ CHECK(int, 0, sigfillset(&fullset));
+ CHECK(int, 0, sigfillset(&workerset));
+
+#define DELSET(sig) do { \
+ CHECK(int, 0, sigdelset(&fullset, sig)); \
+ CHECK(int, 0, sigdelset(&workerset, sig)); \
+ CHECK(int, 0, sigdelset(&pset, sig)); \
+} while (0)
+ DELSET(SIGABRT);
+ DELSET(SIGBUS);
+ DELSET(SIGFPE);
+ DELSET(SIGILL);
+ DELSET(SIGSEGV);
+ DELSET(SIGXCPU);
+ DELSET(SIGXFSZ);
+#undef DELSET
+
+ if (nworker == 0) { // no SIGTERM handling w/o workers
recv_loop();
- } else {
- parent_pid = getpid();
- worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
- if (!worker_pids)
- err(EXIT_FAILURE, "calloc");
- for (unsigned i = 0; i < nworker; i++)
- start_worker(i);
-
- int st;
- pid_t pid;
- bool quit = false;
- while ((pid = wait(&st)) > 0) {
- int nr = delete_pid(pid);
- if (nr < 0) continue;
- if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
- quit = true;
- if (!quit)
- start_worker(nr);
+ return 0;
+ }
+ CHECK(int, 0, sigdelset(&workerset, SIGTERM));
+ CHECK(int, 0, sigdelset(&workerset, SIGCHLD));
+ parent_pid = getpid();
+ nworker_hwm = nworker;
+ worker_pids = (pid_t *)calloc(nworker, sizeof(pid_t));
+ if (!worker_pids) err(EXIT_FAILURE, "calloc");
+
+ if (pipe(pipefds)) err(EXIT_FAILURE, "pipe");
+ int fl = fcntl(F_GETFL, pipefds[1]);
+ if (fl == -1) err(EXIT_FAILURE, "F_GETFL");
+ if (fcntl(F_SETFL, pipefds[1], fl | O_NONBLOCK))
+ err(EXIT_FAILURE, "F_SETFL");
+
+ CHECK(int, 0, sigdelset(&pset, SIGCHLD));
+ CHECK(int, 0, sigdelset(&pset, SIGTTIN));
+ CHECK(int, 0, sigdelset(&pset, SIGTTOU));
+
+ struct sigaction sa = {};
+ sa.sa_handler = sigp;
+
+ CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL));
+ CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL));
+ sa.sa_flags = SA_NOCLDSTOP;
+ CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL));
+
+ CHECK(int, 0, sigprocmask(SIG_SETMASK, &pset, NULL));
+
+ start_workers();
+
+ char sbuf[64];
+ while (alive || living_workers()) {
+ ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf));
+ if (n < 0) {
+ if (errno == EINTR) continue;
+ err(EXIT_FAILURE, "read");
+ } else if (n == 0) {
+ errx(EXIT_FAILURE, "read EOF");
+ }
+ do_sigchld();
+ for (ssize_t i = 0; i < n; i++) {
+ switch (sbuf[i]) {
+ case '.': break; // do_sigchld already called
+ case '-': do_sigttou(); break;
+ case '+': do_sigttin(); break;
+ default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
+ }
}
}
+
return 0;
}
my $stats = do { local $/; <$err_rd> };
is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported');
- if ($arg[-1] !~ /\('-j0'\)/) {
- kill('KILL', $cinfo{pid});
+ return $ar if $cinfo{pid} == $pid;
+
+ # test worker management:
+ kill('TERM', $cinfo{pid});
+ my $tries = 0;
+ do {
$r = $doreq->($s, qw(test_inspect -d), $ibx_idx[0]);
- %info = map {
- split(/=/, $_, 2)
- } split(/ /, do { local $/; <$r> });
- isnt($info{pid}, $cinfo{pid}, 'spawned new worker');
+ %info = map { split(/=/, $_, 2) }
+ split(/ /, do { local $/; <$r> });
+ } while ($info{pid} == $cinfo{pid} && ++$tries < 10);
+ isnt($info{pid}, $cinfo{pid}, 'spawned new worker');
+
+ my %pids;
+ $tries = 0;
+ my @ins = ($s, qw(test_inspect -d), $ibx_idx[0]);
+ kill('TTIN', $pid);
+ until (scalar(keys %pids) >= 2 || ++$tries > 10) {
+ tick;
+ my @r = map { $doreq->(@ins) } (0..5);
+ for my $fh (@r) {
+ my $buf = do { local $/; <$fh> } // die "read: $!";
+ $buf =~ /\bpid=(\d+)/ and $pids{$1} = undef;
+ }
+ }
+ is(scalar keys %pids, 2, 'have two pids');
+
+ kill('TTOU', $pid);
+ %pids = ();
+ my $delay = $tries * 0.11 * ($ENV{VALGRIND} ? 10 : 1);
+ $tries = 0;
+ diag 'waiting '.$delay.'s for SIGTTOU';
+ tick($delay);
+ until (scalar(keys %pids) == 1 || ++$tries > 100) {
+ %pids = ();
+ my @r = map { $doreq->(@ins) } (0..5);
+ for my $fh (@r) {
+ my $buf = do { local $/; <$fh> } // die "read: $!";
+ $buf =~ /\bpid=(\d+)/ and $pids{$1} = undef;
+ }
}
+ is(scalar keys %pids, 1, 'have one pid') or diag explain(\%pids);
+ is($info{pid}, (keys %pids)[0], 'kept oldest PID after TTOU');
+
$ar;
};
-my $ar;
my @NO_CXX = (1);
unless ($ENV{TEST_XH_CXX_ONLY}) {
- $ar = $test->(qw[-MPublicInbox::XapHelper -e
+ my $ar = $test->(qw[-MPublicInbox::XapHelper -e
PublicInbox::XapHelper::start('-j0')]);
$ar = $test->(qw[-MPublicInbox::XapHelper -e
PublicInbox::XapHelper::start('-j1')]);
require PublicInbox::XapHelperCxx;
PublicInbox::XapHelperCxx::check_build();
};
- skip "XapHelperCxx build: $@", 1 if $@;
+ skip "XapHelperCxx build: $@", 1 if $@ || $ENV{PI_NO_CXX};
@NO_CXX = $ENV{TEST_XH_CXX_ONLY} ? (0) : (0, 1);
- $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
+ my $ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
PublicInbox::XapHelperCxx::start('-j0')]);
$ar = $test->(qw[-MPublicInbox::XapHelperCxx -e
PublicInbox::XapHelperCxx::start('-j1')]);
close $fh;
}
+my $ar;
for my $n (@NO_CXX) {
local $ENV{PI_NO_CXX} = $n;
my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');