]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
lei: use C++ xap_helper if available
authorEric Wong <e@80x24.org>
Wed, 5 Mar 2025 07:18:36 +0000 (07:18 +0000)
committerEric Wong <e@80x24.org>
Fri, 7 Mar 2025 19:23:14 +0000 (19:23 +0000)
The C++ version of xap_helper allows `thread:' subqueries,
non-fragile parsing of git approxidate with `d:', `dt:' and
`rt:' prefixes, and possibly more features in the future not
available in the SWIG or XS Xapian bindings.

There's no point in lei supporting the Perl version of XapHelper
since lei isn't expected to deal with abusive clients making
expensive queries, so lei will only use the C++ version.

We spawn xap_helper in every lei worker process to guarantee
resource availability without having to resort to preforking.

While pre-forking and on-demand thread||process creation was
considered, I decided having a lingering xap_helper process
probably wasn't worth the startup time improvement and instead
prefer to minimize idle memory use.

The whole implementation is a bit strange since lei support was
an after-thought for xap_helper and we wrap the async_mset API
to make it synchronous once again to reduce the code impact for
code shared with public-facing daemons.

Finally, test_lei in TestCommon gets tweaked to avoid repeatedly
rebuilding in a new directory with every test_lei use in our test
suite.

lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/Search.pm
lib/PublicInbox/TestCommon.pm
lib/PublicInbox/XapClient.pm
lib/PublicInbox/xap_helper.h

index 0a779c4fbfd513c1538f1a0accc22e0181cde668..1446ad500eb8ee715c6902c2363c24b9943caa75 100644 (file)
@@ -20,6 +20,7 @@ use Fcntl qw(SEEK_SET);
 use PublicInbox::Config;
 use PublicInbox::Syscall qw(EPOLLIN);
 use PublicInbox::Spawn qw(run_wait popen_rd run_qx);
+use PublicInbox::DS qw(awaitpid);
 eval { require PublicInbox::Lg2 }; # placate FindBin
 use PublicInbox::Lock;
 use PublicInbox::Eml;
@@ -28,7 +29,9 @@ use PublicInbox::Import;
 use PublicInbox::ContentHash qw(git_sha);
 use PublicInbox::OnDestroy;
 use PublicInbox::IPC;
+use PublicInbox::Search;
 use PublicInbox::IO qw(poll_in);
+use PublicInbox::XapHelperCxx;
 use Time::HiRes qw(stat); # ctime comparisons for config cache
 use File::Path ();
 use File::Spec;
@@ -1330,10 +1333,35 @@ sub can_stay_alive { # PublicInbox::DS::post_loop_do cb
        $n + scalar(keys(%PublicInbox::DS::AWAIT_PIDS));
 }
 
+sub clear_tmp_xh { # awaitpid cb, called in lei worker
+       my ($pid) = @_;
+       my ($e, $s, @m) = ($? >> 8, $? & 127);
+       push @m, " status=$e" if $e && $e != 66; # EX_NOINPUT ok
+       push @m, " signal=$s" if $s;
+       warn "W: xap_helper PID:$pid died: ", @m, "\n" if @m;
+       undef $PublicInbox::Search::XHC;
+}
+
+sub spawn_tmp_xh { # called in lei worker processes
+       $PublicInbox::Search::XHC //= eval {
+               my $xhc = PublicInbox::XapClient::start_helper(qw(-l -j0));
+               awaitpid($xhc->{io}->attached_pid, \&clear_tmp_xh) if $xhc;
+               $xhc;
+       } || warn("E: $@ (will attempt to continue w/o Xapian helper)\n");
+}
+
 # lei(1) calls this when it can't connect
 sub lazy_start {
        my ($path, $errno, $narg) = @_;
-       local ($errors_log, $listener);
+       local ($errors_log, $listener, $PublicInbox::Search::XHC);
+
+       # no point in using xap_helper w/o C++ features for local clients
+       my $xh_cmd = eval { PublicInbox::XapHelperCxx::cmd() };
+       $PublicInbox::Search::XHC = $xh_cmd ? undef : 0;
+       if ($xh_cmd) {
+               require PublicInbox::XapClient;
+               require PublicInbox::XhcMset;
+       }
        my ($sock_dir) = ($path =~ m!\A(.+?)/[^/]+\z!);
        $errors_log = "$sock_dir/errors.log";
        my $addr = pack_sockaddr_un($path);
index b1f2fe5b97917d3c95bd1f135a6ada37fad468e2..5b47515ab4d69eca3b66c018dd7acf9a6e24290e 100644 (file)
@@ -40,6 +40,8 @@ sub attach_external {
                return warn("$desc not indexed for Xapian ($@ $!)\n");
        my @shards = $srch->xdb_shards_flat or
                return warn("$desc has no Xapian shards\n");
+       my @dirs = $srch->shard_dirs or
+               return warn("$desc has no Xapian shard dirs\n");
 
        if (delete $self->{xdb}) { # XXX: do we need this?
                # clobber existing {xdb} if amending
@@ -60,9 +62,12 @@ sub attach_external {
                        "BUG: reloaded $nr shards, expected $expect"
        }
        push @{$self->{shards_flat}}, @shards;
+       push @{$self->{shard_dirs}}, @dirs;
        push(@{$self->{shard2ibx}}, $ibxish) for (@shards);
 }
 
+sub shard_dirs { @{$_[0]->{shard_dirs}} }
+
 # returns a list of local inboxes (or count in scalar context)
 sub locals { @{$_[0]->{locals} // []} }
 
@@ -73,7 +78,7 @@ sub xdb_shards_flat { @{$_[0]->{shards_flat} // []} }
 
 sub _mitem_kw { # retry_reopen callback
        my ($srch, $smsg, $mitem, $flagged) = @_;
-       my $doc = $mitem->get_document;
+       my $doc = $srch->{xdb}->get_document($mitem->get_docid);
        my $kw = xap_terms('K', $doc);
        $kw->{flagged} = 1 if $flagged;
        my @L = xap_terms('L', $doc);
@@ -147,6 +152,20 @@ sub mset_progress {
        }
 }
 
+sub sync_mset_cb { # async_mset cb
+       my ($ret, $mset, $err) = @_;
+       @$ret = ($mset, $err);
+}
+
+sub sync_mset ($$) {
+       my ($srch, $mo) = @_;
+       local $PublicInbox::DS::in_loop; # force synchronous
+       $srch->async_mset($mo->{qstr}, $mo, \&sync_mset_cb, my $ret = []);
+       my ($mset, $err) = @$ret;
+       die $err if $err;
+       $mset;
+}
+
 sub query_one_mset { # for --threads and l2m w/o sort
        my ($self, $ibxish) = @_;
        my $lei = $self->{lei};
@@ -170,8 +189,9 @@ sub query_one_mset { # for --threads and l2m w/o sort
        ref($min) and return warn("$maxk=$min has multiple values\n");
        ($min =~ /[^0-9]/) and return warn("$maxk=$min not numeric\n");
        my $first_ids;
+       $lei->spawn_tmp_xh; # per-worker
        do {
-               $mset = eval { $srch->mset($mo->{qstr}, $mo) };
+               $mset = eval { sync_mset $srch, $mo };
                return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl
                mset_progress($lei, $dir, $mo->{offset} + $mset->size,
                                $mset->get_matches_estimated);
@@ -232,8 +252,9 @@ sub query_combined_mset { # non-parallel for non-"--threads" users
                attach_external($self, $loc);
        }
        my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+       $lei->spawn_tmp_xh; # per-worker
        do {
-               $mset = eval { $self->mset($mo->{qstr}, $mo) };
+               $mset = eval { sync_mset $self, $mo };
                return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl
                mset_progress($lei, 'xsearch', $mo->{offset} + $mset->size,
                                $mset->get_matches_estimated);
index 535d6b83eccadf14c0182460ca1a4a63bebfce66..afcaabc7f9e30f1c8e1d1d7a27786c8ff31002f6 100644 (file)
@@ -750,7 +750,7 @@ sub xh_args { # prep getopt args to feed to xap_helper.h socket
                # TODO: arbitrary header indexing goes here
                [ sort keys %dedupe ];
        };
-       ((map { ('-d', $_) } shard_dirs($self)), map { ('-Q', $_) } @$apfx);
+       ((map { ('-d', $_) } $self->shard_dirs), map { ('-Q', $_) } @$apfx);
 }
 
 sub docids_by_postlist ($$) {
index a9967735fafad0276f437828629ca0c67f24cda3..162cefed2b4da3e4ff165b30c3d1344d10a0c147 100644 (file)
@@ -747,9 +747,9 @@ SKIP: {
        require PublicInbox::Spawn;
        require PublicInbox::Config;
        require File::Path;
-       eval { # use XDG_CACHE_HOME, first:
+       state $xh_cmd = eval { # use XDG_CACHE_HOME, first:
                require PublicInbox::XapHelperCxx;
-               PublicInbox::XapHelperCxx::check_build();
+               PublicInbox::XapHelperCxx::cmd();
        };
        local %ENV = %ENV;
        delete $ENV{XDG_DATA_HOME};
@@ -773,6 +773,13 @@ SKIP: {
                my $home = "$tmpdir/lei-daemon";
                mkdir($home, 0700);
                local $ENV{HOME} = $home;
+               if ($xh_cmd && $xh_cmd->[0] =~ m!\A(.+)/+[^/]+\z!) {
+                       # avoid repeated rebuilds by copying
+                       my $src = $1;
+                       my $dst = "$home/.cache/public-inbox/jaot";
+                       File::Path::make_path($dst);
+                       xsys_e([qw(/bin/cp -Rp), $src, $dst ]);
+               }
                my $persist;
                if ($persist_xrd && !$test_opt->{daemon_only}) {
                        $persist = $daemon_xrd = $persist_xrd;
index 786982d87238e32067230fce50590cd1184e6638..b54e9be56febc38f71edf8276572e16d4a1001fb 100644 (file)
@@ -34,6 +34,7 @@ sub start_helper (@) {
        my $env;
        my $cmd = eval "require $cls; ${cls}::cmd()";
        if ($@) { # fall back to Perl + XS|SWIG
+               return if "@argv" =~ /\b-l\b/; # no point w/o C++ in lei
                $cls = 'PublicInbox::XapHelper';
                # ensure the child process has the same @INC we do:
                $env = { PERL5LIB => join(':', @INC) };
index 816f7df5095d6cf7e78e78384e17b3f20cd8ca92..f6da52fb7daa405f22db2cfbc6cf81e5eb975374 100644 (file)
@@ -146,6 +146,7 @@ static long my_fd_max, shard_nfd;
 static volatile int sock_fd = STDIN_FILENO;
 static sigset_t fullset, workerset;
 static bool alive = true;
+static bool lei; // support kw: and L: prefixes, FLAG_PHRASE always
 #if STDERR_ASSIGNABLE
 static FILE *orig_err = stderr;
 #endif
@@ -614,11 +615,11 @@ static void srch_init(struct req *req)
                i = 0;
                try {
                        srch->db = new Xapian::Database(req->dirv[i]);
-                       if (is_chert(req->dirv[0]))
+                       if (!lei && is_chert(req->dirv[0]))
                                srch->qp_flags &= ~FLAG_PHRASE;
                        for (i = 1; i < req->dirc; i++) {
                                const char *dir = req->dirv[i];
-                               if (srch->qp_flags & FLAG_PHRASE &&
+                               if (!lei && srch->qp_flags & FLAG_PHRASE &&
                                                is_chert(dir))
                                        srch->qp_flags &= ~FLAG_PHRASE;
                                srch->db->add_database(Xapian::Database(dir));
@@ -653,6 +654,10 @@ static void srch_init(struct req *req)
        } else {
                qp_init_mail_search(srch->qp); // Search.pm
                srch->qp->add_boolean_prefix("thread", thread_fp);
+               if (lei) {
+                       srch->qp->add_boolean_prefix("kw", "K");
+                       srch->qp->add_boolean_prefix("L", "L");
+               }
        }
 }
 
@@ -1101,7 +1106,7 @@ int main(int argc, char *argv[])
        if (my_setlinebuf(stderr))
                err(EXIT_FAILURE, "setlinebuf(stderr)");
        // not using -W<workers> like Daemon.pm, since -W is reserved (glibc)
-       while ((c = getopt(argc, argv, "j:")) != -1) {
+       while ((c = getopt(argc, argv, "lj:")) != -1) {
                char *end;
 
                switch (c) {
@@ -1110,6 +1115,9 @@ int main(int argc, char *argv[])
                        if (*end != 0 || nworker > WORKER_MAX)
                                errx(EXIT_FAILURE, "-j %s invalid", optarg);
                        break;
+               case 'l':
+                       lei = true;
+                       break;
                case ':':
                        errx(EXIT_FAILURE, "missing argument: `-%c'", optopt);
                case '?':