--- /dev/null
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# async system()/qx() which takes callback
+package PublicInbox::Aspawn;
+use v5.12;
+use parent qw(Exporter);
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::Spawn qw(spawn);
+our @EXPORT_OK = qw(run_await);
+
+sub _await_cb { # awaitpid cb
+ my ($pid, $cmd, $env, $opt, $cb, @args) = @_;
+ PublicInbox::Spawn::read_out_err($opt);
+ if ($? && !$opt->{quiet}) {
+ my ($status, $sig) = ($? >> 8, $? & 127);
+ my $msg = '';
+ $msg .= " (-C=$opt->{-C})" if defined $opt->{-C};
+ $msg .= " status=$status" if $status;
+ $msg .= " signal=$sig" if $sig;
+ warn "E: @$cmd", $msg, "\n";
+ }
+ $cb->($pid, $cmd, $env, $opt, @args) if $cb;
+}
+
+sub run_await {
+ my ($cmd, $env, $opt, $cb, @args) = @_;
+ $opt->{1} //= \(my $out);
+ my $pid = spawn($cmd, $env, $opt);
+ awaitpid($pid, \&_await_cb, $cmd, $env, $opt, $cb, @args);
+ awaitpid($pid); # synchronous for non-$in_loop
+}
+
+1;
use PublicInbox::GzipFilter;
use Scalar::Util qw(blessed);
use PublicInbox::Limiter;
+use PublicInbox::Aspawn qw(run_await);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
sub _do_spawn {
my ($self, $start_cb, $limiter) = @_;
- my $err;
my ($cmd, $cmd_env, $opt) = @{delete $self->{args}};
my %o = %{$opt || {}};
$self->{limiter} = $limiter;
- foreach my $k (@PublicInbox::Spawn::RLIMITS) {
- if (defined(my $rlimit = $limiter->{$k})) {
- $o{$k} = $rlimit;
- }
+ for my $k (@PublicInbox::Spawn::RLIMITS) {
+ $o{$k} = $limiter->{$k} // next;
}
$self->{cmd} = $cmd;
$self->{-quiet} = 1 if $o{quiet};
- eval {
- # popen_rd may die on EMFILE, ENFILE
- $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
- \&waitpid_err, $self);
- $limiter->{running}++;
- $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
- };
+ $limiter->{running}++;
+ if ($start_cb) {
+ eval { # popen_rd may die on EMFILE, ENFILE
+ $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
+ \&waitpid_err, $self);
+ $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
+ };
+ } else {
+ eval { run_await($cmd, $cmd_env, \%o, \&wait_await, $self) };
+ warn "E: $@" if $@;
+ }
finish($self, $@) if $@;
}
-sub finalize ($) {
- my ($self) = @_;
+sub finalize ($;$) {
+ my ($self, $opt) = @_;
# process is done, spawn whatever's in the queue
my $limiter = delete $self->{limiter} or return;
warn "@{$self->{cmd}}: $err\n" if !$self->{-quiet};
}
- my ($env, $qx_cb, $qx_arg, $qx_buf) =
- delete @$self{qw(psgi_env qx_cb qx_arg qx_buf)};
- if ($qx_cb) {
- eval { $qx_cb->($qx_buf, $qx_arg) };
+ my ($env, $qx_cb_arg) = delete @$self{qw(psgi_env qx_cb_arg)};
+ if ($qx_cb_arg) {
+ my $cb = shift @$qx_cb_arg;
+ eval { $cb->($opt->{1}, @$qx_cb_arg) };
return unless $@;
warn "E: $@"; # hope qspawn.wcb can handle it
}
sub DESTROY { finalize($_[0]) } # ->finalize is idempotent
sub waitpid_err { # callback for awaitpid
- my (undef, $self) = @_; # $_[0]: pid
+ my (undef, $self, $opt) = @_; # $_[0]: pid
$self->{_err} = ''; # for defined check in ->finish
- if ($?) {
+ if ($?) { # FIXME: redundant
my $status = $? >> 8;
my $sig = $? & 127;
$self->{_err} .= "exit status=$status";
$self->{_err} .= " signal=$sig" if $sig;
}
- finalize($self) if !$self->{rpipe};
+ finalize($self, $opt) if !$self->{rpipe};
+}
+
+sub wait_await { # run_await cb
+ my ($pid, $cmd, $cmd_env, $opt, $self) = @_;
+ waitpid_err($pid, $self, $opt);
}
sub finish ($;$) {
}
}
-sub psgi_qx_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
- my ($self) = @_;
- my ($r, $buf);
-reread:
- $r = sysread($self->{rpipe}, $buf, 65536);
- if (!defined($r)) {
- return if $! == EAGAIN; # try again when notified
- goto reread if $! == EINTR;
- event_step($self, $!);
- } elsif (my $as = delete $self->{async}) { # PublicInbox::HTTPD::Async
- $as->async_pass($self->{psgi_env}->{'psgix.io'},
- $self->{qx_fh}, \$buf);
- } elsif ($r) { # generic PSGI:
- print { $self->{qx_fh} } $buf;
- } else { # EOF
- event_step($self, undef);
- }
-}
-
-sub psgi_qx_start {
- my ($self) = @_;
- if (my $async = $self->{psgi_env}->{'pi-httpd.async'}) {
- # PublicInbox::HTTPD::Async->new(rpipe, $cb, cb_arg, $end_obj)
- $self->{async} = $async->($self->{rpipe},
- \&psgi_qx_init_cb, $self, $self);
- # init_cb will call ->async_pass or ->close
- } else { # generic PSGI
- psgi_qx_init_cb($self) while $self->{qx_fh};
- }
-}
-
-# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls $qx_cb with
+# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
# the stdout of the given command when done; but respects the given limiter
# $env is the PSGI env. As with ``/qx; only use this when output is small
# and safe to slurp.
sub psgi_qx {
- my ($self, $env, $limiter, $qx_cb, $qx_arg) = @_;
+ my ($self, $env, $limiter, @qx_cb_arg) = @_;
$self->{psgi_env} = $env;
- my $qx_buf = '';
- open(my $qx_fh, '+>', \$qx_buf) or die; # PerlIO::scalar
- $self->{qx_cb} = $qx_cb;
- $self->{qx_arg} = $qx_arg;
- $self->{qx_fh} = $qx_fh;
- $self->{qx_buf} = \$qx_buf;
+ $self->{qx_cb_arg} = \@qx_cb_arg;
$limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
- start($self, $limiter, \&psgi_qx_start);
+ start($self, $limiter, undef);
}
# this is called on pipe EOF to reap the process, may be called
my ($self, $err) = @_; # $err: $!
warn "psgi_{return,qx} $err" if defined($err);
finish($self);
- my ($fh, $qx_fh) = delete(@$self{qw(qfh qx_fh)});
+ my $fh = delete $self->{qfh};
$fh->close if $fh; # async-only (psgi_return)
}