--- /dev/null
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# For generic PSGI servers (not public-inbox-httpd/netd) which assumes their
+# getline response bodies can be backpressure-aware for slow clients
+# This depends on rpipe being _blocking_ on getline.
+package PublicInbox::GetlineResponse;
+use v5.12;
+
+sub response {
+ my ($qsp) = @_;
+ my ($res, $rbuf);
+ do { # read header synchronously
+ sysread($qsp->{rpipe}, $rbuf, 65536);
+ $res = $qsp->parse_hdr_done($rbuf); # fills $bref
+ } until defined($res);
+ my ($wcb, $filter) = $qsp->yield_pass(undef, $res) or return;
+ my $self = $res->[2] = bless {
+ qsp => $qsp,
+ filter => $filter,
+ }, __PACKAGE__;
+ my ($bref) = @{delete $qsp->{yield_parse_hdr}};
+ $self->{rbuf} = $$bref if $$bref ne '';
+ $wcb->($res);
+}
+
+sub getline {
+ my ($self) = @_;
+ my $rpipe = $self->{qsp}->{rpipe} // do {
+ delete($self->{qsp})->finish;
+ return; # EOF was set on previous call
+ };
+ my $buf = delete($self->{rbuf}) // $rpipe->getline;
+ $buf // delete($self->{qsp}->{rpipe}); # set EOF for next call
+ $self->{filter} ? $self->{filter}->translate($buf) : $buf;
+}
+
+sub close {}
+
+1;
if ($@) { # regular file (but not w/ select|IO::Poll backends)
$self->{-need_rq} = 1;
$self->requeue;
- } elsif (-p $in || -S _) { # O_NONBLOCK for sockets and pipes
+ } elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF
+ } elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
$in->blocking(0);
} elsif (-t $in) { # isatty(3) can't use `_' stat cache
unblock_tty($self);
}
+ $self;
}
-sub close {
+sub close { # idempotent
my ($self) = @_;
if (my $t = delete($self->{restore_termios})) {
my $fd = fileno($self->{sock} // return);
my $r = sysread($self->{sock} // return, my $rbuf, 65536);
eval {
if ($r) {
- $self->{cb}->(@{$self->{args}}, $rbuf);
+ $self->{cb}->($self, @{$self->{args}}, $rbuf);
$self->requeue if $self->{-need_rq};
} elsif (defined($r)) { # EOF
- $self->{cb}->(@{$self->{args}}, '');
+ $self->{cb}->($self, @{$self->{args}}, '');
$self->close
} elsif ($!{EAGAIN}) { # rely on EPOLLIN
} elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty
$self->requeue if $self->{-need_rq};
} else { # another error
- $self->{cb}->(@{$self->{args}}, undef);
+ $self->{cb}->($self, @{$self->{args}}, undef);
$self->close;
}
};
use Scalar::Util qw(blessed);
use PublicInbox::Limiter;
use PublicInbox::Aspawn qw(run_await);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::InputPipe;
+use Carp qw(carp confess);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
use Errno qw(EAGAIN EINTR);
if ($start_cb) {
eval { # popen_rd may die on EMFILE, ENFILE
$self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
- \&waitpid_err, $self);
+ \&waitpid_err, $self, \%o);
$start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
};
} else {
waitpid_err($pid, $self, $opt);
}
+sub yield_chunk { # $_[-1] is sysread buffer (or undef)
+ my ($self, $ipipe) = @_;
+ if (!defined($_[-1])) {
+ warn "error reading body: $!";
+ } elsif ($_[-1] eq '') { # normal EOF
+ $self->finish;
+ $self->{qfh}->close;
+ } elsif (defined($self->{qfh}->write($_[-1]))) {
+ return; # continue while HTTP client is reading our writes
+ } # else { # HTTP client disconnected
+ delete $self->{rpipe};
+ $ipipe->close;
+}
+
sub finish ($;$) {
my ($self, $err) = @_;
$self->{_err} //= $err; # only for $@
$ret;
}
+sub yield_pass {
+ my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+ my $env = $self->{psgi_env};
+ my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
+ if (ref($res) eq 'CODE') { # chain another command
+ delete $self->{rpipe};
+ $ipipe->close if $ipipe;
+ $res->($wcb);
+ $self->{passed} = 1;
+ return; # all done
+ }
+ confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
+
+ my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
+ pop(@$res) : delete($env->{'qspawn.filter'});
+ $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
+
+ if (scalar(@$res) == 3) { # done early (likely error or static file)
+ delete $self->{rpipe};
+ $ipipe->close if $ipipe;
+ $wcb->($res); # all done
+ return;
+ }
+ scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
+ return ($wcb, $filter) if !$ipipe; # generic PSGI
+ # streaming response
+ my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
+ $qfh = $filter->attach($qfh) if $filter;
+ my ($bref) = @{delete $self->{yield_parse_hdr}};
+ $qfh->write($$bref) if $$bref ne '';
+ $self->{qfh} = $qfh; # keep $ipipe open
+}
+
sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
my ($self) = @_;
my $r = rd_hdr($self) or return; # incomplete
}
}
+sub r500 () { [ 500, [], [ "Internal error\n" ] ] }
+
+sub parse_hdr_done ($$) {
+ my ($self) = @_;
+ my $ret;
+ if (defined $_[-1]) {
+ my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
+ $$bref .= $_[-1];
+ $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
+ if ($@) {
+ carp "parse_hdr (@{$self->{cmd}}): $@\n";
+ $ret = r500();
+ } elsif (!$ret && $_[-1] eq '') {
+ carp <<EOM;
+EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+ $ret = r500();
+ }
+ } else {
+ carp <<EOM;
+E: parsing headers: $! from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+ $ret = r500();
+ }
+ $ret; # undef if headers incomplete
+}
+
+sub ipipe_cb { # InputPipe callback
+ my ($ipipe, $self) = @_; # $_[-1] rbuf
+ if ($self->{qfh}) { # already streaming
+ yield_chunk($self, $ipipe, $_[-1]);
+ } elsif (my $res = parse_hdr_done($self, $_[-1])) {
+ yield_pass($self, $ipipe, $res);
+ } # else: headers incomplete, keep reading
+}
+
+sub _yield_start { # may run later, much later...
+ my ($self) = @_;
+ if ($self->{psgi_env}->{'pi-httpd.async'}) {
+ require PublicInbox::ProcessIONBF;
+ my $rpipe = $self->{rpipe};
+ PublicInbox::ProcessIONBF->replace($rpipe);
+ PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
+ } else {
+ require PublicInbox::GetlineResponse;
+ PublicInbox::GetlineResponse::response($self);
+ }
+}
+
# Used for streaming the stdout of one process as a PSGI response.
#
# $env is the PSGI env.
}
}
+sub psgi_yield {
+ my ($self, $env, $limiter, @parse_hdr_arg)= @_;
+ $self->{psgi_env} = $env;
+ $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
+ $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
+
+ # the caller already captured the PSGI write callback from
+ # the PSGI server, so we can call ->start, here:
+ $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
+ # the caller will return this sub to the PSGI server, so
+ # it can set the response callback (that is, for
+ # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+ # but other HTTP servers are supported:
+ $env->{'qspawn.wcb'} = $_[0];
+ start($self, $limiter, \&_yield_start);
+ }
+}
+
1;