use PublicInbox::Search qw(xap_terms);
use PublicInbox::SearchIdx qw(add_val);
use PublicInbox::Config;
-use PublicInbox::Spawn qw(run_die);
+use PublicInbox::Spawn qw(spawn);
+use PublicInbox::OnDestroy;
+our $LIVE; # pid => callback
+our $LIVE_JOBS;
# stop walking history if we see >$SEEN_MAX existing commits, this assumes
# branches don't diverge by more than this number of commits...
$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
}
-sub store_repo ($$) {
- my ($self, $repo) = @_;
+sub store_repo ($$$) {
+ my ($self, $git, $repo) = @_;
my $xdb = delete($repo->{shard})->idx_acquire;
$xdb->begin_transaction;
+ for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
if (defined $repo->{id}) {
my $doc = $xdb->get_document($repo->{id}) //
- die "$self->{git}->{git_dir} doc #$repo->{id} gone";
+ die "$git->{git_dir} doc #$repo->{id} gone";
add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
- my %new = map { $_ => undef } @{$self->{roots}};
+ my %new = map { $_ => undef } @{$repo->{roots}};
my $old = xap_terms('G', $doc);
delete @new{keys %$old};
$doc->add_boolean_term('G'.$_) for keys %new;
- delete @$old{@{$self->{roots}}};
+ delete @$old{@{$repo->{roots}}};
$doc->remove_term('G'.$_) for keys %$old;
$doc->set_data($repo->{fp});
$xdb->replace_document($repo->{id}, $doc);
} else {
my $new = $PublicInbox::Search::X{Document}->new;
add_val($new, PublicInbox::CodeSearch::CT, $repo->{ct});
- $new->add_boolean_term("P$self->{git}->{git_dir}");
+ $new->add_boolean_term("P$git->{git_dir}");
$new->add_boolean_term('T'.'r');
$new->add_boolean_term('G'.$_) for @{$repo->{roots}};
$new->set_data($repo->{fp}); # \n delimited
@ids;
}
-sub get_roots ($$) {
- my ($self, $refs) = @_;
- my @roots = $self->{git}->qx([qw(rev-list --stdin --max-parents=0)],
- undef, { 0 => $refs });
- die "git rev-list \$?=$?" if $?;
- sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
- chomp(@roots);
- scalar(@roots) ? \@roots : undef;
+sub cidx_reap ($$) {
+ my ($self, $jobs) = @_;
+ while (keys(%$LIVE) >= $jobs) {
+ my $pid = waitpid(-1, 0) // die "waitpid(-1): $!";
+ last if $pid < 0;
+ if (my $x = delete $LIVE->{$pid}) {
+ my $cb = shift @$x;
+ $cb->(@$x) if $cb;
+ } else {
+ warn "reaped unknown PID=$pid ($?)\n";
+ }
+ }
}
# this is different from the grokmirror-compatible fingerprint since we
# only care about --heads (branches) and --tags, and not even their names
-sub cidx_fp ($) {
- my ($self) = @_;
+sub fp_start ($$$) {
+ my ($self, $git, $prep_repo) = @_;
+ return if !$LIVE; # premature exit
+ cidx_reap($self, $LIVE_JOBS);
open my $refs, '+>', undef or die "open: $!";
- run_die(['git', "--git-dir=$self->{git}->{git_dir}",
+ my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
qw(show-ref --heads --tags --hash)], undef, { 1 => $refs });
+ $git->{-repo}->{refs} = $refs;
+ $LIVE->{$pid} = [ \&fp_fini, $self, $git, $prep_repo ];
+}
+
+sub fp_fini {
+ my ($self, $git, $prep_repo) = @_;
+ my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
seek($refs, 0, SEEK_SET) or die "seek: $!";
my $buf;
my $dig = PublicInbox::SHA->new(256);
while (read($refs, $buf, 65536)) { $dig->add($buf) }
- sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
- ($dig->hexdigest, $refs);
+ $git->{-repo}->{fp} = $dig->hexdigest;
}
-# TODO: should we also index gitweb.owner and the full fingerprint for grokmirror?
-sub prep_git_dir ($) {
- my ($self) = @_;
- my $git_dir = $self->{git}->{git_dir};
- my $ct = $self->{git}->qx([qw[for-each-ref
- --sort=-committerdate --format=%(committerdate:raw) --count=1
+sub ct_start ($$$) {
+ my ($self, $git, $prep_repo) = @_;
+ return if !$LIVE; # premature exit
+ cidx_reap($self, $LIVE_JOBS);
+ my ($rd, $pid) = $git->popen([qw[for-each-ref --sort=-committerdate
+ --format=%(committerdate:raw) --count=1
refs/heads/ refs/tags/]]);
- my $repo = {};
- @$repo{qw(fp refs)} = cidx_fp($self);
- $repo->{roots} = get_roots($self, $repo->{refs});
- if (!$repo->{roots} || !defined($ct)) {
- warn "W: $git_dir has no root commits, skipping\n";
+ $LIVE->{$pid} = [ \&ct_fini, $self, $git, $rd, $prep_repo ];
+}
+
+sub ct_fini {
+ my ($self, $git, $rd, $prep_repo) = @_;
+ defined(my $ct = <$rd>) or return;
+ $ct =~ s/\s+.*\z//s; # drop TZ + LF
+ $git->{-repo}->{ct} = $ct + 0;
+}
+
+# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
+sub prep_repo ($$) {
+ my ($self, $git) = @_;
+ return if !$LIVE; # premature exit
+ my $repo = $git->{-repo} // die 'BUG: no {-repo}';
+ my $git_dir = $git->{git_dir};
+ if (!defined($repo->{ct})) {
+ warn "W: $git_dir has no commits, skipping\n";
+ delete $git->{-repo};
return;
}
- $ct =~ s/ .*\z//s; # drop TZ
- $repo->{ct} = $ct + 0;
my $n = git_dir_hash($git_dir) % $self->{nshard};
my $shard = $repo->{shard} = bless { %$self, shard => $n }, ref($self);
delete @$shard{qw(lockfh lock_path)};
local $shard->{xdb};
my $xdb = $shard->idx_acquire;
my @docids = docids_by_postlist($shard, 'P'.$git_dir);
- my $docid = shift(@docids) // return $repo;
+ my $docid = shift(@docids) // return get_roots($self, $git);
if (@docids) {
warn "BUG: $git_dir indexed multiple times, culling\n";
- $xdb->begin_transaction;
- for (@docids) { $xdb->delete_document($_) }
- $xdb->commit_transaction;
+ $repo->{to_delete} = \@docids; # XXX needed?
}
my $doc = $xdb->get_document($docid) //
die "BUG: no #$docid ($git_dir)";
my $old_fp = $doc->get_data;
if ($old_fp eq $repo->{fp}) { # no change
- progress($self, 'unchanged');
+ progress($self, "$git_dir unchanged");
+ delete $git->{-repo};
return;
}
$repo->{id} = $docid;
- $repo;
+ get_roots($self, $git);
}
-sub partition_refs ($$) {
- my ($self, $refs) = @_; # show-ref --heads --tags --hash output
- my $fh = $self->{git}->popen(qw(rev-list --stdin), undef,
- { 0 => $refs });
+sub partition_refs ($$$) {
+ my ($self, $git, $refs) = @_; # show-ref --heads --tags --hash output
+ sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
+ my $fh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
close $refs or die "close: $!";
local $self->{xdb};
my $xdb = $self->{-opt}->{reindex} ? undef : $self->xdb;
close($fh);
if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
$self->{nchange} += $nchange;
- progress($self, "$nchange commits");
+ progress($self, "$git->{git_dir}: $nchange commits");
for my $fh (@shard_in) {
$fh->flush or die "flush: $!";
sysseek($fh, 0, SEEK_SET) or die "seek: $!";
}
return @shard_in;
}
- die "git-rev-list: \$?=$?\n";
+ die "git --git-dir=$git->{git_dir} rev-list: \$?=$?\n";
}
-sub index_git_dir ($$) {
- my ($self, $git_dir) = @_;
- local $self->{git} = PublicInbox::Git->new($git_dir); # for ->patch_id
- my $repo = prep_git_dir($self) or return;
- local $self->{current_info} = $git_dir;
- my @shard_in = partition_refs($self, delete($repo->{refs}));
+sub index_repo {
+ my ($self, $git, $roots) = @_;
+ return if !$LIVE; # premature exit
+ my $repo = delete $git->{-repo} or return;
+ seek($roots, 0, SEEK_SET) or die "seek: $!";
+ chomp(my @roots = <$roots>);
+ close($roots) or die "close: $!";
+ @roots or return warn("E: $git->{git_dir} has no root commits\n");
+ $repo->{roots} = \@roots;
+ local $self->{current_info} = $git->{git_dir};
+ my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
my %pids;
my $fwd_kill = sub {
my ($sig) = @_;
my $pid = fork // die "fork: $!";
if ($pid == 0) { # no RNG use, here
$0 = "code index [$n]";
+ $self->{git} = $git;
$self->{shard} = $n;
$self->{current_info} = "$self->{current_info} [$n]";
delete @$self{qw(lockfh lock_path)};
my $in = $shard_in[$n];
@shard_in = ();
- $self->{roots} = delete $repo->{roots};
+ $self->{roots} = \@roots;
undef $repo;
eval { shard_worker($self, $in, $sigset) };
warn "E: $@" if $@;
}
PublicInbox::DS::sig_setmask($sigset);
@shard_in = ();
- my $err;
+ my ($err, @todo);
while (keys %pids) {
- my $pid = waitpid(-1, 0) or last;
- my $j = delete $pids{$pid} // "unknown PID:$pid";
- next if $? == 0;
- warn "PID:$pid $j exited with \$?=$?\n";
- $err = 1;
+ my $pid = waitpid(-1, 0) // die "waitpid: $!";
+ if (my $j = delete $pids{$pid}) {
+ next if $? == 0;
+ warn "PID:$pid $j exited with \$?=$?\n";
+ $err = 1;
+ } elsif (my $todo = delete $LIVE->{$pid}) {
+ warn "PID:$pid exited with \$?=$?\n" if $?;
+ push @todo, $todo;
+ } else {
+ warn "reaped unknown PID=$pid ($?)\n";
+ }
}
die "subprocess(es) failed\n" if $err;
- store_repo($self, $repo);
- progress($self, 'done');
+ store_repo($self, $git, $repo);
+ progress($self, "$git->{git_dir}: done");
# TODO: check fp afterwards?
+ while (my $x = shift @todo) {
+ my $cb = shift @$x;
+ $cb->(@$x) if $cb;
+ }
+}
+
+sub get_roots ($$) {
+ my ($self, $git) = @_;
+ return if !$LIVE; # premature exit
+ cidx_reap($self, $LIVE_JOBS);
+ my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
+ sysseek($refs, 0, SEEK_SET) or die "seek: $!";
+ open my $roots, '+>', undef or die "open: $!";
+ my $pid = spawn(['git', "--git-dir=$git->{git_dir}",
+ qw(rev-list --stdin --max-parents=0)],
+ undef, { 0 => $refs, 1 => $roots });
+ $LIVE->{$pid} = [ \&index_repo, $self, $git, $roots ];
}
# for PublicInbox::SearchIdx::patch_id and with_umask
EOM
}
+sub scan_git_dirs ($) {
+ my ($self) = @_;
+ local $LIVE_JOBS = $self->{-opt}->{jobs} //
+ PublicInbox::IPC::detect_nproc() // 2;
+ local $LIVE = {};
+ for (@{$self->{git_dirs}}) {
+ my $git = PublicInbox::Git->new($_);
+ my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
+ $self, $git);
+ fp_start($self, $git, $prep_repo);
+ ct_start($self, $git, $prep_repo);
+ }
+ cidx_reap($self, 0);
+}
+
sub cidx_run {
my ($self) = @_;
cidx_init($self);
}
local $self->{nchange} = 0;
# do_prune($self) if $self->{-opt}->{prune}; TODO
- if ($self->{-opt}->{scan} // 1) {
- for my $gd (@{$self->{git_dirs}}) {
- index_git_dir($self, $gd);
- }
- }
+ scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
$self->lock_release(!!$self->{nchange});
}