@JOIN_DT, # YYYYmmddHHMMSS for dt:
$QRY_STR, # common query string for both code and inbox associations
$DUMP_IBX_WPIPE, # goes to sort(1)
+ $ANY_SHARD, # shard round-robin for scan fingerprinting
@OFF2ROOT,
);
# this is different from the grokmirror-compatible fingerprint since we
# only care about --heads (branches) and --tags, and not even their names
-sub fp_start ($$$) {
- my ($self, $git, $prep_repo) = @_;
+sub fp_start ($$) {
+ my ($self, $git) = @_;
return if $DO_QUIT;
open my $refs, '+>', undef;
$git->{-repo}->{refs} = $refs;
- run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs },
- \&fp_fini, $self, $git, $prep_repo);
-}
-
-sub fp_fini { # run_git cb
- my (undef, $self, $git, $prep_repo) = @_;
- my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
- sysseek($refs, 0, SEEK_SET);
- $git->{-repo}->{fp} = sha_all(256, $refs)->hexdigest;
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ my $next_on_err = PublicInbox::OnDestroy->new(\&index_next, $self);
+ $c->{ops}->{fp_done} = [ $self, $git, $next_on_err ];
+ $IDX_SHARDS[++$ANY_SHARD % scalar(@IDX_SHARDS)]->wq_io_do('fp_async',
+ [ $p->{op_p}, $refs ], $git->{git_dir})
}
-sub ct_start ($$$) {
- my ($self, $git, $prep_repo) = @_;
- return if $DO_QUIT;
- run_git([ qw[for-each-ref --sort=-committerdate
- --format=%(committerdate:raw) --count=1
- refs/heads/ refs/tags/] ], undef, # capture like qx
- \&ct_fini, $self, $git, $prep_repo);
+sub fp_async { # via wq_io_do in worker
+ my ($self, $git_dir) = @_;
+ my $op_p = delete $self->{0} // die 'BUG: no {0} op_p';
+ my $refs = delete $self->{1} // die 'BUG: no {1} refs';
+ my $git = PublicInbox::Git->new($git_dir);
+ run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs },
+ \&fp_async_done, $self, $git, $op_p);
}
-sub ct_fini { # run_git cb
- my ($opt, $self, $git, $prep_repo) = @_;
- my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF
- $git->{-repo}->{ct} = $ct + 0;
+sub fp_async_done { # run_git cb from worker
+ my ($opt, $self, $git, $op_p) = @_;
+ my $refs = delete $opt->{1} // 'BUG: no {-repo}->{refs}';
+ sysseek($refs, 0, SEEK_SET);
+ send($op_p, 'fp_done '.sha_all(256, $refs)->hexdigest, 0);
}
-# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
-sub prep_repo ($$) {
- my ($self, $git) = @_;
+sub fp_done { # called parent via PktOp by fp_async_done
+ my ($self, $git, $next_on_err, $hex) = @_;
+ $next_on_err->cancel;
return if $DO_QUIT;
- return index_next($self) if $git->{-cidx_err};
- my $repo = $git->{-repo} // die 'BUG: no {-repo}';
- if (!defined($repo->{ct})) {
- warn "W: $git->{git_dir} has no commits, skipping\n";
- delete $git->{-repo};
- return index_next($self);
- }
+ $git->{-repo}->{fp} = $hex;
my $n = git_dir_hash($git->{git_dir}) % scalar(@RDONLY_XDB);
my $shard = bless { %$self, shard => $n }, ref($self);
- $repo->{shard_n} = $n;
+ $git->{-repo}->{shard_n} = $n;
delete @$shard{qw(lockfh lock_path)};
local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef";
$shard->retry_reopen(\&check_existing, $self, $git);
sub check_existing { # retry_reopen callback
my ($shard, $self, $git) = @_;
my @docids = $shard->docids_of_git_dir($git->{git_dir});
- my $docid = shift(@docids) // return get_roots($self, $git);
+ my $docid = shift(@docids) // return prep_repo($self, $git); # new repo
my $doc = $shard->get_doc($docid) //
die "BUG: no #$docid ($git->{git_dir})";
my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data;
warn "BUG: $git->{git_dir} indexed multiple times, culling\n";
$git->{-repo}->{to_delete} = \@docids; # XXX needed?
}
- get_roots($self, $git);
+ prep_repo($self, $git);
}
sub partition_refs ($$$) {
my ($self) = @_;
return if $DO_QUIT;
if ($IDXQ && @$IDXQ) {
- index_repo(undef, $self, shift @$IDXQ);
+ index_repo($self, shift @$IDXQ);
} elsif ($SCANQ && @$SCANQ) {
- my $git = shift @$SCANQ;
- my $prep_repo = PublicInbox::OnDestroy->new(\&prep_repo,
- $self, $git);
- fp_start($self, $git, $prep_repo);
- ct_start($self, $git, $prep_repo);
+ fp_start $self, shift @$SCANQ;
} elsif ($TMPDIR) {
delete $TODO{dump_roots_start};
delete $TODO{dump_ibx_start}; # runs OnDestroy once
# repo_stored will fire once store_repo is done
}
-sub index_repo { # run_git cb
- my (undef, $self, $git) = @_;
+sub index_repo {
+ my ($self, $git) = @_;
return if $DO_QUIT;
+ my $repo = $git->{-repo} // die 'BUG: no {-repo}';
return index_next($self) if $git->{-cidx_err};
+ if (!defined($repo->{ct})) {
+ warn "W: $git->{git_dir} has no commits, skipping\n";
+ return index_next($self);
+ }
return push(@$IDXQ, $git) if $REPO_CTX; # busy
- my $repo = delete $git->{-repo} or return index_next($self);
+ delete $git->{-repo};
my $roots_fh = delete $repo->{roots_fh} // die 'BUG: no {roots_fh}';
seek($roots_fh, 0, SEEK_SET);
chomp(my @roots = PublicInbox::IO::read_all $roots_fh);
# shard_done fires when shard_index is done
}
-sub get_roots ($$) {
+sub ct_fini { # run_git cb
+ my ($opt, $self, $git, $index_repo) = @_;
+ my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF
+ $git->{-repo}->{ct} = $ct + 0;
+}
+
+# TODO: also index gitweb.owner and the full fingerprint for grokmirror?
+sub prep_repo ($$) {
my ($self, $git) = @_;
return if $DO_QUIT;
+ my $index_repo = PublicInbox::OnDestroy->new(\&index_repo, $self, $git);
my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
sysseek($refs, 0, SEEK_SET);
open my $roots_fh, '+>', undef;
$git->{-repo}->{roots_fh} = $roots_fh;
run_git([ qw(rev-list --stdin --max-parents=0) ],
- { 0 => $refs, 1 => $roots_fh }, \&index_repo, $self, $git)
+ { 0 => $refs, 1 => $roots_fh }, \&PublicInbox::Config::noop,
+ $self, $git, $index_repo);
+ run_git([ qw[for-each-ref --sort=-committerdate
+ --format=%(committerdate:raw) --count=1
+ refs/heads/ refs/tags/] ], undef, # capture like qx
+ \&ct_fini, $self, $git, $index_repo);
}
# for PublicInbox::SearchIdx `git patch-id' call and with_umask
init_join_prefork($self)
}
local @IDX_SHARDS = cidx_init($self); # forks workers
+ local $ANY_SHARD = -1;
local $self->{current_info} = '';
local $MY_SIG = {
CHLD => \&PublicInbox::DS::enqueue_reap,