# local-ized in parent before fork
$TXN_BYTES = $batch_bytes;
local $self->{git} = $git; # for patchid
+ return if $DO_QUIT;
my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
close $in or die "close: $!";
my $nr = 0;
my $len;
my $cmt = {};
local $/ = $FS;
- my $buf = <$rd> // return; # leading $FS
+ my $buf = <$rd> // return close($rd); # leading $FS
$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
$self->begin_txn_lazy;
- while (defined($buf = <$rd>)) {
+ while (!$DO_QUIT && defined($buf = <$rd>)) {
chomp($buf);
$/ = "\n";
$len = length($buf);
$TXN_BYTES = $batch_bytes - $len;
}
add_commit($self, $cmt);
- last if $DO_QUIT;
++$nr;
if ($TXN_BYTES <= 0) {
cidx_ckpoint($self, "[$n] $nr");
sub need_reap { # post_loop_do
my (undef, $jobs) = @_;
+ return if !$LIVE || $DO_QUIT;
scalar(keys(%$LIVE)) > $jobs;
}
sub partition_refs ($$$) {
my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
- my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
+ my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
close $refs or die "close: $!";
my ($seen, $nchange) = (0, 0);
my @shard_in = map {
$fh;
} @RDONLY_SHARDS;
- while (defined(my $cmt = <$fh>)) {
+ while (defined(my $cmt = <$rfh>)) {
chomp $cmt;
my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
if (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
++$nchange;
$seen = 0;
}
+ if ($DO_QUIT) {
+ close($rfh);
+ return ();
+ }
}
- close($fh);
+ close($rfh);
+ return () if $DO_QUIT;
if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
$self->{nchange} += $nchange;
progress($self, "$git->{git_dir}: $nchange commits");
sub consumers_open { # post_loop_do
my (undef, $consumers) = @_;
+ return if $DO_QUIT;
scalar(grep { $_->{sock} } values %$consumers);
}
+sub wait_consumers ($$$) {
+ my ($self, $git, $consumers) = @_;
+ local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+ PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
+ my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+ die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
+}
+
sub commit_used_shards ($$$) {
my ($self, $git, $consumers) = @_;
local $self->{-shard_ok} = {};
$IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
$consumers->{$n} = $c;
}
- local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
- PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
- my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
- die "E: $git->{git_dir} $n shards failed" if $n;
+ wait_consumers($self, $git, $consumers);
}
sub index_repo { # cidx_await cb
my ($self, $git, $roots) = @_;
- return if $git->{-cidx_err};
+ return if $git->{-cidx_err} || $DO_QUIT;
my $repo = delete $git->{-repo} or return;
seek($roots, 0, SEEK_SET) or die "seek: $!";
chomp(my @roots = <$roots>);
local $self->{current_info} = $git->{git_dir};
my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
- my %CONSUMERS;
+ my $consumers = {};
for my $n (0..$#shard_in) {
-s $shard_in[$n] or next;
+ last if $DO_QUIT;
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{shard_done} = [ $self ];
$IDX_SHARDS[$n]->wq_io_do('shard_index',
[ $shard_in[$n], $p->{op_p} ],
$git, $n, \@roots);
- $CONSUMERS{$n} = $c;
+ $consumers->{$n} = $c;
}
@shard_in = ();
- local @PublicInbox::DS::post_loop_do = (\&consumers_open, \%CONSUMERS);
- PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
- my $n = grep { ! $self->{-shard_ok}->{$_} } keys %CONSUMERS;
- die "E: $git->{git_dir} $n shards failed" if $n;
+ wait_consumers($self, $git, $consumers);
if ($DO_QUIT) {
- commit_used_shards($self, $git, \%CONSUMERS);
+ commit_used_shards($self, $git, $consumers);
progress($self, "$git->{git_dir}: done");
return;
}
$repo->{git_dir} = $git->{git_dir};
my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
if ($id > 0) {
- $CONSUMERS{$repo->{shard_n}} = undef;
- commit_used_shards($self, $git, \%CONSUMERS);
+ $consumers->{$repo->{shard_n}} = undef;
+ commit_used_shards($self, $git, $consumers);
progress($self, "$git->{git_dir}: done");
return run_todo($self);
}
$self, $git);
fp_start($self, $git, $prep_repo);
ct_start($self, $git, $prep_repo);
+ last if $DO_QUIT;
}
cidx_reap($self, 0);
}
sub shard_done_wait { # awaitpid cb via ipc_worker_reap
my ($pid, $shard, $self) = @_;
+ my $quit_req = delete($shard->{-cidx_quit});
+ return if $DO_QUIT || !$LIVE;
if ($? == 0) { # success
- delete($shard->{-cidx_quit}) // warn 'BUG: {-cidx_quit} unset';
+ $quit_req // warn 'BUG: {-cidx_quit} unset';
return;
}
warn "PID:$pid $shard->{shard} exited with \$?=$?\n";