use PublicInbox::Config;
use PublicInbox::Syscall qw(EPOLLIN);
use PublicInbox::Spawn qw(run_wait popen_rd run_qx);
+use PublicInbox::DS qw(awaitpid);
eval { require PublicInbox::Lg2 }; # placate FindBin
use PublicInbox::Lock;
use PublicInbox::Eml;
use PublicInbox::ContentHash qw(git_sha);
use PublicInbox::OnDestroy;
use PublicInbox::IPC;
+use PublicInbox::Search;
use PublicInbox::IO qw(poll_in);
+use PublicInbox::XapHelperCxx;
use Time::HiRes qw(stat); # ctime comparisons for config cache
use File::Path ();
use File::Spec;
$n + scalar(keys(%PublicInbox::DS::AWAIT_PIDS));
}
+sub clear_tmp_xh { # awaitpid cb, called in lei worker
+ my ($pid) = @_;
+ my ($e, $s, @m) = ($? >> 8, $? & 127);
+ push @m, " status=$e" if $e && $e != 66; # EX_NOINPUT ok
+ push @m, " signal=$s" if $s;
+ warn "W: xap_helper PID:$pid died: ", @m, "\n" if @m;
+ undef $PublicInbox::Search::XHC;
+}
+
+sub spawn_tmp_xh { # called in lei worker processes
+ $PublicInbox::Search::XHC //= eval {
+ my $xhc = PublicInbox::XapClient::start_helper(qw(-l -j0));
+ awaitpid($xhc->{io}->attached_pid, \&clear_tmp_xh) if $xhc;
+ $xhc;
+ } || warn("E: $@ (will attempt to continue w/o Xapian helper)\n");
+}
+
# lei(1) calls this when it can't connect
sub lazy_start {
my ($path, $errno, $narg) = @_;
- local ($errors_log, $listener);
+ local ($errors_log, $listener, $PublicInbox::Search::XHC);
+
+ # no point in using xap_helper w/o C++ features for local clients
+ my $xh_cmd = eval { PublicInbox::XapHelperCxx::cmd() };
+ $PublicInbox::Search::XHC = $xh_cmd ? undef : 0;
+ if ($xh_cmd) {
+ require PublicInbox::XapClient;
+ require PublicInbox::XhcMset;
+ }
my ($sock_dir) = ($path =~ m!\A(.+?)/[^/]+\z!);
$errors_log = "$sock_dir/errors.log";
my $addr = pack_sockaddr_un($path);
return warn("$desc not indexed for Xapian ($@ $!)\n");
my @shards = $srch->xdb_shards_flat or
return warn("$desc has no Xapian shards\n");
+ my @dirs = $srch->shard_dirs or
+ return warn("$desc has no Xapian shard dirs\n");
if (delete $self->{xdb}) { # XXX: do we need this?
# clobber existing {xdb} if amending
"BUG: reloaded $nr shards, expected $expect"
}
push @{$self->{shards_flat}}, @shards;
+ push @{$self->{shard_dirs}}, @dirs;
push(@{$self->{shard2ibx}}, $ibxish) for (@shards);
}
+sub shard_dirs { @{$_[0]->{shard_dirs}} }
+
# returns a list of local inboxes (or count in scalar context)
sub locals { @{$_[0]->{locals} // []} }
sub _mitem_kw { # retry_reopen callback
my ($srch, $smsg, $mitem, $flagged) = @_;
- my $doc = $mitem->get_document;
+ my $doc = $srch->{xdb}->get_document($mitem->get_docid);
my $kw = xap_terms('K', $doc);
$kw->{flagged} = 1 if $flagged;
my @L = xap_terms('L', $doc);
}
}
+sub sync_mset_cb { # async_mset cb
+ my ($ret, $mset, $err) = @_;
+ @$ret = ($mset, $err);
+}
+
+sub sync_mset ($$) {
+ my ($srch, $mo) = @_;
+ local $PublicInbox::DS::in_loop; # force synchronous
+ $srch->async_mset($mo->{qstr}, $mo, \&sync_mset_cb, my $ret = []);
+ my ($mset, $err) = @$ret;
+ die $err if $err;
+ $mset;
+}
+
sub query_one_mset { # for --threads and l2m w/o sort
my ($self, $ibxish) = @_;
my $lei = $self->{lei};
ref($min) and return warn("$maxk=$min has multiple values\n");
($min =~ /[^0-9]/) and return warn("$maxk=$min not numeric\n");
my $first_ids;
+ $lei->spawn_tmp_xh; # per-worker
do {
- $mset = eval { $srch->mset($mo->{qstr}, $mo) };
+ $mset = eval { sync_mset $srch, $mo };
return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl
mset_progress($lei, $dir, $mo->{offset} + $mset->size,
$mset->get_matches_estimated);
attach_external($self, $loc);
}
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
+ $lei->spawn_tmp_xh; # per-worker
do {
- $mset = eval { $self->mset($mo->{qstr}, $mo) };
+ $mset = eval { sync_mset $self, $mo };
return $lei->child_error(22 << 8, "E: $@") if $@; # 22 from curl
mset_progress($lei, 'xsearch', $mo->{offset} + $mset->size,
$mset->get_matches_estimated);
# TODO: arbitrary header indexing goes here
[ sort keys %dedupe ];
};
- ((map { ('-d', $_) } shard_dirs($self)), map { ('-Q', $_) } @$apfx);
+ ((map { ('-d', $_) } $self->shard_dirs), map { ('-Q', $_) } @$apfx);
}
sub docids_by_postlist ($$) {
require PublicInbox::Spawn;
require PublicInbox::Config;
require File::Path;
- eval { # use XDG_CACHE_HOME, first:
+ state $xh_cmd = eval { # use XDG_CACHE_HOME, first:
require PublicInbox::XapHelperCxx;
- PublicInbox::XapHelperCxx::check_build();
+ PublicInbox::XapHelperCxx::cmd();
};
local %ENV = %ENV;
delete $ENV{XDG_DATA_HOME};
my $home = "$tmpdir/lei-daemon";
mkdir($home, 0700);
local $ENV{HOME} = $home;
+ if ($xh_cmd && $xh_cmd->[0] =~ m!\A(.+)/+[^/]+\z!) {
+ # avoid repeated rebuilds by copying
+ my $src = $1;
+ my $dst = "$home/.cache/public-inbox/jaot";
+ File::Path::make_path($dst);
+ xsys_e([qw(/bin/cp -Rp), $src, $dst ]);
+ }
my $persist;
if ($persist_xrd && !$test_opt->{daemon_only}) {
$persist = $daemon_xrd = $persist_xrd;
my $env;
my $cmd = eval "require $cls; ${cls}::cmd()";
if ($@) { # fall back to Perl + XS|SWIG
+ return if "@argv" =~ /\b-l\b/; # no point w/o C++ in lei
$cls = 'PublicInbox::XapHelper';
# ensure the child process has the same @INC we do:
$env = { PERL5LIB => join(':', @INC) };
static volatile int sock_fd = STDIN_FILENO;
static sigset_t fullset, workerset;
static bool alive = true;
+static bool lei; // support kw: and L: prefixes, FLAG_PHRASE always
#if STDERR_ASSIGNABLE
static FILE *orig_err = stderr;
#endif
i = 0;
try {
srch->db = new Xapian::Database(req->dirv[i]);
- if (is_chert(req->dirv[0]))
+ if (!lei && is_chert(req->dirv[0]))
srch->qp_flags &= ~FLAG_PHRASE;
for (i = 1; i < req->dirc; i++) {
const char *dir = req->dirv[i];
- if (srch->qp_flags & FLAG_PHRASE &&
+ if (!lei && srch->qp_flags & FLAG_PHRASE &&
is_chert(dir))
srch->qp_flags &= ~FLAG_PHRASE;
srch->db->add_database(Xapian::Database(dir));
} else {
qp_init_mail_search(srch->qp); // Search.pm
srch->qp->add_boolean_prefix("thread", thread_fp);
+ if (lei) {
+ srch->qp->add_boolean_prefix("kw", "K");
+ srch->qp->add_boolean_prefix("L", "L");
+ }
}
}
if (my_setlinebuf(stderr))
err(EXIT_FAILURE, "setlinebuf(stderr)");
// not using -W<workers> like Daemon.pm, since -W is reserved (glibc)
- while ((c = getopt(argc, argv, "j:")) != -1) {
+ while ((c = getopt(argc, argv, "lj:")) != -1) {
char *end;
switch (c) {
if (*end != 0 || nworker > WORKER_MAX)
errx(EXIT_FAILURE, "-j %s invalid", optarg);
break;
+ case 'l':
+ lei = true;
+ break;
case ':':
errx(EXIT_FAILURE, "missing argument: `-%c'", optopt);
case '?':