$n;
}
+sub need_reap { # post_loop_do
+ my (undef, $jobs) = @_;
+ scalar(keys(%$LIVE)) > $jobs;
+}
+
sub cidx_reap ($$) {
my ($self, $jobs) = @_;
while (run_todo($self)) {}
- my $cb = sub { keys(%$LIVE) > $jobs };
- PublicInbox::DS->SetPostLoopCallback($cb);
- PublicInbox::DS::event_loop($MY_SIG, $SIGSET) while $cb->();
+ local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs);
+ while (need_reap(undef, $jobs)) {
+ PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+ }
while (!$jobs && run_todo($self)) {}
}
send($op_p, "shard_done $n", MSG_EOR);
}
+sub consumers_open { # post_loop_do
+ my (undef, $consumers) = @_;
+ scalar(grep { $_->{sock} } values %$consumers);
+}
+
sub commit_used_shards ($$$) {
my ($self, $git, $consumers) = @_;
local $self->{-shard_ok} = {};
$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
$consumers->{$n} = $c;
}
- PublicInbox::DS->SetPostLoopCallback(sub {
- scalar(grep { $_->{sock} } values %$consumers);
- });
+ local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
die "E: $git->{git_dir} $n shards failed" if $n;
$CONSUMERS{$n} = $c;
}
@shard_in = ();
- PublicInbox::DS->SetPostLoopCallback(sub {
- scalar(grep { $_->{sock} } values %CONSUMERS);
- });
+ local @PublicInbox::DS::post_loop_do = (\&consumers_open, \%CONSUMERS);
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
die "E: $git->{git_dir} $n shards failed" if $n;
cidx_reap($self, 0);
}
-sub shards_active { # PostLoopCallback
+sub shards_active { # post_loop_do
scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
}
$s->wq_close;
}
- PublicInbox::DS->SetPostLoopCallback(\&shards_active);
+ local @PublicInbox::DS::post_loop_do = (\&shards_active);
PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
$self->lock_release(!!$self->{nchange});
}
$Epoll, # Global epoll fd (or DSKQXS ref)
$ep_io, # IO::Handle for Epoll
- $PostLoopCallback, # subref to call at the end of each loop, if defined (global)
+ @post_loop_do, # subref + args to call at the end of each loop
$LoopTimeout, # timeout of event loop in milliseconds
@Timers, # timers
%DescriptorMap = ();
@Timers = ();
%UniqTimer = ();
- $PostLoopCallback = undef;
+ @post_loop_do = ();
# we may be iterating inside one of these on our stack
my @q = delete @Stack{keys %Stack};
$Epoll = undef; # may call DSKQXS::DESTROY
} while (@Timers || keys(%Stack) || $nextq || $AWAIT_PIDS ||
$ToClose || keys(%DescriptorMap) ||
- $PostLoopCallback || keys(%UniqTimer));
+ @post_loop_do || keys(%UniqTimer));
$reap_armed = undef;
$LoopTimeout = -1; # no timeout by default
}
# by default we keep running, unless a postloop callback cancels it
- $PostLoopCallback ? $PostLoopCallback->(\%DescriptorMap) : 1;
+ @post_loop_do ? $post_loop_do[0]->(\%DescriptorMap,
+ @post_loop_do[1..$#post_loop_do])
+ : 1
}
# Start processing IO events. In most daemon programs this never exits. See
-# C<PostLoopCallback> for how to exit the loop.
+# C<post_loop_do> for how to exit the loop.
sub event_loop (;$$) {
my ($sig, $oldset) = @_;
$Epoll //= _InitPoller();
} while (PostEventLoop());
}
-=head2 C<< CLASS->SetPostLoopCallback( CODEREF ) >>
-
-Sets post loop callback function. Pass a subref and it will be
-called every time the event loop finishes.
-
-Return 1 (or any true value) from the sub to make the loop continue, 0 or false
-and it will exit.
-
-The callback function will be passed two parameters: \%DescriptorMap
-
-=cut
-sub SetPostLoopCallback {
- my ($class, $ref) = @_;
-
- # global callback
- $PostLoopCallback = (defined $ref && ref $ref eq 'CODE') ? $ref : undef;
-}
-
#####################################################################
### PublicInbox::DS-the-object code
#####################################################################
my $proc_name;
my $warn = 0;
# drop idle connections and try to quit gracefully
- PublicInbox::DS->SetPostLoopCallback(sub {
+ @PublicInbox::DS::post_loop_do = (sub {
my ($dmap, undef) = @_;
my $n = 0;
my $now = now();
my $quit = PublicInbox::SearchIdx::quit_cb($sync);
$sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
- PublicInbox::DS->SetPostLoopCallback(sub { !$sync->{quit} });
+ local @PublicInbox::DS::post_loop_do = (sub { !$sync->{quit} });
$pr->("initial scan complete, entering event loop\n") if $pr;
# calls InboxIdle->event_step:
PublicInbox::DS::event_loop($sig, $oldset);
$n;
}
+sub sock_defined {
+ my (undef, $wqw) = @_;
+ defined($wqw->{sock});
+}
+
sub wq_worker_loop ($$) {
my ($self, $bcast2) = @_;
my $wqw = PublicInbox::WQWorker->new($self, $self->{-wq_s2});
PublicInbox::WQWorker->new($self, $bcast2) if $bcast2;
- PublicInbox::DS->SetPostLoopCallback(sub { $wqw->{sock} });
+ local @PublicInbox::DS::post_loop_do = (\&sock_defined, $wqw);
PublicInbox::DS::event_loop();
PublicInbox::DS->Reset;
}
};
require PublicInbox::DirIdle;
local $dir_idle = PublicInbox::DirIdle->new(sub {
- # just rely on wakeup to hit PostLoopCallback set below
+ # just rely on wakeup to hit post_loop_do
dir_idle_handler($_[0]) if $_[0]->fullname ne $path;
});
$dir_idle->add_watches([$sock_dir]);
- PublicInbox::DS->SetPostLoopCallback(sub {
+ local @PublicInbox::DS::post_loop_do = (sub {
my ($dmap, undef) = @_;
if (@st = defined($path) ? stat($path) : ()) {
if ($dev_ino_expect ne pack('dd', $st[0], $st[1])) {
add_timer(0, \&poll_fetch_fork, $self, $intvl, $uris);
}
watch_fs_init($self) if $self->{mdre};
- PublicInbox::DS->SetPostLoopCallback(sub { !$self->quit_done });
+ local @PublicInbox::DS::post_loop_do = (sub { !$self->quit_done });
PublicInbox::DS::event_loop($sig, $oldset); # calls ->event_step
_done_for_now($self);
}
#!perl -w
-# Copyright (C) 2020-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use v5.10.1; use strict; use PublicInbox::TestCommon;
+use v5.12; use strict; use PublicInbox::TestCommon;
use PublicInbox::DS qw(now);
use File::Path qw(make_path);
use_ok 'PublicInbox::DirIdle';
$di->add_watches(["$tmpdir/a", "$tmpdir/c"], 1);
PublicInbox::DS->SetLoopTimeout(1000);
my $end = 3 + now;
-PublicInbox::DS->SetPostLoopCallback(sub { scalar(@x) == 0 && now < $end });
+local @PublicInbox::DS::post_loop_do = (sub { scalar(@x) == 0 && now < $end });
tick(0.011);
rmdir("$tmpdir/a/b") or xbail "rmdir $!";
PublicInbox::DS::event_loop();
-# Copyright (C) 2019-2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# Licensed the same as Danga::Socket (and Perl5)
# License: GPL-1.0+ or Artistic-1.0-Perl
# <https://www.gnu.org/licenses/gpl-1.0.txt>
# <https://dev.perl.org/licenses/artistic.html>
-use strict; use v5.10.1; use PublicInbox::TestCommon;
+use v5.12; use PublicInbox::TestCommon;
use_ok 'PublicInbox::DS';
if ('close-on-exec for epoll and kqueue') {
my $evfd_re = qr/(?:kqueue|eventpoll)/i;
PublicInbox::DS->SetLoopTimeout(0);
- PublicInbox::DS->SetPostLoopCallback(sub { 0 });
+ local @PublicInbox::DS::post_loop_do = (sub { 0 });
# make sure execve closes if we're using fork()
my ($r, $w);
my $cb = sub {};
for my $i (0..$n) {
PublicInbox::DS->SetLoopTimeout(0);
- PublicInbox::DS->SetPostLoopCallback($cb);
+ local @PublicInbox::DS::post_loop_do = ($cb);
PublicInbox::DS::event_loop();
PublicInbox::DS->Reset;
}
my $cfg = PublicInbox::Config->new;
PublicInbox::DS->Reset;
my $ii = PublicInbox::InboxIdle->new($cfg);
- my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) };
+ my $cb = sub { @PublicInbox::DS::post_loop_do = (sub {}) };
my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup';
$cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) });
my $watcherr = "$tmpdir/watcherr";
'delivered a message for IDLE to kick -watch') or
diag "mda error \$?=$?";
diag 'waiting for IMAP IDLE wakeup';
- PublicInbox::DS->SetPostLoopCallback(undef);
+ @PublicInbox::DS::post_loop_do = ();
PublicInbox::DS::event_loop();
diag 'inbox unlocked on IDLE wakeup';
'delivered a message for -watch PollInterval');
diag 'waiting for PollInterval wakeup';
- PublicInbox::DS->SetPostLoopCallback(undef);
+ @PublicInbox::DS::post_loop_do = ();
PublicInbox::DS::event_loop();
diag 'inbox unlocked (poll)';
$w->kill;
my $cfg = PublicInbox::Config->new;
PublicInbox::DS->Reset;
my $ii = PublicInbox::InboxIdle->new($cfg);
- my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) };
+ my $cb = sub { @PublicInbox::DS::post_loop_do = (sub {}) };
my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup';
$cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) });
my $watcherr = "$tmpdir/watcherr";
-# Copyright (C) 2019-2021 all contributors <meta@public-inbox.org>
-use strict;
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+use v5.12;
use Test::More;
use IO::Handle;
use POSIX qw(:signal_h);
is($nbsig->wait_once, undef, 'nonblocking ->wait_once');
ok($! == Errno::EAGAIN, 'got EAGAIN');
kill('HUP', $$) or die "kill $!";
- PublicInbox::DS->SetPostLoopCallback(sub {}); # loop once
+ local @PublicInbox::DS::post_loop_do = (sub {}); # loop once
PublicInbox::DS::event_loop();
is($hit->{HUP}->{sigfd}, 2, 'HUP sigfd fired in event loop') or
diag explain($hit); # sometimes fails on FreeBSD 11.x
-# Copyright (C) 2016-2021 all contributors <meta@public-inbox.org>
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-use strict;
-use Test::More;
+use v5.12;
use PublicInbox::Eml;
use Cwd;
use PublicInbox::Config;
my $ii = PublicInbox::InboxIdle->new($cfg);
my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup';
$cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) });
- PublicInbox::DS->SetPostLoopCallback(sub { $delivered == 0 });
+ local @PublicInbox::DS::post_loop_do = (sub { $delivered == 0 });
# wait for -watch to setup inotify watches
my $sleep = 1;
# setup the event loop so that it exits at every step
# while we're still doing connect(2)
PublicInbox::DS->SetLoopTimeout(0);
-PublicInbox::DS->SetPostLoopCallback(\&once);
+local @PublicInbox::DS::post_loop_do = (\&once);
my $pid = $td->{pid};
if ($^O eq 'linux' && open(my $f, '<', "/proc/$pid/status")) {
diag(grep(/RssAnon/, <$f>));
if (!($n % 128) && $DONE != $n) {
diag("nr: ($n) $DONE/$nfd");
PublicInbox::DS->SetLoopTimeout(-1);
- PublicInbox::DS->SetPostLoopCallback(sub { $DONE != $n });
+ local @PublicInbox::DS::post_loop_do = (sub { $DONE != $n });
# clear the backlog:
PublicInbox::DS::event_loop();
# resume looping
PublicInbox::DS->SetLoopTimeout(0);
- PublicInbox::DS->SetPostLoopCallback(\&once);
}
}
diag "done?: @".time." $DONE/$nfd";
if ($DONE != $nfd) {
PublicInbox::DS->SetLoopTimeout(-1);
- PublicInbox::DS->SetPostLoopCallback(sub { $DONE != $nfd });
+ local @PublicInbox::DS::post_loop_do = (sub { $DONE != $nfd });
PublicInbox::DS::event_loop();
}
is($nfd, $DONE, "$nfd/$DONE done");
# setup the event loop so that it exits at every step
# while we're still doing connect(2)
PublicInbox::DS->SetLoopTimeout(0);
-PublicInbox::DS->SetPostLoopCallback(\&once);
+local @PublicInbox::DS::post_loop_do = (\&once);
foreach my $n (1..$nfd) {
my $io = tcp_connect($nntps, Blocking => 0);
if (!($n % 128) && $n != $DONE) {
diag("nr: ($n) $DONE/$nfd");
PublicInbox::DS->SetLoopTimeout(-1);
- PublicInbox::DS->SetPostLoopCallback(sub { $DONE != $n });
+ @PublicInbox::DS::post_loop_do = (sub { $DONE != $n });
# clear the backlog:
PublicInbox::DS::event_loop();
# resume looping
PublicInbox::DS->SetLoopTimeout(0);
- PublicInbox::DS->SetPostLoopCallback(\&once);
+ @PublicInbox::DS::post_loop_do = (\&once);
}
}
my $pid = $td->{pid};
# run the event loop normally, now:
if ($DONE != $nfd) {
PublicInbox::DS->SetLoopTimeout(-1);
- PublicInbox::DS->SetPostLoopCallback(sub {
+ @PublicInbox::DS::post_loop_do = (sub {
diag "done: ".time." $DONE";
$DONE != $nfd;
});
#!perl -w
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
use strict; use v5.10.1; use PublicInbox::TestCommon;
use Sys::Hostname qw(hostname);
my $pub_cfg = PublicInbox::Config->new;
PublicInbox::DS->Reset;
my $ii = PublicInbox::InboxIdle->new($pub_cfg);
- my $cb = sub { PublicInbox::DS->SetPostLoopCallback(sub {}) };
+ my $cb = sub { @PublicInbox::DS::post_loop_do = (sub {}) };
my $obj = bless \$cb, 'PublicInbox::TestCommon::InboxWakeup';
$pub_cfg->each_inbox(sub { $_[0]->subscribe_unlock('ident', $obj) });
my $w = start_script(['-watch'], undef, { 2 => $err_wr });