$NPROC,
$XHC, # XapClient
$REPO_CTX, # current repo being indexed in shards
- $IDX_TODO, # [ $git0, $root0, $git1, $root1, ...]
- $GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...]
+ $IDX_TODO, # PublicInbox::Git object arrayref
+ $GIT_TODO, # PublicInbox::Git object arrayref
%ALT_FH, # hexlen => tmp IO for TMPDIR git alternates
$TMPDIR, # File::Temp->newdir object for prune
@PRUNE_QUEUE, # GIT_DIRs to prepare for pruning
return if $DO_QUIT || !$PRUNE_DONE;
die "BUG: \$PRUNE_DONE->[$n] already defined" if $PRUNE_DONE->[$n];
$PRUNE_DONE->[$n] = 1;
- grep(defined, @$PRUNE_DONE) == @IDX_SHARDS and
- progress($self, 'prune done')
+ if (grep(defined, @$PRUNE_DONE) == @IDX_SHARDS) {
+ progress($self, 'prune done');
+ index_next($self); # may kick dump_roots_start
+ }
}
sub seen ($$) {
$max < 0 ? ((2 ** 31) - 1) : $max;
}
+sub start_xhc () {
+ my ($xhc, $pid) = PublicInbox::XapClient::start_helper("-j$NPROC");
+ awaitpid($pid, \&cmd_done, ['xap_helper', "-j$NPROC"]);
+ $xhc;
+}
+
sub dump_roots_start {
my ($self, $associate) = @_;
- ($XHC, my $pid) = PublicInbox::XapClient::start_helper("-j$NPROC");
- awaitpid($pid, \&cmd_done, ['xap_helper', "-j$NPROC"]);
+ $XHC //= start_xhc;
$associate // die 'BUG: no $associate';
$TODO{associating} = 1; # keep shards_active() happy
progress($self, 'dumping IDs from coderepos');
sub dump_ibx_start {
my ($self, $associate) = @_;
+ $XHC //= start_xhc;
my ($sort_opt, $fold_opt);
pipe(local $sort_opt->{0}, $DUMP_IBX_WPIPE);
pipe(local $fold_opt->{0}, local $sort_opt->{1});
fp_start($self, $git, $prep_repo);
ct_start($self, $git, $prep_repo);
} elsif ($TMPDIR) {
- return if delete($TODO{dump_roots_start});
+ delete $TODO{dump_roots_start};
delete $TODO{dump_ibx_start}; # runs OnDestroy once
return dump_ibx($self, shift @IBXQ) if @IBXQ;
undef $DUMP_IBX_WPIPE; # done dumping inboxes
- undef $XHC;
delete $TODO{associate};
}
# else: wait for shards_active (post_loop_do) callback
}
sub index_done { # OnDestroy cb called when done indexing each code repo
- my ($repo_ctx) = @_;
+ my ($repo_ctx, $drs) = @_;
my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
return if $DO_QUIT;
$active->{$n} = undef;
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{repo_stored} = [ $self, $repo_ctx ];
+ $c->{-cidx_dump_roots_start} = $drs if $drs;
$IDX_SHARDS[$n]->wq_io_do('store_repo', [ $p->{op_p} ], $repo);
# repo_stored will fire once store_repo is done
}
$repo->{git_dir} = $git->{git_dir};
my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo };
delete $git->{-cidx_gits_fini}; # may fire gits_fini
+ my $drs = delete $git->{-cidx_dump_roots_start};
my $index_done = PublicInbox::OnDestroy->new($$, \&index_done,
- $repo_ctx);
+ $repo_ctx, $drs);
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{shard_done} = [ $self, $repo_ctx, $index_done ];
for my $n (0..$#shard_in) {
@shards;
}
+# called when all git coderepos are done
sub gits_fini {
undef $GITS_NR;
PublicInbox::DS::enqueue_reap(); # kick @post_loop_do
$GITS_NR = @$GIT_TODO;
my $gits_fini = PublicInbox::OnDestroy->new($$, \&gits_fini);
$_->{-cidx_gits_fini} = $gits_fini for @$GIT_TODO;
+ if (my $drs = $TODO{dump_roots_start}) {
+ $_->{-cidx_dump_roots_start} = $drs for @$GIT_TODO;
+ }
progress($self, "scanning $GITS_NR code repositories...");
}
sub parent_quit {
$DO_QUIT = POSIX->can("SIG$_[0]")->();
- $XHC = undef;
+ $XHC = 0; # stops the process
kill_shards(@_);
warn "# SIG$_[0] received, quitting...\n";
}
sub associate {
my ($self) = @_;
return if $DO_QUIT;
+ $XHC = 0; # should not be recreated again
@IDX_SHARDS or return warn("# aborting on no shards\n");
unlink("$TMPDIR/root2id");
my @pending = keys %{$self->{PENDING}};
require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed,
comm => \@COMM, awk => \@AWK);
for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" }
- my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
+ my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self,
+ $TODO{dump_roots_start});
my ($sort_opt, $sed_opt, $delve_opt);
pipe(local $sed_opt->{0}, local $delve_opt->{1});
pipe(local $sort_opt->{0}, local $sed_opt->{1});
}
sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
- my ($self) = @_;
+ my ($self, $drs) = @_;
return if $DO_QUIT;
# setup the following pipeline: (
# git --git-dir=hexlen40.git cat-file \
run_await(\@AWK, $CMD_ENV, $awk_opt, \&cmd_done);
run_await([@SORT, '-u'], $CMD_ENV, $sort_opt, \&cmd_done);
my $comm_rd = popen_rd(\@COMM, $CMD_ENV, $comm_opt, \&cmd_done, \@COMM);
- PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
+ PublicInbox::CidxComm->new($comm_rd, $self, $drs); # ->cidx_read_comm
my $git_ver = PublicInbox::Git::git_version();
push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6;
}
sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
- my ($self, $comm_rd) = @_;
+ my ($self, $comm_rd, $drs) = @_;
return if $DO_QUIT;
$_->wq_do('prune_init') for @IDX_SHARDS;
while (defined(my $cmt = <$comm_rd>)) {
}
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{prune_done} = [ $self ];
+ $c->{-cidx_dump_roots_start} = $drs;
$_->wq_io_do('prune_commit', [ $p->{op_p} ]) for @IDX_SHARDS;
}
sub do_inits { # called via PublicInbox::DS::add_timer
my ($self) = @_;
- init_prune($self);
init_associate_postfork($self);
+ init_prune($self);
scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
index_next($self) for (1..$max);