]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
lei: fix SIGPIPE on large result sets to pager
authorEric Wong <e@80x24.org>
Tue, 7 Nov 2023 13:01:47 +0000 (13:01 +0000)
committerEric Wong <e@80x24.org>
Tue, 7 Nov 2023 13:23:36 +0000 (13:23 +0000)
When dealing with large search results, we need to deal with
EPIPE not just from the pager, but also EPIPE or ECONNRESET
between lei_xsearch and lei2mail processes.

Without this fix, lei_xsearch processes could linger and get
stuck writing to dead lei2mail processes if a user aborts the
pager early during a large result set.

To ensure lei_xsearch processes don't linger around after
lei2mail workers all die, we must close $l2m->{-wq_s2} before
spawning lei_xsearch processes, since $l2m->{-wq_s2} is only
used in lei2mail workers.

For `git cat-file' processes, we also need to trigger
PublicInbox::Git->close to handle unpredictable destructor
ordering to avoid using uninitialized IO refs.  This combines
with the `git_to_mail' change to deal with process cleanup
handling from premature shutdowns.

To test all this, we can't just rely on a single message being
large, but also need to rely on the result set being large
enough to saturate the lei_xsearch -> lei2mail socket so we
rely on GIANT_INBOX_DIR once again.

lib/PublicInbox/Git.pm
lib/PublicInbox/LeiOverview.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
t/lei-sigpipe.t

index 11712db295fa5e10ee90427f146adf5a14019771..292c359ab617c8d7aee2bbced2202e9d926e3060 100644 (file)
@@ -276,6 +276,7 @@ sub cat_async_step ($$) {
 
 sub cat_async_wait ($) {
        my ($self) = @_;
+       $self->close if !$self->{sock};
        my $inflight = $self->{inflight} or return;
        while (scalar(@$inflight)) {
                cat_async_step($self, $inflight);
@@ -331,6 +332,7 @@ sub check_async_wait ($) {
        my ($self) = @_;
        return cat_async_wait($self) if $self->{-bc};
        my $ck = $self->{ck} or return;
+       $ck->close if !$ck->{sock};
        my $inflight = $ck->{inflight} or return;
        check_async_step($ck, $inflight) while (scalar(@$inflight));
 }
index 066c40bd94eafdb4de8e37270b4a442f5f55e239..129dabf8c4023ee8f9eeb0ea86f26bb880930141 100644 (file)
@@ -212,7 +212,8 @@ sub ovv_each_smsg_cb { # runs in wq worker usually
                sub {
                        my ($smsg, $mitem, $eml) = @_;
                        $smsg->{pct} = get_pct($mitem) if $mitem;
-                       $l2m->wq_io_do('write_mail', [], $smsg, $eml);
+                       eval { $l2m->wq_io_do('write_mail', [], $smsg, $eml) };
+                       $lei->fail($@) if $@ && !$!{ECONNRESET} && !$!{EPIPE};
                }
        } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) {
                my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},";
index e80163e236165e0920aa4d082a32da6f73124fac..b73af68aa75e05d7e8d0bb92a22621f4301866fa 100644 (file)
@@ -8,11 +8,13 @@ use v5.10.1;
 use parent qw(PublicInbox::IPC);
 use PublicInbox::Eml;
 use PublicInbox::IO;
+use PublicInbox::Git;
 use PublicInbox::Spawn qw(spawn);
 use IO::Handle; # ->autoflush
 use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY);
 use PublicInbox::Syscall qw(rename_noreplace);
 use autodie qw(open seek close);
+use Carp qw(croak);
 
 my %kw2char = ( # Maildir characters
        draft => 'D',
@@ -132,8 +134,12 @@ sub eml2mboxcl2 {
 
 sub git_to_mail { # git->cat_async callback
        my ($bref, $oid, $type, $size, $smsg) = @_;
-       my $self = delete $smsg->{l2m} // die "BUG: no l2m";
        $type // return; # called by PublicInbox::Git::close
+       my $self = delete $smsg->{l2m};
+       if (!defined($self)) {
+               return if $PublicInbox::Git::in_cleanup;
+               croak "BUG: no l2m (type=$type)";
+       }
        eval {
                if ($type eq 'missing' &&
                          ($bref = $self->{-lms_rw}->local_blob($oid, 1))) {
index 5443188d79308587aef4c5d71caa4791a23d20ed..6c8dfe10722f6a4ea41f8546817a38b9ce3fcf8f 100644 (file)
@@ -20,7 +20,7 @@ use PublicInbox::LEI;
 use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR);
 use PublicInbox::ContentHash qw(git_sha);
 use POSIX qw(strftime);
-use autodie qw(open read seek truncate);
+use autodie qw(close open read seek truncate);
 use PublicInbox::Syscall qw($F_SETPIPE_SZ);
 
 sub new {
@@ -543,6 +543,7 @@ sub do_query {
                pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!";
                fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ;
                delete $l2m->{au_peers};
+               close(delete $l2m->{-wq_s2}); # share wq_s1 with lei_xsearch
        }
        $self->wq_workers_start('lei_xsearch', undef,
                                $lei->oldset, { lei => $lei },
index 622598a40d2e4a0094e8f5eb3e6c3437c4ecedfa..1aa700e93c1f2da0c8ef0dced5d08cb442a7d36c 100644 (file)
@@ -7,6 +7,19 @@ use PublicInbox::TestCommon;
 use POSIX qw(WTERMSIG WIFSIGNALED SIGPIPE);
 use PublicInbox::OnDestroy;
 use PublicInbox::Syscall qw($F_SETPIPE_SZ);
+use autodie qw(close open pipe seek sysread);
+use PublicInbox::IO qw(write_file);
+my $inboxdir = $ENV{GIANT_INBOX_DIR};
+SKIP: {
+       $inboxdir // skip 'GIANT_INBOX_DIR unset to test large results';
+       require PublicInbox::Inbox;
+       my $ibx = PublicInbox::Inbox->new({
+               name => 'unconfigured-test',
+               address => [ "test\@example.com" ],
+               inboxdir => $inboxdir,
+       });
+       $ibx->search or xbail "GIANT_INBOX_DIR=$inboxdir has no search";
+}
 
 # undo systemd (and similar) ignoring SIGPIPE, since lei expects to be run
 # from an interactive terminal:
@@ -21,30 +34,30 @@ test_lei(sub {
        my $f = "$ENV{HOME}/big.eml";
        my $imported;
        for my $out ([], [qw(-f mboxcl2)], [qw(-f text)]) {
-               pipe(my ($r, $w)) or BAIL_OUT $!;
+               pipe(my $r, my $w);
                my $size = $F_SETPIPE_SZ && fcntl($w, $F_SETPIPE_SZ, 4096) ?
                        4096 : 65536;
                unless (-f $f) {
-                       open my $fh, '>', $f or xbail "open $f: $!";
-                       print $fh <<'EOM' or xbail;
+                       my $fh = write_file '>', $f, <<'EOM';
 From: big@example.com
 Message-ID: <big@example.com>
 EOM
                        print $fh 'Subject:';
                        print $fh (' '.('x' x 72)."\n") x (($size / 73) + 1);
                        print $fh "\nbody\n";
-                       close $fh or xbail "close: $!";
+                       close $fh;
                }
 
                lei_ok(qw(import), $f) if $imported++ == 0;
-               open my $errfh, '+>>', "$ENV{HOME}/stderr.log" or xbail $!;
+               open my $errfh, '+>>', "$ENV{HOME}/stderr.log";
                my $opt = { run_mode => 0, 2 => $errfh, 1 => $w };
                my $cmd = [qw(lei q -q -t), @$out, 'z:1..'];
+               push @$cmd, '--only='.$inboxdir if defined $inboxdir;
                my $tp = start_script($cmd, undef, $opt);
                close $w;
                vec(my $rvec = '', fileno($r), 1) = 1;
                if (!select($rvec, undef, undef, 30)) {
-                       seek($errfh, 0, 0) or xbail $!;
+                       seek($errfh, 0, 0);
                        my $s = do { local $/; <$errfh> };
                        xbail "lei q had no output after 30s, stderr=$s";
                }
@@ -53,7 +66,7 @@ EOM
                $tp->join;
                ok(WIFSIGNALED($?), "signaled @$out");
                is(WTERMSIG($?), SIGPIPE, "got SIGPIPE @$out");
-               seek($errfh, 0, 0) or xbail $!;
+               seek($errfh, 0, 0);
                my $s = do { local $/; <$errfh> };
                is($s, '', "quiet after sigpipe @$out");
        }