]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
cindex: store coderepo data asynchronously
authorEric Wong <e@80x24.org>
Wed, 25 Oct 2023 15:33:47 +0000 (15:33 +0000)
committerEric Wong <e@80x24.org>
Wed, 25 Oct 2023 20:32:01 +0000 (20:32 +0000)
While it's typically fast to store coderepo data, pathological
latency on HDDs can let us use that delay to get other work
done.

lib/PublicInbox/CodeSearchIdx.pm

index aeee37c00def6ab8b8d1ccbe08fd3be1b487baaa..f2fd28e390586423cc11865e6be9ee6f983d9420 100644 (file)
@@ -193,8 +193,9 @@ sub progress {
        $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n");
 }
 
-sub store_repo { # wq_do - returns docid
+sub store_repo { # wq_io_do, sends docid back
        my ($self, $repo) = @_;
+       my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
        $self->begin_txn_lazy;
        $self->{xdb}->delete_document($_) for @{$repo->{to_delete}};
        my $doc = $PublicInbox::Search::X{Document}->new;
@@ -203,12 +204,10 @@ sub store_repo { # wq_do - returns docid
        $doc->add_boolean_term('T'.'r');
        $doc->add_boolean_term('G'.$_) for @{$repo->{roots}};
        $doc->set_data($repo->{fp}); # \n delimited
-       if ($repo->{docid}) {
-               $self->{xdb}->replace_document($repo->{docid}, $doc);
-               $repo->{docid};
-       } else {
-               $self->{xdb}->add_document($doc);
-       }
+       my $did = $repo->{docid};
+       $did ? $self->{xdb}->replace_document($did, $doc)
+               : ($did = $self->{xdb}->add_document($doc));
+       send($op_p, "repo_stored $did", 0);
 }
 
 sub cidx_ckpoint ($;$) {
@@ -322,6 +321,17 @@ sub shard_done { # called via PktOp on shard_index completion
        $repo_ctx->{shard_ok}->{$n} = 1;
 }
 
+sub repo_stored {
+       my ($self, $repo_ctx, $did) = @_;
+       $did > 0 or die "BUG: $repo_ctx->{repo}->{git_dir}: docid=$did";
+       my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx);
+       my ($c, $p) = PublicInbox::PktOp->pair;
+       $c->{ops}->{shard_done} = [ $self, $repo_ctx, $next ];
+       # shard_done fires when all shards are committed
+       my @active = keys %{$repo_ctx->{active}};
+       $IDX_SHARDS[$_]->wq_io_do('shard_commit', [ $p->{op_p} ]) for @active;
+}
+
 sub prune_done { # called via prune_do completion
        my ($self, $n) = @_;
        return if $DO_QUIT || !$PRUNE_DONE;
@@ -584,37 +594,30 @@ sub index_next ($) {
 
 sub next_repos { # OnDestroy cb
        my ($repo_ctx) = @_;
-       progress($repo_ctx->{self}, "$repo_ctx->{repo}->{git_dir}: done");
-       return if $DO_QUIT;
-       if ($REPO_CTX) {
-               $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx";
-               $REPO_CTX = undef;
-               index_next($repo_ctx->{self});
-       }
+       my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
+       progress($self, "$repo->{git_dir}: done");
+       return if $DO_QUIT || !$REPO_CTX;
+       my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active;
+       die "E: $repo->{git_dir} $n shards failed" if $n;
+       $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx";
+       $REPO_CTX = undef;
+       index_next($self);
 }
 
-sub commit_shard { # OnDestroy cb
+sub index_done { # OnDestroy cb called when done indexing each code repo
        my ($repo_ctx) = @_;
        my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
 
+       return if $DO_QUIT;
        my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active;
-       die "E: $repo->{git_dir} $n shards failed" if $n && !$DO_QUIT;
-
-       $repo_ctx->{shard_ok} = {};
-       if (!$DO_QUIT) {
-               my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo',
-                                                               $repo);
-               (!defined($id) || $id <= 0) and
-                       die "E: store_repo $repo->{git_dir}: id=$id";
-               $active->{$repo->{shard_n}} = undef;
-       }
-       my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx);
+       die "E: $repo->{git_dir} $n shards failed" if $n;
+       $repo_ctx->{shard_ok} = {}; # reset for future shard_done
+       $n = $repo->{shard_n};
+       $active->{$n} = undef;
        my ($c, $p) = PublicInbox::PktOp->pair;
-       $c->{ops}->{shard_done} = [ $repo_ctx->{self}, $repo_ctx, $next ];
-       for my $n (keys %$active) {
-               $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
-       }
-       # shard_done fires when all shards are committed
+       $c->{ops}->{repo_stored} = [ $self, $repo_ctx ];
+       $IDX_SHARDS[$n]->wq_io_do('store_repo', [ $p->{op_p} ], $repo);
+       # repo_stored will fire once store_repo is done
 }
 
 sub index_repo { # run_git cb
@@ -637,10 +640,10 @@ sub index_repo { # run_git cb
        $repo->{git_dir} = $git->{git_dir};
        my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo };
        delete $git->{-cidx_gits_fini}; # may fire gits_fini
-       my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard,
+       my $index_done = PublicInbox::OnDestroy->new($$, \&index_done,
                                                        $repo_ctx);
        my ($c, $p) = PublicInbox::PktOp->pair;
-       $c->{ops}->{shard_done} = [ $self, $repo_ctx, $commit_shard ];
+       $c->{ops}->{shard_done} = [ $self, $repo_ctx, $index_done ];
        for my $n (0..$#shard_in) {
                $shard_in[$n]->flush or die "flush shard[$n]: $!";
                -s $shard_in[$n] or next;