$pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
}
-sub store_repo { # wq_do - returns docid
+sub store_repo { # wq_io_do, sends docid back
my ($self, $repo) = @_;
+ my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
$self->begin_txn_lazy;
$self->{xdb}->delete_document($_) for @{$repo->{to_delete}};
my $doc = $PublicInbox::Search::X{Document}->new;
$doc->add_boolean_term('T'.'r');
$doc->add_boolean_term('G'.$_) for @{$repo->{roots}};
$doc->set_data($repo->{fp}); # \n delimited
- if ($repo->{docid}) {
- $self->{xdb}->replace_document($repo->{docid}, $doc);
- $repo->{docid};
- } else {
- $self->{xdb}->add_document($doc);
- }
+ my $did = $repo->{docid};
+ $did ? $self->{xdb}->replace_document($did, $doc)
+ : ($did = $self->{xdb}->add_document($doc));
+ send($op_p, "repo_stored $did", 0);
}
sub cidx_ckpoint ($;$) {
$repo_ctx->{shard_ok}->{$n} = 1;
}
+sub repo_stored {
+ my ($self, $repo_ctx, $did) = @_;
+ $did > 0 or die "BUG: $repo_ctx->{repo}->{git_dir}: docid=$did";
+ my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx);
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{shard_done} = [ $self, $repo_ctx, $next ];
+ # shard_done fires when all shards are committed
+ my @active = keys %{$repo_ctx->{active}};
+ $IDX_SHARDS[$_]->wq_io_do('shard_commit', [ $p->{op_p} ]) for @active;
+}
+
sub prune_done { # called via prune_do completion
my ($self, $n) = @_;
return if $DO_QUIT || !$PRUNE_DONE;
sub next_repos { # OnDestroy cb
my ($repo_ctx) = @_;
- progress($repo_ctx->{self}, "$repo_ctx->{repo}->{git_dir}: done");
- return if $DO_QUIT;
- if ($REPO_CTX) {
- $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx";
- $REPO_CTX = undef;
- index_next($repo_ctx->{self});
- }
+ my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
+ progress($self, "$repo->{git_dir}: done");
+ return if $DO_QUIT || !$REPO_CTX;
+ my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active;
+ die "E: $repo->{git_dir} $n shards failed" if $n;
+ $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx";
+ $REPO_CTX = undef;
+ index_next($self);
}
-sub commit_shard { # OnDestroy cb
+sub index_done { # OnDestroy cb called when done indexing each code repo
my ($repo_ctx) = @_;
my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
+ return if $DO_QUIT;
my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active;
- die "E: $repo->{git_dir} $n shards failed" if $n && !$DO_QUIT;
-
- $repo_ctx->{shard_ok} = {};
- if (!$DO_QUIT) {
- my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo',
- $repo);
- (!defined($id) || $id <= 0) and
- die "E: store_repo $repo->{git_dir}: id=$id";
- $active->{$repo->{shard_n}} = undef;
- }
- my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx);
+ die "E: $repo->{git_dir} $n shards failed" if $n;
+ $repo_ctx->{shard_ok} = {}; # reset for future shard_done
+ $n = $repo->{shard_n};
+ $active->{$n} = undef;
my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $repo_ctx->{self}, $repo_ctx, $next ];
- for my $n (keys %$active) {
- $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
- }
- # shard_done fires when all shards are committed
+ $c->{ops}->{repo_stored} = [ $self, $repo_ctx ];
+ $IDX_SHARDS[$n]->wq_io_do('store_repo', [ $p->{op_p} ], $repo);
+ # repo_stored will fire once store_repo is done
}
sub index_repo { # run_git cb
$repo->{git_dir} = $git->{git_dir};
my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo };
delete $git->{-cidx_gits_fini}; # may fire gits_fini
- my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard,
+ my $index_done = PublicInbox::OnDestroy->new($$, \&index_done,
$repo_ctx);
my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $self, $repo_ctx, $commit_shard ];
+ $c->{ops}->{shard_done} = [ $self, $repo_ctx, $index_done ];
for my $n (0..$#shard_in) {
$shard_in[$n]->flush or die "flush shard[$n]: $!";
-s $shard_in[$n] or next;