$LIVE_JOBS, # integer
$MY_SIG, # like %SIG
$SIGSET,
+ $TXN_BYTES, # number of bytes in current shard transaction
+ $DO_QUIT, # signal number
@RDONLY_SHARDS, # Xapian::Database
@IDX_SHARDS # clones of self
);
sub shard_index { # via wq_io_do
my ($self, $git, $n, $roots) = @_;
local $self->{current_info} = "$git->{git_dir} [$n]";
- my ($quit, $cmt);
+ my $cmt;
local $self->{roots} = $roots;
my $in = delete($self->{0}) // die 'BUG: no {0} input';
my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
my $batch_bytes = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
- my $max = $batch_bytes;
- my $set_quit = sub { $quit = shift };
- local $SIG{USR1} = sub { $max = -1 }; # similar to `git fast-import'
- local $SIG{QUIT} = $set_quit;
- local $SIG{TERM} = $set_quit;
- local $SIG{INT} = $set_quit;
+ # local-ized in parent before fork
+ $TXN_BYTES = $batch_bytes;
local $self->{git} = $git; # for patchid
my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
close $in or die "close: $!";
$self->begin_txn_lazy;
while (defined($buf = <$rd>)) {
chomp($buf);
- $max -= length($buf);
+ $TXN_BYTES -= length($buf);
@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
$/ = "\n";
add_commit($self, $cmt);
- last if $quit; # likely SIGPIPE
+ last if $DO_QUIT;
++$nr;
- if ($max <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
+ if ($TXN_BYTES <= 0 && !$PublicInbox::Search::X{CLOEXEC_UNSET}) {
progress($self, "[$n] $nr");
$self->{xdb}->commit_transaction;
- $max = $batch_bytes;
+ $TXN_BYTES = $batch_bytes;
$self->{xdb}->begin_transaction;
}
$/ = $FS;
}
close($rd);
- if (!$? || ($quit && ($? & 127) == POSIX::SIGPIPE)) {
+ if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+ ($? & 127) == POSIX::SIGPIPE))) {
send($op_p, "shard_done $n", MSG_EOR);
} else {
warn "E: git @LOG_STDIN: \$?=$?\n";
sub cidx_reap ($$) {
my ($self, $jobs) = @_;
while (run_todo($self)) {}
- local @PublicInbox::DS::post_loop_do = \(&need_reap, $jobs);
+ local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs);
while (need_reap(undef, $jobs)) {
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
}
sub cidx_await_cb { # awaitpid cb
my ($pid, $cb, $self, $git, @args) = @_;
- return if !$LIVE; # premature shutdown
+ return if !$LIVE || $DO_QUIT;
my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
if ($?) {
# only care about --heads (branches) and --tags, and not even their names
sub fp_start ($$$) {
my ($self, $git, $prep_repo) = @_;
- return if !$LIVE; # premature exit
+ return if !$LIVE || $DO_QUIT;
cidx_reap($self, $LIVE_JOBS);
open my $refs, '+>', undef or die "open: $!";
my $cmd = ['git', "--git-dir=$git->{git_dir}",
sub ct_start ($$$) {
my ($self, $git, $prep_repo) = @_;
- return if !$LIVE; # premature exit
+ return if !$LIVE || $DO_QUIT;
cidx_reap($self, $LIVE_JOBS);
my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
qw[for-each-ref --sort=-committerdate
# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
sub prep_repo ($$) {
my ($self, $git) = @_;
- return if !$LIVE || $git->{-cidx_err}; # premature exit
+ return if !$LIVE || $DO_QUIT || $git->{-cidx_err};
my $repo = $git->{-repo} // die 'BUG: no {-repo}';
if (!defined($repo->{ct})) {
warn "W: $git->{git_dir} has no commits, skipping\n";
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
die "E: $git->{git_dir} $n shards failed" if $n;
+ if ($DO_QUIT) {
+ commit_used_shards($self, $git, \%CONSUMERS);
+ progress($self, "$git->{git_dir}: done");
+ return;
+ }
$repo->{git_dir} = $git->{git_dir};
my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
if ($id > 0) {
sub get_roots ($$) {
my ($self, $git) = @_;
- return if !$LIVE; # premature exit
+ return if !$LIVE || $DO_QUIT;
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET) or die "seek: $!";
open my $roots, '+>', undef or die "open: $!";
@$dirs = grep { !$uniq{$_}++ } @$dirs;
}
+# SIG handlers:
+sub shard_quit { $DO_QUIT = POSIX->can("SIG$_[0]")->() }
+sub shard_usr1 { $TXN_BYTES = -1 }
+
sub cidx_init ($) {
my ($self) = @_;
my $dir = $self->{cidx_dir};
}
$self->lock_acquire;
my @shards;
+ local $TXN_BYTES;
for my $n (0..($self->{nshard} - 1)) {
my $shard = bless { %$self, shard => $n }, ref($self);
delete @$shard{qw(lockfh lock_path)};
$shard->idx_acquire;
$shard->idx_release;
- $shard->wq_workers_start("shard[$n]", 1, undef, {
+ $shard->wq_workers_start("shard[$n]", 1, $SIGSET, {
siblings => \@shards, # for ipc_atfork_child
}, \&shard_done_wait, $self);
push @shards, $shard;
scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
}
+# signal handlers
+sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
+
+sub parent_quit {
+ $DO_QUIT = $_[0];
+ kill_shards(@_);
+ warn "# SIG$_[0] received, quitting...\n";
+}
+
sub cidx_run { # main entry point
my ($self) = @_;
local $self->{todo} = [];
my $restore = PublicInbox::OnDestroy->new($$,
\&PublicInbox::DS::sig_setmask, $SIGSET);
local $LIVE = {};
+ local $DO_QUIT;
local @IDX_SHARDS = cidx_init($self);
local $self->{current_info} = '';
- my $cb = $SIG{__WARN__} || \&CORE::warn;
local $MY_SIG = {
CHLD => \&PublicInbox::DS::enqueue_reap,
- INT => sub { exit },
+ USR1 => \&kill_shards,
};
+ $MY_SIG->{$_} = \&parent_quit for qw(TERM QUIT INT);
+ my $cb = $SIG{__WARN__} || \&CORE::warn;
local $SIG{__WARN__} = sub {
my $m = shift @_;
$self->{current_info} eq '' or
sub ipc_atfork_child {
my ($self) = @_;
$self->SUPER::ipc_atfork_child;
+ $SIG{USR1} = \&shard_usr1;
+ $SIG{$_} = \&shard_quit for qw(INT TERM QUIT);
my $x = delete $self->{siblings} // die 'BUG: no {siblings}';
$_->wq_close for @$x;
+ undef;
}
sub shard_done_wait { # awaitpid cb via ipc_worker_reap
my ($pid, $shard, $self) = @_;
- delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
- return unless $?;
+ if ($? == 0) { # success
+ delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+ return;
+ }
warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
++$self->{shard_err} if defined($self->{shard_err});
}