]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
psgi_qx: use a temporary file rather than pipe
authorEric Wong <e@80x24.org>
Wed, 25 Oct 2023 00:29:26 +0000 (00:29 +0000)
committerEric Wong <e@80x24.org>
Wed, 25 Oct 2023 07:28:31 +0000 (07:28 +0000)
A pipe requires more context switches, syscalls, and code to
deal with unpredictable pipe EOF vs waitpid ordering.  So just
use the new spawn/aspawn features to automatically handle
slurping output into a string.

MANIFEST
lib/PublicInbox/Aspawn.pm [new file with mode: 0644]
lib/PublicInbox/Qspawn.pm

index dcce801cecf9a11aea7b3d1c67a878c321eecb42..f087621cf2572985d8d7a72a8d48c0ccbe4dae36 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -161,6 +161,7 @@ lib/PublicInbox/AddressPP.pm
 lib/PublicInbox/Admin.pm
 lib/PublicInbox/AdminEdit.pm
 lib/PublicInbox/AltId.pm
+lib/PublicInbox/Aspawn.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
 lib/PublicInbox/CidxComm.pm
diff --git a/lib/PublicInbox/Aspawn.pm b/lib/PublicInbox/Aspawn.pm
new file mode 100644 (file)
index 0000000..49f8651
--- /dev/null
@@ -0,0 +1,34 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# async system()/qx() which takes callback
+package PublicInbox::Aspawn;
+use v5.12;
+use parent qw(Exporter);
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::Spawn qw(spawn);
+our @EXPORT_OK = qw(run_await);
+
+sub _await_cb { # awaitpid cb
+       my ($pid, $cmd, $env, $opt, $cb, @args) = @_;
+       PublicInbox::Spawn::read_out_err($opt);
+       if ($? && !$opt->{quiet}) {
+               my ($status, $sig) = ($? >> 8, $? & 127);
+               my $msg = '';
+               $msg .= " (-C=$opt->{-C})" if defined $opt->{-C};
+               $msg .= " status=$status" if $status;
+               $msg .= " signal=$sig" if $sig;
+               warn "E: @$cmd", $msg, "\n";
+       }
+       $cb->($pid, $cmd, $env, $opt, @args) if $cb;
+}
+
+sub run_await {
+       my ($cmd, $env, $opt, $cb, @args) = @_;
+       $opt->{1} //= \(my $out);
+       my $pid = spawn($cmd, $env, $opt);
+       awaitpid($pid, \&_await_cb, $cmd, $env, $opt, $cb, @args);
+       awaitpid($pid); # synchronous for non-$in_loop
+}
+
+1;
index a4d78e495e5f4014116c04f722d7d1c3d16868a6..59d5ed40285d41cc8dca35c91b65fa14f3d1df92 100644 (file)
@@ -30,6 +30,7 @@ use PublicInbox::Spawn qw(popen_rd);
 use PublicInbox::GzipFilter;
 use Scalar::Util qw(blessed);
 use PublicInbox::Limiter;
+use PublicInbox::Aspawn qw(run_await);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -48,29 +49,30 @@ sub new {
 
 sub _do_spawn {
        my ($self, $start_cb, $limiter) = @_;
-       my $err;
        my ($cmd, $cmd_env, $opt) = @{delete $self->{args}};
        my %o = %{$opt || {}};
        $self->{limiter} = $limiter;
-       foreach my $k (@PublicInbox::Spawn::RLIMITS) {
-               if (defined(my $rlimit = $limiter->{$k})) {
-                       $o{$k} = $rlimit;
-               }
+       for my $k (@PublicInbox::Spawn::RLIMITS) {
+               $o{$k} = $limiter->{$k} // next;
        }
        $self->{cmd} = $cmd;
        $self->{-quiet} = 1 if $o{quiet};
-       eval {
-               # popen_rd may die on EMFILE, ENFILE
-               $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
-                                       \&waitpid_err, $self);
-               $limiter->{running}++;
-               $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
-       };
+       $limiter->{running}++;
+       if ($start_cb) {
+               eval { # popen_rd may die on EMFILE, ENFILE
+                       $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
+                                               \&waitpid_err, $self);
+                       $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
+               };
+       } else {
+               eval { run_await($cmd, $cmd_env, \%o, \&wait_await, $self) };
+               warn "E: $@" if $@;
+       }
        finish($self, $@) if $@;
 }
 
-sub finalize ($) {
-       my ($self) = @_;
+sub finalize ($;$) {
+       my ($self, $opt) = @_;
 
        # process is done, spawn whatever's in the queue
        my $limiter = delete $self->{limiter} or return;
@@ -89,10 +91,10 @@ sub finalize ($) {
                warn "@{$self->{cmd}}: $err\n" if !$self->{-quiet};
        }
 
-       my ($env, $qx_cb, $qx_arg, $qx_buf) =
-               delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
-       if ($qx_cb) {
-               eval { $qx_cb->($qx_buf, $qx_arg) };
+       my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
+       if ($qx_cb_arg) {
+               my $cb = shift @$qx_cb_arg;
+               eval { $cb->($opt->{1}, @$qx_cb_arg) };
                return unless $@;
                warn "E: $@"; # hope qspawn.wcb can handle it
        }
@@ -108,15 +110,20 @@ sub finalize ($) {
 sub DESTROY { finalize($_[0]) } # ->finalize is idempotent
 
 sub waitpid_err { # callback for awaitpid
-       my (undef, $self) = @_; # $_[0]: pid
+       my (undef, $self, $opt) = @_; # $_[0]: pid
        $self->{_err} = ''; # for defined check in ->finish
-       if ($?) {
+       if ($?) { # FIXME: redundant
                my $status = $? >> 8;
                my $sig = $? & 127;
                $self->{_err} .= "exit status=$status";
                $self->{_err} .= " signal=$sig" if $sig;
        }
-       finalize($self) if !$self->{rpipe};
+       finalize($self, $opt) if !$self->{rpipe};
+}
+
+sub wait_await { # run_await cb
+       my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
+       waitpid_err($pid, $self, $opt);
 }
 
 sub finish ($;$) {
@@ -140,52 +147,16 @@ sub start ($$$) {
        }
 }
 
-sub psgi_qx_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
-       my ($self) = @_;
-       my ($r, $buf);
-reread:
-       $r = sysread($self->{rpipe}, $buf, 65536);
-       if (!defined($r)) {
-               return if $! == EAGAIN; # try again when notified
-               goto reread if $! == EINTR;
-               event_step($self, $!);
-       } elsif (my $as = delete $self->{async}) { # PublicInbox::HTTPD::Async
-               $as->async_pass($self->{psgi_env}->{'psgix.io'},
-                               $self->{qx_fh}, \$buf);
-       } elsif ($r) { # generic PSGI:
-               print { $self->{qx_fh} } $buf;
-       } else { # EOF
-               event_step($self, undef);
-       }
-}
-
-sub psgi_qx_start {
-       my ($self) = @_;
-       if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
-               # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj)
-               $self->{async} = $async->($self->{rpipe},
-                                       \&psgi_qx_init_cb, $self, $self);
-               # init_cb will call ->async_pass or ->close
-       } else { # generic PSGI
-               psgi_qx_init_cb($self) while $self->{qx_fh};
-       }
-}
-
-# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
+# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
 # the stdout of the given command when done; but respects the given limiter
 # $env is the PSGI env.  As with ``/qx; only use this when output is small
 # and safe to slurp.
 sub psgi_qx {
-       my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+       my ($self, $env, $limiter, @qx_cb_arg) = @_;
        $self->{psgi_env} = $env;
-       my $qx_buf = '';
-       open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
-       $self->{qx_cb} = $qx_cb;
-       $self->{qx_arg} = $qx_arg;
-       $self->{qx_fh} = $qx_fh;
-       $self->{qx_buf} = \$qx_buf;
+       $self->{qx_cb_arg} = \@qx_cb_arg;
        $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
-       start($self, $limiter, \&psgi_qx_start);
+       start($self, $limiter, undef);
 }
 
 # this is called on pipe EOF to reap the process, may be called
@@ -195,7 +166,7 @@ sub event_step {
        my ($self, $err) = @_; # $err: $!
        warn "psgi_{return,qx} $err" if defined($err);
        finish($self);
-       my ($fh, $qx_fh) = delete(@$self{qw(qfh qx_fh)});
+       my $fh = delete $self->{qfh};
        $fh->close if $fh; # async-only (psgi_return)
 }