From: Eric Wong Date: Wed, 5 Mar 2025 07:18:36 +0000 (+0000) Subject: lei: use C++ xap_helper if available X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=22ae93b85758b82f2e387bb347c52d2222d89cea;p=thirdparty%2Fpublic-inbox.git lei: use C++ xap_helper if available The C++ version of xap_helper allows `thread:' subqueries, non-fragile parsing of git approxidate with `d:', `dt:' and `rt:' prefixes, and possibly more features in the future not available in the SWIG or XS Xapian bindings. There's no point in lei supporting the Perl version of XapHelper since lei isn't expected to deal with abusive clients making expensive queries, so lei will only use the C++ version. We spawn xap_helper in every lei worker process to guarantee resource availability without having to resort to preforking. While pre-forking and on-demand thread||process creation was considered, I decided having a lingering xap_helper process probably wasn't worth the startup time improvement and instead prefer to minimize idle memory use. The whole implementation is a bit strange since lei support was an after-thought for xap_helper and we wrap the async_mset API to make it synchronous once again to reduce the code impact for code shared with public-facing daemons. Finally, test_lei in TestCommon gets tweaked to avoid repeatedly rebuilding in a new directory with every test_lei use in our test suite. --- diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 0a779c4fb..1446ad500 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -20,6 +20,7 @@ use Fcntl qw(SEEK_SET); use PublicInbox::Config; use PublicInbox::Syscall qw(EPOLLIN); use PublicInbox::Spawn qw(run_wait popen_rd run_qx); +use PublicInbox::DS qw(awaitpid); eval { require PublicInbox::Lg2 }; # placate FindBin use PublicInbox::Lock; use PublicInbox::Eml; @@ -28,7 +29,9 @@ use PublicInbox::Import; use PublicInbox::ContentHash qw(git_sha); use PublicInbox::OnDestroy; use PublicInbox::IPC; +use PublicInbox::Search; use PublicInbox::IO qw(poll_in); +use PublicInbox::XapHelperCxx; use Time::HiRes qw(stat); # ctime comparisons for config cache use File::Path (); use File::Spec; @@ -1330,10 +1333,35 @@ sub can_stay_alive { # PublicInbox::DS::post_loop_do cb $n + scalar(keys(%PublicInbox::DS::AWAIT_PIDS)); } +sub clear_tmp_xh { # awaitpid cb, called in lei worker + my ($pid) = @_; + my ($e, $s, @m) = ($? >> 8, $? & 127); + push @m, " status=$e" if $e && $e != 66; # EX_NOINPUT ok + push @m, " signal=$s" if $s; + warn "W: xap_helper PID:$pid died: ", @m, "\n" if @m; + undef $PublicInbox::Search::XHC; +} + +sub spawn_tmp_xh { # called in lei worker processes + $PublicInbox::Search::XHC //= eval { + my $xhc = PublicInbox::XapClient::start_helper(qw(-l -j0)); + awaitpid($xhc->{io}->attached_pid, \&clear_tmp_xh) if $xhc; + $xhc; + } || warn("E: $@ (will attempt to continue w/o Xapian helper)\n"); +} + # lei(1) calls this when it can't connect sub lazy_start { my ($path, $errno, $narg) = @_; - local ($errors_log, $listener); + local ($errors_log, $listener, $PublicInbox::Search::XHC); + + # no point in using xap_helper w/o C++ features for local clients + my $xh_cmd = eval { PublicInbox::XapHelperCxx::cmd() }; + $PublicInbox::Search::XHC = $xh_cmd ? undef : 0; + if ($xh_cmd) { + require PublicInbox::XapClient; + require PublicInbox::XhcMset; + } my ($sock_dir) = ($path =~ m!\A(.+?)/[^/]+\z!); $errors_log = "$sock_dir/errors.log"; my $addr = pack_sockaddr_un($path); diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index b1f2fe5b9..5b47515ab 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -40,6 +40,8 @@ sub attach_external { return warn("$desc not indexed for Xapian ($@ $!)\n"); my @shards = $srch->xdb_shards_flat or return warn("$desc has no Xapian shards\n"); + my @dirs = $srch->shard_dirs or + return warn("$desc has no Xapian shard dirs\n"); if (delete $self->{xdb}) { # XXX: do we need this? # clobber existing {xdb} if amending @@ -60,9 +62,12 @@ sub attach_external { "BUG: reloaded $nr shards, expected $expect" } push @{$self->{shards_flat}}, @shards; + push @{$self->{shard_dirs}}, @dirs; push(@{$self->{shard2ibx}}, $ibxish) for (@shards); } +sub shard_dirs { @{$_[0]->{shard_dirs}} } + # returns a list of local inboxes (or count in scalar context) sub locals { @{$_[0]->{locals} // []} } @@ -73,7 +78,7 @@ sub xdb_shards_flat { @{$_[0]->{shards_flat} // []} } sub _mitem_kw { # retry_reopen callback my ($srch, $smsg, $mitem, $flagged) = @_; - my $doc = $mitem->get_document; + my $doc = $srch->{xdb}->get_document($mitem->get_docid); my $kw = xap_terms('K', $doc); $kw->{flagged} = 1 if $flagged; my @L = xap_terms('L', $doc); @@ -147,6 +152,20 @@ sub mset_progress { } } +sub sync_mset_cb { # async_mset cb + my ($ret, $mset, $err) = @_; + @$ret = ($mset, $err); +} + +sub sync_mset ($$) { + my ($srch, $mo) = @_; + local $PublicInbox::DS::in_loop; # force synchronous + $srch->async_mset($mo->{qstr}, $mo, \&sync_mset_cb, my $ret = []); + my ($mset, $err) = @$ret; + die $err if $err; + $mset; +} + sub query_one_mset { # for --threads and l2m w/o sort my ($self, $ibxish) = @_; my $lei = $self->{lei}; @@ -170,8 +189,9 @@ sub query_one_mset { # for --threads and l2m w/o sort ref($min) and return warn("$maxk=$min has multiple values\n"); ($min =~ /[^0-9]/) and return warn("$maxk=$min not numeric\n"); my $first_ids; + $lei->spawn_tmp_xh; # per-worker do { - $mset = eval { $srch->mset($mo->{qstr}, $mo) }; + $mset = eval { sync_mset $srch, $mo }; return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl mset_progress($lei, $dir, $mo->{offset} + $mset->size, $mset->get_matches_estimated); @@ -232,8 +252,9 @@ sub query_combined_mset { # non-parallel for non-"--threads" users attach_external($self, $loc); } my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei); + $lei->spawn_tmp_xh; # per-worker do { - $mset = eval { $self->mset($mo->{qstr}, $mo) }; + $mset = eval { sync_mset $self, $mo }; return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl mset_progress($lei, 'xsearch', $mo->{offset} + $mset->size, $mset->get_matches_estimated); diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index 535d6b83e..afcaabc7f 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -750,7 +750,7 @@ sub xh_args { # prep getopt args to feed to xap_helper.h socket # TODO: arbitrary header indexing goes here [ sort keys %dedupe ]; }; - ((map { ('-d', $_) } shard_dirs($self)), map { ('-Q', $_) } @$apfx); + ((map { ('-d', $_) } $self->shard_dirs), map { ('-Q', $_) } @$apfx); } sub docids_by_postlist ($$) { diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index a9967735f..162cefed2 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -747,9 +747,9 @@ SKIP: { require PublicInbox::Spawn; require PublicInbox::Config; require File::Path; - eval { # use XDG_CACHE_HOME, first: + state $xh_cmd = eval { # use XDG_CACHE_HOME, first: require PublicInbox::XapHelperCxx; - PublicInbox::XapHelperCxx::check_build(); + PublicInbox::XapHelperCxx::cmd(); }; local %ENV = %ENV; delete $ENV{XDG_DATA_HOME}; @@ -773,6 +773,13 @@ SKIP: { my $home = "$tmpdir/lei-daemon"; mkdir($home, 0700); local $ENV{HOME} = $home; + if ($xh_cmd && $xh_cmd->[0] =~ m!\A(.+)/+[^/]+\z!) { + # avoid repeated rebuilds by copying + my $src = $1; + my $dst = "$home/.cache/public-inbox/jaot"; + File::Path::make_path($dst); + xsys_e([qw(/bin/cp -Rp), $src, $dst ]); + } my $persist; if ($persist_xrd && !$test_opt->{daemon_only}) { $persist = $daemon_xrd = $persist_xrd; diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm index 786982d87..b54e9be56 100644 --- a/lib/PublicInbox/XapClient.pm +++ b/lib/PublicInbox/XapClient.pm @@ -34,6 +34,7 @@ sub start_helper (@) { my $env; my $cmd = eval "require $cls; ${cls}::cmd()"; if ($@) { # fall back to Perl + XS|SWIG + return if "@argv" =~ /\b-l\b/; # no point w/o C++ in lei $cls = 'PublicInbox::XapHelper'; # ensure the child process has the same @INC we do: $env = { PERL5LIB => join(':', @INC) }; diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 816f7df50..f6da52fb7 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -146,6 +146,7 @@ static long my_fd_max, shard_nfd; static volatile int sock_fd = STDIN_FILENO; static sigset_t fullset, workerset; static bool alive = true; +static bool lei; // support kw: and L: prefixes, FLAG_PHRASE always #if STDERR_ASSIGNABLE static FILE *orig_err = stderr; #endif @@ -614,11 +615,11 @@ static void srch_init(struct req *req) i = 0; try { srch->db = new Xapian::Database(req->dirv[i]); - if (is_chert(req->dirv[0])) + if (!lei && is_chert(req->dirv[0])) srch->qp_flags &= ~FLAG_PHRASE; for (i = 1; i < req->dirc; i++) { const char *dir = req->dirv[i]; - if (srch->qp_flags & FLAG_PHRASE && + if (!lei && srch->qp_flags & FLAG_PHRASE && is_chert(dir)) srch->qp_flags &= ~FLAG_PHRASE; srch->db->add_database(Xapian::Database(dir)); @@ -653,6 +654,10 @@ static void srch_init(struct req *req) } else { qp_init_mail_search(srch->qp); // Search.pm srch->qp->add_boolean_prefix("thread", thread_fp); + if (lei) { + srch->qp->add_boolean_prefix("kw", "K"); + srch->qp->add_boolean_prefix("L", "L"); + } } } @@ -1101,7 +1106,7 @@ int main(int argc, char *argv[]) if (my_setlinebuf(stderr)) err(EXIT_FAILURE, "setlinebuf(stderr)"); // not using -W like Daemon.pm, since -W is reserved (glibc) - while ((c = getopt(argc, argv, "j:")) != -1) { + while ((c = getopt(argc, argv, "lj:")) != -1) { char *end; switch (c) { @@ -1110,6 +1115,9 @@ int main(int argc, char *argv[]) if (*end != 0 || nworker > WORKER_MAX) errx(EXIT_FAILURE, "-j %s invalid", optarg); break; + case 'l': + lei = true; + break; case ':': errx(EXIT_FAILURE, "missing argument: `-%c'", optopt); case '?':