]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
introduce ProcessIONBF for multiplexed non-blocking IO
authorEric Wong <e@80x24.org>
Sun, 8 Oct 2023 22:11:48 +0000 (22:11 +0000)
committerEric Wong <e@80x24.org>
Sun, 8 Oct 2023 23:45:36 +0000 (23:45 +0000)
This is required for reliable epoll/kevent/poll/select
wakeup notifications, since we have no visibility into
the buffer states used internally by Perl.

We can safely use sysread here since we never use the :utf8
nor any :encoding Perl IO layers for readable pipes.

I suspect this fixes occasional failures from t/solver_git.t
when retrieving the WwwCoderepo summary.

MANIFEST
lib/PublicInbox/Git.pm
lib/PublicInbox/HTTPD/Async.pm
lib/PublicInbox/ProcessIONBF.pm [new file with mode: 0644]

index c972818faad283e075529a75c0661c7d67e30366..791d91a7422c23b7744025ec5d0543b51fe65d2d 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -319,6 +319,7 @@ lib/PublicInbox/POP3.pm
 lib/PublicInbox/POP3D.pm
 lib/PublicInbox/PktOp.pm
 lib/PublicInbox/ProcessIO.pm
+lib/PublicInbox/ProcessIONBF.pm
 lib/PublicInbox/Qspawn.pm
 lib/PublicInbox/Reply.pm
 lib/PublicInbox/RepoAtom.pm
index 94d5dcee89ec6cda81681e3c2ca834094c7f3bc5..448cfaf7085e244351e167807e7deaf67b6f428e 100644 (file)
@@ -12,7 +12,6 @@ use v5.10.1;
 use parent qw(Exporter PublicInbox::DS);
 use autodie qw(socketpair);
 use POSIX ();
-use IO::Handle; # ->blocking
 use Socket qw(AF_UNIX SOCK_STREAM);
 use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
 use Errno qw(EINTR EAGAIN);
@@ -20,6 +19,7 @@ use File::Glob qw(bsd_glob GLOB_NOSORT);
 use File::Spec ();
 use Time::HiRes qw(stat);
 use PublicInbox::Spawn qw(spawn popen_rd which);
+use PublicInbox::ProcessIONBF;
 use PublicInbox::Tmpfile;
 use IO::Poll qw(POLLIN);
 use Carp qw(croak carp);
@@ -146,7 +146,6 @@ sub _sock_cmd {
        my ($self, $batch, $err_c) = @_;
        $self->{sock} and Carp::confess('BUG: {sock} exists');
        socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, 0);
-       $s1->blocking(0);
        my $opt = { pgid => 0, 0 => $s2, 1 => $s2 };
        my $gd = $self->{git_dir};
        if ($gd =~ s!/([^/]+/[^/]+)\z!/!) {
@@ -165,7 +164,7 @@ sub _sock_cmd {
                                                $self->fail("tmpfile($id): $!");
        }
        my $pid = spawn(\@cmd, undef, $opt);
-       $self->{sock} = PublicInbox::ProcessIO->maybe_new($pid, $s1);
+       $self->{sock} = PublicInbox::ProcessIONBF->new($pid, $s1);
 }
 
 sub poll_in ($) { IO::Poll::_poll($RDTIMEO, fileno($_[0]), my $ev = POLLIN) }
@@ -626,8 +625,8 @@ sub cleanup_if_unlinked {
        my $ret = 0;
        for my $obj ($self, ($self->{ck} // ())) {
                my $sock = $obj->{sock} // next;
-               my PublicInbox::ProcessIO $pp = tied *$sock; # ProcessIO
-               my $pid = $pp->{pid} // next;
+               my PublicInbox::ProcessIONBF $p = tied *$sock; # ProcessIONBF
+               my $pid = $p->{pid} // next;
                open my $fh, '<', "/proc/$pid/maps" or return cleanup($self, 1);
                while (<$fh>) {
                        # n.b. we do not restart for unlinked multi-pack-index
index b9d2159c6caba023bd262561ff83dbdcff6a7214..b73d0c4b9f334cfd159ca69a095a0144fca365e5 100644 (file)
@@ -18,6 +18,7 @@ use v5.12;
 use parent qw(PublicInbox::DS);
 use Errno qw(EAGAIN);
 use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::ProcessIONBF;
 
 # This is called via: $env->{'pi-httpd.async'}->()
 # $io is a read-only pipe ($rpipe) for now, but may be a
@@ -37,8 +38,7 @@ sub new {
                arg => $arg, # arg for $cb
                end_obj => $end_obj, # like END{}, can ->event_step
        }, $class;
-       my $pp = tied *$io; # ProcessIO
-       $pp->{fh}->blocking(0) // die "$io->blocking(0): $!";
+       PublicInbox::ProcessIONBF->replace($io);
        $self->SUPER::new($io, EPOLLIN);
 }
 
diff --git a/lib/PublicInbox/ProcessIONBF.pm b/lib/PublicInbox/ProcessIONBF.pm
new file mode 100644 (file)
index 0000000..490e200
--- /dev/null
@@ -0,0 +1,25 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# used to support unbuffered partial reads
+package PublicInbox::ProcessIONBF;
+use v5.12;
+use parent qw(PublicInbox::ProcessIO);
+use IO::Handle; # ->blocking
+
+sub new {
+       my ($cls, $pid, $fh, @cb_arg) = @_;
+       $fh->blocking(0) // die "$fh->blocking(0): $!";
+       my $io = $cls->SUPER::maybe_new($pid, $fh, @cb_arg);
+}
+
+sub replace {
+       my ($cls, $orig) = @_;
+       my $pio = tied *$orig; # ProcessIO
+       $pio->{fh}->blocking(0) // die "$pio->{fh}->blocking(0): $!";
+       bless $pio, $cls;
+}
+
+sub READ { sysread($_[0]->{fh}, $_[1], $_[2], $_[3] // 0) }
+
+1;