]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
cindex: use run_await wrapper for git commands
authorEric Wong <e@80x24.org>
Wed, 25 Oct 2023 00:29:48 +0000 (00:29 +0000)
committerEric Wong <e@80x24.org>
Wed, 25 Oct 2023 07:28:49 +0000 (07:28 +0000)
Instead of keeping track of live processes ourselves in a hash
table, we'll rely on OnDestroy here to notify us of git command
completions.

lib/PublicInbox/CodeSearchIdx.pm

index 2356164b18310033c822f7660937cd2e5ae3a425..e17cba39bce914f97249bc491f36ceebf520f0f9 100644 (file)
@@ -58,14 +58,14 @@ use PublicInbox::Compat qw(uniqstr);
 use PublicInbox::Aspawn qw(run_await);
 use Carp ();
 use autodie qw(pipe open seek sysseek send);
+our $DO_QUIT = 15; # signal number
 our (
-       $LIVE, # pid => cmd
        $LIVE_JOBS, # integer
+       $GITS_NR, # number of coderepos
        $MY_SIG, # like %SIG
        $SIGSET,
        $TXN_BYTES, # number of bytes in current shard transaction
        $BATCH_BYTES,
-       $DO_QUIT, # signal number
        @RDONLY_XDB, # Xapian::Database
        @IDX_SHARDS, # clones of self
        $MAX_SIZE,
@@ -359,39 +359,31 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
        @ids;
 }
 
-sub cidx_await_cb { # awaitpid cb
-       my ($pid, $cb, $self, $git, @args) = @_;
-       return if !$LIVE || $DO_QUIT;
-       my $cmd = delete $LIVE->{$pid} // die 'BUG: no $cmd';
-       PublicInbox::DS::enqueue_reap() if !keys(%$LIVE); # once more for PLC
-       if ($?) {
-               $git->{-cidx_err} = 1;
-               return warn("@$cmd error: \$?=$?\n");
-       }
-       $cb->($self, $git, @args);
+sub _cb { # run_await cb
+       my ($pid, $cmd, undef, $opt, $cb, $self, $git, @arg) = @_;
+       return if $DO_QUIT;
+       ($git->{-cidx_err} = $?) ? warn("@$cmd error: \$?=$?\n") :
+                               $cb->($opt, $self, $git, @arg);
 }
 
-sub cidx_await ($$$$$@) {
-       my ($pid, $cmd, $cb, $self, $git, @args) = @_;
-       $LIVE->{$pid} = $cmd;
-       awaitpid($pid, \&cidx_await_cb, $cb, $self, $git, @args);
+sub run_git {
+       my ($cmd, $opt, $cb, $self, $git, @arg) = @_;
+       run_await($git->cmd(@$cmd), undef, $opt, \&_cb, $cb, $self, $git, @arg)
 }
 
 # this is different from the grokmirror-compatible fingerprint since we
 # only care about --heads (branches) and --tags, and not even their names
 sub fp_start ($$$) {
        my ($self, $git, $prep_repo) = @_;
-       return if !$LIVE || $DO_QUIT;
+       return if $DO_QUIT;
        open my $refs, '+>', undef;
-       my $cmd = ['git', "--git-dir=$git->{git_dir}",
-               qw(show-ref --heads --tags --hash)];
-       my $pid = spawn($cmd, undef, { 1 => $refs });
        $git->{-repo}->{refs} = $refs;
-       cidx_await($pid, $cmd, \&fp_fini, $self, $git, $prep_repo);
+       run_git([qw(show-ref --heads --tags --hash)], { 1 => $refs },
+               \&fp_fini, $self, $git, $prep_repo);
 }
 
-sub fp_fini { # cidx_await cb
-       my ($self, $git, $prep_repo) = @_;
+sub fp_fini { # run_git cb
+       my (undef, $self, $git, $prep_repo) = @_;
        my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
        seek($refs, 0, SEEK_SET);
        my $buf;
@@ -402,26 +394,23 @@ sub fp_fini { # cidx_await cb
 
 sub ct_start ($$$) {
        my ($self, $git, $prep_repo) = @_;
-       return if !$LIVE || $DO_QUIT;
-       my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
-               qw[for-each-ref --sort=-committerdate
+       return if $DO_QUIT;
+       run_git([ qw[for-each-ref --sort=-committerdate
                --format=%(committerdate:raw) --count=1
-               refs/heads/ refs/tags/] ];
-       my ($rd, $pid) = popen_rd($cmd);
-       cidx_await($pid, $cmd, \&ct_fini, $self, $git, $rd, $prep_repo);
+               refs/heads/ refs/tags/] ], undef, # capture like qx
+               \&ct_fini, $self, $git, $prep_repo);
 }
 
-sub ct_fini { # cidx_await cb
-       my ($self, $git, $rd, $prep_repo) = @_;
-       defined(my $ct = <$rd>) or return;
-       $ct =~ s/\s+.*\z//s; # drop TZ + LF
+sub ct_fini { # run_git cb
+       my ($opt, $self, $git, $prep_repo) = @_;
+       my ($ct) = split(/\s+/, ${$opt->{1}}); # drop TZ + LF
        $git->{-repo}->{ct} = $ct + 0;
 }
 
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
        my ($self, $git) = @_;
-       return if !$LIVE || $DO_QUIT;
+       return if $DO_QUIT;
        return index_next($self) if $git->{-cidx_err};
        my $repo = $git->{-repo} // die 'BUG: no {-repo}';
        if (!defined($repo->{ct})) {
@@ -578,9 +567,9 @@ sub index_next ($) {
        my ($self) = @_;
        return if $DO_QUIT;
        if ($IDX_TODO && @$IDX_TODO) {
-               index_repo($self, shift @$IDX_TODO);
+               index_repo(undef, $self, shift @$IDX_TODO);
        } elsif ($GIT_TODO && @$GIT_TODO) {
-               my $git = PublicInbox::Git->new(shift @$GIT_TODO);
+               my $git = shift @$GIT_TODO;
                my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
                                                        $self, $git);
                fp_start($self, $git, $prep_repo);
@@ -589,7 +578,8 @@ sub index_next ($) {
                return if delete($TODO{dump_roots_start});
                delete $TODO{dump_ibx_start}; # runs OnDestroy once
                return dump_ibx($self, shift @IBXQ) if @IBXQ;
-               undef $DUMP_IBX_WPIPE; # done dumping inboxes, dump roots
+               undef $DUMP_IBX_WPIPE; # done dumping inboxes
+               undef $XHC;
                delete $TODO{associate};
        }
        # else: wait for shards_active (post_loop_do) callback
@@ -630,8 +620,8 @@ sub commit_shard { # OnDestroy cb
        # shard_done fires when all shards are committed
 }
 
-sub index_repo { # cidx_await cb
-       my ($self, $git) = @_;
+sub index_repo { # run_git cb
+       my (undef, $self, $git) = @_;
        return if $DO_QUIT;
        return index_next($self) if $git->{-cidx_err};
        return push(@$IDX_TODO, $git) if $REPO_CTX; # busy
@@ -649,6 +639,7 @@ sub index_repo { # cidx_await cb
        my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
        $repo->{git_dir} = $git->{git_dir};
        my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo };
+       delete $git->{-cidx_gits_fini}; # may fire gits_fini
        my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard,
                                                        $repo_ctx);
        my ($c, $p) = PublicInbox::PktOp->pair;
@@ -667,15 +658,13 @@ sub index_repo { # cidx_await cb
 
 sub get_roots ($$) {
        my ($self, $git) = @_;
-       return if !$LIVE || $DO_QUIT;
+       return if $DO_QUIT;
        my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
        sysseek($refs, 0, SEEK_SET);
        open my $roots_fh, '+>', undef;
-       my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
-                       qw(rev-list --stdin --max-parents=0) ];
-       my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots_fh });
        $git->{-repo}->{roots_fh} = $roots_fh;
-       cidx_await($pid, $cmd, \&index_repo, $self, $git);
+       run_git([ qw(rev-list --stdin --max-parents=0) ],
+               { 0 => $refs, 1 => $roots_fh }, \&index_repo, $self, $git)
 }
 
 # for PublicInbox::SearchIdx::patch_id and with_umask
@@ -751,10 +740,18 @@ EOM
        @shards;
 }
 
+sub gits_fini {
+       undef $GITS_NR;
+       PublicInbox::DS::enqueue_reap(); # kick @post_loop_do
+}
+
 sub scan_git_dirs ($) {
        my ($self) = @_;
-       my $n = @$GIT_TODO = @{$self->{git_dirs}};
-       progress($self, "scanning $n code repositories...");
+       @$GIT_TODO = map { PublicInbox::Git->new($_) } @{$self->{git_dirs}};
+       $GITS_NR = @$GIT_TODO;
+       my $gits_fini = PublicInbox::OnDestroy->new($$, \&gits_fini);
+       $_->{-cidx_gits_fini} = $gits_fini for @$GIT_TODO;
+       progress($self, "scanning $GITS_NR code repositories...");
 }
 
 sub prune_init { # via wq_io_do in IDX_SHARDS
@@ -786,10 +783,10 @@ sub prune_commit { # via wq_io_do in IDX_SHARDS
 
 sub shards_active { # post_loop_do
        return if $DO_QUIT;
-       return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4;
+       return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO) != 3;
        return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
-       return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX;
-       return 1 if keys(%$LIVE) || @IBXQ || keys(%TODO);
+       return 1 if $GITS_NR || scalar(@$IDX_TODO) || $REPO_CTX;
+       return 1 if @IBXQ || keys(%TODO);
        for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
                $s->{-cidx_quit} = 1 if defined($s->{-wq_s1});
                $s->wq_close; # may recurse via awaitpid outside of event_loop
@@ -1105,14 +1102,13 @@ sub cidx_run { # main entry point
                                        POSIX::SIGTSTP, POSIX::SIGCONT);
        my $restore = PublicInbox::OnDestroy->new($$,
                \&PublicInbox::DS::sig_setmask, $SIGSET);
-       local $LIVE = {};
        local $PRUNE_DONE = [];
        local $IDX_TODO = [];
        local $GIT_TODO = [];
        local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
                $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
                %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
-               @ID2ROOT, $XHC, @SORT);
+               @ID2ROOT, $XHC, @SORT, $GITS_NR);
        local $BATCH_BYTES = $self->{-opt}->{batch_size} //
                                $PublicInbox::SearchIdx::BATCH_BYTES;
        local $self->{ASSOC_PFX} = \@ASSOC_PFX;
@@ -1202,14 +1198,13 @@ sub ipc_atfork_child { # @IDX_SHARDS
 sub shard_done_wait { # awaitpid cb via ipc_worker_reap
        my ($pid, $shard, $self) = @_;
        my $quit_req = delete($shard->{-cidx_quit});
-       return if $DO_QUIT || !$LIVE;
+       return if $DO_QUIT;
        if ($? == 0) { # success
                $quit_req // warn 'BUG: {-cidx_quit} unset';
        } else {
                warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
                ++$self->{shard_err} if defined($self->{shard_err});
        }
-       PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC
 }
 
 1;