]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
ipc: require fork+SOCK_SEQPACKET for wq_* functions
authorEric Wong <e@80x24.org>
Sat, 7 Oct 2023 21:24:05 +0000 (21:24 +0000)
committerEric Wong <e@80x24.org>
Sun, 8 Oct 2023 18:54:44 +0000 (18:54 +0000)
None of the lei internals works properly without forking and
sockets.  The fallback code increases the potential to accidentally
call subs in the wrong process during the teardown phase.

We'll still support ipc_do w/o forking for now since it
forking doesn't benefit small indexing runs from -mda and
such.

lib/PublicInbox/IPC.pm
t/ipc.t

index 068c5623dfed6e6290d53136441b80b5340b4516..ba8b5739c402ff931726edfa6e49bd2047db8678 100644 (file)
@@ -256,16 +256,12 @@ sub do_sock_stream { # via wq_io_do, for big requests
 
 sub wq_broadcast {
        my ($self, $sub, @args) = @_;
-       if (my $wkr = $self->{-wq_workers}) {
-               my $buf = ipc_freeze([$sub, @args]);
-               for my $bcast1 (values %$wkr) {
-                       my $sock = $bcast1 // $self->{-wq_s1} // next;
-                       send($sock, $buf, 0) // croak "send: $!";
-                       # XXX shouldn't have to deal with EMSGSIZE here...
-               }
-       } else {
-               eval { $self->$sub(@args) };
-               warn "wq_broadcast: $@" if $@;
+       my $wkr = $self->{-wq_workers} or Carp::confess('no -wq_workers');
+       my $buf = ipc_freeze([$sub, @args]);
+       for my $bcast1 (values %$wkr) {
+               my $sock = $bcast1 // $self->{-wq_s1} // next;
+               send($sock, $buf, 0) // croak "send: $!";
+               # XXX shouldn't have to deal with EMSGSIZE here...
        }
 }
 
@@ -291,24 +287,17 @@ sub stream_in_full ($$$) {
 
 sub wq_io_do { # always async
        my ($self, $sub, $ios, @args) = @_;
-       if (my $s1 = $self->{-wq_s1}) { # run in worker
-               my $fds = [ map { fileno($_) } @$ios ];
-               my $buf = ipc_freeze([$sub, @args]);
-               if (length($buf) > $MY_MAX_ARG_STRLEN) {
-                       stream_in_full($s1, $fds, $buf);
-               } else {
-                       my $n = $send_cmd->($s1, $fds, $buf, 0);
-                       return if defined($n); # likely
-                       $!{ETOOMANYREFS} and
-                               croak "sendmsg: $! (check RLIMIT_NOFILE)";
-                       $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
-                               croak("sendmsg: $!");
-               }
+       my $s1 = $self->{-wq_s1} or Carp::confess('no -wq_s1');
+       my $fds = [ map { fileno($_) } @$ios ];
+       my $buf = ipc_freeze([$sub, @args]);
+       if (length($buf) > $MY_MAX_ARG_STRLEN) {
+               stream_in_full($s1, $fds, $buf);
        } else {
-               @$self{0..$#$ios} = @$ios;
-               eval { $self->$sub(@args) };
-               warn "wq_io_do: $@" if $@;
-               delete @$self{0..$#$ios}; # don't close
+               my $n = $send_cmd->($s1, $fds, $buf, 0);
+               return if defined($n); # likely
+               $!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)";
+               $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+                       croak("sendmsg: $!");
        }
 }
 
diff --git a/t/ipc.t b/t/ipc.t
index 7bdf22189b954d125063a4e72796d26538bcf3b2..519ef089bfbec57edfbe9cd5d48738f839e50836 100644 (file)
--- a/t/ipc.t
+++ b/t/ipc.t
@@ -1,9 +1,7 @@
 #!perl -w
 # Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict;
-use v5.10.1;
-use Test::More;
+use v5.12;
 use PublicInbox::TestCommon;
 use Fcntl qw(SEEK_SET);
 use PublicInbox::SHA qw(sha1_hex);
@@ -108,7 +106,9 @@ open my $agpl, '<', 'COPYING' or BAIL_OUT "AGPL-3 missing: $!";
 my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!";
 close $agpl or BAIL_OUT "close: $!";
 
-for my $t ('local', 'worker', 'worker again') {
+for my $t ('worker', 'worker again') {
+       my $ppid = $ipc->wq_workers_start('wq', 1);
+       push(@ppids, $ppid);
        $ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
        my $i = 0;
        for my $fh ($ra, $rb, $rc) {
@@ -132,14 +132,12 @@ for my $t ('local', 'worker', 'worker again') {
                $exp = sha1_hex($bigger)."\n";
                is(readline($rb), $exp, "SHA WQWorker limit ($t)");
        }
-       my $ppid = $ipc->wq_workers_start('wq', 1);
-       push(@ppids, $ppid);
 }
 
 # wq_io_do works across fork (siblings can feed)
 SKIP: {
        skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
-       is_deeply(\@ppids, [$$, undef, undef],
+       is_xdeeply(\@ppids, [$$, undef],
                'parent pid returned in wq_workers_start');
        my $pid = fork // BAIL_OUT $!;
        if ($pid == 0) {
@@ -173,10 +171,9 @@ SKIP: {
        skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0];
        seek($warn, 0, SEEK_SET) or BAIL_OUT;
        my @warn = <$warn>;
-       is(scalar(@warn), 3, 'warned 3 times');
-       like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do');
-       like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
-       is($warn[2], $warn[1], 'worker did not die');
+       is(scalar(@warn), 2, 'warned 3 times');
+       like($warn[0], qr/ wq_worker: /, '2nd warned from wq_worker');
+       is($warn[0], $warn[1], 'worker did not die');
 
        $SIG{__WARN__} = 'DEFAULT';
        is($ipc->wq_workers_start('wq', 2), $$, 'workers started again');