]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
lei: C++ xap_helper forks dynamically master
authorEric Wong <e@80x24.org>
Thu, 5 Feb 2026 03:47:17 +0000 (03:47 +0000)
committerEric Wong <e@80x24.org>
Fri, 6 Feb 2026 22:17:25 +0000 (22:17 +0000)
Rework xap_helper for lei to prefork before dynamically forking
a new worker on demand for every request.  We do this to ensure
the C++ xap_helper is usable ASAP and enable us to reliably skip
approxidate handling in Perl (and thus rely on our integration
of the C approxidate from git(1) with Xapian's QueryParser API).

Previously, we would v?fork+execve for every query, but it was
open to TOCTOU errors for deciding whether or not to do
approxidate processing in Perl or (C-flavored) C++.  Using
fexecve(2) was also considered, but that's not available from
pure Perl and I'm still hesitant to make Inline::C or XS a hard
requirement for us.

Public-facing xap_helper usage remains unchanged, since it's
expected public-facing daemons have dedicated resources
preconfigured while local software such as lei can be more
dynamic and flexible in its resource use.

There is no change to the Perl + (XS|SWIG) XapHelper
implementation, since it provides no benefit to lei users.

lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiQuery.pm
lib/PublicInbox/LeiSucks.pm
lib/PublicInbox/LeiUp.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/Search.pm
lib/PublicInbox/xap_helper.h
t/xap_helper.t

index 30bbf4a8b9e5325b2e09efc107d479fb6e7cb638..b185c93f99d0b2b3b59f2e89350c6f761f551dc1 100644 (file)
@@ -1336,7 +1336,7 @@ sub can_stay_alive { # PublicInbox::DS::post_loop_do cb
        $n + scalar(keys(%PublicInbox::DS::AWAIT_PIDS));
 }
 
-sub clear_tmp_xh { # awaitpid cb, called in lei worker
+sub clear_tmp_xh { # awaitpid cb, called in lei-daemon
        my ($pid) = @_;
        my ($e, $s, @m) = ($? >> 8, $? & 127);
        push @m, " status=$e" if $e && $e != 66; # EX_NOINPUT ok
@@ -1345,12 +1345,15 @@ sub clear_tmp_xh { # awaitpid cb, called in lei worker
        undef $PublicInbox::Search::XHC;
 }
 
-sub spawn_tmp_xh { # called in lei worker processes
+sub spawn_tmp_xh { # called in top-level lei-daemon
        $PublicInbox::Search::XHC //= eval {
                my $xhc = PublicInbox::XapClient::start_helper(qw(-l -j0));
                awaitpid($xhc->{io}->attached_pid, \&clear_tmp_xh) if $xhc;
                $xhc;
-       } || warn("E: $@ (will attempt to continue w/o Xapian helper)\n");
+       } || do {
+               warn("E: $@ (will attempt to continue w/o Xapian helper)\n");
+               undef;
+       }
 }
 
 # lei(1) calls this when it can't connect
@@ -1411,6 +1414,7 @@ sub lazy_start {
                        $exit_code //= eval("POSIX::SIG$_[0] + 128") if @_;
                        $dir_idle->close if $dir_idle; # EPOLL_CTL_DEL
                        $dir_idle = undef; # let RC take care of it
+                       $PublicInbox::Search::XHC = undef; # ditto
                        eval 'PublicInbox::LeiNoteEvent::flush_task()';
                        my $lis = $pil or exit($exit_code // 0);
                        # closing eof_p triggers \&noop wakeup
index 787cd3a51a9b77cedca73923cc818650811db3b4..402edab664293b60816bba52165ea3b3d7104abb 100644 (file)
@@ -76,6 +76,7 @@ sub cfg_ext ($) {
 
 sub lxs_prepare {
        my ($self) = @_;
+       $self->spawn_tmp_xh;
        require PublicInbox::LeiXSearch;
        # prepare any number of LeiXSearch || LeiSearch || Inbox || URL
        my $lxs = $self->{lxs} = PublicInbox::LeiXSearch->new;
index fcaf48d6632a160ca4179f0fccfdee20d713a596..f595d46ef7b1b3aab2e51c14bcf3718a60821eed 100644 (file)
@@ -55,8 +55,7 @@ sub lei_sucks {
        } else {
                push @out, "Xapian not available: $@\n";
        }
-       require PublicInbox::XapClient;
-       my $xhc = PublicInbox::XapClient::start_helper(qw(-l -j0));
+       my $xhc = $lei->spawn_tmp_xh;
        push(@out, $xhc ? "xap_helper: $xhc->{impl}\n"
                        : "xap_helper not available\n");
        push @out, "public-inbox blob OIDs of loaded features:\n";
index 3fe7841c54a3d7ab724e6f0cdf4ae9cbf92c05a7..c586d64d6a595ba0ad24c9244ed4c9b57ca25264 100644 (file)
@@ -31,6 +31,7 @@ sub up1 ($$) {
        my $lse = $lei->{lse} // die 'BUG: {lse} missing';
        my $rawstr = $lss->{-cfg}->{'lei.internal.rawstr'} //
                (scalar(@$q) == 1 && substr($q->[0], -1) eq "\n");
+       $lei->spawn_tmp_xh; # for query_approxidate|query_argv_to_string
        if ($rawstr) {
                die <<EOM if scalar(@$q) > 1;
 $f: lei.q has multiple values (@$q) (out=$out)
index c480ddf4f4dc1b03af5f2ca510502d98da0b7991..b781c77f6d966a0f33984418b8cdf232a8c5c8bb 100644 (file)
@@ -192,7 +192,6 @@ sub query_one_mset { # for --threads and l2m w/o sort
        ref($min) and return warn("$maxk=$min has multiple values\n");
        ($min =~ /[^0-9]/) and return warn("$maxk=$min not numeric\n");
        my $first_ids;
-       $lei->spawn_tmp_xh; # per-worker
        do {
                $mset = eval { sync_mset $srch, $mo };
                return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl
@@ -255,7 +254,6 @@ sub query_combined_mset { # non-parallel for non-"--threads" users
                attach_external($self, $loc);
        }
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
-       $lei->spawn_tmp_xh; # per-worker
        do {
                $mset = eval { sync_mset $self, $mo };
                return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl
index c37dcf3b99b1635f98f6b5bf2280919ec4c252e3..89f21faeca414de7d359aa9f910613311d11b01a 100644 (file)
@@ -407,24 +407,36 @@ sub date_parse_finalize {
                $1 eq '%s' ? $t : strftime($1, gmtime($t))/sge;
 }
 
-# n.b. argv never has NUL, though we'll need to filter it out
+# Try to make raw lei CLI args work more like the HTML <input> element.
+# That is, that phrases quoted to keep the CLI happy (e.g. sh/bash/zsh)
+# don't need extra (nested) quoting to make Xapian see the quotes for a phrase.
+# n.b. argv never has NULL, though we'll need to filter it out
 # if this $argv isn't from a command execution
 sub query_argv_to_string {
        my (undef, $git, $argv) = @_;
-       return "@$argv" if $XHC && $XHC->{impl} eq 'PublicInbox::XapHelperCxx';
-       my $to_parse;
-       my $tmp = join(' ', map {;
-               if (s!\b(d|rt|dt):(\S+)\z!date_parse_prepare(
+       if ($XHC && $XHC->{impl} eq 'PublicInbox::XapHelperCxx') {
+               join ' ', map {;
+                       if (/\s/) { # n.b. $1 may be `('
+                               s/(.*?)\b(\w+:)// ? qq{$1$2"$_"} : qq{"$_"};
+                       } else {
+                               $_
+                       }
+               } @$argv;
+       } else {
+               my $to_parse;
+               my $tmp = join ' ', map {;
+                       if (s!\b(d|rt|dt):(\S+)\z!date_parse_prepare(
                                                $to_parse //= [], $1, $2)!sge) {
-                       $_;
-               } elsif (/\s/) {
-                       s/(.*?)\b(\w+:)// ? qq{$1$2"$_"} : qq{"$_"};
-               } else {
-                       $_
-               }
-       } @$argv);
-       date_parse_finalize($git, $to_parse, $tmp) if $to_parse;
-       $tmp
+                               $_;
+                       } elsif (/\s/) { # n.b. $1 may be `('
+                               s/(.*?)\b(\w+:)// ? qq{$1$2"$_"} : qq{"$_"};
+                       } else {
+                               $_
+                       }
+               } @$argv;
+               date_parse_finalize($git, $to_parse, $tmp) if $to_parse;
+               $tmp
+       }
 }
 
 # this is for the WWW "q=" query parameter and "lei q --stdin"
index 3e108b957fc0e851fb6e1d9e7c4f3b717aa0ab3a..3fe9d65b7d812f1fb29b1daec9ecc03ad6843307 100644 (file)
  * instead of their equivalents in the C++ stdlib :P
  * Everything here is an unstable internal API of public-inbox and
  * NOT intended for ordinary users; only public-inbox hackers
+ *
+ * As of public-inbox 2.2+, lei forks a subprocess on every request
+ * to scale to local demand.  Public-facing use is prefork-only,
+ * since we expect public-facing servers to have dedicated resources
+ * allocated for it and local user software to have more dynamic
+ * resource usage.
  */
 #ifndef _ALL_SOURCE
 #      define _ALL_SOURCE
@@ -570,6 +576,7 @@ again:
        } else if (r < 0) {
                switch (errno) {
                case EINTR: goto again;
+               case EAGAIN: exit(0); // spurious wakeup for lei dynfork
                case EBADF: if (sock_fd < 0) exit(0);
                        // fall-through
                default: err(EXIT_FAILURE, "recvmsg");
@@ -927,9 +934,30 @@ static void reopen_logs(void)
        }
 }
 
-static void recv_loop(void) // worker process loop
+static bool recv_once(void)
 {
        static char rbuf[4096 * 33]; // per-process
+       size_t len = sizeof(rbuf);
+       CLEANUP_REQ struct req req = {};
+
+       if (!recv_req(&req, rbuf, &len))
+               return false;
+       if (req.fp[1])
+               stderr_set(req.fp[1]);
+
+       // MY_ARG_MAX limits the return value of SPLIT2ARGV
+       req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
+       dispatch(&req);
+       ERR_CLOSE(req.fp[0], 0);
+       if (req.fp[1]) {
+               stderr_restore();
+               ERR_CLOSE(req.fp[1], 0);
+       }
+       return true;
+}
+
+static void recv_loop(void) // worker process loop
+{
        struct sigaction sa = {};
        sa.sa_handler = sigw;
 
@@ -937,22 +965,8 @@ static void recv_loop(void) // worker process loop
        CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
 
        while (sock_fd == 0) {
-               size_t len = sizeof(rbuf);
-               CLEANUP_REQ struct req req = {};
-
-               if (!recv_req(&req, rbuf, &len))
+               if (!recv_once())
                        continue;
-               if (req.fp[1])
-                       stderr_set(req.fp[1]);
-
-               // MY_ARG_MAX limits the return value of SPLIT2ARGV
-               req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
-               dispatch(&req);
-               ERR_CLOSE(req.fp[0], 0);
-               if (req.fp[1]) {
-                       stderr_restore();
-                       ERR_CLOSE(req.fp[1], 0);
-               }
                if (worker_needs_reopen) {
                        worker_needs_reopen = 0;
                        reopen_logs();
@@ -966,6 +980,11 @@ static void insert_pid(pid_t pid, unsigned nr)
        worker_pids[nr] = pid;
 }
 
+#define XSIGNAL(signame, handler) do { \
+       if (signal(signame, handler) == SIG_ERR) \
+               err(EXIT_FAILURE, "signal(" #signame ", " #handler ")"); \
+} while (0)
+
 static void start_worker(unsigned nr)
 {
        pid_t pid = fork();
@@ -977,12 +996,9 @@ static void start_worker(unsigned nr)
                cleanup_pids();
                xclose(pipefds[0]);
                xclose(pipefds[1]);
-               if (signal(SIGCHLD, SIG_DFL) == SIG_ERR)
-                       err(EXIT_FAILURE, "signal CHLD");
-               if (signal(SIGTTIN, SIG_IGN) == SIG_ERR)
-                       err(EXIT_FAILURE, "signal TTIN");
-               if (signal(SIGTTOU, SIG_IGN) == SIG_ERR)
-                       err(EXIT_FAILURE, "signal TTIN");
+               XSIGNAL(SIGCHLD, SIG_DFL);
+               XSIGNAL(SIGTTIN, SIG_IGN);
+               XSIGNAL(SIGTTOU, SIG_IGN);
                recv_loop();
                exit(0);
        }
@@ -1052,7 +1068,7 @@ static void sigp(int sig) // parent signal handler
        _exit(EXIT_FAILURE);
 }
 
-static void reaped_worker(pid_t pid, int st)
+static void reaped_prefork_worker(pid_t pid, int st)
 {
        unsigned long nr = 0;
        for (; nr < nworker_hwm; nr++) {
@@ -1073,13 +1089,25 @@ static void reaped_worker(pid_t pid, int st)
                start_workers();
 }
 
+static void reaped_dynfork_worker(pid_t pid, int st) // lei-only
+{
+       if (WIFEXITED(st) && WEXITSTATUS(st) == EX_NOINPUT)
+               alive = false;
+       else if (st)
+               warnx("worker died $?=%d alive=%d", st, (int)alive);
+}
+
 static void do_sigchld(void)
 {
        while (1) {
                int st;
                pid_t pid = waitpid(-1, &st, WNOHANG);
+
                if (pid > 0) {
-                       reaped_worker(pid, st);
+                       if (lei)
+                               reaped_dynfork_worker(pid, st);
+                       else
+                               reaped_prefork_worker(pid, st);
                } else if (pid == 0) {
                        return;
                } else {
@@ -1135,25 +1163,135 @@ static size_t living_workers(void)
        return ret;
 }
 
+static void prefork_loop(void)
+{
+       struct rlimit rl;
+
+       if (getrlimit(RLIMIT_NOFILE, &rl))
+               err(EXIT_FAILURE, "getrlimit");
+       my_fd_max = rl.rlim_cur;
+       if (my_fd_max < 72)
+               warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max);
+       my_fd_max -= 64;
+       nworker_hwm = nworker;
+       worker_pids = (pid_t *)xcalloc(nworker, sizeof(pid_t));
+
+       start_workers();
+
+       while (alive || living_workers()) {
+               char sbuf[64];
+               ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf));
+               if (n < 0) {
+                       if (errno == EINTR) continue;
+                       err(EXIT_FAILURE, "read");
+               } else if (n == 0) {
+                       errx(EXIT_FAILURE, "read EOF");
+               }
+               do_sigchld();
+               for (ssize_t i = 0; i < n; i++) {
+                       switch (sbuf[i]) {
+                       case '.': break; // do_sigchld already called
+                       case '-': do_sigttou(); break;
+                       case '+': do_sigttin(); break;
+                       case '#': parent_reopen_logs(); break;
+                       default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
+                       }
+               }
+       }
+}
+
+static void xsetfl(int fd, int newflags)
+{
+       int fl = fcntl(fd, F_GETFL);
+       if (fl == -1) err(EXIT_FAILURE, "F_GETFL");
+       if (fcntl(fd, F_SETFL, fl | newflags))
+               err(EXIT_FAILURE, "F_SETFL");
+}
+
+static void dynfork_dispatch(void) // lei-only
+{
+       sigset_t old;
+       CHECK(int, 0, sigprocmask(SIG_SETMASK, &fullset, &old));
+       pid_t pid = fork();
+       if (pid < 0)
+               err(EXIT_FAILURE, "fork");
+       if (pid > 0) {
+               CHECK(int, 0, sigprocmask(SIG_SETMASK, &old, NULL));
+               return; // don't even care about the PID
+       }
+       if (pid == 0) {
+               xclose(pipefds[0]);
+               xclose(pipefds[1]);
+               XSIGNAL(SIGCHLD, SIG_DFL);
+               XSIGNAL(SIGTTIN, SIG_IGN);
+               XSIGNAL(SIGTTOU, SIG_IGN);
+               XSIGNAL(SIGUSR1, SIG_DFL);
+               XSIGNAL(SIGTERM, SIG_DFL);
+               (void)recv_once();
+               exit(0);
+       }
+}
+
+static void dynfork_check_sigs(void)
+{
+       char sbuf[64];
+       ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf));
+       if (n < 0) {
+               if (errno == EAGAIN) return; // spurious wakeup
+               err(EXIT_FAILURE, "read");
+       } else if (n == 0) {
+               errx(EXIT_FAILURE, "read EOF");
+       }
+       do_sigchld();
+       for (ssize_t i = 0; i < n; i++) {
+               switch (sbuf[i]) {
+               case '.': break; // do_sigchld already called
+               case '-': break; // ignore (TTOU|TTIN)
+               case '+': break;
+               case '#': reopen_logs(); break;
+               default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
+               }
+       }
+}
+
+static void dynfork_loop(void) // for lei
+{
+       struct pollfd pfd[2];
+       pfd[0].fd = sock_fd;
+       pfd[1].fd = pipefds[0];
+       pfd[0].events = pfd[1].events = POLLIN;
+
+       xsetfl(pipefds[0], O_NONBLOCK);
+       xsetfl(sock_fd, O_NONBLOCK);
+       my_fd_max = 128; // just keep srch_init happy
+
+       while (alive) {
+               int rc = poll(pfd, MY_ARRAY_SIZE(pfd), -1);
+
+               if (rc < 0) {
+                       if (errno == EINTR) continue;
+                       err(EXIT_FAILURE, "poll");
+               } else {
+                       if (pfd[0].revents & POLLIN)
+                               dynfork_dispatch();
+                       if (pfd[1].revents & POLLIN)
+                               dynfork_check_sigs();
+               }
+       }
+}
+
 int main(int argc, char *argv[])
 {
        int c;
        socklen_t slen = (socklen_t)sizeof(c);
-       stdout_path = getenv("STDOUT_PATH");
-       stderr_path = getenv("STDERR_PATH");
-       struct rlimit rl;
 
        if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen))
                err(EXIT_FAILURE, "getsockopt");
        if (c != SOCK_SEQPACKET)
                errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET");
 
-       if (getrlimit(RLIMIT_NOFILE, &rl))
-               err(EXIT_FAILURE, "getrlimit");
-       my_fd_max = rl.rlim_cur;
-       if (my_fd_max < 72)
-               warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max);
-       my_fd_max -= 64;
+       stdout_path = getenv("STDOUT_PATH");
+       stderr_path = getenv("STDERR_PATH");
 
        thread_fp = new ThreadFieldProcessor();
        xh_date_init();
@@ -1222,15 +1360,9 @@ int main(int argc, char *argv[])
        }
        CHECK(int, 0, sigdelset(&workerset, SIGTERM));
        CHECK(int, 0, sigdelset(&workerset, SIGCHLD));
-       nworker_hwm = nworker;
-       worker_pids = (pid_t *)xcalloc(nworker, sizeof(pid_t));
 
        if (pipe(pipefds)) err(EXIT_FAILURE, "pipe");
-       int fl = fcntl(pipefds[1], F_GETFL);
-       if (fl == -1) err(EXIT_FAILURE, "F_GETFL");
-       if (fcntl(pipefds[1], F_SETFL, fl | O_NONBLOCK))
-               err(EXIT_FAILURE, "F_SETFL");
-
+       xsetfl(pipefds[1], O_NONBLOCK);
        CHECK(int, 0, sigdelset(&pset, SIGCHLD));
        CHECK(int, 0, sigdelset(&pset, SIGTTIN));
        CHECK(int, 0, sigdelset(&pset, SIGTTOU));
@@ -1246,29 +1378,9 @@ int main(int argc, char *argv[])
        CHECK(int, 0, sigaction(SIGCHLD, &sa, NULL));
 
        CHECK(int, 0, sigprocmask(SIG_SETMASK, &pset, NULL));
-
-       start_workers();
-
-       char sbuf[64];
-       while (alive || living_workers()) {
-               ssize_t n = read(pipefds[0], &sbuf, sizeof(sbuf));
-               if (n < 0) {
-                       if (errno == EINTR) continue;
-                       err(EXIT_FAILURE, "read");
-               } else if (n == 0) {
-                       errx(EXIT_FAILURE, "read EOF");
-               }
-               do_sigchld();
-               for (ssize_t i = 0; i < n; i++) {
-                       switch (sbuf[i]) {
-                       case '.': break; // do_sigchld already called
-                       case '-': do_sigttou(); break;
-                       case '+': do_sigttin(); break;
-                       case '#': parent_reopen_logs(); break;
-                       default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
-                       }
-               }
-       }
-
+       if (lei)
+               dynfork_loop();
+       else
+               prefork_loop();
        return 0;
 }
index 96fedbb9e9b9960708db71375bc77866dea2943a..7455428dc808ce1ca0b69d177173b98ce38a2858 100644 (file)
@@ -115,7 +115,12 @@ my $test = sub {
        $r = $doreq->($s, qw(test_inspect -d), $int[0], @ciol);
        my %cinfo = map { split(/=/, $_, 2) } split(/ /, do { local $/; <$r> });
        is($cinfo{has_threadid}, '0', 'has_threadid false for cindex');
-       is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex');
+       my $lei_mode = grep /\A-l\z/, @cmd;
+       if ($lei_mode) {
+               isnt $cinfo{pid}, $info{pid}, 'PID changed in lei mode';
+       } else {
+               is $cinfo{pid}, $info{pid}, 'PID unchanged for cindex';
+       }
 
        my @dump = (qw(dump_ibx -A XDFID), @ibx_shard_args, qw(13 z:0..));
        $r = $doreq->($s, @dump);
@@ -132,7 +137,7 @@ my $test = sub {
        is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported') or
                diag "res=$res";
 
-       return wantarray ? ($ar, $s) : $ar if $cinfo{pid} == $pid;
+       return wantarray ? ($ar, $s) : $ar if $cinfo{pid} == $pid || $lei_mode;
 
        # test worker management:
        kill('TERM', $cinfo{pid});
@@ -201,6 +206,7 @@ SKIP: {
        @NO_CXX = $ENV{TEST_XH_CXX_ONLY} ? (0) : (0, 1);
        my $ar = $test->(@$cmd, '-j0');
        $ar = $test->(@$cmd, '-j1');
+       $ar = $test->(@$cmd, qw(-l));
 };
 
 require PublicInbox::CodeSearch;