our @EXPORT_OK = qw(retry_reopen int_val get_pct xap_terms);
use List::Util qw(max);
use POSIX qw(strftime);
+use autodie qw(open pipe);
use Carp ();
our $XHC = 0; # defined but false
xdb($self); # populate {nshards}
my @margs = ($self->xh_args, xh_opt($self, $opt), '--');
my $ret = eval {
- my $rd = $XHC->mkreq(undef, 'mset', @margs, $qry_str);
- PublicInbox::XhcMset->maybe_new($rd, $self, $cb, @args);
+ pipe my $out_rd, my $out_wr;
+ open my $err_rw, '+>', undef;
+ $XHC->mkreq([ $out_wr, $err_rw ],
+ 'mset', @margs, $qry_str);
+ undef $out_wr;
+ PublicInbox::XhcMset->maybe_new($out_rd, $err_rw,
+ $self, $cb, @args);
};
$cb->(@args, undef, $@) if $@;
$ret;
sub err_txt {
my ($ctx, $err) = @_;
my $u = $ctx->{ibx}->base_url($ctx->{env}) . '_/text/help/';
- $err =~ s/^\s*Exception:\s*//; # bad word to show users :P
+ $err =~ s/\s*\bException:\s*/ /; # bad word to show users :P
sanitize_local_paths $err;
$err = ascii_html($err);
"\nBad query: <b>$err</b>\n" .
sub mkreq {
my ($self, $ios, @arg) = @_;
- my ($r, $n);
- pipe($r, $ios->[0]) if !defined($ios->[0]);
my @fds = map fileno($_), @$ios;
my $buf = join("\0", @arg, '');
- $n = $PublicInbox::IPC::send_cmd->($self->{io}, \@fds, $buf, 0, $tries)
- // die "send_cmd: $!";
+ my $n = $PublicInbox::IPC::send_cmd->($self->{io},
+ \@fds, $buf, 0, $tries) // die "send_cmd: $!";
$n == length($buf) or die "send_cmd: $n != ".length($buf);
- $r;
}
sub start_helper (@) {
}
}
-sub dispatch {
+sub dispatch (@) {
my ($req, $cmd, @argv) = @_;
my $fn = $req->can("cmd_$cmd") or return;
$GLP->getoptionsfromarray(\@argv, $req, @PublicInbox::Search::XH_SPEC)
my $req = bless {}, __PACKAGE__;
my $i = 0;
open($req->{$i++}, '+<&=', $_) for @fds;
+ $req->{1}->autoflush(1) if $req->{1};
local $stderr = $req->{1} // \*STDERR;
die "not NUL-terminated" if chop($rbuf) ne "\0";
- my @argv = split(/\0/, $rbuf);
+ my ($cmd, @argv) = split(/\0/, $rbuf);
$req->{nr_out} = 0;
- $req->dispatch(@argv) if @argv;
+ if (defined $cmd) {
+ eval { dispatch $req, $cmd, @argv };
+ warn "$cmd: $@" if $@;
+ }
}
}
package PublicInbox::XhcMset;
use v5.12;
use parent qw(PublicInbox::DS);
+use autodie qw(seek);
+use PublicInbox::IO qw(read_all);
use PublicInbox::XhcMsetIterator;
use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+use Fcntl qw(SEEK_SET);
+use Carp qw(croak);
+use PublicInbox::Git qw(git_quote);
+
+sub die_err ($@) {
+ my ($self, @msg) = @_;
+ my $err_rw = delete $self->{err_rw};
+ seek $err_rw, 0, SEEK_SET;
+ my $s = read_all $err_rw;
+ chomp $s;
+ croak $s, @msg;
+}
sub event_step {
my ($self) = @_;
my ($cb, @args) = @{delete $self->{cb_args} // return};
my $rd = $self->{sock};
eval {
- my $hdr = <$rd> // die "E: reading mset header: $!";
+ my $hdr = <$rd> //
+ die_err $self, $! ? ("reading mset header: $!") : ();
for (split /\s+/, $hdr) { # read mset.size + estimated_matches
my ($k, $v) = split /=/, $_, 2;
$k =~ s/\A[^\.]*\.//; # s/(mset)?\./
$self->{$k} = $v;
}
- my $size = $self->{size} // die "E: bad xhc header: `$hdr'";
+ my $size = $self->{size} //
+ die_err $self, 'bad xhc header: ', git_quote($hdr);
my @it = map { PublicInbox::XhcMsetIterator::make($_) } <$rd>;
$self->{items} = \@it;
- scalar(@it) == $size or die
- 'E: got ',scalar(@it),", expected mset.size=$size";
+ scalar(@it) == $size or die_err $self,
+ 'got ', scalar(@it), ', expected mset.size=', $size;
};
my $err = $@;
$self->close;
}
sub maybe_new {
- my (undef, $rd, $srch, @cb_args) = @_;
- my $self = bless { cb_args => \@cb_args, srch => $srch }, __PACKAGE__;
+ my (undef, $out_rd, $err_rw, $srch, @cb_args) = @_;
+ my $self = bless {
+ cb_args => \@cb_args,
+ srch => $srch,
+ err_rw => $err_rw,
+ }, __PACKAGE__;
if ($PublicInbox::DS::in_loop) { # async
- $self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
+ $self->SUPER::new($out_rd, EPOLLIN|EPOLLONESHOT);
} else { # synchronous
- $self->{sock} = $rd;
+ $self->{sock} = $out_rd;
event_step($self);
undef;
}
static bool cmd_mset(struct req *req)
{
if (optind >= req->argc) ABORT("usage: mset [OPTIONS] WANT QRY_STR");
- if (req->fp[1]) ABORT("mset only accepts 1 FD");
const char *qry_str = req->argv[optind];
CLEANUP_FBUF struct fbuf wbuf = {};
Xapian::MSet mset = req->code_search ? commit_mset(req, qry_str) :
use PublicInbox::TestCommon;
use Cwd qw(getcwd);
use List::Util qw(sum);
-use autodie qw(close mkdir open rename);
+use autodie qw(close mkdir open pipe rename);
require_mods(qw(json Xapian +SCM_RIGHTS DBD::SQLite));
use_ok 'PublicInbox::CodeSearchIdx';
use PublicInbox::Import;
my ($xhc) = @_;
my $csrch = PublicInbox::CodeSearch->new("$tmp/ext");
my $impl = $xhc->{impl};
- my ($r, @l);
- $r = $xhc->mkreq([], qw(mset -c -g), $zp_git, @xh_args, 'NUL');
+ my ($r, $w, @l);
+ pipe $r, $w;
+ $xhc->mkreq([$w], qw(mset -c -g), $zp_git, @xh_args, 'NUL');
+ close $w;
chomp(@l = <$r>);
like shift(@l), qr/\bmset\.size=2\b/, "got expected header $impl";
my %docid2data;
} @l;
is_deeply(\@got, $exp, "expected doc_data $impl");
- $r = $xhc->mkreq([], qw(mset -c -g), "$tmp/wt0/.git", @xh_args, 'NUL');
+ pipe $r, $w;
+ $xhc->mkreq([$w], qw(mset -c -g), "$tmp/wt0/.git", @xh_args, 'NUL');
+ close $w;
chomp(@l = <$r>);
like shift(@l), qr/\bmset.size=0\b/, "got miss in wrong dir $impl";
is_deeply(\@l, [], "no extra lines $impl");
# git patch-id --stable <t/data/0001.patch | awk '{print $1}'
my $dfid = '91ee6b761fc7f47cad9f2b09b10489f313eb5b71';
my $mid = '20180720072141.GA15957@example';
- my $r = $xhc->mkreq([ undef, $err_w ], qw(dump_ibx -A XDFID -A Q),
+
+ pipe my $r, my $w;
+ $xhc->mkreq([ $w, $err_w ], qw(dump_ibx -A XDFID -A Q),
(map { ('-d', $_) } @ibx_idx),
9, "mid:$mid");
close $err_w;
+ close $w;
my $res = do { local $/; <$r> };
is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
my $err = do { local $/; <$err_r> };
is($err, "mset.size=1 nr_out=2\n", "got expected status ($xhc->{impl})");
pipe($err_r, $err_w);
- $r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A XDFID),
+ pipe $r, $w;
+ $xhc->mkreq([ $w, $err_w ], qw(dump_roots -c -A XDFID),
(map { ('-d', $_) } @int),
$root2id_file, 'dt:19700101'.'000000..');
close $err_w;
+ close $w;
my @res = <$r>;
is(scalar(@res), 5, 'got expected rows');
is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)),
$err = do { local $/; <$err_r> };
is $err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})";
- $r = $xhc->mkreq([], qw(mset), @ibx_shard_args,
+ pipe $r, $w;
+ $xhc->mkreq([$w], qw(mset), @ibx_shard_args,
'dfn:lib/PublicInbox/Search.pm');
+ close $w;
chomp((my $hdr, @res) = readline($r));
like $hdr, qr/\bmset\.size=1\b/,
"got expected header via mset ($xhc->{impl}";
ok $res[1] > 0 && $res[1] <= 100, 'pct > 0 && <= 100';
is scalar(@res), 3, 'only 3 columns in result';
- $r = $xhc->mkreq([], qw(mset), @ibx_shard_args,
+ pipe $r, $w;
+ $xhc->mkreq([$w], qw(mset), @ibx_shard_args,
'dt:19700101'.'000000..');
+ close $w;
chomp(($hdr, @res) = readline($r));
like $hdr, qr/\bmset\.size=6\b/,
"got expected header via multi-result mset ($xhc->{impl}";
my $nr;
for my $i (7, 8, 39, 40) {
pipe($err_r, $err_w);
- $r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A),
+ pipe($r, $w);
+ $xhc->mkreq([ $w, $err_w ], qw(dump_roots -c -A),
"XDFPOST$i", (map { ('-d', $_) } @int),
$root2id_file, 'dt:19700101'.'000000..');
close $err_w;
+ close $w;
@res = <$r>;
my @err = <$err_r>;
if (defined $nr) {
or diag explain(\@res, \@err);
}
pipe($err_r, $err_w);
- $r = $xhc->mkreq([ undef, $err_w ], qw(dump_ibx -A XDFPOST7),
+ pipe $r, $w;
+ $xhc->mkreq([ $w, $err_w ], qw(dump_ibx -A XDFPOST7),
@ibx_shard_args, qw(13 rt:0..));
close $err_w;
+ close $w;
@res = <$r>;
my @err = <$err_r>;
my ($nr_out) = ("@err" =~ /nr_out=(\d+)/);
my $capture = sub { ($mset, $err) = @_ };
my $retrieve = sub {
my ($qstr) = @_;
- $r = $xhc->mkreq(undef, 'mset', @thr_shard_args, $qstr);
- PublicInbox::XhcMset->maybe_new($r, undef, $capture);
+ pipe $r, $w;
+ $xhc->mkreq([ $w ], 'mset', @thr_shard_args, $qstr);
+ close $w;
+ open my $err_rw, '+>', undef;
+ PublicInbox::XhcMset->maybe_new($r, $err_rw,
+ undef, $capture);
map { $over->get_art($_->get_docid) } $mset->items;
};
@art = $retrieve->('thread:thread-root@example wildfires');
diag 'testing timeouts...';
for my $j (qw(0 1)) {
my $t0 = now;
- $r = $xhc->mkreq(undef, qw(test_sleep -K 1 -d),
+ pipe $r, $w;
+ $xhc->mkreq([ $w ], qw(test_sleep -K 1 -d),
$ibx_idx[0]);
+ close $w;
is readline($r), undef, 'got EOF';
my $diff = now - $t0;
ok $diff < 3, "timeout didn't take too long -j$j";
my $exp;
for (0..(PublicInbox::Search::ulimit_n() * $nr)) {
for my $xhc (@xhc) {
- my $r = $xhc->mkreq([], qw(mset -Q), "tst$n=XTST$n",
+ pipe my $r, my $w;
+ $xhc->mkreq([$w], qw(mset -Q), "tst$n=XTST$n",
@ibx_shard_args, qw(rt:0..));
+ close $w;
chomp(my @res = readline($r));
$exp //= $res[0];
$exp eq $res[0] or