From: Eric Wong Date: Tue, 7 Nov 2023 13:01:47 +0000 (+0000) Subject: lei: fix SIGPIPE on large result sets to pager X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e97a30e7624dfb8645aa33a94844bcf28c7e1379;p=thirdparty%2Fpublic-inbox.git lei: fix SIGPIPE on large result sets to pager When dealing with large search results, we need to deal with EPIPE not just from the pager, but also EPIPE or ECONNRESET between lei_xsearch and lei2mail processes. Without this fix, lei_xsearch processes could linger and get stuck writing to dead lei2mail processes if a user aborts the pager early during a large result set. To ensure lei_xsearch processes don't linger around after lei2mail workers all die, we must close $l2m->{-wq_s2} before spawning lei_xsearch processes, since $l2m->{-wq_s2} is only used in lei2mail workers. For `git cat-file' processes, we also need to trigger PublicInbox::Git->close to handle unpredictable destructor ordering to avoid using uninitialized IO refs. This combines with the `git_to_mail' change to deal with process cleanup handling from premature shutdowns. To test all this, we can't just rely on a single message being large, but also need to rely on the result set being large enough to saturate the lei_xsearch -> lei2mail socket so we rely on GIANT_INBOX_DIR once again. --- diff --git a/lib/PublicInbox/Git.pm b/lib/PublicInbox/Git.pm index 11712db29..292c359ab 100644 --- a/lib/PublicInbox/Git.pm +++ b/lib/PublicInbox/Git.pm @@ -276,6 +276,7 @@ sub cat_async_step ($$) { sub cat_async_wait ($) { my ($self) = @_; + $self->close if !$self->{sock}; my $inflight = $self->{inflight} or return; while (scalar(@$inflight)) { cat_async_step($self, $inflight); @@ -331,6 +332,7 @@ sub check_async_wait ($) { my ($self) = @_; return cat_async_wait($self) if $self->{-bc}; my $ck = $self->{ck} or return; + $ck->close if !$ck->{sock}; my $inflight = $ck->{inflight} or return; check_async_step($ck, $inflight) while (scalar(@$inflight)); } diff --git a/lib/PublicInbox/LeiOverview.pm b/lib/PublicInbox/LeiOverview.pm index 066c40bd9..129dabf8c 100644 --- a/lib/PublicInbox/LeiOverview.pm +++ b/lib/PublicInbox/LeiOverview.pm @@ -212,7 +212,8 @@ sub ovv_each_smsg_cb { # runs in wq worker usually sub { my ($smsg, $mitem, $eml) = @_; $smsg->{pct} = get_pct($mitem) if $mitem; - $l2m->wq_io_do('write_mail', [], $smsg, $eml); + eval { $l2m->wq_io_do('write_mail', [], $smsg, $eml) }; + $lei->fail($@) if $@ && !$!{ECONNRESET} && !$!{EPIPE}; } } elsif ($self->{fmt} =~ /\A(concat)?json\z/ && $lei->{opt}->{pretty}) { my $EOR = ($1//'') eq 'concat' ? "\n}" : "\n},"; diff --git a/lib/PublicInbox/LeiToMail.pm b/lib/PublicInbox/LeiToMail.pm index e80163e23..b73af68aa 100644 --- a/lib/PublicInbox/LeiToMail.pm +++ b/lib/PublicInbox/LeiToMail.pm @@ -8,11 +8,13 @@ use v5.10.1; use parent qw(PublicInbox::IPC); use PublicInbox::Eml; use PublicInbox::IO; +use PublicInbox::Git; use PublicInbox::Spawn qw(spawn); use IO::Handle; # ->autoflush use Fcntl qw(SEEK_SET SEEK_END O_CREAT O_EXCL O_WRONLY); use PublicInbox::Syscall qw(rename_noreplace); use autodie qw(open seek close); +use Carp qw(croak); my %kw2char = ( # Maildir characters draft => 'D', @@ -132,8 +134,12 @@ sub eml2mboxcl2 { sub git_to_mail { # git->cat_async callback my ($bref, $oid, $type, $size, $smsg) = @_; - my $self = delete $smsg->{l2m} // die "BUG: no l2m"; $type // return; # called by PublicInbox::Git::close + my $self = delete $smsg->{l2m}; + if (!defined($self)) { + return if $PublicInbox::Git::in_cleanup; + croak "BUG: no l2m (type=$type)"; + } eval { if ($type eq 'missing' && ($bref = $self->{-lms_rw}->local_blob($oid, 1))) { diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm index 5443188d7..6c8dfe107 100644 --- a/lib/PublicInbox/LeiXSearch.pm +++ b/lib/PublicInbox/LeiXSearch.pm @@ -20,7 +20,7 @@ use PublicInbox::LEI; use Fcntl qw(SEEK_SET F_SETFL O_APPEND O_RDWR); use PublicInbox::ContentHash qw(git_sha); use POSIX qw(strftime); -use autodie qw(open read seek truncate); +use autodie qw(close open read seek truncate); use PublicInbox::Syscall qw($F_SETPIPE_SZ); sub new { @@ -543,6 +543,7 @@ sub do_query { pipe($lei->{startq}, $lei->{au_done}) or die "pipe: $!"; fcntl($lei->{startq}, $F_SETPIPE_SZ, 4096) if $F_SETPIPE_SZ; delete $l2m->{au_peers}; + close(delete $l2m->{-wq_s2}); # share wq_s1 with lei_xsearch } $self->wq_workers_start('lei_xsearch', undef, $lei->oldset, { lei => $lei }, diff --git a/t/lei-sigpipe.t b/t/lei-sigpipe.t index 622598a40..1aa700e93 100644 --- a/t/lei-sigpipe.t +++ b/t/lei-sigpipe.t @@ -7,6 +7,19 @@ use PublicInbox::TestCommon; use POSIX qw(WTERMSIG WIFSIGNALED SIGPIPE); use PublicInbox::OnDestroy; use PublicInbox::Syscall qw($F_SETPIPE_SZ); +use autodie qw(close open pipe seek sysread); +use PublicInbox::IO qw(write_file); +my $inboxdir = $ENV{GIANT_INBOX_DIR}; +SKIP: { + $inboxdir // skip 'GIANT_INBOX_DIR unset to test large results'; + require PublicInbox::Inbox; + my $ibx = PublicInbox::Inbox->new({ + name => 'unconfigured-test', + address => [ "test\@example.com" ], + inboxdir => $inboxdir, + }); + $ibx->search or xbail "GIANT_INBOX_DIR=$inboxdir has no search"; +} # undo systemd (and similar) ignoring SIGPIPE, since lei expects to be run # from an interactive terminal: @@ -21,30 +34,30 @@ test_lei(sub { my $f = "$ENV{HOME}/big.eml"; my $imported; for my $out ([], [qw(-f mboxcl2)], [qw(-f text)]) { - pipe(my ($r, $w)) or BAIL_OUT $!; + pipe(my $r, my $w); my $size = $F_SETPIPE_SZ && fcntl($w, $F_SETPIPE_SZ, 4096) ? 4096 : 65536; unless (-f $f) { - open my $fh, '>', $f or xbail "open $f: $!"; - print $fh <<'EOM' or xbail; + my $fh = write_file '>', $f, <<'EOM'; From: big@example.com Message-ID: EOM print $fh 'Subject:'; print $fh (' '.('x' x 72)."\n") x (($size / 73) + 1); print $fh "\nbody\n"; - close $fh or xbail "close: $!"; + close $fh; } lei_ok(qw(import), $f) if $imported++ == 0; - open my $errfh, '+>>', "$ENV{HOME}/stderr.log" or xbail $!; + open my $errfh, '+>>', "$ENV{HOME}/stderr.log"; my $opt = { run_mode => 0, 2 => $errfh, 1 => $w }; my $cmd = [qw(lei q -q -t), @$out, 'z:1..']; + push @$cmd, '--only='.$inboxdir if defined $inboxdir; my $tp = start_script($cmd, undef, $opt); close $w; vec(my $rvec = '', fileno($r), 1) = 1; if (!select($rvec, undef, undef, 30)) { - seek($errfh, 0, 0) or xbail $!; + seek($errfh, 0, 0); my $s = do { local $/; <$errfh> }; xbail "lei q had no output after 30s, stderr=$s"; } @@ -53,7 +66,7 @@ EOM $tp->join; ok(WIFSIGNALED($?), "signaled @$out"); is(WTERMSIG($?), SIGPIPE, "got SIGPIPE @$out"); - seek($errfh, 0, 0) or xbail $!; + seek($errfh, 0, 0); my $s = do { local $/; <$errfh> }; is($s, '', "quiet after sigpipe @$out"); }