]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
cindex: enter event loop once per run
authorEric Wong <e@80x24.org>
Wed, 5 Apr 2023 11:26:56 +0000 (11:26 +0000)
committerEric Wong <e@80x24.org>
Wed, 5 Apr 2023 20:12:19 +0000 (20:12 +0000)
This avoids needing to alter the sigmask for systems without
signalfd or EVFILT_SIGNAL.  This will also make it easier to
workaround FreeBSD (and likely *BSD) signal behavior in the
next commit.

lib/PublicInbox/CodeSearchIdx.pm

index 05007afd033d769510cfd08b6e47e10ccd2d318b..1000dc6f055b0510c25be44a5fc0e0fa073c1934 100644 (file)
@@ -35,7 +35,6 @@ use Socket qw(MSG_EOR);
 use Carp ();
 our (
        $LIVE, # pid => cmd
-       $DEFER, # [ [ cb, @args ], ... ]
        $LIVE_JOBS, # integer
        $MY_SIG, # like %SIG
        $SIGSET,
@@ -55,7 +54,10 @@ our (
        $PRUNE_NR, # total number pruned
        $PRUNE_DONE, # marks off prune completions
        $NCHANGE, # current number of changes
+       $REPO_CTX, # current repo being indexed in shards
        %ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune
+       $IDX_TODO, # [ $git0, $root0, $git1, $root1, ...]
+       $GIT_TODO, # [ GIT_DIR0, GIT_DIR1, ...]
 );
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -285,8 +287,8 @@ sub cidx_read_log_p {
 }
 
 sub shard_done { # called via PktOp on shard_index completion
-       my ($self, $n) = @_;
-       $self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok});
+       my ($self, $repo_ctx, $on_destroy, $n) = @_;
+       $repo_ctx->{shard_ok}->{$n} = 1;
 }
 
 sub prune_done { # called via PktOp->event_step completion
@@ -298,16 +300,6 @@ sub prune_done { # called via PktOp->event_step completion
                progress($self, 'prune done')
 }
 
-sub prune_busy { # post_loop_do
-       return if $DO_QUIT;
-       grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
-}
-
-sub await_prune () {
-       local @PublicInbox::DS::post_loop_do = (\&prune_busy);
-       PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if prune_busy();
-}
-
 sub seen ($$) {
        my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
        for (1..100) {
@@ -336,32 +328,6 @@ sub docids_by_postlist ($$) { # consider moving to PublicInbox::Search
        @ids;
 }
 
-sub run_deferred () {
-       my $n;
-       while (defined(my $x = shift(@{$DEFER // []}))) {
-               my $cb = shift @$x;
-               $cb->(@$x);
-               ++$n;
-       }
-       $n;
-}
-
-sub need_reap { # post_loop_do
-       my (undef, $jobs) = @_;
-       return if !$LIVE || $DO_QUIT;
-       scalar(keys(%$LIVE)) > $jobs;
-}
-
-sub cidx_reap ($$) {
-       my ($self, $jobs) = @_;
-       while (run_deferred()) {}
-       local @PublicInbox::DS::post_loop_do = (\&need_reap, $jobs);
-       while (need_reap(undef, $jobs)) {
-               PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-       }
-       while (!$jobs && run_deferred()) {}
-}
-
 sub cidx_await_cb { # awaitpid cb
        my ($pid, $cb, $self, $git, @args) = @_;
        return if !$LIVE || $DO_QUIT;
@@ -371,7 +337,7 @@ sub cidx_await_cb { # awaitpid cb
                $git->{-cidx_err} = 1;
                return warn("@$cmd error: \$?=$?\n");
        }
-       push(@$DEFER, [ $cb, $self, $git, @args ]) if $DEFER;
+       $cb->($self, $git, @args);
 }
 
 sub cidx_await ($$$$$@) {
@@ -385,7 +351,6 @@ sub cidx_await ($$$$$@) {
 sub fp_start ($$$) {
        my ($self, $git, $prep_repo) = @_;
        return if !$LIVE || $DO_QUIT;
-       cidx_reap($self, $LIVE_JOBS);
        open my $refs, '+>', undef or die "open: $!";
        my $cmd = ['git', "--git-dir=$git->{git_dir}",
                qw(show-ref --heads --tags --hash)];
@@ -407,7 +372,6 @@ sub fp_fini { # cidx_await cb
 sub ct_start ($$$) {
        my ($self, $git, $prep_repo) = @_;
        return if !$LIVE || $DO_QUIT;
-       cidx_reap($self, $LIVE_JOBS);
        my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
                qw[for-each-ref --sort=-committerdate
                --format=%(committerdate:raw) --count=1
@@ -426,12 +390,13 @@ sub ct_fini { # cidx_await cb
 # TODO: also index gitweb.owner and the full fingerprint for grokmirror?
 sub prep_repo ($$) {
        my ($self, $git) = @_;
-       return if !$LIVE || $DO_QUIT || $git->{-cidx_err};
+       return if !$LIVE || $DO_QUIT;
+       return index_next($self) if $git->{-cidx_err};
        my $repo = $git->{-repo} // die 'BUG: no {-repo}';
        if (!defined($repo->{ct})) {
                warn "W: $git->{git_dir} has no commits, skipping\n";
                delete $git->{-repo};
-               return;
+               return index_next($self);
        }
        my $n = git_dir_hash($git->{git_dir}) % $self->{nshard};
        my $shard = bless { %$self, shard => $n }, ref($self);
@@ -450,7 +415,7 @@ sub check_existing { # retry_reopen callback
        my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data;
        if ($old_fp eq $git->{-repo}->{fp}) { # no change
                delete $git->{-repo};
-               return;
+               return index_next($self);
        }
        $git->{-repo}->{docid} = $docid;
        if (@docids) {
@@ -510,71 +475,88 @@ sub shard_commit { # via wq_io_do
        send($op_p, "shard_done $self->{shard}", MSG_EOR);
 }
 
-sub consumer_open { # post_loop_do
-       my (undef, $c) = @_; # $c is PublicInbox::PktOp
-       $DO_QUIT ? undef : defined($c->{sock});
+sub index_next ($) {
+       my ($self) = @_;
+       return if $DO_QUIT;
+       if ($IDX_TODO && @$IDX_TODO) {
+               index_repo($self, shift @$IDX_TODO);
+       } elsif ($GIT_TODO && @$GIT_TODO) {
+               my $git = PublicInbox::Git->new(shift @$GIT_TODO);
+               my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
+                                                       $self, $git);
+               fp_start($self, $git, $prep_repo);
+               ct_start($self, $git, $prep_repo);
+       }
+       # else: wait for shards_active (post_loop_do) callback
 }
 
-sub wait_active ($$$$) {
-       my ($self, $git, $active, $c) = @_;
-       local @PublicInbox::DS::post_loop_do = (\&consumer_open, $c);
-       PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
-       my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$active;
-       die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
+sub next_repos { # OnDestroy cb
+       my ($repo_ctx) = @_;
+       progress($repo_ctx->{self}, "$repo_ctx->{repo}->{git_dir}: done");
+       return if $DO_QUIT;
+       if ($REPO_CTX) {
+               $REPO_CTX == $repo_ctx or die "BUG: $REPO_CTX != $repo_ctx";
+               $REPO_CTX = undef;
+               index_next($repo_ctx->{self});
+       }
 }
 
-sub commit_active_shards ($$$) {
-       my ($self, $git, $active) = @_;
-       local $self->{-shard_ok} = {};
+sub commit_shard { # OnDestroy cb
+       my ($repo_ctx) = @_;
+       my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
+
+       my $n = grep { ! $repo_ctx->{shard_ok}->{$_} } keys %$active;
+       die "E: $repo->{git_dir} $n shards failed" if $n && !$DO_QUIT;
+
+       $repo_ctx->{shard_ok} = {};
+       if (!$DO_QUIT) {
+               my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo',
+                                                               $repo);
+               (!defined($id) || $id <= 0) and
+                       die "E: store_repo $repo->{git_dir}: id=$id";
+               $active->{$repo->{shard_n}} = undef;
+       }
+       my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx);
        my ($c, $p) = PublicInbox::PktOp->pair;
-       $c->{ops}->{shard_done} = [ $self ];
+       $c->{ops}->{shard_done} = [ $repo_ctx->{self}, $repo_ctx, $next ];
        for my $n (keys %$active) {
                $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
        }
-       undef $p;
-       wait_active($self, $git, $active, $c);
+       undef $p; # shard_done fires when all shards are committed
 }
 
 sub index_repo { # cidx_await cb
-       my ($self, $git, $roots) = @_;
-       return if $git->{-cidx_err} || $DO_QUIT;
-       my $repo = delete $git->{-repo} or return;
-       seek($roots, 0, SEEK_SET) or die "seek: $!";
-       chomp(my @roots = <$roots>);
-       close($roots) or die "close: $!";
-       @roots or return warn("E: $git->{git_dir} has no root commits\n");
+       my ($self, $git) = @_;
+       return if $DO_QUIT;
+       return index_next($self) if $git->{-cidx_err};
+       return push(@$IDX_TODO, $git) if $REPO_CTX; # busy
+       my $repo = delete $git->{-repo} or return index_next($self);
+       my $roots_fh = delete $repo->{roots_fh} // die 'BUG: no {roots_fh}';
+       seek($roots_fh, 0, SEEK_SET) or die "seek: $!";
+       chomp(my @roots = <$roots_fh>);
+       close($roots_fh) or die "close: $!";
+       if (!@roots) {
+               warn("E: $git->{git_dir} has no root commits\n");
+               return index_next($self);
+       }
        $repo->{roots} = \@roots;
        local $self->{current_info} = $git->{git_dir};
        my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
-       local $self->{-shard_ok} = {};
-       my $active = {};
+       $repo->{git_dir} = $git->{git_dir};
+       my $repo_ctx = $REPO_CTX = { self => $self, repo => $repo };
+       my $commit_shard = PublicInbox::OnDestroy->new($$, \&commit_shard,
+                                                       $repo_ctx);
        my ($c, $p) = PublicInbox::PktOp->pair;
-       $c->{ops}->{shard_done} = [ $self ];
+       $c->{ops}->{shard_done} = [ $self, $repo_ctx, $commit_shard ];
        for my $n (0..$#shard_in) {
                -s $shard_in[$n] or next;
                last if $DO_QUIT;
                $IDX_SHARDS[$n]->wq_io_do('shard_index',
                                        [ $shard_in[$n], $p->{op_p} ],
                                        $git, \@roots);
-               $active->{$n} = undef;
+               $repo_ctx->{active}->{$n} = undef;
        }
-       undef $p;
-       @shard_in = ();
-       wait_active($self, $git, $active, $c);
-       if ($DO_QUIT) {
-               commit_active_shards($self, $git, $active);
-               progress($self, "$git->{git_dir}: done");
-               return;
-       }
-       $repo->{git_dir} = $git->{git_dir};
-       my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
-       if ($id > 0) {
-               $active->{$repo->{shard_n}} = undef;
-               commit_active_shards($self, $git, $active);
-               progress($self, "$git->{git_dir}: done");
-               return run_deferred();
-       }
-       die "E: store_repo $git->{git_dir}: id=$id";
+       # shard_done fires when shard_index is done
 }
 
 sub get_roots ($$) {
@@ -582,11 +564,12 @@ sub get_roots ($$) {
        return if !$LIVE || $DO_QUIT;
        my $refs = $git->{-repo}->{refs} // die 'BUG: no {-repo}->{refs}';
        sysseek($refs, 0, SEEK_SET) or die "seek: $!";
-       open my $roots, '+>', undef or die "open: $!";
+       open my $roots_fh, '+>', undef or die "open: $!";
        my $cmd = [ 'git', "--git-dir=$git->{git_dir}",
                        qw(rev-list --stdin --max-parents=0) ];
-       my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots });
-       cidx_await($pid, $cmd, \&index_repo, $self, $git, $roots);
+       my $pid = spawn($cmd, undef, { 0 => $refs, 1 => $roots_fh });
+       $git->{-repo}->{roots_fh} = $roots_fh;
+       cidx_await($pid, $cmd, \&index_repo, $self, $git);
 }
 
 # for PublicInbox::SearchIdx::patch_id and with_umask
@@ -653,15 +636,8 @@ EOM
 
 sub scan_git_dirs ($) {
        my ($self) = @_;
-       for (@{$self->{git_dirs}}) {
-               my $git = PublicInbox::Git->new($_);
-               my $prep_repo = PublicInbox::OnDestroy->new($$, \&prep_repo,
-                                                       $self, $git);
-               fp_start($self, $git, $prep_repo);
-               ct_start($self, $git, $prep_repo);
-               last if $DO_QUIT;
-       }
-       cidx_reap($self, 0);
+       @$GIT_TODO = @{$self->{git_dirs}};
+       index_next($self) for (1..$LIVE_JOBS);
 }
 
 sub prune_cb { # git->check_async callback
@@ -734,7 +710,15 @@ sub prune_start { # via wq_io_do in IDX_SHARDS
 
 sub shards_active { # post_loop_do
        return if $DO_QUIT;
-       scalar(grep { $_->{-cidx_quit} } (@IDX_SHARDS));
+       return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4;
+       return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
+       return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX;
+       return 1 if keys(%$LIVE);
+       for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
+               $s->{-cidx_quit} = 1;
+               $s->wq_close;
+       }
+       scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
 }
 
 # signal handlers
@@ -792,6 +776,7 @@ sub prep_umask ($) {
 
 sub start_prune ($) {
        my ($self) = @_;
+       return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune};
        init_tmp_git_dir($self);
        my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE);
        my ($c, $p) = PublicInbox::PktOp->pair;
@@ -805,14 +790,15 @@ sub start_prune ($) {
 sub cidx_run { # main entry point
        my ($self) = @_;
        my $restore_umask = prep_umask($self);
-       local $DEFER = [];
        local $SIGSET = PublicInbox::DS::block_signals(
                                        POSIX::SIGTSTP, POSIX::SIGCONT);
        my $restore = PublicInbox::OnDestroy->new($$,
                \&PublicInbox::DS::sig_setmask, $SIGSET);
        local $LIVE = {};
        local $PRUNE_DONE = [];
-       local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE);
+       local $IDX_TODO = [];
+       local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE,
+               $GIT_TODO, $REPO_CTX);
        local $BATCH_BYTES = $self->{-opt}->{batch_size} //
                                $PublicInbox::SearchIdx::BATCH_BYTES;
        local @IDX_SHARDS = cidx_init($self);
@@ -858,14 +844,8 @@ sub cidx_run { # main entry point
        local $LIVE_JOBS = $self->{-opt}->{jobs} ||
                        PublicInbox::IPC::detect_nproc() || 2;
        local @RDONLY_XDB = $self->xdb_shards_flat;
-       start_prune($self) if $self->{-opt}->{prune};
+       start_prune($self);
        scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
-       await_prune if $self->{-opt}->{prune};
-
-       for my $s (@IDX_SHARDS) {
-               $s->{-cidx_quit} = 1;
-               $s->wq_close;
-       }
 
        local @PublicInbox::DS::post_loop_do = (\&shards_active);
        PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
@@ -888,10 +868,11 @@ sub shard_done_wait { # awaitpid cb via ipc_worker_reap
        return if $DO_QUIT || !$LIVE;
        if ($? == 0) { # success
                $quit_req // warn 'BUG: {-cidx_quit} unset';
-               return;
+       } else {
+               warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
+               ++$self->{shard_err} if defined($self->{shard_err});
        }
-       warn "PID:$pid $shard->{shard} exited with \$?=$?\n";
-       ++$self->{shard_err} if defined($self->{shard_err});
+       PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC
 }
 
 sub with_umask { # TODO get rid of this treewide and rely on OnDestroy