# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-# for reading pipes and sockets off the DS event loop
+# for reading pipes, sockets, and TTYs off the DS event loop
package PublicInbox::InputPipe;
use v5.12;
use parent qw(PublicInbox::DS);
-use PublicInbox::Syscall qw(EPOLLIN EPOLLET);
+use PublicInbox::Syscall qw(EPOLLIN);
sub consume {
my ($in, $cb, @args) = @_;
my $self = bless { cb => $cb, args => \@args }, __PACKAGE__;
- eval { $self->SUPER::new($in, EPOLLIN|EPOLLET) };
- return $self->requeue if $@; # regular file
- $in->blocking(0); # pipe or socket
+ eval { $self->SUPER::new($in, EPOLLIN) };
+ if ($@) { # regular file (but not w/ select|IO::Poll backends)
+ $self->{-need_rq} = 1;
+ $self->requeue;
+ } elsif (-p $in || -s _) { # O_NONBLOCK for sockets and pipes
+ $in->blocking(0);
+ } # TODO: tty
+}
+
+sub close {
+ my ($self) = @_;
+ $self->{-need_rq} ? delete($self->{sock}) : $self->SUPER::close
}
sub event_step {
my ($self) = @_;
my $r = sysread($self->{sock} // return, my $rbuf, 65536);
- if ($r) {
- $self->{cb}->(@{$self->{args}}, $rbuf);
- return $self->requeue; # may be regular file or pipe
- }
- if (defined($r)) { # EOF
- $self->{cb}->(@{$self->{args}}, '');
- } elsif ($!{EAGAIN}) {
- return;
- } else { # another error
- $self->{cb}->(@{$self->{args}}, undef)
+ eval {
+ if ($r) {
+ $self->{cb}->(@{$self->{args}}, $rbuf);
+ $self->requeue if $self->{-need_rq};
+ } elsif (defined($r)) { # EOF
+ $self->{cb}->(@{$self->{args}}, '');
+ $self->close
+ } elsif ($!{EAGAIN}) { # rely on EPOLLIN
+ } elsif ($!{EINTR}) { # rely on EPOLLIN for sockets/pipes/tty
+ $self->requeue if $self->{-need_rq};
+ } else { # another error
+ $self->{cb}->(@{$self->{args}}, undef);
+ $self->close;
+ }
+ };
+ if ($@) {
+ warn "E: $@";
+ $self->close;
}
- $self->{sock}->blocking ? delete($self->{sock}) : $self->close
}
1;