# and designed for handling thousands of untrusted clients over slow
# and/or lossy connections.
package PublicInbox::Daemon;
-use strict;
-use v5.10.1;
+use v5.12;
use Getopt::Long qw(:config gnu_getopt no_ignore_case auto_abbrev);
use IO::Handle; # ->autoflush
use IO::Socket;
use Socket qw(IPPROTO_TCP SOL_SOCKET);
STDOUT->autoflush(1);
STDERR->autoflush(1);
-use PublicInbox::DS qw(now);
+use PublicInbox::DS qw(now awaitpid);
use PublicInbox::Listener;
use PublicInbox::EOFpipe;
-use PublicInbox::Sigfd;
use PublicInbox::Git;
use PublicInbox::GitAsyncCat;
use PublicInbox::Eml;
my @CMD;
my ($set_user, $oldset);
my (@cfg_listen, $stdout, $stderr, $group, $user, $pid_file, $daemonize);
-my $worker_processes = 1;
-my @listeners;
-my (%pids, %logs);
+my ($nworker, @listeners, %WORKERS, %logs);
my %tls_opt; # scheme://sockname => args for IO::Socket::SSL::SSL_Context->new
my $reexec_pid;
my ($uid, $gid);
for (keys %KNOWN_STARTTLS) { $SCHEME2PORT{$KNOWN_STARTTLS{$_}} = $_ + 0 }
$SCHEME2PORT{http} = 80;
+our ($parent_pipe, %POST_ACCEPT, %XNETD);
+our %WORKER_SIG = (
+ INT => \&worker_quit,
+ QUIT => \&worker_quit,
+ TERM => \&worker_quit,
+ TTIN => 'IGNORE',
+ TTOU => 'IGNORE',
+ USR1 => \&reopen_logs,
+ USR2 => 'IGNORE',
+ WINCH => 'IGNORE',
+ CHLD => \&PublicInbox::DS::enqueue_reap,
+);
+
sub listener_opt ($) {
my ($str) = @_; # opt1=val1,opt2=val2 (opt may repeat for multi-value)
my $o = {};
\%xn;
}
-sub daemon_prepare ($$) {
- my ($default_listen, $xnetd) = @_;
+sub daemon_prepare ($) {
+ my ($default_listen) = @_;
my $listener_names = {}; # sockname => IO::Handle
$oldset = PublicInbox::DS::block_signals();
@CMD = ($0, @ARGV);
'l|listen=s' => \@cfg_listen,
'1|stdout=s' => \$stdout,
'2|stderr=s' => \$stderr,
- 'W|worker-processes=i' => \$worker_processes,
+ 'W|worker-processes=i' => \$nworker,
'P|pid-file=s' => \$pid_file,
'u|user=s' => \$user,
'g|group=s' => \$group,
die "$orig specified w/o cert=\n";
}
if ($listener_names->{$l}) { # already inherited
- $xnetd->{$l} = load_mod($scheme, $opt, $l);
+ $XNETD{$l} = load_mod($scheme, $opt, $l);
next;
}
my (%o, $sock_pkg);
$s->blocking(0);
my $sockname = sockname($s);
warn "# bound $scheme://$sockname\n";
- $xnetd->{$sockname} //= load_mod($scheme, $opt);
+ $XNETD{$sockname} //= load_mod($scheme, $opt);
$listener_names->{$sockname} = $s;
push @listeners, $s;
}
for my $x (@inherited_names) {
$x =~ /:([0-9]+)\z/ or next; # no TLS for AF_UNIX
if (my $scheme = $KNOWN_TLS{$1}) {
- $xnetd->{$x} //= load_mod($scheme);
+ $XNETD{$x} //= load_mod($scheme);
$tls_opt{"$scheme://$x"} ||= accept_tls_opt('');
} elsif (($scheme = $KNOWN_STARTTLS{$1})) {
- $xnetd->{$x} //= load_mod($scheme);
+ $XNETD{$x} //= load_mod($scheme);
$tls_opt{"$scheme://$x"} ||= accept_tls_opt('');
} elsif (defined $stls) {
$tls_opt{"$stls://$x"} ||= accept_tls_opt('');
}
if (defined $default_scheme) {
for my $x (@inherited_names) {
- $xnetd->{$x} //= load_mod($default_scheme);
+ $XNETD{$x} //= load_mod($default_scheme);
}
}
die "No listeners bound\n" unless @listeners;
write_pid($pid_file);
}
my $pid = fork;
- unless (defined $pid) {
+ if (!defined($pid)) {
warn "fork failed: $!\n";
- return;
- }
- if ($pid == 0) {
+ } elsif ($pid == 0) {
$ENV{LISTEN_FDS} = scalar @listeners;
$ENV{LISTEN_PID} = $$;
foreach my $s (@listeners) {
}
exec @CMD;
die "Failed to exec: $!\n";
+ } else {
+ awaitpid($pid, \&upgrade_aborted);
+ $reexec_pid = $pid;
}
- $reexec_pid = $pid;
}
-sub kill_workers ($) {
- my ($sig) = @_;
- kill $sig, keys(%pids);
-}
+sub kill_workers ($) { kill $_[0], values(%WORKERS) }
-sub upgrade_aborted ($) {
- my ($p) = @_;
- warn "reexec PID($p) died with: $?\n";
+sub upgrade_aborted {
+ my ($pid) = @_;
+ warn "reexec PID($pid) died with: $?\n";
$reexec_pid = undef;
return unless $pid_file;
warn $@, "\n" if $@;
}
-sub reap_children { # $_[0] = 'CHLD'
- while (1) {
- my $p = waitpid(-1, WNOHANG) or return;
- if (defined $reexec_pid && $p == $reexec_pid) {
- upgrade_aborted($p);
- } elsif (defined(my $id = delete $pids{$p})) {
- warn "worker[$id] PID($p) died with: $?\n";
- } elsif ($p > 0) {
- warn "unknown PID($p) reaped: $?\n";
- } else {
- return;
- }
- }
-}
-
sub unlink_pid_file_safe_ish ($$) {
my ($unlink_pid, $file) = @_;
return unless defined $unlink_pid && $unlink_pid == $$;
sub master_quit ($) {
exit unless @listeners;
@listeners = ();
- kill_workers($_[0]);
+ exit unless kill_workers($_[0]);
+}
+
+sub reap_worker { # awaitpid CB
+ my ($pid, $nr) = @_;
+ warn "worker[$nr] died \$?=$?\n" if $?;
+ delete $WORKERS{$nr};
+ exit if !@listeners && !keys(%WORKERS);
+ PublicInbox::DS::requeue(\&start_workers);
+}
+
+sub start_worker ($) {
+ my ($nr) = @_;
+ my $seed = rand(0xffffffff);
+ return unless @listeners;
+ my $pid = fork;
+ if (!defined($pid)) {
+ warn "fork: $!";
+ } elsif ($pid == 0) {
+ undef %WORKERS;
+ PublicInbox::DS::Reset();
+ srand($seed);
+ eval { Net::SSLeay::randomize() };
+ $set_user->() if $set_user;
+ PublicInbox::EOFpipe->new($parent_pipe, \&worker_quit);
+ worker_loop();
+ exit 0;
+ } else {
+ $WORKERS{$nr} = $pid;
+ awaitpid($pid, \&reap_worker, $nr);
+ }
+}
+
+sub start_workers {
+ for my $nr (grep { !defined($WORKERS{$_}) } (0..($nworker - 1))) {
+ start_worker($nr);
+ }
+}
+
+sub trim_workers {
+ my @nr = grep { $_ >= $nworker } keys %WORKERS;
+ kill('TERM', @WORKERS{@nr});
}
sub master_loop {
- pipe(my ($p0, $p1)) or die "failed to create parent-pipe: $!";
- my $set_workers = $worker_processes;
+ local $parent_pipe;
+ pipe($parent_pipe, my $p1) or die "failed to create parent-pipe: $!";
+ my $set_workers = $nworker; # for SIGWINCH
reopen_logs();
- my $ignore_winch;
- my $sig = {
+ my $msig = {
USR1 => sub { reopen_logs(); kill_workers($_[0]); },
USR2 => \&upgrade,
QUIT => \&master_quit,
INT => \&master_quit,
TERM => \&master_quit,
WINCH => sub {
- return if $ignore_winch || !@listeners;
- if (-t STDIN || -t STDOUT || -t STDERR) {
- $ignore_winch = 1;
- warn <<EOF;
-ignoring SIGWINCH since we are not daemonized
-EOF
- } else {
- $worker_processes = 0;
- }
+ $nworker = 0;
+ trim_workers();
},
HUP => sub {
- return unless @listeners;
- $worker_processes = $set_workers;
+ $nworker = $set_workers; # undo WINCH
kill_workers($_[0]);
+ PublicInbox::DS::requeue(\&start_workers)
},
TTIN => sub {
- return unless @listeners;
- if ($set_workers > $worker_processes) {
- ++$worker_processes;
+ if ($set_workers > $nworker) {
+ ++$nworker;
} else {
- $worker_processes = ++$set_workers;
+ $nworker = ++$set_workers;
}
+ PublicInbox::DS::requeue(\&start_workers);
},
TTOU => sub {
- $worker_processes = --$set_workers if $set_workers > 0;
+ return if $nworker <= 0;
+ --$nworker;
+ trim_workers();
},
- CHLD => \&reap_children,
+ CHLD => \&PublicInbox::DS::enqueue_reap,
};
- my $sigfd = PublicInbox::Sigfd->new($sig);
- local @SIG{keys %$sig} = values(%$sig) unless $sigfd;
- PublicInbox::DS::sig_setmask($oldset) if !$sigfd;
- while (1) { # main loop
- my $n = scalar keys %pids;
- unless (@listeners) {
- exit if $n == 0;
- $set_workers = $worker_processes = $n = 0;
- }
-
- if ($n > $worker_processes) {
- while (my ($k, $v) = each %pids) {
- kill('TERM', $k) if $v >= $worker_processes;
- }
- $n = $worker_processes;
- }
- my $want = $worker_processes - 1;
- if ($n <= $want) {
- PublicInbox::DS::block_signals() if !$sigfd;
- for my $i ($n..$want) {
- my $seed = rand(0xffffffff);
- my $pid = fork;
- if (!defined $pid) {
- warn "failed to fork worker[$i]: $!\n";
- } elsif ($pid == 0) {
- srand($seed);
- eval { Net::SSLeay::randomize() };
- $set_user->() if $set_user;
- return $p0; # run normal work code
- } else {
- warn "PID=$pid is worker[$i]\n";
- $pids{$pid} = $i;
- }
- }
- PublicInbox::DS::sig_setmask($oldset) if !$sigfd;
- }
-
- if ($sigfd) { # Linux and IO::KQueue users:
- $sigfd->wait_once;
- } else { # wake up every second
- sleep(1);
- }
- }
+ $msig->{WINCH} = sub {
+ warn "ignoring SIGWINCH since we are not daemonized\n";
+ } if -t STDIN || -t STDOUT || -t STDERR;
+ start_workers();
+ PublicInbox::DS::event_loop($msig, $oldset);
exit # never gets here, just for documentation
}
}
}
-sub daemon_loop ($) {
- my ($xnetd) = @_;
+sub daemon_loop () {
local $PublicInbox::Config::DEDUPE = {}; # enable dedupe cache
- my $refresh = sub {
+ my $refresh = $WORKER_SIG{HUP} = sub {
my ($sig) = @_;
%$PublicInbox::Config::DEDUPE = (); # clear cache
- for my $xn (values %$xnetd) {
+ for my $xn (values %XNETD) {
delete $xn->{tlsd}->{ssl_ctx}; # PublicInbox::TLS::start
eval { $xn->{refresh}->($sig) };
warn "refresh $@\n" if $@;
}
};
- my %post_accept;
while (my ($k, $ctx_opt) = each %tls_opt) {
$ctx_opt // next;
my ($scheme, $l) = split(m!://!, $k, 2);
- my $xn = $xnetd->{$l} // die "BUG: no xnetd for $k";
+ my $xn = $XNETD{$l} // die "BUG: no xnetd for $k";
$xn->{tlsd}->{ssl_ctx_opt} //= $ctx_opt;
$scheme =~ m!\A(?:https|imaps|nntps|pop3s)! and
- $post_accept{$l} = tls_cb(@$xn{qw(post_accept tlsd)});
+ $POST_ACCEPT{$l} = tls_cb(@$xn{qw(post_accept tlsd)});
}
undef %tls_opt;
- my $sig = {
- HUP => $refresh,
- INT => \&worker_quit,
- QUIT => \&worker_quit,
- TERM => \&worker_quit,
- TTIN => 'IGNORE',
- TTOU => 'IGNORE',
- USR1 => \&reopen_logs,
- USR2 => 'IGNORE',
- WINCH => 'IGNORE',
- CHLD => \&PublicInbox::DS::enqueue_reap,
- };
- if ($worker_processes > 0) {
+ if ($nworker > 0) {
$refresh->(); # preload by default
- my $fh = master_loop(); # returns if in child process
- PublicInbox::EOFpipe->new($fh, \&worker_quit);
+ return master_loop();
} else {
reopen_logs();
$set_user->() if $set_user;
- $sig->{USR2} = sub { worker_quit() if upgrade() };
+ $WORKER_SIG{USR2} = sub { worker_quit() if upgrade() };
$refresh->();
}
+ worker_loop();
+}
+
+sub worker_loop {
$uid = $gid = undef;
reopen_logs();
@listeners = map {;
my $l = sockname($_);
- my $tls_cb = $post_accept{$l};
- my $xn = $xnetd->{$l} // die "BUG: no xnetd for $l";
+ my $tls_cb = $POST_ACCEPT{$l};
+ my $xn = $XNETD{$l} // die "BUG: no xnetd for $l";
# NNTPS, HTTPS, HTTP, IMAPS and POP3S are client-first traffic
# IMAP, NNTP and POP3 are server-first
PublicInbox::Listener->new($_, $tls_cb || $xn->{post_accept},
$xn->{'multi-accept'})
} @listeners;
- PublicInbox::DS::event_loop($sig, $oldset);
+ PublicInbox::DS::event_loop(\%WORKER_SIG, $oldset);
}
sub run {
my ($default_listen) = @_;
- daemon_prepare($default_listen, my $xnetd = {});
+ $nworker = 1;
+ local (%XNETD, %POST_ACCEPT);
+ daemon_prepare($default_listen);
my $for_destroy = daemonize();
# localize GCF2C for tests:
local $PublicInbox::GitAsyncCat::GCF2C;
local $PublicInbox::Git::async_warn = 1;
local $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
+ local %WORKER_SIG = %WORKER_SIG;
+ local %POST_ACCEPT;
- daemon_loop($xnetd);
+ daemon_loop();
PublicInbox::DS->Reset;
# ->DESTROY runs when $for_destroy goes out-of-scope
}
# Copyright (C) all contributors <meta@public-inbox.org>
# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
# Tests for binding Unix domain sockets
-use strict;
-use Test::More;
+use v5.12;
use PublicInbox::TestCommon;
use Errno qw(EADDRINUSE);
use Cwd qw(abs_path);
require_mods(qw(Plack::Util Plack::Builder HTTP::Date HTTP::Status));
use IO::Socket::UNIX;
use POSIX qw(mkfifo);
+require PublicInbox::Sigfd;
my ($tmpdir, $for_destroy) = tmpdir();
my $unix = "$tmpdir/unix.sock";
my $psgi = './t/httpd-corner.psgi';
# portable Perl can delay or miss signal dispatches due to races,
# so disable some tests on systems lacking signalfd(2) or EVFILT_SIGNAL
-my $has_sigfd = PublicInbox::Sigfd->new({}, 0) ? 1 : $ENV{TEST_UNRELIABLE};
+my $has_sigfd = PublicInbox::Sigfd->new({}) ? 1 : $ENV{TEST_UNRELIABLE};
+PublicInbox::DS::Reset() if $has_sigfd;
sub delay_until {
- my $cond = shift;
+ my ($cond, $msg) = @_;
my $end = time + 30;
do {
return if $cond->();
tick(0.012);
} until (time > $end);
- Carp::confess('condition failed');
+ Carp::confess($msg // 'condition failed');
}
SKIP: {
is(select($rvec, undef, undef, 1), 1, 'timeout for pipe HUP');
is(my $undef = <$p0>, undef, 'process closed pipe writer at exit');
ok(!-e $pid_file, "$w pid file unlinked at exit");
+ delay_until(sub { !kill(0, $pid) },
+ "daemonized $w really not running");
}
my $httpd = abs_path('blib/script/public-inbox-httpd');
delay_until(sub {
$pid == (eval { $read_pid->($pid_file) } // 0)
});
+
+ delay_until(sub { !kill(0, $new_pid) }, 'new PID really died');
+
is($read_pid->($pid_file), $pid, 'old PID file restored');
ok(!-f "$pid_file.oldbin", '.oldbin PID file gone');
# drop the old parent
kill('QUIT', $old_pid) or die "QUIT failed: $!";
- delay_until(sub { !kill(0, $old_pid) }); # UGH
+ delay_until(sub { !kill(0, $old_pid) }, 'old PID really died');
ok(!-f "$pid_file.oldbin", '.oldbin PID file gone');
is(my $u = <$p0>, undef, 'process closed pipe writer at exit');
ok(!-f $pid_file, 'PID file is gone');
+ delay_until(sub { !kill(0, $new_pid) }, 'new PID really died');
}
if ('try USR2 without workers (-W0)') {
is(select($rvec, undef, undef, 1), 1, 'timeout for pipe HUP');
is(my $u = <$p0>, undef, 'process closed pipe writer at exit');
ok(!-f $pid_file, 'PID file is gone');
+ delay_until(sub { !kill(0, $pid) }, '-W0 daemon is gone');
}
}