]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
treewide: drop MSG_EOR with AF_UNIX+SOCK_SEQPACKET
authorEric Wong <e@80x24.org>
Wed, 30 Aug 2023 05:10:39 +0000 (05:10 +0000)
committerEric Wong <e@80x24.org>
Wed, 30 Aug 2023 05:27:32 +0000 (05:27 +0000)
It's apparently not needed for AF_UNIX + SOCK_SEQPACKET as our
receivers never check for MSG_EOR in "struct msghdr".msg_flags
anyways.  I don't believe POSIX is clear on the exact semantics
of MSG_EOR on this socket type.  This works around truncation
problems on OpenBSD recvmsg when MSG_EOR is used by the sender.

Link: https://marc.info/?i=20230826020759.M335788@dcvr
lib/PublicInbox/CodeSearchIdx.pm
lib/PublicInbox/IPC.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/PktOp.pm
lib/PublicInbox/WQBlocked.pm
lib/PublicInbox/XapClient.pm
script/lei
t/check-www-inbox.perl
t/cmd_ipc.t
t/lei-daemon.t
t/xap_helper.t

index f9251be5eab19c1a6d7d47ec216e537e4c73f9ea..2e46e30ae654ac6f279b5f1c96b87ef5b1f3628b 100644 (file)
@@ -55,7 +55,7 @@ use PublicInbox::CidxLogP;
 use PublicInbox::CidxComm;
 use PublicInbox::Git qw(%OFMT2HEXLEN);
 use PublicInbox::Compat qw(uniqstr);
-use Socket qw(MSG_EOR AF_UNIX SOCK_SEQPACKET);
+use Socket qw(AF_UNIX SOCK_SEQPACKET);
 use Carp ();
 our (
        $LIVE, # pid => cmd
@@ -253,7 +253,7 @@ sub cidx_reap_log { # awaitpid cb
        my ($pid, $self, $op_p) = @_;
        if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
                                ($? & 127) == POSIX::SIGPIPE))) {
-               send($op_p, "shard_done $self->{shard}", MSG_EOR);
+               send($op_p, "shard_done $self->{shard}", 0);
        } else {
                warn "E: git @LOG_STDIN: \$?=$?\n";
                $self->{xdb}->cancel_transaction;
@@ -501,7 +501,7 @@ sub shard_commit { # via wq_io_do
        my ($self) = @_;
        my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
        $self->commit_txn_lazy;
-       send($op_p, "shard_done $self->{shard}", MSG_EOR);
+       send($op_p, "shard_done $self->{shard}", 0);
 }
 
 sub assoc_max_init ($) {
@@ -782,7 +782,7 @@ sub prune_commit { # via wq_io_do in IDX_SHARDS
        my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p';
        my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef';
        cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr;
-       send($prune_op_p, "prune_done $self->{shard}", MSG_EOR);
+       send($prune_op_p, "prune_done $self->{shard}", 0);
 }
 
 sub shards_active { # post_loop_do
index 84765748402092a60df908365a05eb60b02ab544..766c377f83479d9f80909cfdd41ebde0ddf52dcf 100644 (file)
@@ -16,7 +16,7 @@ use PublicInbox::DS qw(awaitpid);
 use PublicInbox::Spawn;
 use PublicInbox::OnDestroy;
 use PublicInbox::WQWorker;
-use Socket qw(AF_UNIX MSG_EOR SOCK_STREAM);
+use Socket qw(AF_UNIX SOCK_STREAM);
 my $MY_MAX_ARG_STRLEN = 4096 * 33; # extra 4K for serialization
 my $SEQPACKET = eval { Socket::SOCK_SEQPACKET() }; # portable enough?
 our @EXPORT_OK = qw(ipc_freeze ipc_thaw nproc_shards);
@@ -279,7 +279,7 @@ sub wq_broadcast {
                my $buf = ipc_freeze([$sub, @args]);
                for my $bcast1 (values %$wkr) {
                        my $sock = $bcast1 // $self->{-wq_s1} // next;
-                       send($sock, $buf, MSG_EOR) // croak "send: $!";
+                       send($sock, $buf, 0) // croak "send: $!";
                        # XXX shouldn't have to deal with EMSGSIZE here...
                }
        } else {
@@ -294,7 +294,7 @@ sub stream_in_full ($$$) {
                croak "socketpair: $!";
        my $n = send_cmd($s1, [ fileno($r) ],
                        ipc_freeze(['do_sock_stream', length($buf)]),
-                       MSG_EOR) // croak "sendmsg: $!";
+                       0) // croak "sendmsg: $!";
        undef $r;
        $n = send_cmd($w, $fds, $buf, 0) // croak "sendmsg: $!";
        while ($n < length($buf)) {
@@ -316,7 +316,7 @@ sub wq_io_do { # always async
                if (length($buf) > $MY_MAX_ARG_STRLEN) {
                        stream_in_full($s1, $fds, $buf);
                } else {
-                       my $n = send_cmd $s1, $fds, $buf, MSG_EOR;
+                       my $n = send_cmd $s1, $fds, $buf, 0;
                        return if defined($n); # likely
                        $!{ETOOMANYREFS} and
                                croak "sendmsg: $! (check RLIMIT_NOFILE)";
@@ -365,7 +365,7 @@ sub wq_nonblock_do { # always async
        if ($self->{wqb}) { # saturated once, assume saturated forever
                $self->{wqb}->flush_send($buf);
        } else {
-               $send_cmd->($self->{-wq_s1}, [], $buf, MSG_EOR) //
+               $send_cmd->($self->{-wq_s1}, [], $buf, 0) //
                        ($!{EAGAIN} ? PublicInbox::WQBlocked->new($self, $buf)
                                        : croak("sendmsg: $!"));
        }
index 578686e2b0d1216e93244d2d55bcbd0d64b482dc..5fbb1211961b25a38d416d9e3e8bb676f720122a 100644 (file)
@@ -10,7 +10,7 @@ use v5.12;
 use parent qw(PublicInbox::DS PublicInbox::LeiExternal
        PublicInbox::LeiQuery);
 use Getopt::Long ();
-use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un);
 use Errno qw(EPIPE EAGAIN ECONNREFUSED ENOENT ECONNRESET);
 use Cwd qw(getcwd);
 use POSIX qw(strftime);
@@ -481,7 +481,7 @@ sub x_it ($$) {
        if ($self->{pkt_op_p}) { # worker => lei-daemon
                $self->{pkt_op_p}->pkt_do('x_it', $code);
        } elsif ($self->{sock}) { # lei->daemon => lei(1) client
-               send($self->{sock}, "x_it $code", MSG_EOR);
+               send($self->{sock}, "x_it $code", 0);
        } elsif ($quit == \&CORE::exit) { # an admin (one-shot) command
                exit($code >> 8);
        } # else ignore if client disconnected
@@ -548,7 +548,7 @@ sub child_error { # passes non-fatal curl exit codes to user
        if ($self->{pkt_op_p}) { # to top lei-daemon
                $self->{pkt_op_p}->pkt_do('child_error', $child_error);
        } elsif ($self->{sock}) { # to lei(1) client
-               send($self->{sock}, "child_error $child_error", MSG_EOR);
+               send($self->{sock}, "child_error $child_error", 0);
        } # else noop if client disconnected
 }
 
@@ -1017,7 +1017,7 @@ sub send_exec_cmd { # tell script/lei to execute a command
        PublicInbox::IPC::send_cmd(
                        $self->{sock} // die('lei client gone'),
                        [ map { fileno($_) } @$io ],
-                       exec_buf($cmd, $env), MSG_EOR) //
+                       exec_buf($cmd, $env), 0) //
                Carp::croak("sendmsg: $!");
 }
 
@@ -1028,7 +1028,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
        while (my $op = shift(@$alerts)) {
                if ($op eq ':WINCH') {
                        # hit the process group that started the MUA
-                       send($sock, '-WINCH', MSG_EOR) if $sock;
+                       send($sock, '-WINCH', 0) if $sock;
                } elsif ($op eq ':bell') {
                        out($self, "\a");
                } elsif ($op =~ /(?<!\\),/) { # bare ',' (not ',,')
@@ -1037,7 +1037,7 @@ sub poke_mua { # forces terminal MUAs to wake up and hopefully notice new mail
                        my $cmd = $1; # run an arbitrary command
                        require Text::ParseWords;
                        $cmd = [ Text::ParseWords::shellwords($cmd) ];
-                       send($sock, exec_buf($cmd, {}), MSG_EOR) if $sock;
+                       send($sock, exec_buf($cmd, {}), 0) if $sock;
                } else {
                        warn("W: unsupported --alert=$op\n"); # non-fatal
                }
@@ -1093,7 +1093,7 @@ sub pgr_err {
        print { $self->{2} } @msg;
        $self->{2}->autoflush(1);
        stop_pager($self);
-       send($self->{sock}, 'wait', MSG_EOR); # wait for user to quit pager
+       send($self->{sock}, 'wait', 0); # wait for user to quit pager
 }
 
 sub stop_pager {
@@ -1110,25 +1110,25 @@ sub accept_dispatch { # Listener {post_accept} callback
        my $self = bless { sock => $sock }, __PACKAGE__;
        vec(my $rvec = '', fileno($sock), 1) = 1;
        select($rvec, undef, undef, 60) or
-               return send($sock, 'timed out waiting to recv FDs', MSG_EOR);
+               return send($sock, 'timed out waiting to recv FDs', 0);
        # (4096 * 33) >MAX_ARG_STRLEN
        my @fds = PublicInbox::IPC::recv_cmd($sock, my $buf, 4096 * 33) or
                return; # EOF
        if (!defined($fds[0])) {
                warn(my $msg = "recv_cmd failed: $!");
-               return send($sock, $msg, MSG_EOR);
+               return send($sock, $msg, 0);
        } else {
                my $i = 0;
                for my $fd (@fds) {
                        open($self->{$i++}, '+<&=', $fd) and next;
-                       send($sock, "open(+<&=$fd) (FD=$i): $!", MSG_EOR);
+                       send($sock, "open(+<&=$fd) (FD=$i): $!", 0);
                }
-               $i == 4 or return send($sock, 'not enough FDs='.($i-1), MSG_EOR)
+               $i == 4 or return send($sock, 'not enough FDs='.($i-1), 0)
        }
        # $ENV_STR = join('', map { "\0$_=$ENV{$_}" } keys %ENV);
        # $buf = "$argc\0".join("\0", @ARGV).$ENV_STR."\0\0";
        substr($buf, -2, 2, '') eq "\0\0" or  # s/\0\0\z//
-               return send($sock, 'request command truncated', MSG_EOR);
+               return send($sock, 'request command truncated', 0);
        my ($argc, @argv) = split(/\0/, $buf, -1);
        undef $buf;
        my %env = map { split(/=/, $_, 2) } splice(@argv, $argc);
@@ -1174,7 +1174,7 @@ sub event_step {
                        die "unrecognized client signal: $buf";
                }
                my $s = $self->{-socks} // []; # lei up --all
-               @$s = grep { send($_, $buf, MSG_EOR) } @$s;
+               @$s = grep { send($_, $buf, 0) } @$s;
        };
        if (my $err = $@) {
                eval { $self->fail($err) };
@@ -1534,7 +1534,7 @@ sub cfg_dump ($$) {
 sub request_umask {
        my ($lei) = @_;
        my $s = $lei->{sock} // return;
-       send($s, 'umask', MSG_EOR) // die "send: $!";
+       send($s, 'umask', 0) // die "send: $!";
        vec(my $rvec = '', fileno($s), 1) = 1;
        select($rvec, undef, undef, 2) or die 'timeout waiting for umask';
        recv($s, my $v, 5, 0) // die "recv: $!";
index 4c434566d31f90104d921b76adf1844a9d6aefde..dc432307cc76c22d468471e6881627e1fbd0f3cf 100644 (file)
@@ -11,7 +11,7 @@ use v5.10.1;
 use parent qw(PublicInbox::DS);
 use Errno qw(EAGAIN ECONNRESET);
 use PublicInbox::Syscall qw(EPOLLIN);
-use Socket qw(AF_UNIX MSG_EOR SOCK_SEQPACKET);
+use Socket qw(AF_UNIX SOCK_SEQPACKET);
 use PublicInbox::IPC qw(ipc_freeze ipc_thaw);
 use Scalar::Util qw(blessed);
 
@@ -32,7 +32,7 @@ sub pair {
 
 sub pkt_do { # for the producer to trigger event_step in consumer
        my ($self, $cmd, @args) = @_;
-       send($self->{op_p}, @args ? "$cmd\0".ipc_freeze(\@args) : $cmd, MSG_EOR)
+       send($self->{op_p}, @args ? "$cmd\0".ipc_freeze(\@args) : $cmd, 0)
 }
 
 sub event_step {
index fbb436004ffd37fd27b91cd336724a6d2fac1a11..8d931fa9d1ad8af9b941eceb9ad9e2dbf37ad68e 100644 (file)
@@ -8,7 +8,6 @@ use parent qw(PublicInbox::DS);
 use PublicInbox::Syscall qw(EPOLLOUT EPOLLONESHOT);
 use PublicInbox::IPC;
 use Carp ();
-use Socket qw(MSG_EOR);
 
 sub new {
        my ($cls, $wq, $buf) = @_;
@@ -25,7 +24,7 @@ sub flush_send {
                } else {
                        my $wq_s1 = $self->{sock};
                        my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf,
-                                                               MSG_EOR);
+                                                               0);
                        next if defined($n);
                        Carp::croak("sendmsg: $!") unless $!{EAGAIN};
                        PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT);
index 56e3c3b4391a6778fc6afda9eaf6a3f94564e7d4..f6c09c3ba049e38c410e41cdf8bf5ff21d587069 100644 (file)
@@ -9,7 +9,7 @@
 package PublicInbox::XapClient;
 use v5.12;
 use PublicInbox::Spawn qw(spawn);
-use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR);
+use Socket qw(AF_UNIX SOCK_SEQPACKET);
 use PublicInbox::IPC;
 
 sub mkreq {
@@ -21,7 +21,7 @@ sub mkreq {
        }
        my @fds = map fileno($_), @$ios;
        my $buf = join("\0", @arg, '');
-       $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, MSG_EOR) //
+       $n = PublicInbox::IPC::send_cmd($self->{io}, \@fds, $buf, 0) //
                die "send_cmd: $!";
        $n == length($buf) or die "send_cmd: $n != ".length($buf);
        $r;
index 5feb77515d0f4a10c9fe8b67f859f8232f11e113..a77ea880d57919617c8eb0a6e9906b09adccaa5d 100755 (executable)
@@ -2,7 +2,7 @@
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 use v5.12;
-use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un);
 use PublicInbox::CmdIPC4;
 my $narg = 5;
 my $sock;
@@ -109,9 +109,9 @@ open my $dh, '<', '.' or die "open(.) $!";
 my $buf = join("\0", scalar(@ARGV), @ARGV);
 while (my ($k, $v) = each %ENV) { $buf .= "\0$k=$v" }
 $buf .= "\0\0";
-$send_cmd->($sock, [0, 1, 2, fileno($dh)], $buf, MSG_EOR) or die "sendmsg: $!";
-$SIG{TSTP} = sub { send($sock, 'STOP', MSG_EOR); kill 'STOP', $$ };
-$SIG{CONT} = sub { send($sock, 'CONT', MSG_EOR) };
+$send_cmd->($sock, [0, 1, 2, fileno($dh)], $buf, 0) or die "sendmsg: $!";
+$SIG{TSTP} = sub { send($sock, 'STOP', 0); kill 'STOP', $$ };
+$SIG{CONT} = sub { send($sock, 'CONT', 0) };
 
 my $x_it_code = 0;
 while (1) {
@@ -126,7 +126,7 @@ while (1) {
        } elsif ($buf eq '-WINCH') {
                kill($buf, @parent); # for MUA
        } elsif ($buf eq 'umask') {
-               send($sock, 'u'.pack('V', umask), MSG_EOR) or die "send: $!"
+               send($sock, 'u'.pack('V', umask), 0) or die "send: $!"
        } elsif ($buf =~ /\Ax_it ([0-9]+)\z/) {
                $x_it_code ||= $1 + 0;
                last;
index 033b90d174dc16d4ccca8bc2a1c2ab189b2ca0af..46f9ce1e8b9e71242ec9c1c493f83c30b57613b0 100644 (file)
@@ -123,7 +123,7 @@ while (keys %workers) { # reacts to SIGCHLD
                }
        }
        while ($u = shift @queue) {
-               my $s = $todo[1]->send($u, MSG_EOR);
+               my $s = $todo[1]->send($u, 0);
                if ($!{EAGAIN}) {
                        unshift @queue, $u;
                        last;
@@ -177,7 +177,7 @@ sub worker_loop {
                foreach my $l (@links, "DONE\t$u") {
                        next if $l eq '' || $l =~ /\.mbox(?:\.gz)\z/;
                        do {
-                               $s = $done_wr->send($l, MSG_EOR);
+                               $s = $done_wr->send($l, 0);
                        } while (!defined $s && $!{EINTR});
                        die "$$ send $!\n" unless defined $s;
                        my $n = length($l);
index cd76d5e896d160a015e19a9f9e119bfa0ffaa188..403d0eed68c57834f759f3988abd1a2e84c7a853 100644 (file)
@@ -5,7 +5,7 @@ use strict;
 use v5.10.1;
 use Test::More;
 use PublicInbox::TestCommon;
-use Socket qw(AF_UNIX SOCK_STREAM MSG_EOR);
+use Socket qw(AF_UNIX SOCK_STREAM);
 pipe(my ($r, $w)) or BAIL_OUT;
 my ($send, $recv);
 require_ok 'PublicInbox::Spawn';
@@ -122,7 +122,7 @@ SKIP: {
        $send = $send_ic;
        $recv = $recv_ic;
        $do_test->(SOCK_STREAM, 0, 'Inline::C stream');
-       $do_test->($SOCK_SEQPACKET, MSG_EOR, 'Inline::C seqpacket');
+       $do_test->($SOCK_SEQPACKET, 0, 'Inline::C seqpacket');
 }
 
 SKIP: {
@@ -131,7 +131,7 @@ SKIP: {
        $send = PublicInbox::CmdIPC4->can('send_cmd4');
        $recv = PublicInbox::CmdIPC4->can('recv_cmd4');
        $do_test->(SOCK_STREAM, 0, 'MsgHdr stream');
-       $do_test->($SOCK_SEQPACKET, MSG_EOR, 'MsgHdr seqpacket');
+       $do_test->($SOCK_SEQPACKET, 0, 'MsgHdr seqpacket');
        SKIP: {
                ($send_ic && $recv_ic) or
                        skip 'Inline::C not installed/enabled', 12;
@@ -149,7 +149,7 @@ SKIP: {
        $recv = PublicInbox::Syscall->can('recv_cmd4') or
                skip 'recv_cmd4 not defined for arch';
        $do_test->(SOCK_STREAM, 0, 'PP Linux stream');
-       $do_test->($SOCK_SEQPACKET, MSG_EOR, 'PP Linux seqpacket');
+       $do_test->($SOCK_SEQPACKET, 0, 'PP Linux seqpacket');
 }
 
 done_testing;
index e11105bc7040c080f81483396d1a3d0ff1bbb0fd..78ed265e24a9f7a2abe2030e744e1af7e20c2074 100644 (file)
@@ -2,7 +2,7 @@
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 use strict; use v5.10.1; use PublicInbox::TestCommon;
-use Socket qw(AF_UNIX SOCK_SEQPACKET MSG_EOR pack_sockaddr_un);
+use Socket qw(AF_UNIX SOCK_SEQPACKET pack_sockaddr_un);
 
 test_lei({ daemon_only => 1 }, sub {
        my $send_cmd = PublicInbox::Spawn->can('send_cmd4') // do {
@@ -40,7 +40,7 @@ test_lei({ daemon_only => 1 }, sub {
                        socket(my $c, AF_UNIX, SOCK_SEQPACKET, 0) or
                                                        BAIL_OUT "socket: $!";
                        connect($c, $addr) or BAIL_OUT "connect: $!";
-                       $send_cmd->($c, \@fds, 'hi',  MSG_EOR);
+                       $send_cmd->($c, \@fds, 'hi',  0);
                }
                lei_ok('daemon-pid');
                chomp($pid = $lei_out);
index 3646cf9786ee5d4dd411cdfa7821bb97ddb83205..73c1c849a0817463d33acf653e4bd92d06b2bec9 100644 (file)
@@ -7,7 +7,7 @@ require_mods(qw(DBD::SQLite Search::Xapian));
 my $msg = no_scm_rights;
 plan(skip_all => $msg) if $msg; # TODO: FIFO support?
 use PublicInbox::Spawn qw(spawn);
-use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM MSG_EOR);
+use Socket qw(AF_UNIX SOCK_SEQPACKET SOCK_STREAM);
 require PublicInbox::AutoReap;
 require PublicInbox::IPC;
 require PublicInbox::XapClient;
@@ -54,7 +54,7 @@ my $doreq = sub {
        my $buf = join("\0", @arg, '');
        my @fds = fileno($y);
        push @fds, fileno($err) if $err;
-       my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, MSG_EOR);
+       my $n = PublicInbox::IPC::send_cmd($s, \@fds, $buf, 0);
        $n // xbail "send: $!";
        my $arg = "@arg";
        $arg =~ s/\Q$tmp\E/\$TMP/gs;