sub wq_broadcast {
my ($self, $sub, @args) = @_;
- if (my $wkr = $self->{-wq_workers}) {
- my $buf = ipc_freeze([$sub, @args]);
- for my $bcast1 (values %$wkr) {
- my $sock = $bcast1 // $self->{-wq_s1} // next;
- send($sock, $buf, 0) // croak "send: $!";
- # XXX shouldn't have to deal with EMSGSIZE here...
- }
- } else {
- eval { $self->$sub(@args) };
- warn "wq_broadcast: $@" if $@;
+ my $wkr = $self->{-wq_workers} or Carp::confess('no -wq_workers');
+ my $buf = ipc_freeze([$sub, @args]);
+ for my $bcast1 (values %$wkr) {
+ my $sock = $bcast1 // $self->{-wq_s1} // next;
+ send($sock, $buf, 0) // croak "send: $!";
+ # XXX shouldn't have to deal with EMSGSIZE here...
}
}
sub wq_io_do { # always async
my ($self, $sub, $ios, @args) = @_;
- if (my $s1 = $self->{-wq_s1}) { # run in worker
- my $fds = [ map { fileno($_) } @$ios ];
- my $buf = ipc_freeze([$sub, @args]);
- if (length($buf) > $MY_MAX_ARG_STRLEN) {
- stream_in_full($s1, $fds, $buf);
- } else {
- my $n = $send_cmd->($s1, $fds, $buf, 0);
- return if defined($n); # likely
- $!{ETOOMANYREFS} and
- croak "sendmsg: $! (check RLIMIT_NOFILE)";
- $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
- croak("sendmsg: $!");
- }
+ my $s1 = $self->{-wq_s1} or Carp::confess('no -wq_s1');
+ my $fds = [ map { fileno($_) } @$ios ];
+ my $buf = ipc_freeze([$sub, @args]);
+ if (length($buf) > $MY_MAX_ARG_STRLEN) {
+ stream_in_full($s1, $fds, $buf);
} else {
- @$self{0..$#$ios} = @$ios;
- eval { $self->$sub(@args) };
- warn "wq_io_do: $@" if $@;
- delete @$self{0..$#$ios}; # don't close
+ my $n = $send_cmd->($s1, $fds, $buf, 0);
+ return if defined($n); # likely
+ $!{ETOOMANYREFS} and croak "sendmsg: $! (check RLIMIT_NOFILE)";
+ $!{EMSGSIZE} ? stream_in_full($s1, $fds, $buf) :
+ croak("sendmsg: $!");
}
}
#!perl -w
# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict;
-use v5.10.1;
-use Test::More;
+use v5.12;
use PublicInbox::TestCommon;
use Fcntl qw(SEEK_SET);
use PublicInbox::SHA qw(sha1_hex);
my $big = do { local $/; <$agpl> } // BAIL_OUT "read: $!";
close $agpl or BAIL_OUT "close: $!";
-for my $t ('local', 'worker', 'worker again') {
+for my $t ('worker', 'worker again') {
+ my $ppid = $ipc->wq_workers_start('wq', 1);
+ push(@ppids, $ppid);
$ipc->wq_io_do('test_write_each_fd', [ $wa, $wb, $wc ], 'hello world');
my $i = 0;
for my $fh ($ra, $rb, $rc) {
$exp = sha1_hex($bigger)."\n";
is(readline($rb), $exp, "SHA WQWorker limit ($t)");
}
- my $ppid = $ipc->wq_workers_start('wq', 1);
- push(@ppids, $ppid);
}
# wq_io_do works across fork (siblings can feed)
SKIP: {
skip 'Socket::MsgHdr or Inline::C missing', 3 if !$ppids[0];
- is_deeply(\@ppids, [$$, undef, undef],
+ is_xdeeply(\@ppids, [$$, undef],
'parent pid returned in wq_workers_start');
my $pid = fork // BAIL_OUT $!;
if ($pid == 0) {
skip 'Socket::MsgHdr or Inline::C missing', 11 if !$ppids[0];
seek($warn, 0, SEEK_SET) or BAIL_OUT;
my @warn = <$warn>;
- is(scalar(@warn), 3, 'warned 3 times');
- like($warn[0], qr/ wq_io_do: /, '1st warned from wq_do');
- like($warn[1], qr/ wq_worker: /, '2nd warned from wq_worker');
- is($warn[2], $warn[1], 'worker did not die');
+ is(scalar(@warn), 2, 'warned 3 times');
+ like($warn[0], qr/ wq_worker: /, '2nd warned from wq_worker');
+ is($warn[0], $warn[1], 'worker did not die');
$SIG{__WARN__} = 'DEFAULT';
is($ipc->wq_workers_start('wq', 2), $$, 'workers started again');