From: Eric Wong Date: Wed, 25 Oct 2023 00:29:48 +0000 (+0000) Subject: cindex: use run_await wrapper for git commands X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bd838e51061b121ea95ce49715678aafae148bee;p=thirdparty%2Fpublic-inbox.git cindex: use run_await wrapper for git commands Instead of keeping track of live processes ourselves in a hash table, we'll rely on OnDestroy here to notify us of git command completions. --- diff --git a/lib/PublicInbox/CodeSearchIdx.pm b/lib/PublicInbox/CodeSearchIdx.pm index 2356164b1..e17cba39b 100644 --- a/lib/PublicInbox/CodeSearchIdx.pm +++ b/lib/PublicInbox/CodeSearchIdx.pm @@ -58,14 +58,14 @@ use PublicInbox::Compat qw(uniqstr); use PublicInbox::Aspawn qw(run_await); use Carp (); use autodie qw(pipe open seek sysseek send); +our $DO_QUIT = 15; # signal number our ( - $LIVE, # pid => cmd $LIVE_JOBS, # integer + $GITS_NR, # number of coderepos $MY_SIG, # like %SIG $SIGSET, $TXN_BYTES, # number of bytes in current shard transaction $BATCH_BYTES, - $DO_QUIT, # signal number @RDONLY_XDB, # Xapian::Database @IDX_SHARDS, # clones of self $MAX_SIZE, @@ -359,39 +359,31 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search @ids; } -sub cidx_await_cb { # awaitpid cb - my ($pid, $cb, $self, $git, @args) = @_; - return if !$LIVE || $DO_QUIT; - my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd'; - PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC - if ($?) { - $git->{-cidx_err} = 1; - return warn("@$cmd error: \$?=$?\n"); - } - $cb->($self, $git, @args); +sub _cb { # run_await cb + my ($pid, $cmd, undef, $opt, $cb, $self, $git, @arg) = @_; + return if $DO_QUIT; + ($git->{-cidx_err} = $?) ? warn("@$cmd error: \$?=$?\n") : + $cb->($opt, $self, $git, @arg); } -sub cidx_await ($$$$$@) { - my ($pid, $cmd, $cb, $self, $git, @args) = @_; - $LIVE->{$pid} = $cmd; - awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args); +sub run_git { + my ($cmd, $opt, $cb, $self, $git, @arg) = @_; + run_await($git->cmd(@$cmd), undef, $opt, \&_cb, $cb, $self, $git, @arg) } # this is different from the grokmirror-compatible fingerprint since we # only care about --heads (branches) and --tags, and not even their names sub fp_start ($$$) { my ($self, $git, $prep_repo) = @_; - return if !$LIVE || $DO_QUIT; + return if $DO_QUIT; open my $refs, '+>', undef; - my $cmd = ['git', "--git-dir=$git->{git_dir}", - qw(show-ref --heads --tags --hash)]; - my $pid = spawn($cmd, undef, { 1 => $refs }); $git->{-repo}->{refs} = $refs; - cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo); + run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs }, + \&fp_fini, $self, $git, $prep_repo); } -sub fp_fini { # cidx_await cb - my ($self, $git, $prep_repo) = @_; +sub fp_fini { # run_git cb + my (undef, $self, $git, $prep_repo) = @_; my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; seek($refs, 0, SEEK_SET); my $buf; @@ -402,26 +394,23 @@ sub fp_fini { # cidx_await cb sub ct_start ($$$) { my ($self, $git, $prep_repo) = @_; - return if !$LIVE || $DO_QUIT; - my $cmd = [ 'git', "--git-dir=$git->{git_dir}", - qw[for-each-ref --sort=-committerdate + return if $DO_QUIT; + run_git([ qw[for-each-ref --sort=-committerdate --format=%(committerdate:raw) --count=1 - refs/heads/ refs/tags/] ]; - my ($rd, $pid) = popen_rd($cmd); - cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo); + refs/heads/ refs/tags/] ], undef, # capture like qx + \&ct_fini, $self, $git, $prep_repo); } -sub ct_fini { # cidx_await cb - my ($self, $git, $rd, $prep_repo) = @_; - defined(my $ct = <$rd>) or return; - $ct =~ s/\s+.*\z//s; # drop TZ + LF +sub ct_fini { # run_git cb + my ($opt, $self, $git, $prep_repo) = @_; + my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF $git->{-repo}->{ct} = $ct + 0; } # TODO: also index gitweb.owner and the full fingerprint for grokmirror? sub prep_repo ($$) { my ($self, $git) = @_; - return if !$LIVE || $DO_QUIT; + return if $DO_QUIT; return index_next($self) if $git->{-cidx_err}; my $repo = $git->{-repo} // die 'BUG: no {-repo}'; if (!defined($repo->{ct})) { @@ -578,9 +567,9 @@ sub index_next ($) { my ($self) = @_; return if $DO_QUIT; if ($IDX_TODO && @$IDX_TODO) { - index_repo($self, shift @$IDX_TODO); + index_repo(undef, $self, shift @$IDX_TODO); } elsif ($GIT_TODO && @$GIT_TODO) { - my $git = PublicInbox::Git->new(shift @$GIT_TODO); + my $git = shift @$GIT_TODO; my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo, $self, $git); fp_start($self, $git, $prep_repo); @@ -589,7 +578,8 @@ sub index_next ($) { return if delete($TODO{dump_roots_start}); delete $TODO{dump_ibx_start}; # runs OnDestroy once return dump_ibx($self, shift @IBXQ) if @IBXQ; - undef $DUMP_IBX_WPIPE; # done dumping inboxes, dump roots + undef $DUMP_IBX_WPIPE; # done dumping inboxes + undef $XHC; delete $TODO{associate}; } # else: wait for shards_active (post_loop_do) callback @@ -630,8 +620,8 @@ sub commit_shard { # OnDestroy cb # shard_done fires when all shards are committed } -sub index_repo { # cidx_await cb - my ($self, $git) = @_; +sub index_repo { # run_git cb + my (undef, $self, $git) = @_; return if $DO_QUIT; return index_next($self) if $git->{-cidx_err}; return push(@$IDX_TODO, $git) if $REPO_CTX; # busy @@ -649,6 +639,7 @@ sub index_repo { # cidx_await cb my @shard_in = partition_refs($self, $git, delete($repo->{refs})); $repo->{git_dir} = $git->{git_dir}; my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo }; + delete $git->{-cidx_gits_fini}; # may fire gits_fini my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard, $repo_ctx); my ($c, $p) = PublicInbox::PktOp->pair; @@ -667,15 +658,13 @@ sub index_repo { # cidx_await cb sub get_roots ($$) { my ($self, $git) = @_; - return if !$LIVE || $DO_QUIT; + return if $DO_QUIT; my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}'; sysseek($refs, 0, SEEK_SET); open my $roots_fh, '+>', undef; - my $cmd = [ 'git', "--git-dir=$git->{git_dir}", - qw(rev-list --stdin --max-parents=0) ]; - my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots_fh }); $git->{-repo}->{roots_fh} = $roots_fh; - cidx_await($pid, $cmd, \&index_repo, $self, $git); + run_git([ qw(rev-list --stdin --max-parents=0) ], + { 0 => $refs, 1 => $roots_fh }, \&index_repo, $self, $git) } # for PublicInbox::SearchIdx::patch_id and with_umask @@ -751,10 +740,18 @@ EOM @shards; } +sub gits_fini { + undef $GITS_NR; + PublicInbox::DS::enqueue_reap(); # kick @post_loop_do +} + sub scan_git_dirs ($) { my ($self) = @_; - my $n = @$GIT_TODO = @{$self->{git_dirs}}; - progress($self, "scanning $n code repositories..."); + @$GIT_TODO = map { PublicInbox::Git->new($_) } @{$self->{git_dirs}}; + $GITS_NR = @$GIT_TODO; + my $gits_fini = PublicInbox::OnDestroy->new($$, \&gits_fini); + $_->{-cidx_gits_fini} = $gits_fini for @$GIT_TODO; + progress($self, "scanning $GITS_NR code repositories..."); } sub prune_init { # via wq_io_do in IDX_SHARDS @@ -786,10 +783,10 @@ sub prune_commit { # via wq_io_do in IDX_SHARDS sub shards_active { # post_loop_do return if $DO_QUIT; - return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4; + return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO) != 3; return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS; - return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX; - return 1 if keys(%$LIVE) || @IBXQ || keys(%TODO); + return 1 if $GITS_NR || scalar(@$IDX_TODO) || $REPO_CTX; + return 1 if @IBXQ || keys(%TODO); for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) { $s->{-cidx_quit} = 1 if defined($s->{-wq_s1}); $s->wq_close; # may recurse via awaitpid outside of event_loop @@ -1105,14 +1102,13 @@ sub cidx_run { # main entry point POSIX::SIGTSTP, POSIX::SIGCONT); my $restore = PublicInbox::OnDestroy->new($$, \&PublicInbox::DS::sig_setmask, $SIGSET); - local $LIVE = {}; local $PRUNE_DONE = []; local $IDX_TODO = []; local $GIT_TODO = []; local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE, $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV, %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE, - @ID2ROOT, $XHC, @SORT); + @ID2ROOT, $XHC, @SORT, $GITS_NR); local $BATCH_BYTES = $self->{-opt}->{batch_size} // $PublicInbox::SearchIdx::BATCH_BYTES; local $self->{ASSOC_PFX} = \@ASSOC_PFX; @@ -1202,14 +1198,13 @@ sub ipc_atfork_child { # @IDX_SHARDS sub shard_done_wait { # awaitpid cb via ipc_worker_reap my ($pid, $shard, $self) = @_; my $quit_req = delete($shard->{-cidx_quit}); - return if $DO_QUIT || !$LIVE; + return if $DO_QUIT; if ($? == 0) { # success $quit_req // warn 'BUG: {-cidx_quit} unset'; } else { warn "PID:$pid $shard->{shard} exited with \$?=$?\n"; ++$self->{shard_err} if defined($self->{shard_err}); } - PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC } 1;