From: Eric Wong Date: Tue, 29 Apr 2025 17:16:46 +0000 (+0000) Subject: rename InputPipe to InputStream X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=94e89a7b4623c497e7b5d6386dac7d2c71e000df;p=thirdparty%2Fpublic-inbox.git rename InputPipe to InputStream Calling it a "stream" is more accurate since it can be a stream socket, FIFO, or even a regular file; not just a pipe. --- diff --git a/MANIFEST b/MANIFEST index 7602c39df..35201a17c 100644 --- a/MANIFEST +++ b/MANIFEST @@ -232,7 +232,7 @@ lib/PublicInbox/InboxWritable.pm lib/PublicInbox/IndexHeader.pm lib/PublicInbox/Inotify.pm lib/PublicInbox/Inotify3.pm -lib/PublicInbox/InputPipe.pm +lib/PublicInbox/InputStream.pm lib/PublicInbox/Isearch.pm lib/PublicInbox/KQNotify.pm lib/PublicInbox/LEI.pm diff --git a/lib/PublicInbox/InputPipe.pm b/lib/PublicInbox/InputStream.pm similarity index 97% rename from lib/PublicInbox/InputPipe.pm rename to lib/PublicInbox/InputStream.pm index 77eae4edb..e2b5a9b8e 100644 --- a/lib/PublicInbox/InputPipe.pm +++ b/lib/PublicInbox/InputStream.pm @@ -2,7 +2,7 @@ # License: AGPL-3.0+ # for reading pipes, sockets, and TTYs off the DS event loop -package PublicInbox::InputPipe; +package PublicInbox::InputStream; use v5.12; use parent qw(PublicInbox::DS); use PublicInbox::Syscall qw(EPOLLIN); diff --git a/lib/PublicInbox/LEI.pm b/lib/PublicInbox/LEI.pm index 1446ad500..afe78e07f 100644 --- a/lib/PublicInbox/LEI.pm +++ b/lib/PublicInbox/LEI.pm @@ -1626,7 +1626,7 @@ sub request_umask { # assumes client is trusted and fast $u eq 'u' or warn "E: recv $v has no umask"; } -sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin +sub _stdin_cb { # PublicInbox::InputStream::consume callback for --stdin my (undef, $lei, $cb) = @_; # $_[-1] = $rbuf $_[1] // return $lei->fail("error reading stdin: $!"); $lei->{stdin_buf} .= $_[-1]; @@ -1635,7 +1635,7 @@ sub _stdin_cb { # PublicInbox::InputPipe::consume callback for --stdin sub slurp_stdin { my ($lei, $cb) = @_; - require PublicInbox::InputPipe; + require PublicInbox::InputStream; my $in = $lei->{0}; if (-t $in) { # run cat via script/lei and read from it $in = undef; @@ -1643,7 +1643,7 @@ sub slurp_stdin { say { $lei->{2} } '# enter query, Ctrl-D when done'; send_exec_cmd($lei, [ $lei->{0}, $wr ], ['cat'], {}); } - PublicInbox::InputPipe::consume($in, \&_stdin_cb, $lei, $cb); + PublicInbox::InputStream::consume($in, \&_stdin_cb, $lei, $cb); } 1; diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index 690860c17..bd8cd35b0 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -32,7 +32,7 @@ use Scalar::Util qw(blessed); use PublicInbox::Limiter; use PublicInbox::Aspawn qw(run_await); use PublicInbox::Syscall qw(EPOLLIN); -use PublicInbox::InputPipe; +use PublicInbox::InputStream; use Carp qw(carp confess); # n.b.: we get EAGAIN with public-inbox-httpd, and EINTR on other PSGI servers @@ -114,7 +114,7 @@ sub wait_await { # run_await cb } sub yield_chunk { # $_[-1] is sysread buffer (or undef) - my ($self, $ipipe) = @_; + my ($self, $istream) = @_; if (!defined($_[-1])) { warn "error reading body: $!"; } elsif ($_[-1] eq '') { # normal EOF @@ -124,7 +124,7 @@ sub yield_chunk { # $_[-1] is sysread buffer (or undef) return; # continue while HTTP client is reading our writes } # else { # HTTP client disconnected delete $self->{rpipe}; - $ipipe->close; + $istream->close; } sub finish ($;$) { @@ -168,12 +168,12 @@ sub psgi_qx { } sub yield_pass { - my ($self, $ipipe, $res) = @_; # $ipipe = InputPipe + my ($self, $istream, $res) = @_; # $istream = InputStream my $env = $self->{env}; my $wcb = delete $env->{'qspawn.wcb'} // confess('BUG: no qspawn.wcb'); if (ref($res) eq 'CODE') { # chain another command delete $self->{rpipe}; - $ipipe->close if $ipipe; + $istream->close if $istream; $res->($wcb); $self->{passed} = 1; return; # all done @@ -186,18 +186,18 @@ sub yield_pass { if (scalar(@$res) == 3) { # done early (likely error or static file) delete $self->{rpipe}; - $ipipe->close if $ipipe; + $istream->close if $istream; $wcb->($res); # all done return; } scalar(@$res) == 2 or confess("BUG: scalar(res) != 2: @$res"); - return ($wcb, $filter) if !$ipipe; # generic PSGI + return ($wcb, $filter) if !$istream; # generic PSGI # streaming response my $qfh = $wcb->($res); # get PublicInbox::HTTP::(Chunked|Identity) $qfh = $filter->attach($qfh) if $filter; my ($bref) = @{delete $self->{yield_parse_hdr}}; $qfh->write($$bref) if $$bref ne ''; - $self->{qfh} = $qfh; # keep $ipipe open + $self->{qfh} = $qfh; # keep $istream open } sub parse_hdr_done ($$) { @@ -223,12 +223,12 @@ EOM $ret; # undef if headers incomplete } -sub ipipe_cb { # InputPipe callback - my ($ipipe, $self) = @_; # $_[-1] rbuf +sub istream_cb { # InputStream callback + my ($istream, $self) = @_; # $_[-1] rbuf if ($self->{qfh}) { # already streaming - yield_chunk($self, $ipipe, $_[-1]); + yield_chunk($self, $istream, $_[-1]); } elsif (my $res = parse_hdr_done($self, $_[-1])) { - yield_pass($self, $ipipe, $res); + yield_pass($self, $istream, $res); } # else: headers incomplete, keep reading } @@ -236,7 +236,7 @@ sub _yield_start { # may run later, much later... my ($self) = @_; if ($self->{env}->{'pi-httpd.async'}) { my $rpipe = $self->{rpipe}; - PublicInbox::InputPipe::consume($rpipe, \&ipipe_cb, $self); + PublicInbox::InputStream::consume($rpipe, \&istream_cb, $self); } else { require PublicInbox::GetlineResponse; PublicInbox::GetlineResponse::response($self);