]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
treewide: handle EINTR for non-(signalfd|kevent)
authorEric Wong <e@80x24.org>
Tue, 20 Aug 2024 10:35:17 +0000 (10:35 +0000)
committerEric Wong <e@80x24.org>
Wed, 21 Aug 2024 18:16:23 +0000 (18:16 +0000)
We may encounter new architectures in Linux without syscall
number definitions or *BSD systems without IO::KQueue or kevent
support at all, so be prepared to handle signals anywhere within
the event loop in such cases.

lib/PublicInbox/CodeSearchIdx.pm
lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/PktOp.pm
lib/PublicInbox/SHA.pm

index 6d777bf62a3e2f0b06bb9d3457810f377ed2aa74..ff3db8badf0ca994f99bcb4671c2b7fb351da457 100644 (file)
@@ -72,7 +72,8 @@ use PublicInbox::Aspawn qw(run_await);
 use Compress::Zlib qw(compress);
 use Carp qw(croak);
 use Time::Local qw(timegm);
-use autodie qw(close pipe open sysread seek sysseek send);
+use Errno qw(EINTR);
+use autodie qw(close pipe open seek sysseek);
 our $DO_QUIT = 15; # signal number
 our (
        $LIVE_JOBS, # integer
@@ -225,6 +226,17 @@ sub check_objfmt_status ($$$) {
        $fmt;
 }
 
+sub xsend ($$) { # move to PerlIO if we need to
+       my ($s, $buf) = @_;
+       my $n;
+       while (1) {
+               $n = send $s, $buf, 0;
+               return $n if defined $n;
+               next if $! == EINTR;
+               croak "send: $!";
+       }
+}
+
 sub store_repo { # wq_io_do, sends docid back
        my ($self, $repo) = @_;
        my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
@@ -248,7 +260,7 @@ EOM
        my $did = $repo->{docid};
        $did ? $self->{xdb}->replace_document($did, $doc)
                : ($did = $self->{xdb}->add_document($doc));
-       send($op_p, "repo_stored $did", 0);
+       xsend $op_p, "repo_stored $did";
 }
 
 sub cidx_ckpoint ($;$) {
@@ -293,7 +305,7 @@ sub cidx_reap_log { # awaitpid cb
        my ($pid, $cmd, $self, $op_p) = @_;
        if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
                                ($? & 127) == POSIX::SIGPIPE))) {
-               send($op_p, "shard_done $self->{shard}", 0);
+               xsend $op_p, "shard_done $self->{shard}";
        } else {
                warn "W: @$cmd (\$?=$?)\n";
                $self->{xdb}->cancel_transaction;
@@ -444,7 +456,7 @@ sub fp_async_done { # run_git cb from worker
        my ($opt, $self, $git, $op_p) = @_;
        my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}';
        sysseek($refs, 0, SEEK_SET);
-       send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0);
+       xsend $op_p, 'fp_done '.sha_all(256, $refs)->hexdigest;
 }
 
 sub fp_done { # called parent via PktOp by fp_async_done
@@ -523,7 +535,7 @@ sub shard_commit { # via wq_io_do
        my ($self) = @_;
        my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
        $self->commit_txn_lazy;
-       send($op_p, "shard_done $self->{shard}", 0);
+       xsend $op_p, "shard_done $self->{shard}";
 }
 
 sub dump_roots_start {
@@ -818,7 +830,7 @@ sub prune_commit { # via wq_io_do in IDX_SHARDS
        my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p';
        my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef';
        cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr;
-       send($prune_op_p, "prune_done $self->{shard}", 0);
+       xsend $prune_op_p, "prune_done $self->{shard}";
 }
 
 sub shards_active { # post_loop_do
index ed6d27fd489b13180afd4bd16a6dbe972c943d17..13a897be3658fd480a1f9f1080d9f3d093e87061 100644 (file)
@@ -10,7 +10,8 @@
 package PublicInbox::IPC;
 use v5.12;
 use parent qw(Exporter);
-use autodie qw(close pipe read socketpair sysread);
+use autodie qw(close pipe read socketpair);
+use Errno qw(EAGAIN EINTR);
 use Carp qw(croak);
 use PublicInbox::DS qw(awaitpid);
 use PublicInbox::Spawn;
@@ -215,8 +216,17 @@ sub recv_and_run {
        }
        while ($full_stream && $n < $len) {
                my $r = sysread($s2, $buf, $len - $n, $n);
-               croak "read EOF after $n/$len bytes" if $r == 0;
-               $n = length($buf);
+               if ($r) {
+                       $n = length($buf); # keep looping
+               } elsif (!defined $r) {
+                       if ($! == EAGAIN) {
+                               poll_in($s2)
+                       } elsif ($! != EINTR) {
+                               croak "sysread: $!";
+                       } # next on EINTR
+               } else { # ($r == 0)
+                       croak "read EOF after $n/$len bytes";
+               }
        }
        # Sereal dies on truncated data, Storable returns undef
        my $args = ipc_thaw($buf) // die "thaw error on buffer of size: $n";
index c5146428303eadb15c4e5d4ab59a07bce08c1e3b..637cc8b10a01ed4bff56236c6a9c03aa3444f8d9 100644 (file)
@@ -12,7 +12,7 @@ use parent qw(PublicInbox::DS PublicInbox::LeiExternal
 use autodie qw(bind chdir open pipe socket socketpair syswrite unlink);
 use Getopt::Long ();
 use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un);
-use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET);
+use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET EINTR);
 use Cwd qw(getcwd);
 use POSIX qw(strftime);
 use IO::Handle ();
@@ -27,6 +27,7 @@ use PublicInbox::Import;
 use PublicInbox::ContentHash qw(git_sha);
 use PublicInbox::OnDestroy;
 use PublicInbox::IPC;
+use PublicInbox::IO qw(poll_in);
 use Time::HiRes qw(stat); # ctime comparisons for config cache
 use File::Path ();
 use File::Spec;
@@ -488,6 +489,15 @@ sub _drop_wq {
        }
 }
 
+sub send_gently ($$) {
+       my ($s, $buf) = @_;
+       my $n;
+       while (1) {
+               $n = send $s, $buf, 0;
+               return $n if defined($n) || $! != EINTR;
+       }
+}
+
 # pronounced "exit": x_it(1 << 8) => exit(1); x_it(13) => SIGPIPE
 sub x_it ($$) {
        my ($self, $code) = @_;
@@ -498,7 +508,7 @@ sub x_it ($$) {
                $self->{pkt_op_p}->pkt_do('x_it', $code);
                exit($code >> 8) if $$ != $daemon_pid;
        } elsif ($self->{sock}) { # lei->daemon => lei(1) client
-               send($self->{sock}, "x_it $code", 0);
+               send_gently $self->{sock}, "x_it $code";
        } elsif ($quit == \&CORE::exit) { # an admin (one-shot) command
                exit($code >> 8);
        } # else ignore if client disconnected
@@ -569,7 +579,7 @@ sub child_error { # passes non-fatal curl exit codes to user
        if ($self->{pkt_op_p}) { # to top lei-daemon
                $self->{pkt_op_p}->pkt_do('child_error', $child_error);
        } elsif ($self->{sock}) { # to lei(1) client
-               send($self->{sock}, "child_error $child_error", 0);
+               send_gently $self->{sock}, "child_error $child_error";
        } # else noop if client disconnected
 }
 
@@ -1066,7 +1076,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
        while (my $op = shift(@$alerts)) {
                if ($op eq ':WINCH') {
                        # hit the process group that started the MUA
-                       send($sock, '-WINCH', 0) if $sock;
+                       send_gently $sock, '-WINCH' if $sock;
                } elsif ($op eq ':bell') {
                        out($self, "\a");
                } elsif ($op =~ /(?<!\\),/) { # bare ',' (not ',,')
@@ -1075,7 +1085,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
                        my $cmd = $1; # run an arbitrary command
                        require Text::ParseWords;
                        $cmd = [ Text::ParseWords::shellwords($cmd) ];
-                       send($sock, exec_buf($cmd, {}), 0) if $sock;
+                       send_gently $sock, exec_buf($cmd, {}) if $sock;
                } else {
                        warn("W: unsupported --alert=$op\n"); # non-fatal
                }
@@ -1130,7 +1140,7 @@ sub pgr_err {
        say { $self->{2} } @msg, '# -quit pager to continue-';
        $self->{2}->autoflush(1);
        stop_pager($self);
-       send($self->{sock}, 'wait', 0); # wait for user to quit pager
+       send_gently $self->{sock}, 'wait'; # wait for user to quit pager
 }
 
 sub stop_pager {
@@ -1147,22 +1157,22 @@ sub accept_dispatch { # Listener {post_accept} callback
        my $self = bless { sock => $sock }, __PACKAGE__;
        vec(my $rvec = '', fileno($sock), 1) = 1;
        select($rvec, undef, undef, 60) or
-               return send($sock, 'timed out waiting to recv FDs', 0);
+               return send_gently $sock, 'timed out waiting to recv FDs';
        # (4096 * 33) >MAX_ARG_STRLEN
        my @fds = $PublicInbox::IPC::recv_cmd->($sock, my $buf, 4096 * 33) or
                return; # EOF
        if (!defined($fds[0])) {
                warn(my $msg = "recv_cmd failed: $!");
-               return send($sock, $msg, 0);
+               return send_gently $sock, $msg;
        } else {
                my $i = 0;
                open($self->{$i++}, '+<&=', $_) for @fds;
-               $i == 4 or return send($sock, 'not enough FDs='.($i-1), 0)
+               $i == 4 or return send_gently $sock, 'not enough FDs='.($i-1);
        }
        # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
        # $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
        substr($buf, -2, 2, '') eq "\0\0" or  # s/\0\0\z//
-               return send($sock, 'request command truncated', 0);
+               return send_gently $sock, 'request command truncated';
        my ($argc, @argv) = split(/\0/, $buf, -1);
        undef $buf;
        my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
@@ -1207,7 +1217,7 @@ sub event_step {
                        die "unrecognized client signal: $buf";
                }
                my $s = $self->{-socks} // []; # lei up --all
-               @$s = grep { send($_, $buf, 0) } @$s;
+               @$s = grep { send_gently $_, $buf } @$s;
        };
        if (my $err = $@) {
                eval { $self->fail($err) };
@@ -1572,14 +1582,21 @@ sub cfg_dump ($$) {
        undef;
 }
 
-sub request_umask {
+sub request_umask { # assumes client is trusted and fast
        my ($lei) = @_;
        my $s = $lei->{sock} // return;
        send($s, 'umask', 0) // die "send: $!";
-       vec(my $rvec = '', fileno($s), 1) = 1;
-       select($rvec, undef, undef, 2) or die 'timeout waiting for umask';
-       recv($s, my $v, 5, 0) // die "recv: $!";
-       (my $u, $lei->{client_umask}) = unpack('AV', $v);
+       my ($v, $r, $u);
+       do { # n.b. poll_in returns -1 on EINTR
+               vec($v = '', fileno($s), 1) = 1;
+               $r = poll_in($s, 2) or
+                       die 'timeout waiting for umask';
+       } while ($r < 0 && $! == EINTR);
+       do {
+               $r = recv $s, $v, 5, 0;
+               die "recv: $!" if !defined($r) && $! != EINTR;
+       } while (!defined $r);
+       ($u, $lei->{client_umask}) = unpack('AV', $v);
        $u eq 'u' or warn "E: recv $v has no umask";
 }
 
index 1bcdd79927b3d68644656dd2dec992a9d1c025bb..c3ee36aa431e5a617113988d49c470fd1a8b1ace 100644 (file)
@@ -8,7 +8,7 @@
 package PublicInbox::PktOp;
 use v5.12;
 use parent qw(PublicInbox::DS);
-use Errno qw(EAGAIN ECONNRESET);
+use Errno qw(EAGAIN ECONNRESET EINTR);
 use PublicInbox::Syscall qw(EPOLLIN);
 use Socket qw(AF_UNIX SOCK_SEQPACKET);
 use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
@@ -37,12 +37,15 @@ sub pkt_do { # for the producer to trigger event_step in consumer
 sub event_step {
        my ($self) = @_;
        my $c = $self->{sock};
-       my $n = recv($c, my $msg, 4096, 0);
-       unless (defined $n) {
-               return if $! == EAGAIN;
-               die "recv: $!" if $! != ECONNRESET; # we may be bidirectional
-       }
-       my ($cmd, @pargs);
+       my ($msg, $n, $cmd, @pargs);
+       do {
+               $n = recv($c, $msg, 4096, 0);
+               unless (defined $n) {
+                       next if $! == EINTR;
+                       return if $! == EAGAIN;
+                       die "recv: $!" if $! != ECONNRESET; # we may be bidirectional
+               }
+       } until (defined $n);
        if (index($msg, "\0") > 0) {
                ($cmd, my $pargs) = split(/\0/, $msg, 2);
                @pargs = @{ipc_thaw($pargs)};
index 3fa8530e61d39539ce695583611d76d4bc0c4424..5eb882c642adc4bf1763ac80683c608e9cd617fe 100644 (file)
 package PublicInbox::SHA;
 use v5.12;
 require Exporter;
+use Errno qw(EAGAIN EINTR);
+use PublicInbox::IO qw(poll_in);
+use Carp qw(croak);
 our @EXPORT_OK = qw(sha1_hex sha256_hex sha256 sha_all);
-use autodie qw(sysread);
 our @ISA;
 
 BEGIN {
@@ -59,9 +61,21 @@ EOM
 
 sub sha_all ($$) {
        my ($n, $fh) = @_;
-       my ($dig, $buf) = (PublicInbox::SHA->new($n));
-       while (sysread($fh, $buf, 65536)) { $dig->add($buf) }
-       $dig
+       my ($dig, $buf, $r) = (PublicInbox::SHA->new($n));
+       while (1) {
+               $r = sysread($fh, $buf, 65536);
+               if ($r) {
+                       $dig->add($buf);
+               } elsif (!defined $r) {
+                       if ($! == EAGAIN) {
+                               poll_in($fh);
+                       } elsif ($! != EINTR) {
+                               croak "sysread: $!";
+                       } # next on EINTR
+               } else { # EOF:
+                       return $dig;
+               }
+       }
 }
 
 1;