$u eq 'u' or warn "E: recv $v has no umask";
}
-sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin
+sub _stdin_cb { # PublicInbox::InputStream::consume callback for --stdin
my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf
$_[1] // return $lei->fail("error reading stdin: $!");
$lei->{stdin_buf} .= $_[-1];
sub slurp_stdin {
my ($lei, $cb) = @_;
- require PublicInbox::InputPipe;
+ require PublicInbox::InputStream;
my $in = $lei->{0};
if (-t $in) { # run cat via script/lei and read from it
$in = undef;
say { $lei->{2} } '# enter query, Ctrl-D when done';
send_exec_cmd($lei, [ $lei->{0}, $wr ], ['cat'], {});
}
- PublicInbox::InputPipe::consume($in, \&_stdin_cb, $lei, $cb);
+ PublicInbox::InputStream::consume($in, \&_stdin_cb, $lei, $cb);
}
1;
use PublicInbox::Limiter;
use PublicInbox::Aspawn qw(run_await);
use PublicInbox::Syscall qw(EPOLLIN);
-use PublicInbox::InputPipe;
+use PublicInbox::InputStream;
use Carp qw(carp confess);
# n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers
}
sub yield_chunk { # $_[-1] is sysread buffer (or undef)
- my ($self, $ipipe) = @_;
+ my ($self, $istream) = @_;
if (!defined($_[-1])) {
warn "error reading body: $!";
} elsif ($_[-1] eq '') { # normal EOF
return; # continue while HTTP client is reading our writes
} # else { # HTTP client disconnected
delete $self->{rpipe};
- $ipipe->close;
+ $istream->close;
}
sub finish ($;$) {
}
sub yield_pass {
- my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe
+ my ($self, $istream, $res) = @_; # $istream = InputStream
my $env = $self->{env};
my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb');
if (ref($res) eq 'CODE') { # chain another command
delete $self->{rpipe};
- $ipipe->close if $ipipe;
+ $istream->close if $istream;
$res->($wcb);
$self->{passed} = 1;
return; # all done
if (scalar(@$res) == 3) { # done early (likely error or static file)
delete $self->{rpipe};
- $ipipe->close if $ipipe;
+ $istream->close if $istream;
$wcb->($res); # all done
return;
}
scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res");
- return ($wcb, $filter) if !$ipipe; # generic PSGI
+ return ($wcb, $filter) if !$istream; # generic PSGI
# streaming response
my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity)
$qfh = $filter->attach($qfh) if $filter;
my ($bref) = @{delete $self->{yield_parse_hdr}};
$qfh->write($$bref) if $$bref ne '';
- $self->{qfh} = $qfh; # keep $ipipe open
+ $self->{qfh} = $qfh; # keep $istream open
}
sub parse_hdr_done ($$) {
$ret; # undef if headers incomplete
}
-sub ipipe_cb { # InputPipe callback
- my ($ipipe, $self) = @_; # $_[-1] rbuf
+sub istream_cb { # InputStream callback
+ my ($istream, $self) = @_; # $_[-1] rbuf
if ($self->{qfh}) { # already streaming
- yield_chunk($self, $ipipe, $_[-1]);
+ yield_chunk($self, $istream, $_[-1]);
} elsif (my $res = parse_hdr_done($self, $_[-1])) {
- yield_pass($self, $ipipe, $res);
+ yield_pass($self, $istream, $res);
} # else: headers incomplete, keep reading
}
my ($self) = @_;
if ($self->{env}->{'pi-httpd.async'}) {
my $rpipe = $self->{rpipe};
- PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self);
+ PublicInbox::InputStream::consume($rpipe, \&istream_cb, $self);
} else {
require PublicInbox::GetlineResponse;
PublicInbox::GetlineResponse::response($self);