]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
ds: support enqueued CODE refs with arguments
authorEric Wong <e@80x24.org>
Sat, 28 Jun 2025 11:20:10 +0000 (11:20 +0000)
committerEric Wong <e@80x24.org>
Mon, 30 Jun 2025 08:20:37 +0000 (08:20 +0000)
While we inherited support for enqueuing CODE refs from
Danga::Socket, it makes sense to support enqueuing CODE
refs with extra arguments specific to that CODE ref[1]
instead of relying on anonymous CODE refs to capture its
args..

Supporting args with CODE refs allows our ->flush_write
implementation to be more generic and no longer specific to
tmpio write buffering.  This change may eventually allow us
to chain multiple long_response calls together and eliminate
the {long_cb} field.

[1] we prefer CODE refs point to named subs with arguments
    passed to it to save RAM as opposed to constantly allocating
    new anonymous subs to capture local variables.

lib/PublicInbox/DS.pm
lib/PublicInbox/WwwStatic.pm

index f3fdd1d5517cab44dbcfbdf33eb1dba103ec7b43..162ebad1743a6132d14f9cc2ce7b5c1a97ef0179 100644 (file)
@@ -17,8 +17,7 @@
 # fields:
 # sock: underlying socket
 # rbuf: scalarref, usually undef
-# wbuf: arrayref of coderefs or tmpio (autovivified))
-#      (tmpio = [ GLOB, offset, [ length ] ])
+# wbuf: arrayref of coderefs or [ CODE, ARGS ] arrayref (autovivified))
 package PublicInbox::DS;
 use strict;
 use v5.10.1;
@@ -368,25 +367,40 @@ sub ds_close ($) {
 }
 
 # portable, non-thread-safe sendfile emulation (no pread, yet)
-sub send_tmpio ($$) {
-       my ($sock, $tmpio) = @_;
-
-       sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or return;
-       my $n = $tmpio->[2] // 65536;
-       $n = 65536 if $n > 65536;
-       my $to_write = sysread($tmpio->[0], my $buf, $n) // return;
-       my $total = 0;
-       while ($to_write > 0) {
-               if (defined(my $w = syswrite($sock, $buf, $to_write, $total))) {
-                       $total += $w;
-                       $to_write -= $w;
-               } else {
-                       $total ? last : return;
-               }
-       }
-       $tmpio->[1] += $total; # offset
-       $tmpio->[2] -= $total if defined($tmpio->[2]); # length
-       $total;
+# usage: push @{$self->{wbuf}}, [ \&send_io, [ $fh, $offset, $length ] ];
+# ($length is optional)
+sub send_io {
+       my ($self, $tmpio) = @_; # tmpio = [ GLOB, offset, [ length ] ]
+       my $sock = $self->{sock} // return;
+       sysseek($tmpio->[0], $tmpio->[1], SEEK_SET) or
+               return drop($self, "seek($tmpio->[0], $tmpio->[1]): $!");
+       my ($n, $buf, $to_write, $w, $off, $ev, $eagain);
+       do {
+               $n = ($tmpio->[2] // 65536) || return; # tmpio->[2] == 0 is EOF
+               $n = 65536 if $n > 65536;
+               $to_write = sysread($tmpio->[0], $buf, $n) //
+                       return drop($self, "read($tmpio->[0], $n): $!");
+               $to_write or return defined($tmpio->[2]) ?
+                               drop($self, "read($tmpio->[0], $n): EOF") :
+                               undef;
+               $off = 0;
+               do {
+                       $w = syswrite $sock, $buf, $to_write, $off;
+                       if (defined $w) {
+                               $off += $w;
+                               $to_write -= $w;
+                       } elsif ($! == EAGAIN &&
+                                       ($ev = epbit($sock, EPOLLOUT))) {
+                               epwait($sock, $ev | EPOLLONESHOT);
+                               unshift @{$self->{wbuf}}, [ \&send_io, $tmpio ];
+                               $eagain = 1;
+                       } else { # common, unrecoverable error
+                               return $self->close;
+                       }
+               } until ($to_write == 0 || $eagain);
+               $tmpio->[1] += $off;
+               $tmpio->[2] -= $off if defined($tmpio->[2]); # [2]: length
+       } until ($eagain);
 }
 
 sub epbit ($$) { # (sock, default)
@@ -403,34 +417,17 @@ sub flush_write {
        my ($self) = @_;
        my $sock = $self->{sock} or return;
        my $wbuf = $self->{wbuf} or return 1;
-
-next_buf:
-       while (my $bref = $wbuf->[0]) {
-               if (ref($bref) ne 'CODE') { # bref is tmpio
-                       while ($sock) {
-                               my $w = send_tmpio $sock, $bref;
-                               if (defined $w) {
-                                       if ($w == 0) {
-                                               shift @$wbuf;
-                                               goto next_buf;
-                                       }
-                               } elsif ($! == EAGAIN && (my $ev = epbit
-                                                       $sock, EPOLLOUT)) {
-                                       epwait $sock, $ev | EPOLLONESHOT;
-                                       return 0;
-                               } else {
-                                       return $self->close;
-                               }
-                       }
-               } else { #(ref($bref) eq 'CODE') {
-                       shift @$wbuf;
-                       my $before = scalar(@$wbuf);
-                       $bref->($self);
-                       # bref may be enqueueing more CODE to call
+       while (my $cb = shift @$wbuf) {
+               my $before = scalar(@$wbuf);
+               if (ref($cb) eq 'ARRAY') { # $cb->[0] example: send_io
+                       $cb->[0]->($self, @$cb[1..$#$cb]);
+               } else { # (ref($cb) eq 'CODE') {
+                       $cb->($self);
+                       # cb may be enqueueing more CODE to call
                        # (see accept_tls_step)
-                       return 0 if (scalar(@$wbuf) > $before);
-                       $sock = $self->{sock} // return;
                }
+               return 0 if (scalar(@$wbuf) > $before); # got EAGAIN
+               $sock = $self->{sock} // return;
        } # while @$wbuf
 
        delete $self->{wbuf};
@@ -509,14 +506,17 @@ sub write {
                if ($ref eq 'CODE') {
                        push @$wbuf, $bref;
                } else {
-                       my $tmpio = $wbuf->[-1];
-                       if (ref($tmpio) eq 'ARRAY' && !defined($tmpio->[2])) {
-                               # append to tmp file buffer
+                       my ($cb, $tmpio);
+                       if (ref($wbuf->[-1]) eq 'ARRAY' &&
+                                       (($cb, $tmpio) = @{$wbuf->[-1]}) &&
+                                       $cb == \&send_io &&
+                                       !defined($tmpio->[2])) {
+                               # append to existing tmp file buffer
                                print { $tmpio->[0] } $$bref or
                                        return drop($self, "print: $!");
                        } else {
                                $tmpio = tmpio $self, $bref, 0 or return 0;
-                               push @$wbuf, $tmpio;
+                               push @$wbuf, [ \&send_io, $tmpio ];
                        }
                }
                0;
@@ -543,7 +543,7 @@ sub write {
 
                # wbuf may be an empty array if we're being called inside
                # ->flush_write via CODE bref:
-               push @{$self->{wbuf}}, $tmpio; # autovivifies
+               push @{$self->{wbuf}}, [ \&send_io, $tmpio ]; # autovivifies
                0;
        }
 }
@@ -571,7 +571,7 @@ sub _iov_write ($$@) {
        } else { # client disconnected
                return $self->close;
        }
-       push @{$self->{wbuf}}, $tmpio; # autovivifies
+       push @{$self->{wbuf}}, [ \&send_io, $tmpio ]; # autovivifies
        epwait $self->{sock}, EPOLLOUT|EPOLLONESHOT;
        0;
 }
index d54f4cdbd061951270976190f60c379e1307a401..69b658ce0d2cbf0b587517ce6f653e97fe077521 100644 (file)
@@ -170,7 +170,7 @@ sub getline {
        # avoid buffering, by becoming the buffer! (public-inbox-httpd)
        if (my $tmpio = delete $self->{bypass}) {
                my $http = pop @$tmpio; # PublicInbox::HTTP
-               push @{$http->{wbuf}}, $tmpio; # [ $in, $off, $len ]
+               push @{$http->{wbuf}}, [ $http->can('send_io'), $tmpio ];
                $http->flush_write;
                return; # undef, EOF
        }