From 8a3c04bb01b243c28515df77e8ca647ec54dec01 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 5 Feb 2026 03:47:17 +0000 Subject: [PATCH] lei: C++ xap_helper forks dynamically Rework xap_helper for lei to prefork before dynamically forking a new worker on demand for every request. We do this to ensure the C++ xap_helper is usable ASAP and enable us to reliably skip approxidate handling in Perl (and thus rely on our integration of the C approxidate from git(1) with Xapian's QueryParser API). Previously, we would v?fork+execve for every query, but it was open to TOCTOU errors for deciding whether or not to do approxidate processing in Perl or (C-flavored) C++. Using fexecve(2) was also considered, but that's not available from pure Perl and I'm still hesitant to make Inline::C or XS a hard requirement for us. Public-facing xap_helper usage remains unchanged, since it's expected public-facing daemons have dedicated resources preconfigured while local software such as lei can be more dynamic and flexible in its resource use. There is no change to the Perl + (XS|SWIG) XapHelper implementation, since it provides no benefit to lei users. --- lib/PublicInbox/LEI.pm | 10 +- lib/PublicInbox/LeiQuery.pm | 1 + lib/PublicInbox/LeiSucks.pm | 3 +- lib/PublicInbox/LeiUp.pm | 1 + lib/PublicInbox/LeiXSearch.pm | 2 - lib/PublicInbox/Search.pm | 40 ++++-- lib/PublicInbox/xap_helper.h | 240 +++++++++++++++++++++++++--------- t/xap_helper.t | 10 +- 8 files changed, 220 insertions(+), 87 deletions(-) diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 30bbf4a8b..b185c93f9 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1336,7 +1336,7 @@ sub can_stay_alive { # PublicInbox::DS::post_loop_do cb $n + scalar(keys(%PublicInbox::DS::AWAIT_PIDS)); } -sub clear_tmp_xh { # awaitpid cb, called in lei worker +sub clear_tmp_xh { # awaitpid cb, called in lei-daemon my ($pid) = @_; my ($e, $s, @m) = ($? >> 8, $? & 127); push @m, " status=$e" if $e && $e != 66; # EX_NOINPUT ok @@ -1345,12 +1345,15 @@ sub clear_tmp_xh { # awaitpid cb, called in lei worker undef $PublicInbox::Search::XHC; } -sub spawn_tmp_xh { # called in lei worker processes +sub spawn_tmp_xh { # called in top-level lei-daemon $PublicInbox::Search::XHC //= eval { my $xhc = PublicInbox::XapClient::start_helper(qw(-l -j0)); awaitpid($xhc->{io}->attached_pid, \&clear_tmp_xh) if $xhc; $xhc; - } || warn("E: $@ (will attempt to continue w/o Xapian helper)\n"); + } || do { + warn("E: $@ (will attempt to continue w/o Xapian helper)\n"); + undef; + } } # lei(1) calls this when it can't connect @@ -1411,6 +1414,7 @@ sub lazy_start { $exit_code //= eval("POSIX::SIG$_[0] + 128") if @_; $dir_idle->close if $dir_idle; # EPOLL_CTL_DEL $dir_idle = undef; # let RC take care of it + $PublicInbox::Search::XHC = undef; # ditto eval 'PublicInbox::LeiNoteEvent::flush_task()'; my $lis = $pil or exit($exit_code // 0); # closing eof_p triggers \&noop wakeup diff --git a/lib/PublicInbox/LeiQuery.pm b/lib/PublicInbox/LeiQuery.pm index 787cd3a51..402edab66 100644 --- a/lib/PublicInbox/LeiQuery.pm +++ b/lib/PublicInbox/LeiQuery.pm @@ -76,6 +76,7 @@ sub cfg_ext ($) { sub lxs_prepare { my ($self) = @_; + $self->spawn_tmp_xh; require PublicInbox::LeiXSearch; # prepare any number of LeiXSearch || LeiSearch || Inbox || URL my $lxs = $self->{lxs} = PublicInbox::LeiXSearch->new; diff --git a/lib/PublicInbox/LeiSucks.pm b/lib/PublicInbox/LeiSucks.pm index fcaf48d66..f595d46ef 100644 --- a/lib/PublicInbox/LeiSucks.pm +++ b/lib/PublicInbox/LeiSucks.pm @@ -55,8 +55,7 @@ sub lei_sucks { } else { push @out, "Xapian not available: $@\n"; } - require PublicInbox::XapClient; - my $xhc = PublicInbox::XapClient::start_helper(qw(-l -j0)); + my $xhc = $lei->spawn_tmp_xh; push(@out, $xhc ? "xap_helper: $xhc->{impl}\n" : "xap_helper not available\n"); push @out, "public-inbox blob OIDs of loaded features:\n"; diff --git a/lib/PublicInbox/LeiUp.pm b/lib/PublicInbox/LeiUp.pm index 3fe7841c5..c586d64d6 100644 --- a/lib/PublicInbox/LeiUp.pm +++ b/lib/PublicInbox/LeiUp.pm @@ -31,6 +31,7 @@ sub up1 ($$) { my $lse = $lei->{lse} // die 'BUG: {lse} missing'; my $rawstr = $lss->{-cfg}->{'lei.internal.rawstr'} // (scalar(@$q) == 1 && substr($q->[0], -1) eq "\n"); + $lei->spawn_tmp_xh; # for query_approxidate|query_argv_to_string if ($rawstr) { die < 1; $f: lei.q has multiple values (@$q) (out=$out) diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index c480ddf4f..b781c77f6 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -192,7 +192,6 @@ sub query_one_mset { # for --threads and l2m w/o sort ref($min) and return warn("$maxk=$min has multiple values\n"); ($min =~ /[^0-9]/) and return warn("$maxk=$min not numeric\n"); my $first_ids; - $lei->spawn_tmp_xh; # per-worker do { $mset = eval { sync_mset $srch, $mo }; return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl @@ -255,7 +254,6 @@ sub query_combined_mset { # non-parallel for non-"--threads" users attach_external($self, $loc); } my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); - $lei->spawn_tmp_xh; # per-worker do { $mset = eval { sync_mset $self, $mo }; return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index c37dcf3b9..89f21faec 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -407,24 +407,36 @@ sub date_parse_finalize { $1 eq '%s' ? $t : strftime($1, gmtime($t))/sge; } -# n.b. argv never has NUL, though we'll need to filter it out +# Try to make raw lei CLI args work more like the HTML element. +# That is, that phrases quoted to keep the CLI happy (e.g. sh/bash/zsh) +# don't need extra (nested) quoting to make Xapian see the quotes for a phrase. +# n.b. argv never has NULL, though we'll need to filter it out # if this $argv isn't from a command execution sub query_argv_to_string { my (undef, $git, $argv) = @_; - return "@$argv" if $XHC && $XHC->{impl} eq 'PublicInbox::XapHelperCxx'; - my $to_parse; - my $tmp = join(' ', map {; - if (s!\b(d|rt|dt):(\S+)\z!date_parse_prepare( + if ($XHC && $XHC->{impl} eq 'PublicInbox::XapHelperCxx') { + join ' ', map {; + if (/\s/) { # n.b. $1 may be `(' + s/(.*?)\b(\w+:)// ? qq{$1$2"$_"} : qq{"$_"}; + } else { + $_ + } + } @$argv; + } else { + my $to_parse; + my $tmp = join ' ', map {; + if (s!\b(d|rt|dt):(\S+)\z!date_parse_prepare( $to_parse //= [], $1, $2)!sge) { - $_; - } elsif (/\s/) { - s/(.*?)\b(\w+:)// ? qq{$1$2"$_"} : qq{"$_"}; - } else { - $_ - } - } @$argv); - date_parse_finalize($git, $to_parse, $tmp) if $to_parse; - $tmp + $_; + } elsif (/\s/) { # n.b. $1 may be `(' + s/(.*?)\b(\w+:)// ? qq{$1$2"$_"} : qq{"$_"}; + } else { + $_ + } + } @$argv; + date_parse_finalize($git, $to_parse, $tmp) if $to_parse; + $tmp + } } # this is for the WWW "q=" query parameter and "lei q --stdin" diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 3e108b957..3fe9d65b7 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -10,6 +10,12 @@ * instead of their equivalents in the C++ stdlib :P * Everything here is an unstable internal API of public-inbox and * NOT intended for ordinary users; only public-inbox hackers + * + * As of public-inbox 2.2+, lei forks a subprocess on every request + * to scale to local demand. Public-facing use is prefork-only, + * since we expect public-facing servers to have dedicated resources + * allocated for it and local user software to have more dynamic + * resource usage. */ #ifndef _ALL_SOURCE # define _ALL_SOURCE @@ -570,6 +576,7 @@ again: } else if (r < 0) { switch (errno) { case EINTR: goto again; + case EAGAIN: exit(0); // spurious wakeup for lei dynfork case EBADF: if (sock_fd < 0) exit(0); // fall-through default: err(EXIT_FAILURE, "recvmsg"); @@ -927,9 +934,30 @@ static void reopen_logs(void) } } -static void recv_loop(void) // worker process loop +static bool recv_once(void) { static char rbuf[4096 * 33]; // per-process + size_t len = sizeof(rbuf); + CLEANUP_REQ struct req req = {}; + + if (!recv_req(&req, rbuf, &len)) + return false; + if (req.fp[1]) + stderr_set(req.fp[1]); + + // MY_ARG_MAX limits the return value of SPLIT2ARGV + req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len); + dispatch(&req); + ERR_CLOSE(req.fp[0], 0); + if (req.fp[1]) { + stderr_restore(); + ERR_CLOSE(req.fp[1], 0); + } + return true; +} + +static void recv_loop(void) // worker process loop +{ struct sigaction sa = {}; sa.sa_handler = sigw; @@ -937,22 +965,8 @@ static void recv_loop(void) // worker process loop CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL)); while (sock_fd == 0) { - size_t len = sizeof(rbuf); - CLEANUP_REQ struct req req = {}; - - if (!recv_req(&req, rbuf, &len)) + if (!recv_once()) continue; - if (req.fp[1]) - stderr_set(req.fp[1]); - - // MY_ARG_MAX limits the return value of SPLIT2ARGV - req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len); - dispatch(&req); - ERR_CLOSE(req.fp[0], 0); - if (req.fp[1]) { - stderr_restore(); - ERR_CLOSE(req.fp[1], 0); - } if (worker_needs_reopen) { worker_needs_reopen = 0; reopen_logs(); @@ -966,6 +980,11 @@ static void insert_pid(pid_t pid, unsigned nr) worker_pids[nr] = pid; } +#define XSIGNAL(signame, handler) do { \ + if (signal(signame, handler) == SIG_ERR) \ + err(EXIT_FAILURE, "signal(" #signame ", " #handler ")"); \ +} while (0) + static void start_worker(unsigned nr) { pid_t pid = fork(); @@ -977,12 +996,9 @@ static void start_worker(unsigned nr) cleanup_pids(); xclose(pipefds[0]); xclose(pipefds[1]); - if (signal(SIGCHLD, SIG_DFL) == SIG_ERR) - err(EXIT_FAILURE, "signal CHLD"); - if (signal(SIGTTIN, SIG_IGN) == SIG_ERR) - err(EXIT_FAILURE, "signal TTIN"); - if (signal(SIGTTOU, SIG_IGN) == SIG_ERR) - err(EXIT_FAILURE, "signal TTIN"); + XSIGNAL(SIGCHLD, SIG_DFL); + XSIGNAL(SIGTTIN, SIG_IGN); + XSIGNAL(SIGTTOU, SIG_IGN); recv_loop(); exit(0); } @@ -1052,7 +1068,7 @@ static void sigp(int sig) // parent signal handler _exit(EXIT_FAILURE); } -static void reaped_worker(pid_t pid, int st) +static void reaped_prefork_worker(pid_t pid, int st) { unsigned long nr = 0; for (; nr < nworker_hwm; nr++) { @@ -1073,13 +1089,25 @@ static void reaped_worker(pid_t pid, int st) start_workers(); } +static void reaped_dynfork_worker(pid_t pid, int st) // lei-only +{ + if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT) + alive = false; + else if (st) + warnx("worker died $?=%d alive=%d", st, (int)alive); +} + static void do_sigchld(void) { while (1) { int st; pid_t pid = waitpid(-1, &st, WNOHANG); + if (pid > 0) { - reaped_worker(pid, st); + if (lei) + reaped_dynfork_worker(pid, st); + else + reaped_prefork_worker(pid, st); } else if (pid == 0) { return; } else { @@ -1135,25 +1163,135 @@ static size_t living_workers(void) return ret; } +static void prefork_loop(void) +{ + struct rlimit rl; + + if (getrlimit(RLIMIT_NOFILE, &rl)) + err(EXIT_FAILURE, "getrlimit"); + my_fd_max = rl.rlim_cur; + if (my_fd_max < 72) + warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max); + my_fd_max -= 64; + nworker_hwm = nworker; + worker_pids = (pid_t *)xcalloc(nworker, sizeof(pid_t)); + + start_workers(); + + while (alive || living_workers()) { + char sbuf[64]; + ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf)); + if (n < 0) { + if (errno == EINTR) continue; + err(EXIT_FAILURE, "read"); + } else if (n == 0) { + errx(EXIT_FAILURE, "read EOF"); + } + do_sigchld(); + for (ssize_t i = 0; i < n; i++) { + switch (sbuf[i]) { + case '.': break; // do_sigchld already called + case '-': do_sigttou(); break; + case '+': do_sigttin(); break; + case '#': parent_reopen_logs(); break; + default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]); + } + } + } +} + +static void xsetfl(int fd, int newflags) +{ + int fl = fcntl(fd, F_GETFL); + if (fl == -1) err(EXIT_FAILURE, "F_GETFL"); + if (fcntl(fd, F_SETFL, fl | newflags)) + err(EXIT_FAILURE, "F_SETFL"); +} + +static void dynfork_dispatch(void) // lei-only +{ + sigset_t old; + CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, &old)); + pid_t pid = fork(); + if (pid < 0) + err(EXIT_FAILURE, "fork"); + if (pid > 0) { + CHECK(int, 0, sigprocmask(SIG_SETMASK, &old, NULL)); + return; // don't even care about the PID + } + if (pid == 0) { + xclose(pipefds[0]); + xclose(pipefds[1]); + XSIGNAL(SIGCHLD, SIG_DFL); + XSIGNAL(SIGTTIN, SIG_IGN); + XSIGNAL(SIGTTOU, SIG_IGN); + XSIGNAL(SIGUSR1, SIG_DFL); + XSIGNAL(SIGTERM, SIG_DFL); + (void)recv_once(); + exit(0); + } +} + +static void dynfork_check_sigs(void) +{ + char sbuf[64]; + ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf)); + if (n < 0) { + if (errno == EAGAIN) return; // spurious wakeup + err(EXIT_FAILURE, "read"); + } else if (n == 0) { + errx(EXIT_FAILURE, "read EOF"); + } + do_sigchld(); + for (ssize_t i = 0; i < n; i++) { + switch (sbuf[i]) { + case '.': break; // do_sigchld already called + case '-': break; // ignore (TTOU|TTIN) + case '+': break; + case '#': reopen_logs(); break; + default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]); + } + } +} + +static void dynfork_loop(void) // for lei +{ + struct pollfd pfd[2]; + pfd[0].fd = sock_fd; + pfd[1].fd = pipefds[0]; + pfd[0].events = pfd[1].events = POLLIN; + + xsetfl(pipefds[0], O_NONBLOCK); + xsetfl(sock_fd, O_NONBLOCK); + my_fd_max = 128; // just keep srch_init happy + + while (alive) { + int rc = poll(pfd, MY_ARRAY_SIZE(pfd), -1); + + if (rc < 0) { + if (errno == EINTR) continue; + err(EXIT_FAILURE, "poll"); + } else { + if (pfd[0].revents & POLLIN) + dynfork_dispatch(); + if (pfd[1].revents & POLLIN) + dynfork_check_sigs(); + } + } +} + int main(int argc, char *argv[]) { int c; socklen_t slen = (socklen_t)sizeof(c); - stdout_path = getenv("STDOUT_PATH"); - stderr_path = getenv("STDERR_PATH"); - struct rlimit rl; if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen)) err(EXIT_FAILURE, "getsockopt"); if (c != SOCK_SEQPACKET) errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET"); - if (getrlimit(RLIMIT_NOFILE, &rl)) - err(EXIT_FAILURE, "getrlimit"); - my_fd_max = rl.rlim_cur; - if (my_fd_max < 72) - warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max); - my_fd_max -= 64; + stdout_path = getenv("STDOUT_PATH"); + stderr_path = getenv("STDERR_PATH"); thread_fp = new ThreadFieldProcessor(); xh_date_init(); @@ -1222,15 +1360,9 @@ int main(int argc, char *argv[]) } CHECK(int, 0, sigdelset(&workerset, SIGTERM)); CHECK(int, 0, sigdelset(&workerset, SIGCHLD)); - nworker_hwm = nworker; - worker_pids = (pid_t *)xcalloc(nworker, sizeof(pid_t)); if (pipe(pipefds)) err(EXIT_FAILURE, "pipe"); - int fl = fcntl(pipefds[1], F_GETFL); - if (fl == -1) err(EXIT_FAILURE, "F_GETFL"); - if (fcntl(pipefds[1], F_SETFL, fl | O_NONBLOCK)) - err(EXIT_FAILURE, "F_SETFL"); - + xsetfl(pipefds[1], O_NONBLOCK); CHECK(int, 0, sigdelset(&pset, SIGCHLD)); CHECK(int, 0, sigdelset(&pset, SIGTTIN)); CHECK(int, 0, sigdelset(&pset, SIGTTOU)); @@ -1246,29 +1378,9 @@ int main(int argc, char *argv[]) CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL)); CHECK(int, 0, sigprocmask(SIG_SETMASK, &pset, NULL)); - - start_workers(); - - char sbuf[64]; - while (alive || living_workers()) { - ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf)); - if (n < 0) { - if (errno == EINTR) continue; - err(EXIT_FAILURE, "read"); - } else if (n == 0) { - errx(EXIT_FAILURE, "read EOF"); - } - do_sigchld(); - for (ssize_t i = 0; i < n; i++) { - switch (sbuf[i]) { - case '.': break; // do_sigchld already called - case '-': do_sigttou(); break; - case '+': do_sigttin(); break; - case '#': parent_reopen_logs(); break; - default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]); - } - } - } - + if (lei) + dynfork_loop(); + else + prefork_loop(); return 0; } diff --git a/t/xap_helper.t b/t/xap_helper.t index 96fedbb9e..7455428dc 100644 --- a/t/xap_helper.t +++ b/t/xap_helper.t @@ -115,7 +115,12 @@ my $test = sub { $r = $doreq->($s, qw(test_inspect -d), $int[0], @ciol); my %cinfo = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> }); is($cinfo{has_threadid}, '0', 'has_threadid false for cindex'); - is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex'); + my $lei_mode = grep /\A-l\z/, @cmd; + if ($lei_mode) { + isnt $cinfo{pid}, $info{pid}, 'PID changed in lei mode'; + } else { + is $cinfo{pid}, $info{pid}, 'PID unchanged for cindex'; + } my @dump = (qw(dump_ibx -A XDFID), @ibx_shard_args, qw(13 z:0..)); $r = $doreq->($s, @dump); @@ -132,7 +137,7 @@ my $test = sub { is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported') or diag "res=$res"; - return wantarray ? ($ar, $s) : $ar if $cinfo{pid} == $pid; + return wantarray ? ($ar, $s) : $ar if $cinfo{pid} == $pid || $lei_mode; # test worker management: kill('TERM', $cinfo{pid}); @@ -201,6 +206,7 @@ SKIP: { @NO_CXX = $ENV{TEST_XH_CXX_ONLY} ? (0) : (0, 1); my $ar = $test->(@$cmd, '-j0'); $ar = $test->(@$cmd, '-j1'); + $ar = $test->(@$cmd, qw(-l)); }; require PublicInbox::CodeSearch; -- 2.47.3