use PublicInbox::Compat qw(uniqstr);
use PublicInbox::Aspawn qw(run_await);
use Compress::Zlib qw(compress);
-use Carp ();
+use Carp qw(croak);
use Time::Local qw(timegm);
use autodie qw(close pipe open sysread seek sysseek send);
our $DO_QUIT = 15; # signal number
$NPROC,
$XHC, # XapClient
$REPO_CTX, # current repo being indexed in shards
- $IDX_TODO, # PublicInbox::Git object arrayref
- $GIT_TODO, # PublicInbox::Git object arrayref
+ $IDXQ, # PublicInbox::Git object arrayref
+ $SCANQ, # PublicInbox::Git object arrayref
%ALT_FH, # hexlen => tmp IO for TMPDIR git alternates
$TMPDIR, # File::Temp->newdir object for prune
- @PRUNE_QUEUE, # GIT_DIRs to prepare for pruning
+ @PRUNEQ, # GIT_DIRs to prepare for pruning
%TODO, @IBXQ, @IBX,
@JOIN, # join(1) command for --join
$CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune
# window for commits/emails to determine a inbox <-> coderepo association
my $JOIN_WINDOW = 50000;
-our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check);
+our @PRUNE_BATCH = qw(cat-file --batch-all-objects --batch-check);
# TODO: do we care about committer name + email? or tree OID?
my @FMT = qw(H P ct an ae at s b); # (b)ody must be last
$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
}
+sub check_objfmt_status ($$$) {
+ my ($git, $chld_err, $fmt) = @_;
+ my ($status, $sig) = ($chld_err >> 8, $chld_err & 127);
+ if (!$sig && $status == 1) { # unset, default is '' (SHA-1)
+ $fmt = 'sha1';
+ } elsif (!$sig && $status == 0) {
+ chomp($fmt ||= 'sha1');
+ }
+ $fmt // warn("git --git-dir=$git->{git_dir} config \$?=$chld_err");
+ $fmt;
+}
+
sub store_repo { # wq_io_do, sends docid back
my ($self, $repo) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
+ my $git = bless $repo, 'PublicInbox::Git';
+ my $rd = $git->popen(qw(config extensions.objectFormat));
$self->begin_txn_lazy;
$self->{xdb}->delete_document($_) for @{$repo->{to_delete}};
my $doc = $PublicInbox::Search::X{Document}->new;
$doc->add_boolean_term('T'.'r');
$doc->add_boolean_term('G'.$_) for @{$repo->{roots}};
$doc->set_data($repo->{fp}); # \n delimited
+ my $fmt = readline($rd);
+ $rd->close;
+ $fmt = check_objfmt_status $git, $?, $fmt;
+ $OFMT2HEXLEN{$fmt} // warn <<EOM; # store unknown formats anyways
+E: unknown extensions.objectFormat=$fmt in $repo->{git_dir}
+EOM
+ $doc->add_boolean_term('H'.$fmt);
my $did = $repo->{docid};
$did ? $self->{xdb}->replace_document($did, $doc)
: ($did = $self->{xdb}->add_document($doc));
sub _cb { # run_await cb
my ($pid, $cmd, undef, $opt, $cb, $self, $git, @arg) = @_;
return if $DO_QUIT;
+ return $cb->($opt, $self, $git, @arg) if $opt->{quiet};
$? ? ($git->{-cidx_err} = warn("W: @$cmd (\$?=$?)\n")) :
$cb->($opt, $self, $git, @arg);
}
delete $git->{-repo};
return index_next($self);
}
- my $n = git_dir_hash($git->{git_dir}) % $self->{nshard};
+ my $n = git_dir_hash($git->{git_dir}) % scalar(@RDONLY_XDB);
my $shard = bless { %$self, shard => $n }, ref($self);
$repo->{shard_n} = $n;
delete @$shard{qw(lockfh lock_path)};
sub check_existing { # retry_reopen callback
my ($shard, $self, $git) = @_;
- my @docids = $shard->docids_by_postlist('P'.$git->{git_dir});
+ my @docids = $shard->docids_of_git_dir($git->{git_dir});
my $docid = shift(@docids) // return get_roots($self, $git);
my $doc = $shard->get_doc($docid) //
die "BUG: no #$docid ($git->{git_dir})";
sub index_next ($) {
my ($self) = @_;
return if $DO_QUIT;
- if ($IDX_TODO && @$IDX_TODO) {
- index_repo(undef, $self, shift @$IDX_TODO);
- } elsif ($GIT_TODO && @$GIT_TODO) {
- my $git = shift @$GIT_TODO;
+ if ($IDXQ && @$IDXQ) {
+ index_repo(undef, $self, shift @$IDXQ);
+ } elsif ($SCANQ && @$SCANQ) {
+ my $git = shift @$SCANQ;
my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
$self, $git);
fp_start($self, $git, $prep_repo);
my (undef, $self, $git) = @_;
return if $DO_QUIT;
return index_next($self) if $git->{-cidx_err};
- return push(@$IDX_TODO, $git) if $REPO_CTX; # busy
+ return push(@$IDXQ, $git) if $REPO_CTX; # busy
my $repo = delete $git->{-repo} or return index_next($self);
my $roots_fh = delete $repo->{roots_fh} // die 'BUG: no {roots_fh}';
seek($roots_fh, 0, SEEK_SET);
sub scan_git_dirs ($) {
my ($self) = @_;
- @$GIT_TODO = map { PublicInbox::Git->new($_) } @{$self->{git_dirs}};
- $GITS_NR = @$GIT_TODO;
+ @$SCANQ = () unless $self->{-opt}->{scan};
+ $GITS_NR = @$SCANQ or return;
my $gits_fini = PublicInbox::OnDestroy->new($$, \&gits_fini);
- $_->{-cidx_gits_fini} = $gits_fini for @$GIT_TODO;
+ $_->{-cidx_gits_fini} = $gits_fini for @$SCANQ;
if (my $drs = $TODO{dump_roots_start}) {
- $_->{-cidx_dump_roots_start} = $drs for @$GIT_TODO;
+ $_->{-cidx_dump_roots_start} = $drs for @$SCANQ;
}
progress($self, "scanning $GITS_NR code repositories...");
}
sub shards_active { # post_loop_do
return if $DO_QUIT;
- return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO) != 3;
+ return if grep(defined, $PRUNE_DONE, $SCANQ, $IDXQ) != 3;
return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
- return 1 if $GITS_NR || scalar(@$IDX_TODO) || $REPO_CTX;
+ return 1 if $GITS_NR || scalar(@$IDXQ) || $REPO_CTX;
return 1 if @IBXQ || keys(%TODO);
for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
$s->{-cidx_quit} = 1 if defined($s->{-wq_s1});
}
}
-sub prep_alternate_end { # run_await cb for config extensions.objectFormat
- my ($pid, $cmd, undef, $opt, $objdir, $run_prune) = @_;
- my ($status, $sig) = ($? >> 8, $? & 127);
- my $next_dir = shift(@PRUNE_QUEUE);
- prep_alternate_start($next_dir, $run_prune) if defined($next_dir);
- my $fmt;
- if (!$sig && $status == 1) { # unset, default is '' (SHA-1)
- $fmt = 'sha1';
- } elsif (!$sig && $status == 0) {
- chomp($fmt = ${$opt->{1}} || 'sha1');
- }
- $fmt // return warn("git config \$?=$? for objdir=$objdir");
+sub prep_alternate_end ($$) {
+ my ($objdir, $fmt) = @_;
my $hexlen = $OFMT2HEXLEN{$fmt} // return warn <<EOM;
E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt
EOM
say { $ALT_FH{$hexlen} } $objdir;
}
+sub store_objfmt { # via wq_do - make early cidx users happy
+ my ($self, $docid, $git_dir, $fmt) = @_;
+ $self->begin_txn_lazy;
+ my $doc = $self->get_doc($docid) // return
+ warn "BUG? #$docid for $git_dir missing";
+ my @p = xap_terms('P', $doc) or return
+ warn "BUG? #$docid for $git_dir has no P(ath)";
+ @p == 1 or return warn "BUG? #$docid $git_dir multi: @p";
+ $p[0] eq $git_dir or return warn "BUG? #$docid $git_dir != @p";
+ $doc->add_boolean_term('H'.$fmt);
+ $self->{xdb}->replace_document($docid, $doc);
+ # wait for prune_commit to commit...
+}
+
+# TODO: remove prep_alternate_read and store_objfmt 1-2 years after 2.0 is out
+# they are for compatibility with pre-release indices
+sub prep_alternate_read { # run_git cb for config extensions.objectFormat
+ my ($opt, $self, $git, $objdir, $docid, $shard_n, $run_prune) = @_;
+ return if $DO_QUIT;
+ my $chld_err = $?;
+ prep_alternate_start($self, shift(@PRUNEQ), $run_prune) if @PRUNEQ;
+ my $fmt = check_objfmt_status $git, $chld_err, ${$opt->{1}};
+ $IDX_SHARDS[$shard_n]->wq_do('store_objfmt', # async
+ $docid, $git->{git_dir}, $fmt);
+ prep_alternate_end $objdir, $fmt;
+}
+
sub prep_alternate_start {
- my ($git_dir, $run_prune) = @_;
- my $o = $git_dir.'/objects';
+ my ($self, $git, $run_prune) = @_;
+ my $o = $git->git_path('objects');
while (!-d $o) {
- $git_dir = shift(@PRUNE_QUEUE) // return;
- $o = $git_dir.'/objects';
+ $git = shift(@PRUNEQ) // return;
+ $o = $git->git_path('objects');
+ }
+ my $n = git_dir_hash($git->{git_dir}) % scalar(@RDONLY_XDB);
+ local $self->{xdb} = $RDONLY_XDB[$n] // croak("BUG: no shard[$n]");
+ my @ids = $self->docids_by_postlist('P'.$git->{git_dir});
+ my @fmt = @ids ? xap_terms('H', $self->{xdb}, $ids[0]) : ();
+ @fmt > 1 and warn "BUG? multi `H' for shard[$n] #$ids[0]: @fmt";
+
+ if (@fmt) { # cache hit
+ @PRUNEQ and
+ prep_alternate_start($self, shift(@PRUNEQ), $run_prune);
+ prep_alternate_end $o, $fmt[0];
+ } else { # compatibility w/ early cidx format
+ run_git([qw(config extensions.objectFormat)], { quiet => 1 },
+ \&prep_alternate_read, $self, $git, $o, $ids[0], $n,
+ $run_prune);
}
- my $cmd = [ 'git', "--git-dir=$git_dir",
- qw(config extensions.objectFormat) ];
- my $opt = { quiet => 1 };
- run_await($cmd, undef, $opt, \&prep_alternate_end, $o, $run_prune);
}
sub cmd_done { # run_await cb for sort, xapian-delve, sed failures
run_await([@SORT, '-u'], $CMD_ENV, $sort_opt, \&cmd_done, $run_prune);
run_await(\@sed, $CMD_ENV, $sed_opt, \&cmd_done, $run_prune);
run_await(\@delve, undef, $delve_opt, \&cmd_done, $run_prune);
- @PRUNE_QUEUE = @{$self->{git_dirs}};
+ @PRUNEQ = @$SCANQ;
for (1..$LIVE_JOBS) {
- prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune);
+ prep_alternate_start($self, shift(@PRUNEQ) // last, $run_prune);
}
}
sub dump_git_commits { # run_await cb
- my ($pid, undef, undef, $batch_opt) = @_;
- (defined($pid) && $?) and die "E: @PRUNE_BATCH: \$?=$?";
+ my ($pid, $cmd, undef, $batch_opt, $self) = @_;
+ (defined($pid) && $?) and die "E: @$cmd \$?=$?";
return if $DO_QUIT;
- my ($hexlen) = keys(%ALT_FH) or return; # done
+ my ($hexlen) = keys(%ALT_FH) or return; # done, DESTROY batch_opt->{1}
close(delete $ALT_FH{$hexlen}); # flushes `say' buffer
-
- $PRUNE_BATCH[1] = "--git-dir=$TMPDIR/hexlen$hexlen.git";
- run_await(\@PRUNE_BATCH, undef, $batch_opt, \&dump_git_commits);
+ progress($self, "preparing $hexlen-byte hex OID commits for prune...");
+ my $g = PublicInbox::Git->new("$TMPDIR/hexlen$hexlen.git");
+ run_await($g->cmd(@PRUNE_BATCH), undef, $batch_opt,
+ \&dump_git_commits, $self);
}
sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
warn(sprintf(<<EOM, $git_ver)) if $git_ver lt v2.19;
W: git v2.19+ recommended for high-latency storage (have git v%vd)
EOM
- dump_git_commits(undef, undef, undef, $batch_opt);
+ dump_git_commits(undef, undef, undef, $batch_opt, $self);
}
sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
my ($self, $comm_rd, $drs) = @_;
return if $DO_QUIT;
+ progress($self, 'starting prune...');
$_->wq_do('prune_init') for @IDX_SHARDS;
while (defined(my $cmt = <$comm_rd>)) {
chop($cmt) eq "\n" or die "BUG: no LF in comm output ($cmt)";
for my $git_dir (@GIT_DIR_GONE) {
my $n = git_dir_hash($git_dir) % scalar(@IDX_SHARDS);
$IDX_SHARDS[$n]->wq_do('prune_one', 'P'.$git_dir);
+ last if $DO_QUIT;
}
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{prune_done} = [ $self, $drs ];
sub do_inits { # called via PublicInbox::DS::add_timer
my ($self) = @_;
- init_join_postfork($self);
- init_prune($self);
- scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+ grep !!$_, @{$self->{-opt}}{qw(scan prune)} and
+ @$SCANQ = map PublicInbox::Git->new($_), @{$self->{git_dirs}};
+ init_join_postfork $self;
+ init_prune $self;
+ scan_git_dirs $self;
my $max = $TODO{do_join} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
index_next($self) for (1..$max);
}
my $restore = PublicInbox::OnDestroy->new($$,
\&PublicInbox::DS::sig_setmask, $SIGSET);
local $PRUNE_DONE = [];
- local $IDX_TODO = [];
- local $GIT_TODO = [];
- local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
+ local $IDXQ = [];
+ local $SCANQ = [];
+ local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNEQ,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
%TODO, @IBXQ, @IBX, @JOIN, %JOIN, @JOIN_PFX,
@JOIN_DT, $DUMP_IBX_WPIPE, @OFF2ROOT, $XHC, @SORT, $GITS_NR);