use PublicInbox::Aspawn qw(run_await);
use Carp ();
use autodie qw(pipe open seek sysseek send);
+our $DO_QUIT = 15; # signal number
our (
- $LIVE, # pid => cmd
$LIVE_JOBS, # integer
+ $GITS_NR, # number of coderepos
$MY_SIG, # like %SIG
$SIGSET,
$TXN_BYTES, # number of bytes in current shard transaction
$BATCH_BYTES,
- $DO_QUIT, # signal number
@RDONLY_XDB, # Xapian::Database
@IDX_SHARDS, # clones of self
$MAX_SIZE,
@ids;
}
-sub cidx_await_cb { # awaitpid cb
- my ($pid, $cb, $self, $git, @args) = @_;
- return if !$LIVE || $DO_QUIT;
- my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
- PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
- if ($?) {
- $git->{-cidx_err} = 1;
- return warn("@$cmd error: \$?=$?\n");
- }
- $cb->($self, $git, @args);
+sub _cb { # run_await cb
+ my ($pid, $cmd, undef, $opt, $cb, $self, $git, @arg) = @_;
+ return if $DO_QUIT;
+ ($git->{-cidx_err} = $?) ? warn("@$cmd error: \$?=$?\n") :
+ $cb->($opt, $self, $git, @arg);
}
-sub cidx_await ($$$$$@) {
- my ($pid, $cmd, $cb, $self, $git, @args) = @_;
- $LIVE->{$pid} = $cmd;
- awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args);
+sub run_git {
+ my ($cmd, $opt, $cb, $self, $git, @arg) = @_;
+ run_await($git->cmd(@$cmd), undef, $opt, \&_cb, $cb, $self, $git, @arg)
}
# this is different from the grokmirror-compatible fingerprint since we
# only care about --heads (branches) and --tags, and not even their names
sub fp_start ($$$) {
my ($self, $git, $prep_repo) = @_;
- return if !$LIVE || $DO_QUIT;
+ return if $DO_QUIT;
open my $refs, '+>', undef;
- my $cmd = ['git', "--git-dir=$git->{git_dir}",
- qw(show-ref --heads --tags --hash)];
- my $pid = spawn($cmd, undef, { 1 => $refs });
$git->{-repo}->{refs} = $refs;
- cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo);
+ run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs },
+ \&fp_fini, $self, $git, $prep_repo);
}
-sub fp_fini { # cidx_await cb
- my ($self, $git, $prep_repo) = @_;
+sub fp_fini { # run_git cb
+ my (undef, $self, $git, $prep_repo) = @_;
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
seek($refs, 0, SEEK_SET);
my $buf;
sub ct_start ($$$) {
my ($self, $git, $prep_repo) = @_;
- return if !$LIVE || $DO_QUIT;
- my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
- qw[for-each-ref --sort=-committerdate
+ return if $DO_QUIT;
+ run_git([ qw[for-each-ref --sort=-committerdate
--format=%(committerdate:raw) --count=1
- refs/heads/ refs/tags/] ];
- my ($rd, $pid) = popen_rd($cmd);
- cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo);
+ refs/heads/ refs/tags/] ], undef, # capture like qx
+ \&ct_fini, $self, $git, $prep_repo);
}
-sub ct_fini { # cidx_await cb
- my ($self, $git, $rd, $prep_repo) = @_;
- defined(my $ct = <$rd>) or return;
- $ct =~ s/\s+.*\z//s; # drop TZ + LF
+sub ct_fini { # run_git cb
+ my ($opt, $self, $git, $prep_repo) = @_;
+ my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF
$git->{-repo}->{ct} = $ct + 0;
}
# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
sub prep_repo ($$) {
my ($self, $git) = @_;
- return if !$LIVE || $DO_QUIT;
+ return if $DO_QUIT;
return index_next($self) if $git->{-cidx_err};
my $repo = $git->{-repo} // die 'BUG: no {-repo}';
if (!defined($repo->{ct})) {
my ($self) = @_;
return if $DO_QUIT;
if ($IDX_TODO && @$IDX_TODO) {
- index_repo($self, shift @$IDX_TODO);
+ index_repo(undef, $self, shift @$IDX_TODO);
} elsif ($GIT_TODO && @$GIT_TODO) {
- my $git = PublicInbox::Git->new(shift @$GIT_TODO);
+ my $git = shift @$GIT_TODO;
my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
$self, $git);
fp_start($self, $git, $prep_repo);
return if delete($TODO{dump_roots_start});
delete $TODO{dump_ibx_start}; # runs OnDestroy once
return dump_ibx($self, shift @IBXQ) if @IBXQ;
- undef $DUMP_IBX_WPIPE; # done dumping inboxes, dump roots
+ undef $DUMP_IBX_WPIPE; # done dumping inboxes
+ undef $XHC;
delete $TODO{associate};
}
# else: wait for shards_active (post_loop_do) callback
# shard_done fires when all shards are committed
}
-sub index_repo { # cidx_await cb
- my ($self, $git) = @_;
+sub index_repo { # run_git cb
+ my (undef, $self, $git) = @_;
return if $DO_QUIT;
return index_next($self) if $git->{-cidx_err};
return push(@$IDX_TODO, $git) if $REPO_CTX; # busy
my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
$repo->{git_dir} = $git->{git_dir};
my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo };
+ delete $git->{-cidx_gits_fini}; # may fire gits_fini
my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard,
$repo_ctx);
my ($c, $p) = PublicInbox::PktOp->pair;
sub get_roots ($$) {
my ($self, $git) = @_;
- return if !$LIVE || $DO_QUIT;
+ return if $DO_QUIT;
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET);
open my $roots_fh, '+>', undef;
- my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
- qw(rev-list --stdin --max-parents=0) ];
- my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots_fh });
$git->{-repo}->{roots_fh} = $roots_fh;
- cidx_await($pid, $cmd, \&index_repo, $self, $git);
+ run_git([ qw(rev-list --stdin --max-parents=0) ],
+ { 0 => $refs, 1 => $roots_fh }, \&index_repo, $self, $git)
}
# for PublicInbox::SearchIdx::patch_id and with_umask
@shards;
}
+sub gits_fini {
+ undef $GITS_NR;
+ PublicInbox::DS::enqueue_reap(); # kick @post_loop_do
+}
+
sub scan_git_dirs ($) {
my ($self) = @_;
- my $n = @$GIT_TODO = @{$self->{git_dirs}};
- progress($self, "scanning $n code repositories...");
+ @$GIT_TODO = map { PublicInbox::Git->new($_) } @{$self->{git_dirs}};
+ $GITS_NR = @$GIT_TODO;
+ my $gits_fini = PublicInbox::OnDestroy->new($$, \&gits_fini);
+ $_->{-cidx_gits_fini} = $gits_fini for @$GIT_TODO;
+ progress($self, "scanning $GITS_NR code repositories...");
}
sub prune_init { # via wq_io_do in IDX_SHARDS
sub shards_active { # post_loop_do
return if $DO_QUIT;
- return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4;
+ return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO) != 3;
return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
- return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX;
- return 1 if keys(%$LIVE) || @IBXQ || keys(%TODO);
+ return 1 if $GITS_NR || scalar(@$IDX_TODO) || $REPO_CTX;
+ return 1 if @IBXQ || keys(%TODO);
for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
$s->{-cidx_quit} = 1 if defined($s->{-wq_s1});
$s->wq_close; # may recurse via awaitpid outside of event_loop
POSIX::SIGTSTP, POSIX::SIGCONT);
my $restore = PublicInbox::OnDestroy->new($$,
\&PublicInbox::DS::sig_setmask, $SIGSET);
- local $LIVE = {};
local $PRUNE_DONE = [];
local $IDX_TODO = [];
local $GIT_TODO = [];
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
%TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
- @ID2ROOT, $XHC, @SORT);
+ @ID2ROOT, $XHC, @SORT, $GITS_NR);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
sub shard_done_wait { # awaitpid cb via ipc_worker_reap
my ($pid, $shard, $self) = @_;
my $quit_req = delete($shard->{-cidx_quit});
- return if $DO_QUIT || !$LIVE;
+ return if $DO_QUIT;
if ($? == 0) { # success
$quit_req // warn 'BUG: {-cidx_quit} unset';
} else {
warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
++$self->{shard_err} if defined($self->{shard_err});
}
- PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC
}
1;