]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
lei: use ->barrier to commit to lei/store
authorEric Wong <e@80x24.org>
Tue, 16 Apr 2024 20:56:27 +0000 (20:56 +0000)
committerEric Wong <e@80x24.org>
Wed, 17 Apr 2024 09:22:52 +0000 (09:22 +0000)
barrier (synchronous checkpoint) is better than ->done with
parallel lei commands being issued (via '&' or different
terminals), since repeatedly stopping and restarting processes
doesn't play nicely with expensive tasks like `lei reindex'.

This introduces a slight regression in maintaining more
processes (and thus resource use) when lei is idle, but that'll
be fixed in the next commit.

lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/LEI.pm
lib/PublicInbox/LeiInput.pm
lib/PublicInbox/LeiRefreshMailSync.pm
lib/PublicInbox/LeiRemote.pm
lib/PublicInbox/LeiStore.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
t/lei-store-fail.t

index ebbffffc02e0aa235b5ea9c111b17ab83469d687..763a124c1f4e005fbc72af2057ca0750b96a77c9 100644 (file)
@@ -1424,5 +1424,6 @@ no warnings 'once';
 *idx_shard = \&PublicInbox::V2Writable::idx_shard;
 *reindex_checkpoint = \&PublicInbox::V2Writable::reindex_checkpoint;
 *checkpoint = \&PublicInbox::V2Writable::checkpoint;
+*barrier = \&PublicInbox::V2Writable::barrier;
 
 1;
index 5b46686a14fdddf75672ab6daabbab2807d46465..e9a0de6c8d6dc1bbd1af5beb1fdeaf7ac060bd78 100644 (file)
@@ -1443,7 +1443,7 @@ sub wq_eof { # EOF callback for main daemon
        my ($lei, $wq_fld) = @_;
        local $current_lei = $lei;
        my $wq = delete $lei->{$wq_fld // 'wq1'};
-       $lei->sto_done_request($wq);
+       $lei->sto_barrier_request($wq);
        $wq // $lei->fail; # already failed
 }
 
@@ -1548,7 +1548,7 @@ sub lms {
        (-f $f || $creat) ? PublicInbox::LeiMailSync->new($f) : undef;
 }
 
-sub sto_done_request {
+sub sto_barrier_request {
        my ($lei, $wq) = @_;
        return unless $lei->{sto} && $lei->{sto}->{-wq_s1};
        local $current_lei = $lei;
@@ -1558,7 +1558,7 @@ sub sto_done_request {
                my $s = ($wq ? $wq->{lei_sock} : undef) // $lei->{sock};
                my $errfh = $lei->{2} // *STDERR{GLOB};
                my @io = $s ? ($errfh, $s) : ($errfh);
-               eval { $lei->{sto}->wq_io_do('done', \@io) };
+               eval { $lei->{sto}->wq_io_do('barrier', \@io, 1) };
        }
        warn($@) if $@;
 }
index d003d983614e6ac130708c8612899b5ddc2e3087..c388f7dc73fb3f29b2b28240564369fcb2c31f23 100644 (file)
@@ -499,7 +499,7 @@ sub process_inputs {
        }
        # always commit first, even on error partial work is acceptable for
        # lei <import|tag|convert>
-       $self->{lei}->sto_done_request;
+       $self->{lei}->sto_barrier_request;
        $self->{lei}->fail($err) if $err;
 }
 
index a60a9a5e9761ca2ca89320adfd121d291b070e8b..dde232748ec3e5252f0c2dc240e9e0a8d31cf8a6 100644 (file)
@@ -60,7 +60,7 @@ sub input_path_url { # overrides PublicInbox::LeiInput::input_path_url
                        $self->folder_missing($$uri);
                }
        } else { die "BUG: $input not supported" }
-       $self->{lei}->sto_done_request;
+       $self->{lei}->sto_barrier_request;
 }
 
 sub lei_refresh_mail_sync {
index ddcaf2c9ae4faaa15a04ccc75e09a7ef16a5c46d..d6fc40a438bbf7361b0c2f07fd95838e0abfa85b 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (C) 2021 all contributors <meta@public-inbox.org>
+# Copyright (C) all contributors <meta@public-inbox.org>
 # License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
 
 # Make remote externals HTTP(S) inboxes behave like
@@ -51,7 +51,7 @@ sub mset {
        $fh = IO::Uncompress::Gunzip->new($fh, MultiStream=>1, AutoClose=>1);
        eval { PublicInbox::MboxReader->mboxrd($fh, \&each_mboxrd_eml, $self) };
        my $err = $@ ? ": $@" : '';
-       my $wait = $self->{lei}->{sto}->wq_do('done');
+       my $wait = $self->{lei}->{sto}->wq_do('barrier');
        $lei->child_error($?, "@$cmd failed$err") if $err || $?;
        $self; # we are the mset (and $ibx, and $self)
 }
index 0df2352c23b7ae9b437950785aa2ebc1324e567b..162c915fd6b8c6cae2cadf7262d3a5eaca85f0d5 100644 (file)
@@ -81,7 +81,7 @@ sub importer {
                delete $self->{im};
                $im->done;
                undef $im;
-               $self->checkpoint;
+               $self->barrier;
                $max = $self->{priv_eidx}->{mg}->git_epochs + 1;
        }
        my (undef, $tl) = eidx_init($self); # acquire lock
@@ -118,7 +118,7 @@ sub cat_blob {
 
 sub schedule_commit {
        my ($self, $sec) = @_;
-       add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&done, $self);
+       add_uniq_timer($self->{priv_eidx}->{topdir}, $sec, \&barrier, $self);
 }
 
 # follows the stderr file
@@ -391,7 +391,7 @@ sub reindex_done {
        my ($self) = @_;
        my ($eidx, $tl) = eidx_init($self);
        $eidx->git->async_wait_all;
-       # ->done to be called via sto_done_request
+       # ->done to be called via sto_barrier_request
 }
 
 sub add_eml {
@@ -571,11 +571,21 @@ sub set_xvmd {
        sto_export_kw($self, $smsg->{num}, $vmd);
 }
 
-sub checkpoint {
-       my ($self, $wait) = @_;
-       $self->{im}->barrier if $self->{im};
+sub barrier {
+       my ($self) = @_;
+       my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_barrier_request
+       my @err;
+       if ($self->{im}) {
+               eval { $self->{im}->barrier };
+               push(@err, "E: import barrier: $@\n") if $@;
+       }
        delete $self->{lms};
-       $self->{priv_eidx}->checkpoint($wait);
+       eval { $self->{priv_eidx}->barrier };
+       push(@err, "E: priv_eidx barrier: $@\n") if $@;
+       print { $errfh // \*STDERR } @err;
+       send($lei_sock, 'child_error 256', 0) if @err && $lei_sock;
+       xchg_stderr($self);
+       die @err if @err;
 }
 
 sub xchg_stderr {
@@ -594,7 +604,7 @@ sub xchg_stderr {
 
 sub done {
        my ($self) = @_;
-       my ($errfh, $lei_sock) = @$self{0, 1}; # via sto_done_request
+       my ($errfh, $lei_sock) = @$self{0, 1};
        my @err;
        if (my $im = delete($self->{im})) {
                eval { $im->done };
index dfae29e95f1a5fc811c252033434c7c1e2146849..593547f67b1e50cee0564eed3c788e9de04c7024 100644 (file)
@@ -724,8 +724,9 @@ sub post_augment {
        my ($self, $lei, @args) = @_;
        $self->{-au_noted}++ and $lei->qerr("# writing to $self->{dst} ...");
 
+       # FIXME: this synchronous wait can be slow w/ parallel callers
        my $wait = $lei->{opt}->{'import-before'} ?
-                       $lei->{sto}->wq_do('checkpoint', 1) : 0;
+                       $lei->{sto}->wq_do('barrier') : 0;
        # _post_augment_mbox
        my $m = $self->can("_post_augment_$self->{base_type}") or return;
        $m->($self, $lei, @args);
index d4f34733a07b6bae8552c0d1d5b16148beb93355..5a5a1adc0f96a54d442771bdc0ac028e332cce6d 100644 (file)
@@ -363,7 +363,7 @@ print STDERR $_;
                                                $self, $lei, $each_smsg);
                };
                my ($exc, $code) = ($@, $?);
-               $lei->sto_done_request if delete($self->{-sto_imported});
+               $lei->sto_barrier_request if delete($self->{-sto_imported});
                die "E: $exc" if $exc && !$code;
                my $nr = delete $lei->{-nr_remote_eml} // 0;
                if (!$code) { # don't update if no results, maybe MTA is down
@@ -399,7 +399,7 @@ sub query_done { # EOF callback for main daemon
        delete $lei->{lxs};
        ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
                warn "BUG: {sto} missing with --mail-sync";
-       $lei->sto_done_request;
+       $lei->sto_barrier_request;
        $lei->{ovv}->ovv_end($lei);
        if ($l2m) { # close() calls LeiToMail reap_compress
                $l2m->finish_output($lei);
index c2f03148878056669f5ca3aed29e9b087127f6a9..1e83e3834dfe7f2c1b215628974c025e0c0f36f3 100644 (file)
@@ -39,7 +39,7 @@ EOM
        lei_ok qw(q m:testmessage@example.com);
        is($lei_out, "[null]\n", 'delayed commit is unindexed');
 
-       # make immediate ->sto_done_request fail from mboxrd import:
+       # make immediate ->sto_barrier_request fail from mboxrd import:
        remove_tree("$ENV{HOME}/.local/share/lei/store");
        # subsequent lei commands are undefined behavior,
        # but we need to make sure the current lei command fails: