]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
lei: avoid extra fork for v2 outputs
authorEric Wong <e@80x24.org>
Wed, 15 Nov 2023 09:21:44 +0000 (09:21 +0000)
committerEric Wong <e@80x24.org>
Thu, 16 Nov 2023 10:56:53 +0000 (10:56 +0000)
We've always forced LeiToMail to only have one process for v2
outputs anyways since v2 has its own sharding and IPC.  Thus we
can use the single LeiToMail process directly to avoid extra IPC
overhead.

lib/PublicInbox/LeiConvert.pm
lib/PublicInbox/LeiToMail.pm
lib/PublicInbox/LeiXSearch.pm
lib/PublicInbox/V2Writable.pm

index 4a1f83233ab508c1a402bf536a1a68cb18d35e56..9d2479b005de7e0599f043397abd2e04f2937949 100644 (file)
@@ -35,12 +35,9 @@ sub process_inputs { # via wq_do
        my $lei = $self->{lei};
        delete $lei->{1};
        my $l2m = delete $lei->{l2m};
-       my $nr_w = delete($l2m->{-nr_write}) // 0;
        delete $self->{wcb}; # commit
-       if (my $v2w = delete $lei->{v2w}) {
-               $nr_w = $v2w->wq_do('done'); # may die
-               $v2w->wq_close;
-       }
+       if (my $v2w = delete $lei->{v2w}) { $v2w->done } # may die
+       my $nr_w = delete($l2m->{-nr_write}) // 0;
        my $d = (delete($l2m->{-nr_seen}) // 0) - $nr_w;
        $d = $d ? " ($d duplicates)" : '';
        $lei->qerr("# converted $nr_w messages$d");
index 2d9b7061fa720edcec094a7b6013ff8126762ee7..0d62888d72da91f3b6004955a9db070ed3105033 100644 (file)
@@ -369,12 +369,14 @@ sub _v2_write_cb ($$) {
        my ($self, $lei) = @_;
        my $dedupe = $lei->{dedupe};
        $dedupe->prepare_dedupe if $dedupe;
+       # only call in worker
+       $PublicInbox::Import::DROP_UNIQUE_UNSUB = $lei->{-drop_unique_unsub};
        sub { # for git_to_mail
                my ($bref, $smsg, $eml) = @_;
                $eml //= PublicInbox::Eml->new($bref);
                ++$self->{-nr_seen};
                return if $dedupe && $dedupe->is_dup($eml, $smsg);
-               $lei->{v2w}->wq_do('add', $eml); # V2Writable->add
+               $lei->{v2w}->add($eml) and ++$self->{-nr_write};
        }
 }
 
@@ -647,11 +649,6 @@ sub _do_augment_mbox {
        $dedupe->pause_dedupe if $dedupe;
 }
 
-sub v2w_done_wait { # awaitpid cb
-       my ($pid, $v2w, $lei) = @_;
-       $lei->child_error($?, "error for $v2w->{ibx}->{inboxdir}") if $?;
-}
-
 sub _pre_augment_v2 {
        my ($self, $lei) = @_;
        my $dir = $self->{dst};
@@ -677,11 +674,9 @@ sub _pre_augment_v2 {
                $lei->x_it(shift);
                die "E: can't write v2 inbox with broken config\n";
        });
+       $lei->{-drop_unique_unsub} = $PublicInbox::Import::DROP_UNIQUE_UNSUB;
        $ibx->init_inbox if @creat;
-       my $v2w = $ibx->importer;
-       $v2w->wq_workers_start("lei/v2w $dir", 1, $lei->oldset, {lei => $lei},
-                               \&v2w_done_wait, $lei);
-       $lei->{v2w} = $v2w;
+       $lei->{v2w} = $ibx->importer;
        return if !$lei->{opt}->{shared};
        my $d = "$lei->{ale}->{git}->{git_dir}/objects";
        open my $fh, '+>>', my $f = "$dir/git/0.git/objects/info/alternates";
@@ -806,6 +801,10 @@ sub wq_atexit_child {
        my $lei = $self->{lei};
        $lei->{ale}->git->async_wait_all;
        my ($nr_w, $nr_s) = delete(@$self{qw(-nr_write -nr_seen)});
+       if (my $v2w = delete $lei->{v2w}) {
+               eval { $v2w->done };
+               $lei->child_error($?, "E: $@ ($v2w->{ibx}->{inboxdir})") if $@;
+       }
        delete $self->{wcb};
        (($nr_w //= 0) + ($nr_s //= 0)) or return;
        return if $lei->{early_mua} || !$lei->{-progress} || !$lei->{pkt_op_p};
index 7eda6f9e58fe51ed4bf2b0e66ea6fbd0bcb055c6..5e36c11afe7c23e3025f558b6f72fb9c18c07fd5 100644 (file)
@@ -391,11 +391,6 @@ sub query_done { # EOF callback for main daemon
        ($lei->{opt}->{'mail-sync'} && !$lei->{sto}) and
                warn "BUG: {sto} missing with --mail-sync";
        $lei->sto_done_request;
-       my $nr_w = delete($lei->{-nr_write}) // 0;
-       if (my $v2w = delete $lei->{v2w}) {
-               $nr_w = $v2w->wq_do('done'); # may die
-               $v2w->wq_close;
-       }
        $lei->{ovv}->ovv_end($lei);
        if ($l2m) { # close() calls LeiToMail reap_compress
                if (my $out = delete $lei->{old_1}) {
@@ -413,6 +408,7 @@ Error closing $lei->{ovv}->{dst}: \$!=$! \$?=$?
                        delete $l2m->{mbl}; # drop dotlock
                }
        }
+       my $nr_w = delete($lei->{-nr_write}) // 0;
        my $nr_dup = (delete($lei->{-nr_seen}) // 0) - $nr_w;
        if ($lei->{-progress}) {
                my $tot = $lei->{-mset_total} // 0;
index 231ed516927ab8ed449f3a5caf3215ee4f0d879d..fb2593961c1876c51e9e1a70287017d8e248341c 100644 (file)
@@ -135,7 +135,6 @@ sub add {
        if (do_idx($self, $mime, $smsg)) {
                $self->checkpoint;
        }
-       ++$self->{-nr_add}; # for lei convert
        $cmt;
 }
 
@@ -611,7 +610,6 @@ sub done {
        $self->lock_release(!!$nbytes) if $shards;
        $self->git->cleanup;
        die $err if $err;
-       delete $self->{-nr_add}; # for lei-convert
 }
 
 sub importer {