]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
use sendmsg w/ MSG_MORE to reduce syscalls
authorEric Wong <e@80x24.org>
Wed, 19 Jun 2024 23:41:02 +0000 (23:41 +0000)
committerEric Wong <e@80x24.org>
Thu, 20 Jun 2024 22:41:57 +0000 (22:41 +0000)
In places where we made multiple send(..., MSG_MORE) calls in
quick succession, we now use sendmsg(2) to provide the same
semantics with fewer syscalls.  While this may be less efficient
inside the kernel for small messages, syscalls are expensive
nowadays and we can avoid userspace copies and large allocations
when streaming large HTTP chunks in /T/, /t/, and t.mbox.gz
endpoints.

This allows *BSD systems lacking MSG_MORE to save some syscalls
when writing HTTP chunked encoding, among other things.

MANIFEST
lib/PublicInbox/DS.pm
lib/PublicInbox/DSdeflate.pm
lib/PublicInbox/HTTP.pm
lib/PublicInbox/IMAP.pm
lib/PublicInbox/Syscall.pm
t/syscall.t [new file with mode: 0644]

index 5796e05b6774b1a57f91b37db36882776df1e6a5..3fc01a435422f4152c261d5d410db12cae3e8183 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -610,6 +610,7 @@ t/solve/bare.patch
 t/solver_git.t
 t/spamcheck_spamc.t
 t/spawn.t
+t/syscall.t
 t/tail_notify.t
 t/thread-cycle.t
 t/thread-index-gap.t
index 17a8a1df51803aa3e2092999f952ba0f885a064c..c5f448606f3d352d1bf9e3bec2f79b951c122250 100644 (file)
@@ -35,7 +35,9 @@ use PublicInbox::Select;
 use PublicInbox::OnDestroy;
 use Errno qw(EAGAIN EINVAL ECHILD);
 use Carp qw(carp croak);
+use List::Util qw(sum);
 our @EXPORT_OK = qw(now msg_more awaitpid add_timer add_uniq_timer);
+my $sendmsg_more = PublicInbox::Syscall->can('sendmsg_more');
 
 my $nextq; # queue for next_tick
 my $reap_armed;
@@ -471,8 +473,8 @@ sub drop ($@) {
        undef;
 }
 
-sub tmpio ($$$) {
-       my ($self, $bref, $off) = @_;
+sub tmpio ($$$;@) {
+       my ($self, $bref, $off, @rest) = @_;
        my $fh = tmpfile 'wbuf', $self->{sock}, 1 or
                return drop $self, "tmpfile $!";
        $fh->autoflush(1);
@@ -480,6 +482,7 @@ sub tmpio ($$$) {
        my $n = syswrite($fh, $$bref, $len, $off) //
                return drop $self, "write ($len): $!";
        $n == $len or return drop $self, "wrote $n < $len bytes";
+       @rest and (print $fh @rest or return drop $self, "print rest: $!");
        [ $fh, 0 ] # [1] = offset, [2] = length, not set by us
 }
 
@@ -548,30 +551,43 @@ sub write {
        }
 }
 
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-
-sub msg_more ($$) {
-       my $self = $_[0]; # $_[1] = buf
-       my ($sock, $wbuf, $n, $nlen, $tmpio);
+sub msg_more ($@) {
+       my $self = shift;
+       my ($sock, $wbuf);
        $sock = $self->{sock} or return 1;
        $wbuf = $self->{wbuf};
 
-       if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) &&
+       if ($sendmsg_more && (!defined($wbuf) || !scalar(@$wbuf)) &&
                !$sock->can('stop_SSL')) {
-               $n = send($sock, $_[1], MSG_MORE);
-               if (defined $n) {
-                       $nlen = length($_[1]) - $n;
-                       return 1 if $nlen == 0; # all done!
-                       # queue up the unwritten substring:
-                       $tmpio = tmpio($self, \($_[1]), $n) or return 0;
-                       push @{$self->{wbuf}}, $tmpio; # autovivifies
-                       epwait $sock, EPOLLOUT|EPOLLONESHOT;
-                       return 0;
+               my ($s, $tip, $tmpio);
+               $s = $sendmsg_more->($sock, @_);
+               if (defined $s) {
+                       my $exp = sum(map length, @_);
+                       return 1 if $s == $exp;
+                       while (@_) {
+                               $tip = shift;
+                               if ($s >= length($tip)) { # fully written
+                                       $s -= length($tip);
+                               } else { # first partial write
+                                       $tmpio = tmpio $self, \$tip, $s, @_
+                                               or return 0;
+                                       last;
+                               }
+                       }
+                       $tmpio // return drop $self, "BUG: tmpio on $s != $exp";
+               } elsif ($! == EAGAIN) {
+                       $tip = shift;
+                       $tmpio = tmpio $self, \$tip, 0, @_ or return 0;
+               } else { # client disconnected
+                       return $self->close;
                }
+               push @{$self->{wbuf}}, $tmpio; # autovivifies
+               epwait $sock, EPOLLOUT|EPOLLONESHOT;
+               0;
+       } else {
+               # don't redispatch into NNTPdeflate::write
+               PublicInbox::DS::write($self, join('', @_));
        }
-
-       # don't redispatch into NNTPdeflate::write
-       PublicInbox::DS::write($self, \($_[1]));
 }
 
 # return true if complete, false if incomplete (or failure)
index 539adf0fbfd47f1097b6ae89952534835cac7006..a9f9693bda809981a6f71fdef756f5a75f9a0bef 100644 (file)
@@ -91,12 +91,15 @@ sub do_read ($$$$) {
 }
 
 # override PublicInbox::DS::msg_more
-sub msg_more ($$) {
-       my $self = $_[0];
+sub msg_more ($@) {
+       my $self = shift;
 
        # $_[1] may be a reference or not for ->deflate
-       my $err = $zout->deflate($_[1], $zbuf);
-       $err == Z_OK or die "->deflate failed $err";
+       my $err;
+       for (@_) {
+               $err = $zout->deflate($_, $zbuf);
+               $err == Z_OK or die "->deflate failed $err";
+       }
        1;
 }
 
index b7728bd252136b604725221f34d086826e23f9eb..3ca0d18c4892f72e6d99fe623bebe2c08e95817b 100644 (file)
@@ -213,14 +213,9 @@ sub response_header_write {
 
 # middlewares such as Deflater may write empty strings
 sub chunked_write ($$) {
-       my $self = $_[0];
-       return if $_[1] eq '';
-       msg_more($self, sprintf("%x\r\n", length($_[1])));
-       msg_more($self, $_[1]);
-
-       # use $self->write(\"\n\n") if you care about real-time
-       # streaming responses, public-inbox WWW does not.
-       msg_more($self, "\r\n");
+       my ($self, $buf) = @_;
+       $buf eq '' or
+               msg_more $self, sprintf("%x\r\n", length($buf)), $buf, "\r\n";
 }
 
 sub identity_write ($$) {
index b12533cb5f290fb6e88a676df4d26b7f61de495c..0eb21b6a2def11b7b2733504ea00d26e198984f5 100644 (file)
@@ -605,8 +605,7 @@ sub fetch_blob_cb { # called by git->cat_async via ibx_async_cat
 
 sub emit_rfc822 {
        my ($self, $k, undef, $bref) = @_;
-       $self->msg_more(" $k {" . length($$bref)."}\r\n");
-       $self->msg_more($$bref);
+       $self->msg_more(" $k {" . length($$bref)."}\r\n", $$bref);
 }
 
 # Mail::IMAPClient::message_string cares about this by default,
@@ -626,20 +625,18 @@ sub emit_flags { $_[0]->msg_more(' FLAGS ()') }
 
 sub emit_envelope {
        my ($self, undef, undef, undef, $eml) = @_;
-       $self->msg_more(' ENVELOPE '.eml_envelope($eml));
+       $self->msg_more(' ENVELOPE 'eml_envelope($eml));
 }
 
 sub emit_rfc822_header {
        my ($self, $k, undef, undef, $eml) = @_;
-       $self->msg_more(" $k {".length(${$eml->{hdr}})."}\r\n");
-       $self->msg_more(${$eml->{hdr}});
+       $self->msg_more(" $k {".length(${$eml->{hdr}})."}\r\n", ${$eml->{hdr}});
 }
 
 # n.b. this is sorted to be after any emit_eml_new ops
 sub emit_rfc822_text {
        my ($self, $k, undef, $bref) = @_;
-       $self->msg_more(" $k {".length($$bref)."}\r\n");
-       $self->msg_more($$bref);
+       $self->msg_more(" $k {".length($$bref)."}\r\n", $$bref);
 }
 
 sub emit_bodystructure {
@@ -970,8 +967,7 @@ sub partial_emit ($$$) {
                } else {
                        $len = length($str);
                }
-               $self->msg_more(" $k {$len}\r\n");
-               $self->msg_more($str);
+               $self->msg_more(" $k {$len}\r\n", $str);
        }
 }
 
index 4cbe9623d61954a0ab97a441bd98c74e33dba6f3..80b46c19cfb22797054b760f15ab73ebdaa481ba 100644 (file)
@@ -22,7 +22,8 @@ use POSIX qw(ENOENT ENOSYS EINVAL O_NONBLOCK);
 use Socket qw(SOL_SOCKET SCM_RIGHTS);
 use Config;
 our %SIGNUM = (WINCH => 28); # most Linux, {Free,Net,Open}BSD, *Darwin
-our ($INOTIFY, %PACK);
+our ($INOTIFY, %CONST);
+use List::Util qw(sum);
 
 # $VERSION = '0.25'; # Sys::Syscall version
 our @EXPORT_OK = qw(epoll_ctl epoll_create epoll_wait
@@ -290,7 +291,8 @@ EOM
 
 BEGIN {
        if ($^O eq 'linux') {
-               %PACK = (
+               %CONST = (
+                       MSG_MORE => 0x8000,
                        TMPL_cmsg_len => TMPL_size_t,
                        # cmsg_len, cmsg_level, cmsg_type
                        SIZEOF_cmsghdr => SIZEOF_int * 2 + SIZEOF_size_t,
@@ -303,7 +305,7 @@ BEGIN {
                                'i', # msg_flags
                );
        } elsif ($^O =~ /\A(?:freebsd|openbsd|netbsd|dragonfly)\z/) {
-               %PACK = (
+               %CONST = (
                        TMPL_cmsg_len => 'L', # socklen_t
                        SIZEOF_cmsghdr => SIZEOF_int * 3,
                        CMSG_DATA_off => SIZEOF_ptr == 8 ? '@16' : '',
@@ -316,11 +318,12 @@ BEGIN {
 
                )
        }
-       $PACK{CMSG_ALIGN_size} = SIZEOF_size_t;
-       $PACK{SIZEOF_cmsghdr} //= 0;
-       $PACK{TMPL_cmsg_len} //= undef;
-       $PACK{CMSG_DATA_off} //= undef;
-       $PACK{TMPL_msghdr} //= undef;
+       $CONST{CMSG_ALIGN_size} = SIZEOF_size_t;
+       $CONST{SIZEOF_cmsghdr} //= 0;
+       $CONST{TMPL_cmsg_len} //= undef;
+       $CONST{CMSG_DATA_off} //= undef;
+       $CONST{TMPL_msghdr} //= undef;
+       $CONST{MSG_MORE} //= 0;
 }
 
 # SFD_CLOEXEC is arch-dependent, so IN_CLOEXEC may be, too
@@ -455,7 +458,7 @@ sub nodatacow_dir {
        if (open my $fh, '<', $_[0]) { nodatacow_fh($fh) }
 }
 
-use constant \%PACK;
+use constant \%CONST;
 sub CMSG_ALIGN ($) { ($_[0] + CMSG_ALIGN_size - 1) & ~(CMSG_ALIGN_size - 1) }
 use constant CMSG_ALIGN_SIZEOF_cmsghdr => CMSG_ALIGN(SIZEOF_cmsghdr);
 sub CMSG_SPACE ($) { CMSG_ALIGN($_[0]) + CMSG_ALIGN_SIZEOF_cmsghdr }
@@ -527,6 +530,22 @@ require PublicInbox::CmdIPC4;
        }
        @ret;
 };
+
+*sendmsg_more = sub ($@) {
+       use bytes qw(length substr);
+       my $sock = shift;
+       my $iov = join('', map { pack 'P'.TMPL_size_t, $_, length } @_);
+       my $mh = pack(TMPL_msghdr,
+                       undef, 0, # msg_name, msg_namelen (unused)
+                       $iov, scalar(@_), # msg_iov, msg_iovlen
+                       undef, 0, # msg_control, msg_controllen (unused),
+                       0); # msg_flags (unused)
+       my $s;
+       do {
+               $s = syscall($SYS_sendmsg, fileno($sock), $mh, MSG_MORE);
+       } while ($s < 0 && $!{EINTR});
+       $s < 0 ? undef : $s;
+};
 }
 
 1;
diff --git a/t/syscall.t b/t/syscall.t
new file mode 100644 (file)
index 0000000..19390d0
--- /dev/null
@@ -0,0 +1,14 @@
+use v5.12;
+use autodie;
+use Test::More;
+use PublicInbox::Syscall;
+use Socket qw(AF_UNIX SOCK_STREAM);
+my $sendmsg_more = PublicInbox::Syscall->can('sendmsg_more') or
+       plan skip_all => "sendmsg syscalls not defined on $^O";
+
+socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, 0);
+is $sendmsg_more->($s1, 'hello', 'world'), 10, 'sendmsg_more expected size';
+is sysread($s2, my $buf, 11), 10, 'reader got expected size from sendmsg_more';
+is $buf, 'helloworld', 'sendmsg_more sent expected message';
+
+done_testing;