# fields:
# sock: underlying socket
# rbuf: scalarref, usually undef
-# wbuf: arrayref of coderefs or tmpio (autovivified))
-# (tmpio = [ GLOB, offset, [ length ] ])
+# wbuf: arrayref of coderefs or [ CODE, ARGS ] arrayref (autovivified))
package PublicInbox::DS;
use strict;
use v5.10.1;
}
# portable, non-thread-safe sendfile emulation (no pread, yet)
-sub send_tmpio ($$) {
- my ($sock, $tmpio) = @_;
-
- sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or return;
- my $n = $tmpio->[2] // 65536;
- $n = 65536 if $n > 65536;
- my $to_write = sysread($tmpio->[0], my $buf, $n) // return;
- my $total = 0;
- while ($to_write > 0) {
- if (defined(my $w = syswrite($sock, $buf, $to_write, $total))) {
- $total += $w;
- $to_write -= $w;
- } else {
- $total ? last : return;
- }
- }
- $tmpio->[1] += $total; # offset
- $tmpio->[2] -= $total if defined($tmpio->[2]); # length
- $total;
+# usage: push @{$self->{wbuf}}, [ \&send_io, [ $fh, $offset, $length ] ];
+# ($length is optional)
+sub send_io {
+ my ($self, $tmpio) = @_; # tmpio = [ GLOB, offset, [ length ] ]
+ my $sock = $self->{sock} // return;
+ sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or
+ return drop($self, "seek($tmpio->[0], $tmpio->[1]): $!");
+ my ($n, $buf, $to_write, $w, $off, $ev, $eagain);
+ do {
+ $n = ($tmpio->[2] // 65536) || return; # tmpio->[2] == 0 is EOF
+ $n = 65536 if $n > 65536;
+ $to_write = sysread($tmpio->[0], $buf, $n) //
+ return drop($self, "read($tmpio->[0], $n): $!");
+ $to_write or return defined($tmpio->[2]) ?
+ drop($self, "read($tmpio->[0], $n): EOF") :
+ undef;
+ $off = 0;
+ do {
+ $w = syswrite $sock, $buf, $to_write, $off;
+ if (defined $w) {
+ $off += $w;
+ $to_write -= $w;
+ } elsif ($! == EAGAIN &&
+ ($ev = epbit($sock, EPOLLOUT))) {
+ epwait($sock, $ev | EPOLLONESHOT);
+ unshift @{$self->{wbuf}}, [ \&send_io, $tmpio ];
+ $eagain = 1;
+ } else { # common, unrecoverable error
+ return $self->close;
+ }
+ } until ($to_write == 0 || $eagain);
+ $tmpio->[1] += $off;
+ $tmpio->[2] -= $off if defined($tmpio->[2]); # [2]: length
+ } until ($eagain);
}
sub epbit ($$) { # (sock, default)
my ($self) = @_;
my $sock = $self->{sock} or return;
my $wbuf = $self->{wbuf} or return 1;
-
-next_buf:
- while (my $bref = $wbuf->[0]) {
- if (ref($bref) ne 'CODE') { # bref is tmpio
- while ($sock) {
- my $w = send_tmpio $sock, $bref;
- if (defined $w) {
- if ($w == 0) {
- shift @$wbuf;
- goto next_buf;
- }
- } elsif ($! == EAGAIN && (my $ev = epbit
- $sock, EPOLLOUT)) {
- epwait $sock, $ev | EPOLLONESHOT;
- return 0;
- } else {
- return $self->close;
- }
- }
- } else { #(ref($bref) eq 'CODE') {
- shift @$wbuf;
- my $before = scalar(@$wbuf);
- $bref->($self);
- # bref may be enqueueing more CODE to call
+ while (my $cb = shift @$wbuf) {
+ my $before = scalar(@$wbuf);
+ if (ref($cb) eq 'ARRAY') { # $cb->[0] example: send_io
+ $cb->[0]->($self, @$cb[1..$#$cb]);
+ } else { # (ref($cb) eq 'CODE') {
+ $cb->($self);
+ # cb may be enqueueing more CODE to call
# (see accept_tls_step)
- return 0 if (scalar(@$wbuf) > $before);
- $sock = $self->{sock} // return;
}
+ return 0 if (scalar(@$wbuf) > $before); # got EAGAIN
+ $sock = $self->{sock} // return;
} # while @$wbuf
delete $self->{wbuf};
if ($ref eq 'CODE') {
push @$wbuf, $bref;
} else {
- my $tmpio = $wbuf->[-1];
- if (ref($tmpio) eq 'ARRAY' && !defined($tmpio->[2])) {
- # append to tmp file buffer
+ my ($cb, $tmpio);
+ if (ref($wbuf->[-1]) eq 'ARRAY' &&
+ (($cb, $tmpio) = @{$wbuf->[-1]}) &&
+ $cb == \&send_io &&
+ !defined($tmpio->[2])) {
+ # append to existing tmp file buffer
print { $tmpio->[0] } $$bref or
return drop($self, "print: $!");
} else {
$tmpio = tmpio $self, $bref, 0 or return 0;
- push @$wbuf, $tmpio;
+ push @$wbuf, [ \&send_io, $tmpio ];
}
}
0;
# wbuf may be an empty array if we're being called inside
# ->flush_write via CODE bref:
- push @{$self->{wbuf}}, $tmpio; # autovivifies
+ push @{$self->{wbuf}}, [ \&send_io, $tmpio ]; # autovivifies
0;
}
}
} else { # client disconnected
return $self->close;
}
- push @{$self->{wbuf}}, $tmpio; # autovivifies
+ push @{$self->{wbuf}}, [ \&send_io, $tmpio ]; # autovivifies
epwait $self->{sock}, EPOLLOUT|EPOLLONESHOT;
0;
}