From: Eric Wong Date: Sat, 28 Jun 2025 11:20:10 +0000 (+0000) Subject: ds: support enqueued CODE refs with arguments X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=45235e9a336b7682e64a08b43a6e5fc326888f8d;p=thirdparty%2Fpublic-inbox.git ds: support enqueued CODE refs with arguments While we inherited support for enqueuing CODE refs from Danga::Socket, it makes sense to support enqueuing CODE refs with extra arguments specific to that CODE ref[1] instead of relying on anonymous CODE refs to capture its args.. Supporting args with CODE refs allows our ->flush_write implementation to be more generic and no longer specific to tmpio write buffering. This change may eventually allow us to chain multiple long_response calls together and eliminate the {long_cb} field. [1] we prefer CODE refs point to named subs with arguments passed to it to save RAM as opposed to constantly allocating new anonymous subs to capture local variables. --- diff --git a/lib/PublicInbox/DS.pm b/lib/PublicInbox/DS.pm index f3fdd1d55..162ebad17 100644 --- a/lib/PublicInbox/DS.pm +++ b/lib/PublicInbox/DS.pm @@ -17,8 +17,7 @@ # fields: # sock: underlying socket # rbuf: scalarref, usually undef -# wbuf: arrayref of coderefs or tmpio (autovivified)) -# (tmpio = [ GLOB, offset, [ length ] ]) +# wbuf: arrayref of coderefs or [ CODE, ARGS ] arrayref (autovivified)) package PublicInbox::DS; use strict; use v5.10.1; @@ -368,25 +367,40 @@ sub ds_close ($) { } # portable, non-thread-safe sendfile emulation (no pread, yet) -sub send_tmpio ($$) { - my ($sock, $tmpio) = @_; - - sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or return; - my $n = $tmpio->[2] // 65536; - $n = 65536 if $n > 65536; - my $to_write = sysread($tmpio->[0], my $buf, $n) // return; - my $total = 0; - while ($to_write > 0) { - if (defined(my $w = syswrite($sock, $buf, $to_write, $total))) { - $total += $w; - $to_write -= $w; - } else { - $total ? last : return; - } - } - $tmpio->[1] += $total; # offset - $tmpio->[2] -= $total if defined($tmpio->[2]); # length - $total; +# usage: push @{$self->{wbuf}}, [ \&send_io, [ $fh, $offset, $length ] ]; +# ($length is optional) +sub send_io { + my ($self, $tmpio) = @_; # tmpio = [ GLOB, offset, [ length ] ] + my $sock = $self->{sock} // return; + sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or + return drop($self, "seek($tmpio->[0], $tmpio->[1]): $!"); + my ($n, $buf, $to_write, $w, $off, $ev, $eagain); + do { + $n = ($tmpio->[2] // 65536) || return; # tmpio->[2] == 0 is EOF + $n = 65536 if $n > 65536; + $to_write = sysread($tmpio->[0], $buf, $n) // + return drop($self, "read($tmpio->[0], $n): $!"); + $to_write or return defined($tmpio->[2]) ? + drop($self, "read($tmpio->[0], $n): EOF") : + undef; + $off = 0; + do { + $w = syswrite $sock, $buf, $to_write, $off; + if (defined $w) { + $off += $w; + $to_write -= $w; + } elsif ($! == EAGAIN && + ($ev = epbit($sock, EPOLLOUT))) { + epwait($sock, $ev | EPOLLONESHOT); + unshift @{$self->{wbuf}}, [ \&send_io, $tmpio ]; + $eagain = 1; + } else { # common, unrecoverable error + return $self->close; + } + } until ($to_write == 0 || $eagain); + $tmpio->[1] += $off; + $tmpio->[2] -= $off if defined($tmpio->[2]); # [2]: length + } until ($eagain); } sub epbit ($$) { # (sock, default) @@ -403,34 +417,17 @@ sub flush_write { my ($self) = @_; my $sock = $self->{sock} or return; my $wbuf = $self->{wbuf} or return 1; - -next_buf: - while (my $bref = $wbuf->[0]) { - if (ref($bref) ne 'CODE') { # bref is tmpio - while ($sock) { - my $w = send_tmpio $sock, $bref; - if (defined $w) { - if ($w == 0) { - shift @$wbuf; - goto next_buf; - } - } elsif ($! == EAGAIN && (my $ev = epbit - $sock, EPOLLOUT)) { - epwait $sock, $ev | EPOLLONESHOT; - return 0; - } else { - return $self->close; - } - } - } else { #(ref($bref) eq 'CODE') { - shift @$wbuf; - my $before = scalar(@$wbuf); - $bref->($self); - # bref may be enqueueing more CODE to call + while (my $cb = shift @$wbuf) { + my $before = scalar(@$wbuf); + if (ref($cb) eq 'ARRAY') { # $cb->[0] example: send_io + $cb->[0]->($self, @$cb[1..$#$cb]); + } else { # (ref($cb) eq 'CODE') { + $cb->($self); + # cb may be enqueueing more CODE to call # (see accept_tls_step) - return 0 if (scalar(@$wbuf) > $before); - $sock = $self->{sock} // return; } + return 0 if (scalar(@$wbuf) > $before); # got EAGAIN + $sock = $self->{sock} // return; } # while @$wbuf delete $self->{wbuf}; @@ -509,14 +506,17 @@ sub write { if ($ref eq 'CODE') { push @$wbuf, $bref; } else { - my $tmpio = $wbuf->[-1]; - if (ref($tmpio) eq 'ARRAY' && !defined($tmpio->[2])) { - # append to tmp file buffer + my ($cb, $tmpio); + if (ref($wbuf->[-1]) eq 'ARRAY' && + (($cb, $tmpio) = @{$wbuf->[-1]}) && + $cb == \&send_io && + !defined($tmpio->[2])) { + # append to existing tmp file buffer print { $tmpio->[0] } $$bref or return drop($self, "print: $!"); } else { $tmpio = tmpio $self, $bref, 0 or return 0; - push @$wbuf, $tmpio; + push @$wbuf, [ \&send_io, $tmpio ]; } } 0; @@ -543,7 +543,7 @@ sub write { # wbuf may be an empty array if we're being called inside # ->flush_write via CODE bref: - push @{$self->{wbuf}}, $tmpio; # autovivifies + push @{$self->{wbuf}}, [ \&send_io, $tmpio ]; # autovivifies 0; } } @@ -571,7 +571,7 @@ sub _iov_write ($$@) { } else { # client disconnected return $self->close; } - push @{$self->{wbuf}}, $tmpio; # autovivifies + push @{$self->{wbuf}}, [ \&send_io, $tmpio ]; # autovivifies epwait $self->{sock}, EPOLLOUT|EPOLLONESHOT; 0; } diff --git a/lib/PublicInbox/WwwStatic.pm b/lib/PublicInbox/WwwStatic.pm index d54f4cdbd..69b658ce0 100644 --- a/lib/PublicInbox/WwwStatic.pm +++ b/lib/PublicInbox/WwwStatic.pm @@ -170,7 +170,7 @@ sub getline { # avoid buffering, by becoming the buffer! (public-inbox-httpd) if (my $tmpio = delete $self->{bypass}) { my $http = pop @$tmpio; # PublicInbox::HTTP - push @{$http->{wbuf}}, $tmpio; # [ $in, $off, $len ] + push @{$http->{wbuf}}, [ $http->can('send_io'), $tmpio ]; $http->flush_write; return; # undef, EOF }