]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
qspawn: introduce new psgi_yield API
authorEric Wong <e@80x24.org>
Wed, 25 Oct 2023 00:29:32 +0000 (00:29 +0000)
committerEric Wong <e@80x24.org>
Wed, 25 Oct 2023 07:28:36 +0000 (07:28 +0000)
This is intended to replace psgi_return and HTTPD/Async
entirely, hopefully making our code less convoluted while
maintaining the ability to handle slow clients on
memory-constrained systems

This was made possible by the philosophy shift in commit 21a539a2df0c
(httpd/async: switch to buffering-as-fast-as-possible, 2019-06-28).

We'll still support generic PSGI via the `pull' model with a
GetlineResponse class which is similar to the old GetlineBody.

MANIFEST
lib/PublicInbox/GetlineResponse.pm [new file with mode: 0644]
lib/PublicInbox/GitHTTPBackend.pm
lib/PublicInbox/GzipFilter.pm
lib/PublicInbox/HTTP.pm
lib/PublicInbox/InputPipe.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/Qspawn.pm

index f087621cf2572985d8d7a72a8d48c0ccbe4dae36..420b40a14403d941c3d0f8c0c17d8cbc9941a122 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -204,6 +204,7 @@ lib/PublicInbox/Filter/Vger.pm
 lib/PublicInbox/Gcf2.pm
 lib/PublicInbox/Gcf2Client.pm
 lib/PublicInbox/GetlineBody.pm
+lib/PublicInbox/GetlineResponse.pm
 lib/PublicInbox/Git.pm
 lib/PublicInbox/GitAsyncCat.pm
 lib/PublicInbox/GitCredential.pm
diff --git a/lib/PublicInbox/GetlineResponse.pm b/lib/PublicInbox/GetlineResponse.pm
new file mode 100644 (file)
index 0000000..290cce7
--- /dev/null
@@ -0,0 +1,40 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# For generic PSGI servers (not public-inbox-httpd/netd) which assumes their
+# getline response bodies can be backpressure-aware for slow clients
+# This depends on rpipe being _blocking_ on getline.
+package PublicInbox::GetlineResponse;
+use v5.12;
+
+sub response {
+       my ($qsp) = @_;
+       my ($res, $rbuf);
+       do { # read header synchronously
+               sysread($qsp->{rpipe}, $rbuf, 65536);
+               $res = $qsp->parse_hdr_done($rbuf); # fills $bref
+       } until defined($res);
+       my ($wcb, $filter) = $qsp->yield_pass(undef, $res) or return;
+       my $self = $res->[2] = bless {
+               qsp => $qsp,
+               filter => $filter,
+       }, __PACKAGE__;
+       my ($bref) = @{delete $qsp->{yield_parse_hdr}};
+       $self->{rbuf} = $$bref if $$bref ne '';
+       $wcb->($res);
+}
+
+sub getline {
+       my ($self) = @_;
+       my $rpipe = $self->{qsp}->{rpipe} // do {
+               delete($self->{qsp})->finish;
+               return; # EOF was set on previous call
+       };
+       my $buf = delete($self->{rbuf}) // $rpipe->getline;
+       $buf // delete($self->{qsp}->{rpipe}); # set EOF for next call
+       $self->{filter} ? $self->{filter}->translate($buf) : $buf;
+}
+
+sub close {}
+
+1;
index edbc0157c67020c6d6baad8d96fe0d1b40319665..d7e0bcedf4ba2cc7c6fa601c1b3fd3fae1ae93d0 100644 (file)
@@ -79,7 +79,7 @@ sub serve_dumb {
        PublicInbox::WwwStatic::response($env, $h, $path, $type);
 }
 
-sub git_parse_hdr { # {parse_hdr} for Qspawn
+sub ghb_parse_hdr { # header parser for Qspawn
        my ($r, $bref, @dumb_args) = @_;
        my $res = parse_cgi_headers($r, $bref) or return; # incomplete
        $res->[0] == 403 ? serve_dumb(@dumb_args) : $res;
@@ -106,7 +106,7 @@ sub serve_smart {
        $env{PATH_TRANSLATED} = "$git->{git_dir}/$path";
        my $rdr = input_prepare($env) or return r(500);
        my $qsp = PublicInbox::Qspawn->new([qw(git http-backend)], \%env, $rdr);
-       $qsp->psgi_return($env, $limiter, \&git_parse_hdr, $env, $git, $path);
+       $qsp->psgi_yield($env, $limiter, \&ghb_parse_hdr, $env, $git, $path);
 }
 
 sub input_prepare {
index db8e8397bcf83c979f4bc1f4d51d9a41dec11de3..d6ecd5ba65e044f4b5f8ab5471028e19adf70d86 100644 (file)
@@ -123,9 +123,10 @@ sub http_out ($) {
        };
 }
 
+# returns undef if HTTP client disconnected, may return 0
+# because ->translate can return ''
 sub write {
        my $self = shift;
-       # my $ret = bytes::length($_[1]); # XXX does anybody care?
        http_out($self)->write($self->translate(@_));
 }
 
index ca162939c9f1ea47ecefe47333139eff9e42a0ed..edc88fe8bf0b2a6a7b43c2df6457eacd8467c8b6 100644 (file)
@@ -455,11 +455,12 @@ sub next_step {
 # They may be exposed to the PSGI application when the PSGI app
 # returns a CODE ref for "push"-based responses
 package PublicInbox::HTTP::Chunked;
-use strict;
+use v5.12;
 
 sub write {
        # ([$http], $buf) = @_;
-       PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1])
+       PublicInbox::HTTP::chunked_write($_[0]->[0], $_[1]);
+       $_[0]->[0]->{sock} ? length($_[1]) : undef;
 }
 
 sub close {
@@ -468,12 +469,13 @@ sub close {
 }
 
 package PublicInbox::HTTP::Identity;
-use strict;
+use v5.12;
 our @ISA = qw(PublicInbox::HTTP::Chunked);
 
 sub write {
        # ([$http], $buf) = @_;
        PublicInbox::HTTP::identity_write($_[0]->[0], $_[1]);
+       $_[0]->[0]->{sock} ? length($_[1]) : undef;
 }
 
 1;
index b38d8270f68c27312753c258336bc4c425277cae..f4d57e7de69e47c46c02e36b8c0190cd745edf23 100644 (file)
@@ -39,14 +39,16 @@ sub consume {
        if ($@) { # regular file (but not w/ select|IO::Poll backends)
                $self->{-need_rq} = 1;
                $self->requeue;
-       } elsif (-p $in || -S _) { # O_NONBLOCK for sockets and pipes
+       } elsif (do { no warnings 'unopened'; !stat($in) }) { # ProcessIONBF
+       } elsif (-p _ || -S _) { # O_NONBLOCK for sockets and pipes
                $in->blocking(0);
        } elsif (-t $in) { # isatty(3) can't use `_' stat cache
                unblock_tty($self);
        }
+       $self;
 }
 
-sub close {
+sub close { # idempotent
        my ($self) = @_;
        if (my $t = delete($self->{restore_termios})) {
                my $fd = fileno($self->{sock} // return);
@@ -60,16 +62,16 @@ sub event_step {
        my $r = sysread($self->{sock} // return, my $rbuf, 65536);
        eval {
                if ($r) {
-                       $self->{cb}->(@{$self->{args}}, $rbuf);
+                       $self->{cb}->($self, @{$self->{args}}, $rbuf);
                        $self->requeue if $self->{-need_rq};
                } elsif (defined($r)) { # EOF
-                       $self->{cb}->(@{$self->{args}}, '');
+                       $self->{cb}->($self, @{$self->{args}}, '');
                        $self->close
                } elsif ($!{EAGAIN}) { # rely on EPOLLIN
                } elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty
                        $self->requeue if $self->{-need_rq};
                } else { # another error
-                       $self->{cb}->(@{$self->{args}}, undef);
+                       $self->{cb}->($self, @{$self->{args}}, undef);
                        $self->close;
                }
        };
index 56e4c001f17c9a2f46429e2e03f1a8b503af1ec8..7bc7b2dcf87fde2bb3aa536b8572a11f4f4a8531 100644 (file)
@@ -1567,7 +1567,7 @@ sub request_umask {
 }
 
 sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin
-       my ($lei, $cb) = @_; # $_[-1] = $rbuf
+       my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf
        $_[1] // return $lei->fail("error reading stdin: $!");
        $lei->{stdin_buf} .= $_[-1];
        do_env($lei, $cb) if $_[-1] eq '';
index 9a7e8734fb7699ed6881d2bddd12c91daf82dba1..203d8f4133d032f8f42ab07a6d79091bb6f78032 100644 (file)
@@ -31,6 +31,9 @@ use PublicInbox::GzipFilter;
 use Scalar::Util qw(blessed);
 use PublicInbox::Limiter;
 use PublicInbox::Aspawn qw(run_await);
+use PublicInbox::Syscall qw(EPOLLIN);
+use PublicInbox::InputPipe;
+use Carp qw(carp confess);
 
 # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
 use Errno qw(EAGAIN EINTR);
@@ -61,7 +64,7 @@ sub _do_spawn {
        if ($start_cb) {
                eval { # popen_rd may die on EMFILE, ENFILE
                        $self->{rpipe} = popen_rd($cmd, $cmd_env, \%o,
-                                               \&waitpid_err, $self);
+                                               \&waitpid_err, $self, \%o);
                        $start_cb->($self); # EPOLL_CTL_ADD may ENOSPC/ENOMEM
                };
        } else {
@@ -126,6 +129,20 @@ sub wait_await { # run_await cb
        waitpid_err($pid, $self, $opt);
 }
 
+sub yield_chunk { # $_[-1] is sysread buffer (or undef)
+       my ($self, $ipipe) = @_;
+       if (!defined($_[-1])) {
+               warn "error reading body: $!";
+       } elsif ($_[-1] eq '') { # normal EOF
+               $self->finish;
+               $self->{qfh}->close;
+       } elsif (defined($self->{qfh}->write($_[-1]))) {
+               return; # continue while HTTP client is reading our writes
+       } # else { # HTTP client disconnected
+       delete $self->{rpipe};
+       $ipipe->close;
+}
+
 sub finish ($;$) {
        my ($self, $err) = @_;
        $self->{_err} //= $err; # only for $@
@@ -201,6 +218,39 @@ EOM
        $ret;
 }
 
+sub yield_pass {
+       my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+       my $env = $self->{psgi_env};
+       my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
+       if (ref($res) eq 'CODE') { # chain another command
+               delete $self->{rpipe};
+               $ipipe->close if $ipipe;
+               $res->($wcb);
+               $self->{passed} = 1;
+               return; # all done
+       }
+       confess("BUG: $res unhandled") if ref($res) ne 'ARRAY';
+
+       my $filter = blessed($res->[2]) && $res->[2]->can('attach') ?
+                       pop(@$res) : delete($env->{'qspawn.filter'});
+       $filter //= PublicInbox::GzipFilter::qsp_maybe($res->[1], $env);
+
+       if (scalar(@$res) == 3) { # done early (likely error or static file)
+               delete $self->{rpipe};
+               $ipipe->close if $ipipe;
+               $wcb->($res); # all done
+               return;
+       }
+       scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
+       return ($wcb, $filter) if !$ipipe; # generic PSGI
+       # streaming response
+       my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
+       $qfh = $filter->attach($qfh) if $filter;
+       my ($bref) = @{delete $self->{yield_parse_hdr}};
+       $qfh->write($$bref) if $$bref ne '';
+       $self->{qfh} = $qfh; # keep $ipipe open
+}
+
 sub psgi_return_init_cb { # this may be PublicInbox::HTTPD::Async {cb}
        my ($self) = @_;
        my $r = rd_hdr($self) or return; # incomplete
@@ -257,6 +307,55 @@ sub psgi_return_start { # may run later, much later...
        }
 }
 
+sub r500 () { [ 500, [], [ "Internal error\n" ] ] }
+
+sub parse_hdr_done ($$) {
+       my ($self) = @_;
+       my $ret;
+       if (defined $_[-1]) {
+               my ($bref, $ph_cb, @ph_arg) = @{$self->{yield_parse_hdr}};
+               $$bref .= $_[-1];
+               $ret = eval { $ph_cb->(length($_[-1]), $bref, @ph_arg) };
+               if ($@) {
+                       carp "parse_hdr (@{$self->{cmd}}): $@\n";
+                       $ret = r500();
+               } elsif (!$ret && $_[-1] eq '') {
+                       carp <<EOM;
+EOF parsing headers from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+                       $ret = r500();
+               }
+       } else {
+               carp <<EOM;
+E: parsing headers: $! from @{$self->{cmd}} ($self->{psgi_env}->{REQUEST_URI})
+EOM
+               $ret = r500();
+       }
+       $ret; # undef if headers incomplete
+}
+
+sub ipipe_cb { # InputPipe callback
+       my ($ipipe, $self) = @_; # $_[-1] rbuf
+       if ($self->{qfh}) { # already streaming
+               yield_chunk($self, $ipipe, $_[-1]);
+       } elsif (my $res = parse_hdr_done($self, $_[-1])) {
+               yield_pass($self, $ipipe, $res);
+       } # else: headers incomplete, keep reading
+}
+
+sub _yield_start { # may run later, much later...
+       my ($self) = @_;
+       if ($self->{psgi_env}->{'pi-httpd.async'}) {
+               require PublicInbox::ProcessIONBF;
+               my $rpipe = $self->{rpipe};
+               PublicInbox::ProcessIONBF->replace($rpipe);
+               PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
+       } else {
+               require PublicInbox::GetlineResponse;
+               PublicInbox::GetlineResponse::response($self);
+       }
+}
+
 # Used for streaming the stdout of one process as a PSGI response.
 #
 # $env is the PSGI env.
@@ -302,4 +401,22 @@ sub psgi_return {
        }
 }
 
+sub psgi_yield {
+       my ($self, $env, $limiter, @parse_hdr_arg)= @_;
+       $self->{psgi_env} = $env;
+       $self->{yield_parse_hdr} = [ \(my $buf = ''), @parse_hdr_arg ];
+       $limiter ||= $def_limiter ||= PublicInbox::Limiter->new(32);
+
+       # the caller already captured the PSGI write callback from
+       # the PSGI server, so we can call ->start, here:
+       $env->{'qspawn.wcb'} ? start($self, $limiter, \&_yield_start) : sub {
+               # the caller will return this sub to the PSGI server, so
+               # it can set the response callback (that is, for
+               # PublicInbox::HTTP, the chunked_wcb or identity_wcb callback),
+               # but other HTTP servers are supported:
+               $env->{'qspawn.wcb'} = $_[0];
+               start($self, $limiter, \&_yield_start);
+       }
+}
+
 1;