From: Eric Wong Date: Wed, 25 Oct 2023 15:33:47 +0000 (+0000) Subject: cindex: store coderepo data asynchronously X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8967111262747ba72fba1141f772514c83dd66f5;p=thirdparty%2Fpublic-inbox.git cindex: store coderepo data asynchronously While it's typically fast to store coderepo data, pathological latency on HDDs can let us use that delay to get other work done. --- diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index aeee37c00..f2fd28e39 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -193,8 +193,9 @@ sub progress { $pr->($self->{git} ? ("$self->{git}->{git_dir}: ") : (), @msg, "\n"); } -sub store_repo { # wq_do - returns docid +sub store_repo { # wq_io_do, sends docid back my ($self, $repo) = @_; + my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p'; $self->begin_txn_lazy; $self->{xdb}->delete_document($_) for @{$repo->{to_delete}}; my $doc = $PublicInbox::Search::X{Document}->new; @@ -203,12 +204,10 @@ sub store_repo { # wq_do - returns docid $doc->add_boolean_term('T'.'r'); $doc->add_boolean_term('G'.$_) for @{$repo->{roots}}; $doc->set_data($repo->{fp}); # \n delimited - if ($repo->{docid}) { - $self->{xdb}->replace_document($repo->{docid}, $doc); - $repo->{docid}; - } else { - $self->{xdb}->add_document($doc); - } + my $did = $repo->{docid}; + $did ? $self->{xdb}->replace_document($did, $doc) + : ($did = $self->{xdb}->add_document($doc)); + send($op_p, "repo_stored $did", 0); } sub cidx_ckpoint ($;$) { @@ -322,6 +321,17 @@ sub shard_done { # called via PktOp on shard_index completion $repo_ctx->{shard_ok}->{$n} = 1; } +sub repo_stored { + my ($self, $repo_ctx, $did) = @_; + $did > 0 or die "BUG: $repo_ctx->{repo}->{git_dir}: docid=$did"; + my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx); + my ($c, $p) = PublicInbox::PktOp->pair; + $c->{ops}->{shard_done} = [ $self, $repo_ctx, $next ]; + # shard_done fires when all shards are committed + my @active = keys %{$repo_ctx->{active}}; + $IDX_SHARDS[$_]->wq_io_do('shard_commit', [ $p->{op_p} ]) for @active; +} + sub prune_done { # called via prune_do completion my ($self, $n) = @_; return if $DO_QUIT || !$PRUNE_DONE; @@ -584,37 +594,30 @@ sub index_next ($) { sub next_repos { # OnDestroy cb my ($repo_ctx) = @_; - progress($repo_ctx->{self}, "$repo_ctx->{repo}->{git_dir}: done"); - return if $DO_QUIT; - if ($REPO_CTX) { - $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx"; - $REPO_CTX = undef; - index_next($repo_ctx->{self}); - } + my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)}; + progress($self, "$repo->{git_dir}: done"); + return if $DO_QUIT || !$REPO_CTX; + my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active; + die "E: $repo->{git_dir} $n shards failed" if $n; + $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx"; + $REPO_CTX = undef; + index_next($self); } -sub commit_shard { # OnDestroy cb +sub index_done { # OnDestroy cb called when done indexing each code repo my ($repo_ctx) = @_; my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)}; + return if $DO_QUIT; my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active; - die "E: $repo->{git_dir} $n shards failed" if $n && !$DO_QUIT; - - $repo_ctx->{shard_ok} = {}; - if (!$DO_QUIT) { - my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', - $repo); - (!defined($id) || $id <= 0) and - die "E: store_repo $repo->{git_dir}: id=$id"; - $active->{$repo->{shard_n}} = undef; - } - my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx); + die "E: $repo->{git_dir} $n shards failed" if $n; + $repo_ctx->{shard_ok} = {}; # reset for future shard_done + $n = $repo->{shard_n}; + $active->{$n} = undef; my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $repo_ctx->{self}, $repo_ctx, $next ]; - for my $n (keys %$active) { - $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]); - } - # shard_done fires when all shards are committed + $c->{ops}->{repo_stored} = [ $self, $repo_ctx ]; + $IDX_SHARDS[$n]->wq_io_do('store_repo', [ $p->{op_p} ], $repo); + # repo_stored will fire once store_repo is done } sub index_repo { # run_git cb @@ -637,10 +640,10 @@ sub index_repo { # run_git cb $repo->{git_dir} = $git->{git_dir}; my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo }; delete $git->{-cidx_gits_fini}; # may fire gits_fini - my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard, + my $index_done = PublicInbox::OnDestroy->new($$, \&index_done, $repo_ctx); my ($c, $p) = PublicInbox::PktOp->pair; - $c->{ops}->{shard_done} = [ $self, $repo_ctx, $commit_shard ]; + $c->{ops}->{shard_done} = [ $self, $repo_ctx, $index_done ]; for my $n (0..$#shard_in) { $shard_in[$n]->flush or die "flush shard[$n]: $!"; -s $shard_in[$n] or next;