@PublicInbox::DS::post_loop_do = (\&has_busy_clients, { -w => 0 })
}
+sub spawn_xh () {
+ $xh_workers // return;
+ require PublicInbox::XhcMset;
+ local $) = $gid if defined $gid;
+ local $( = $gid if defined $gid;
+ local $> = $uid if defined $uid;
+ local $< = $uid if defined $uid;
+ $PublicInbox::Search::XHC = eval {
+ local $ENV{STDERR_PATH} = $stderr;
+ local $ENV{STDOUT_PATH} = $stdout;
+ PublicInbox::XapClient::start_helper('-j', $xh_workers)
+ };
+ warn "E: $@" if $@;
+ awaitpid($PublicInbox::Search::XHC->{io}->attached_pid, \&respawn_xh)
+ if $PublicInbox::Search::XHC;
+}
+
sub reopen_logs {
+ my ($sig) = @_;
$logs{$stdout} //= \*STDOUT if defined $stdout;
$logs{$stderr} //= \*STDERR if defined $stderr;
while (my ($p, $fh) = each %logs) { open_log_path($fh, $p) }
+ ($sig && defined($xh_workers) && $PublicInbox::Search::XHC) and
+ kill('USR1', $PublicInbox::Search::XHC->{io}->attached_pid);
}
sub sockname ($) {
my $pid = PublicInbox::DS::fork_persist;
if ($pid == 0) {
undef %WORKERS;
+ undef $xh_workers;
local $PublicInbox::DS::Poller; # allow epoll/kqueue
$set_user->() if $set_user;
PublicInbox::EOFpipe->new($parent_pipe, \&worker_quit);
pipe($parent_pipe, my $p1) or die "failed to create parent-pipe: $!";
my $set_workers = $nworker; # for SIGWINCH
reopen_logs();
+ spawn_xh;
my $msig = {
- USR1 => sub { reopen_logs(); kill_workers($_[0]); },
+ USR1 => sub { reopen_logs($_[0]); kill_workers($_[0]); },
USR2 => \&upgrade,
QUIT => \&master_quit,
INT => \&master_quit,
sub worker_loop {
$uid = $gid = undef;
reopen_logs();
+ spawn_xh; # only for -W0
@listeners = map {;
my $l = sockname($_);
my $tls_cb = $POST_ACCEPT{$l};
my ($pid) = @_;
return unless @listeners;
warn "W: xap_helper PID:$pid died: \$?=$?, respawning...\n";
- $PublicInbox::Search::XHC =
- PublicInbox::XapClient::start_helper('-j', $xh_workers);
+ spawn_xh;
}
sub run {
local $SIG{__WARN__} = PublicInbox::Eml::warn_ignore_cb();
local %WORKER_SIG = %WORKER_SIG;
local $PublicInbox::XapClient::tries = 0;
-
- local $PublicInbox::Search::XHC = PublicInbox::XapClient::start_helper(
- '-j', $xh_workers) if defined($xh_workers);
- if ($PublicInbox::Search::XHC) {
- require PublicInbox::XhcMset;
- awaitpid($PublicInbox::Search::XHC->{io}->attached_pid,
- \&respawn_xh);
- }
+ local $PublicInbox::Search::XHC if defined($xh_workers);
daemon_loop();
# $unlink_on_leave runs
$ENV{XDG_CACHE_HOME} //= "$ENV{HOME}/.cache"; # reuse C++ xap_helper builds
$_ = File::Spec->rel2abs($_) for (grep(!m!^/!, @INC));
-
+our $CURRENT_DAEMON;
BEGIN {
@EXPORT = qw(tmpdir tcp_server tcp_connect require_git require_mods
run_script start_script key2sub xsys xsys_e xqx eml_load tick
my $run_mode = $ENV{TEST_RUN_MODE} // $opt->{run_mode} // 2;
my $sub = $run_mode == 0 ? undef : key2sub($key);
my $tail;
- my $xh = $ENV{TEST_DAEMON_XH};
- $xh && $key =~ /-(?:imapd|netd|httpd|pop3d|nntpd)\z/ and
- push @argv, split(/\s+/, $xh);
+ my @xh = split(/\s+/, $ENV{TEST_DAEMON_XH} // '');
+ @xh = () if $key !~ /-(?:imapd|netd|httpd|pop3d|nntpd)\z/;
+ push @argv, @xh;
if ($tail_cmd) {
my @paths;
for (@argv) {
$ENV{LISTEN_FDS} = $fds;
}
if ($opt->{-C}) { chdir($opt->{-C}) }
- $0 = join(' ', @$cmd);
+ $0 = join(' ', @$cmd, @xh);
local @SIG{keys %SIG} = map { undef } values %SIG;
local $SIG{FPE} = 'IGNORE'; # Perl default
undef $tmp_mask;
local $ENV{PLACK_TEST_EXTERNALSERVER_URI} = "http://$h:$p";
my $ua = LWP::UserAgent->new;
$ua->max_redirect(0);
+ local $CURRENT_DAEMON = $td;
Plack::Test::ExternalServer::test_psgi(client => $client,
ua => $ua);
$cb->() if $cb;
local $SIG{__WARN__} = sub { print $stderr @_ };
my $rbuf;
local $SIG{TERM} = sub { undef $in };
+ local $SIG{USR1} = \&reopen_logs;
while (defined($in)) {
PublicInbox::DS::sig_setmask($workerset);
my @fds = eval { # we undef $in in SIG{TERM}
}
}
+sub reopen_logs {
+ my $p = $ENV{STDOUT_PATH};
+ defined($p) && open(STDOUT, '>>', $p) and STDOUT->autoflush(1);
+ $p = $ENV{STDERR_PATH};
+ defined($p) && open(STDERR, '>>', $p) and STDERR->autoflush(1);
+}
+
+sub parent_reopen_logs {
+ reopen_logs();
+ kill('USR1', values %WORKERS);
+}
+
sub xh_alive { $in || scalar(keys %WORKERS) }
sub start (@) {
die 'bad args';
local $workerset = POSIX::SigSet->new;
$workerset->fillset or die "fillset: $!";
- for (@PublicInbox::DS::UNBLOCKABLE) {
+ for (@PublicInbox::DS::UNBLOCKABLE, POSIX::SIGUSR1) {
$workerset->delset($_) or die "delset($_): $!";
}
},
TTOU => \&do_sigttou,
CHLD => \&PublicInbox::DS::enqueue_reap,
+ USR1 => \&parent_reopen_logs,
};
PublicInbox::DS::block_signals();
start_workers();
#define WORKER_MAX USHRT_MAX
static unsigned long nworker, nworker_hwm;
static int pipefds[2];
+static const char *stdout_path, *stderr_path; // for SIGUSR1
+static sig_atomic_t worker_needs_reopen;
// PublicInbox::Search and PublicInbox::CodeSearch generate these:
static void mail_nrp_init(void);
clearerr(stderr);
}
-static void sigw(int sig) // SIGTERM handler for worker
+static void sigw(int sig) // SIGTERM+SIGUSR1 handler for worker
{
- sock_fd = -1; // break out of recv_loop
+ switch (sig) {
+ case SIGUSR1: worker_needs_reopen = 1; break;
+ default: sock_fd = -1; // break out of recv_loop
+ }
}
#define CLEANUP_REQ __attribute__((__cleanup__(req_cleanup)))
free(req->lenv);
}
+static void reopen_logs(void)
+{
+ if (stdout_path && *stdout_path && !freopen(stdout_path, "a", stdout))
+ err(EXIT_FAILURE, "freopen %s", stdout_path);
+ if (stderr_path && *stderr_path) {
+ if (!freopen(stderr_path, "a", stderr))
+ err(EXIT_FAILURE, "freopen %s", stderr_path);
+ if (my_setlinebuf(stderr))
+ err(EXIT_FAILURE, "setlinebuf(stderr)");
+ }
+}
+
static void recv_loop(void) // worker process loop
{
static char rbuf[4096 * 33]; // per-process
sa.sa_handler = sigw;
CHECK(int, 0, sigaction(SIGTERM, &sa, NULL));
+ CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
while (sock_fd == 0) {
size_t len = sizeof(rbuf);
stderr_restore(req.fp[1]);
ERR_CLOSE(req.fp[1], 0);
}
+ if (worker_needs_reopen) {
+ worker_needs_reopen = 0;
+ reopen_logs();
+ }
}
}
#endif
}
+static void parent_reopen_logs(void)
+{
+ reopen_logs();
+ for (unsigned long nr = nworker; nr < nworker_hwm; nr++) {
+ pid_t pid = worker_pids[nr];
+ if (pid != 0 && kill(pid, SIGUSR1))
+ warn("BUG?: kill(%d, SIGUSR1)", (int)pid);
+ }
+}
+
static void sigp(int sig) // parent signal handler
{
static const char eagain[] = "signals coming in too fast";
case SIGCHLD: c = '.'; break;
case SIGTTOU: c = '-'; break;
case SIGTTIN: c = '+'; break;
+ case SIGUSR1: c = '#'; break;
default:
write(STDERR_FILENO, bad_sig, sizeof(bad_sig) - 1);
_exit(EXIT_FAILURE);
{
int c;
socklen_t slen = (socklen_t)sizeof(c);
+ stdout_path = getenv("STDOUT_PATH");
+ stderr_path = getenv("STDERR_PATH");
if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen))
err(EXIT_FAILURE, "getsockopt");
DELSET(SIGXCPU);
DELSET(SIGXFSZ);
#undef DELSET
+ CHECK(int, 0, sigdelset(&workerset, SIGUSR1));
if (nworker == 0) { // no SIGTERM handling w/o workers
recv_loop();
CHECK(int, 0, sigdelset(&pset, SIGCHLD));
CHECK(int, 0, sigdelset(&pset, SIGTTIN));
CHECK(int, 0, sigdelset(&pset, SIGTTOU));
+ CHECK(int, 0, sigdelset(&pset, SIGUSR1));
struct sigaction sa = {};
sa.sa_handler = sigp;
+ CHECK(int, 0, sigaction(SIGUSR1, &sa, NULL));
CHECK(int, 0, sigaction(SIGTTIN, &sa, NULL));
CHECK(int, 0, sigaction(SIGTTOU, &sa, NULL));
sa.sa_flags = SA_NOCLDSTOP;
case '.': break; // do_sigchld already called
case '-': do_sigttou(); break;
case '+': do_sigttin(); break;
+ case '#': parent_reopen_logs(); break;
default: errx(EXIT_FAILURE, "BUG: c=%c", sbuf[i]);
}
}
use PublicInbox::Eml;
use PublicInbox::Config;
use PublicInbox::MID qw(mids);
+use autodie qw(kill rename);
require_mods(qw(DBD::SQLite Xapian HTTP::Request::Common Plack::Test
URI::Escape Plack::Builder HTTP::Date));
use_ok($_) for (qw(HTTP::Request::Common Plack::Test));
test_psgi(sub { $www->call(@_) }, $client3);
test_httpd($env, $client3, 4);
+if ($^O eq 'linux' && -r "/proc/$$/stat") {
+ my $args;
+ my $search_xh_pid = sub {
+ my ($pid) = @_;
+ for my $f (glob('/proc/*/stat')) {
+ open my $fh, '<', $f or next;
+ my @s = split /\s+/, readline($fh) // next;
+ next if $s[3] ne $pid; # look for matching PPID
+ open $fh, '<', "/proc/$s[0]/cmdline" or next;
+ my $cmdline = readline($fh) // next;
+ if ($cmdline =~ /\0-MPublicInbox::XapHelper\0-e\0/ ||
+ $cmdline =~ m!/xap_helper\0!) {
+ return $s[0];
+ }
+ }
+ undef;
+ };
+ my $usr1_test = sub {
+ my ($cb) = @_;
+ my $td = $PublicInbox::TestCommon::CURRENT_DAEMON;
+ my $pid = $td->{pid};
+ my $res = $cb->(GET('/v2test/?q=m:a-mid@b'));
+ is $res->code, 200, '-httpd is running w/ search';
+
+ $search_xh_pid->($pid);
+ my $xh_pid = $search_xh_pid->($pid) or
+ BAIL_OUT "can't find XH pid with $args";
+ my $xh_err = readlink "/proc/$xh_pid/fd/2";
+ is $xh_err, "$env->{TMPDIR}/stderr.log",
+ "initial stderr expected ($args)";
+ rename "$env->{TMPDIR}/stderr.log",
+ "$env->{TMPDIR}/stderr.old";
+ $xh_err = readlink "/proc/$xh_pid/fd/2";
+ is $xh_err, "$env->{TMPDIR}/stderr.old",
+ "stderr followed rename ($args)";
+ kill 'USR1', $pid;
+ tick;
+ $res = $cb->(GET('/v2test/?q=m:a-mid@b'));
+ is $res->code, 200, '-httpd still running w/ search';
+ my $new_xh_pid = $search_xh_pid->($pid) or
+ BAIL_OUT "can't find new XH pid with $args";
+ is $new_xh_pid, $xh_pid, "XH pid unchanged ($args)";
+ $xh_err = readlink "/proc/$new_xh_pid/fd/2";
+ is $xh_err, "$env->{TMPDIR}/stderr.log",
+ "stderr updated ($args)";
+ };
+ for my $x ('-X0', '-X1', '-X0 -W1', '-X1 -W1') {
+ $args = $x;
+ local $ENV{TEST_DAEMON_XH} = $args;
+ test_httpd($env, $usr1_test);
+ }
+}
+
done_testing;