]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
import: switch to Unix stream socket for fast-import
authorEric Wong <e@80x24.org>
Wed, 11 Oct 2023 07:20:54 +0000 (07:20 +0000)
committerEric Wong <e@80x24.org>
Wed, 11 Oct 2023 22:10:49 +0000 (22:10 +0000)
We use fewer file descriptors and fewer lines of code this way.
I'm not aware of any place we rely on POSIX pipe semantics with
`git fast-import', and sockets have bigger buffers by default
in most cases (even if Linux allows larger pipe buffers).

lib/PublicInbox/Import.pm
script/public-inbox-convert

index cd03da05d7fe80278844b2747d8e2cbc9f535a4f..894ba81804fc363d7da3ab7fb595ebaa9c2a3067 100644 (file)
@@ -8,7 +8,7 @@
 package PublicInbox::Import;
 use v5.12;
 use parent qw(PublicInbox::Lock);
-use PublicInbox::Spawn qw(run_die popen_rd);
+use PublicInbox::Spawn qw(run_die popen_rd spawn);
 use PublicInbox::MID qw(mids mid2path);
 use PublicInbox::Address;
 use PublicInbox::Smsg;
@@ -16,9 +16,11 @@ use PublicInbox::MsgTime qw(msg_datestamp);
 use PublicInbox::ContentHash qw(content_digest);
 use PublicInbox::MDA;
 use PublicInbox::Eml;
+use PublicInbox::ProcessIO;
 use POSIX qw(strftime);
-use autodie qw(read close);
+use autodie qw(read close socketpair);
 use Carp qw(croak);
+use Socket qw(AF_UNIX SOCK_STREAM);
 
 sub default_branch () {
        state $default_branch = do {
@@ -56,11 +58,10 @@ sub new {
 # idempotent start function
 sub gfi_start {
        my ($self) = @_;
-
-       return ($self->{in}, $self->{out}) if $self->{in};
-
-       my ($in_r, $out_r, $out_w);
-       pipe($out_r, $out_w) or die "pipe failed: $!";
+       my $io = $self->{io};
+       return $io if $io;
+       socketpair($io, my $s2, AF_UNIX, SOCK_STREAM, 0);
+       $io->autoflush(1);
 
        $self->lock_acquire;
        eval {
@@ -73,18 +74,17 @@ sub gfi_start {
                        die "fatal: ls-tree -r -z --name-only $ref: \$?=$?" if $?;
                        $self->{-tree} = { map { $_ => 1 } split(/\0/, $t) };
                }
-               $in_r = $self->{in} = $git->popen(qw(fast-import
-                                       --quiet --done --date-format=raw),
-                                       undef, { 0 => $out_r });
-               $out_w->autoflush(1);
-               $self->{out} = $out_w;
+               my $gfi = [ 'git', "--git-dir=$git->{git_dir}", qw(fast-import
+                               --quiet --done --date-format=raw) ];
+               my $pid = spawn($gfi, undef, { 0 => $s2, 1 => $s2 });
                $self->{nchg} = 0;
+               $self->{io} = PublicInbox::ProcessIO->maybe_new($pid, $io);
        };
        if ($@) {
                $self->lock_release;
                die $@;
        }
-       ($in_r, $out_w);
+       $self->{io};
 }
 
 sub wfail () { die "write to fast-import failed: $!" }
@@ -99,22 +99,22 @@ sub norm_body ($) {
 }
 
 # only used for v1 (ssoma) inboxes
-sub _check_path ($$$$) {
-       my ($r, $w, $tip, $path) = @_;
+sub _check_path ($$$) {
+       my ($io, $tip, $path) = @_;
        return if $tip eq '';
-       print $w "ls $tip $path\n" or wfail;
+       print $io "ls $tip $path\n" or wfail;
        local $/ = "\n";
-       my $info = <$r> // die "EOF from fast-import: $!";
+       my $info = <$io> // die "EOF from fast-import: $!";
        $info =~ /\Amissing / ? undef : $info;
 }
 
-sub _cat_blob ($$$) {
-       my ($r, $w, $oid) = @_;
-       print $w "cat-blob $oid\n" or wfail;
+sub _cat_blob ($$) {
+       my ($io, $oid) = @_;
+       print $io "cat-blob $oid\n" or wfail;
        local $/ = "\n";
-       my $info = <$r> // die "EOF from fast-import / cat-blob: $!";
+       my $info = <$io> // die "EOF from fast-import / cat-blob: $!";
        $info =~ /\A[a-f0-9]{40,} blob ([0-9]+)\n\z/ or return;
-       my $n = read($r, my $buf, my $len = $1 + 1);
+       my $n = read($io, my $buf, my $len = $1 + 1);
        $n == $len or croak "cat-blob: short read: $n < $len";
        my $lf = chop $buf;
        croak "bad read on final byte: <$lf>" if $lf ne "\n";
@@ -123,17 +123,16 @@ sub _cat_blob ($$$) {
 
 sub cat_blob {
        my ($self, $oid) = @_;
-       my ($r, $w) = $self->gfi_start;
-       _cat_blob($r, $w, $oid);
+       _cat_blob(gfi_start($self), $oid);
 }
 
 sub check_remove_v1 {
-       my ($r, $w, $tip, $path, $mime) = @_;
+       my ($io, $tip, $path, $mime) = @_;
 
-       my $info = _check_path($r, $w, $tip, $path) or return ('MISSING',undef);
+       my $info = _check_path($io, $tip, $path) or return ('MISSING',undef);
        $info =~ m!\A100644 blob ([a-f0-9]{40,})\t!s or die "not blob: $info";
        my $oid = $1;
-       my $bref = _cat_blob($r, $w, $oid) or die "BUG: cat-blob $1 failed";
+       my $bref = _cat_blob($io, $oid) or die "BUG: cat-blob $1 failed";
        PublicInbox::Eml::strip_from($$bref);
        my $cur = PublicInbox::Eml->new($bref);
        my $cur_s = $cur->header('Subject') // '';
@@ -146,16 +145,15 @@ sub check_remove_v1 {
 
 sub checkpoint {
        my ($self) = @_;
-       return unless $self->{in};
-       print { $self->{out} } "checkpoint\n" or wfail;
+       print { $self->{io} // return } "checkpoint\n" or wfail;
        undef;
 }
 
 sub progress {
        my ($self, $msg) = @_;
-       return unless $self->{in};
-       print { $self->{out} } "progress $msg\n" or wfail;
-       readline($self->{in}) eq "progress $msg\n" or die
+       my $io = $self->{io} or return;
+       print $io "progress $msg\n" or wfail;
+       readline($io) eq "progress $msg\n" or die
                "progress $msg not received\n";
        undef;
 }
@@ -205,10 +203,9 @@ sub barrier {
 # used for v2
 sub get_mark {
        my ($self, $mark) = @_;
-       die "not active\n" unless $self->{in};
-       my ($r, $w) = $self->gfi_start;
-       print $w "get-mark $mark\n" or wfail;
-       my $oid = <$r> // die "get-mark failed, need git 2.6.0+\n";
+       my $io = $self->{io} or croak "not active\n";
+       print $io "get-mark $mark\n" or wfail;
+       my $oid = <$io> // die "get-mark failed, need git 2.6.0+\n";
        chomp($oid);
        $oid;
 }
@@ -225,11 +222,11 @@ sub remove {
        my $path_type = $self->{path_type};
        my ($path, $err, $cur, $blob);
 
-       my ($r, $w) = $self->gfi_start;
+       my $io = gfi_start($self);
        my $tip = $self->{tip};
        if ($path_type eq '2/38') {
                $path = mid2path(v1_mid0($mime));
-               ($err, $cur) = check_remove_v1($r, $w, $tip, $path, $mime);
+               ($err, $cur) = check_remove_v1($io, $tip, $path, $mime);
                return ($err, $cur) if $err;
        } else {
                my $sref;
@@ -241,7 +238,7 @@ sub remove {
                }
                my $len = length($$sref);
                $blob = $self->{mark}++;
-               print $w "blob\nmark :$blob\ndata $len\n",
+               print $io "blob\nmark :$blob\ndata $len\n",
                        $$sref, "\n" or wfail;
        }
 
@@ -249,22 +246,22 @@ sub remove {
        my $commit = $self->{mark}++;
        my $parent = $tip =~ /\A:/ ? $tip : undef;
        unless ($parent) {
-               print $w "reset $ref\n" or wfail;
+               print $io "reset $ref\n" or wfail;
        }
        my $ident = $self->{ident};
        my $now = now_raw();
        $msg //= 'rm';
        my $len = length($msg) + 1;
-       print $w "commit $ref\nmark :$commit\n",
+       print $io "commit $ref\nmark :$commit\n",
                "author $ident $now\n",
                "committer $ident $now\n",
                "data $len\n$msg\n\n",
                'from ', ($parent ? $parent : $tip), "\n" or wfail;
        if (defined $path) {
-               print $w "D $path\n\n" or wfail;
+               print $io "D $path\n\n" or wfail;
        } else {
-               clean_tree_v2($self, $w, 'd');
-               print $w "M 100644 :$blob d\n\n" or wfail;
+               clean_tree_v2($self, $io, 'd');
+               print $io "M 100644 :$blob d\n\n" or wfail;
        }
        $self->{nchg}++;
        (($self->{tip} = ":$commit"), $cur);
@@ -354,11 +351,11 @@ sub v1_mid0 ($) {
        $mids->[0];
 }
 sub clean_tree_v2 ($$$) {
-       my ($self, $w, $keep) = @_;
+       my ($self, $io, $keep) = @_;
        my $tree = $self->{-tree} or return; #v2 only
        delete $tree->{$keep};
        foreach (keys %$tree) {
-               print $w "D $_\n" or wfail;
+               print $io "D $_\n" or wfail;
        }
        %$tree = ($keep => 1);
 }
@@ -377,10 +374,10 @@ sub add {
                $path = 'm';
        }
 
-       my ($r, $w) = $self->gfi_start;
+       my $io = gfi_start($self);
        my $tip = $self->{tip};
        if ($path_type eq '2/38') {
-               _check_path($r, $w, $tip, $path) and return;
+               _check_path($io, $tip, $path) and return;
        }
 
        drop_unwanted_headers($mime);
@@ -394,8 +391,7 @@ sub add {
        my $raw_email = $mime->{-public_inbox_raw} // $mime->as_string;
        my $n = length($raw_email);
        $self->{bytes_added} += $n;
-       print $w "blob\nmark :$blob\ndata ", $n, "\n" or wfail;
-       print $w $raw_email, "\n" or wfail;
+       print $io "blob\nmark :$blob\ndata $n\n", $raw_email, "\n" or wfail;
 
        # v2: we need this for Xapian
        if ($smsg) {
@@ -422,19 +418,19 @@ sub add {
        my $parent = $tip =~ /\A:/ ? $tip : undef;
 
        unless ($parent) {
-               print $w "reset $ref\n" or wfail;
+               print $io "reset $ref\n" or wfail;
        }
 
-       print $w "commit $ref\nmark :$commit\n",
+       print $io "commit $ref\nmark :$commit\n",
                "author $author $at\n",
-               "committer $self->{ident} $ct\n" or wfail;
-       print $w "data ", (length($subject) + 1), "\n",
+               "committer $self->{ident} $ct\n",
+               "data ", (length($subject) + 1), "\n",
                $subject, "\n\n" or wfail;
        if ($tip ne '') {
-               print $w 'from ', ($parent ? $parent : $tip), "\n" or wfail;
+               print $io 'from ', ($parent ? $parent : $tip), "\n" or wfail;
        }
-       clean_tree_v2($self, $w, $path);
-       print $w "M 100644 :$blob $path\n\n" or wfail;
+       clean_tree_v2($self, $io, $path);
+       print $io "M 100644 :$blob $path\n\n" or wfail;
        $self->{nchg}++;
        $self->{tip} = ":$commit";
 }
@@ -475,15 +471,14 @@ EOM
 }
 
 # true if locked and active
-sub active { !!$_[0]->{out} }
+sub active { !!$_[0]->{io} }
 
 sub done {
        my ($self) = @_;
-       my $w = delete $self->{out} or return;
+       my $io = delete $self->{io} or return;
        eval {
-               my $r = delete $self->{in} or die 'BUG: missing {in} when done';
-               print $w "done\n" or wfail;
-               close $r;
+               print $io "done\n" or wfail;
+               close $io; # reaps and dies on error
        };
        my $wait_err = $@;
        my $nchg = delete $self->{nchg};
@@ -496,10 +491,7 @@ sub done {
        die $wait_err if $wait_err;
 }
 
-sub atfork_child {
-       my ($self) = @_;
-       close($_) for (grep defined, delete(@$self{qw(in out)}));
-}
+sub atfork_child { close(delete($_[0]->{io}) // return) }
 
 sub digest2mid ($$;$) {
        my ($dig, $hdr, $fallback_time) = @_;
@@ -552,7 +544,7 @@ sub replace_oids {
        my $git = $self->{git};
        my @export = (qw(fast-export --no-data --use-done-feature), $old);
        my $rd = $git->popen(@export);
-       my ($r, $w) = $self->gfi_start;
+       my $io = gfi_start($self);
        my @buf;
        my $nreplace = 0;
        my @oids;
@@ -563,7 +555,7 @@ sub replace_oids {
                        push @buf, "reset $tmp\n";
                } elsif (/^commit (?:.+)/) {
                        if (@buf) {
-                               print $w @buf or wfail;
+                               print $io @buf or wfail;
                                @buf = ();
                        }
                        push @buf, "commit $tmp\n";
@@ -599,7 +591,7 @@ sub replace_oids {
                                rewrite_commit($self, \@oids, \@buf, $mime);
                                $nreplace++;
                        }
-                       print $w @buf, "\n" or wfail;
+                       print $io @buf, "\n" or wfail;
                        @buf = ();
                } elsif ($_ eq "done\n") {
                        $done = 1;
@@ -612,7 +604,7 @@ sub replace_oids {
        }
        close $rd;
        if (@buf) {
-               print $w @buf or wfail;
+               print $io @buf or wfail;
        }
        die 'done\n not seen from fast-export' unless $done;
        chomp(my $cmt = $self->get_mark(":$mark")) if $nreplace;
index 780f7194150b15a840504c2dd940bf795ce93bd8..0cc52777641eebd03e23ef29a7a4a2e7e65aecde 100755 (executable)
@@ -120,7 +120,7 @@ my $head = $old->{ref_head} || 'HEAD';
 my $rd = $old->git->popen(qw(fast-export --use-done-feature), $head);
 $v2w->idx_init($opt);
 my $im = $v2w->importer;
-my ($r, $w) = $im->gfi_start;
+my $io = $im->gfi_start;
 my $h = '[0-9a-f]';
 my %D;
 my $last;
@@ -131,12 +131,12 @@ while (<$rd>) {
                $state = 'commit';
        } elsif (/^data ([0-9]+)/) {
                my $len = $1;
-               print $w $_ or $im->wfail;
+               print $io $_ or $im->wfail;
                while ($len) {
                        my $n = read($rd, my $tmp, $len) or die "read: $!";
                        warn "$n != $len\n" if $n != $len;
                        $len -= $n;
-                       print $w $tmp or $im->wfail;
+                       print $io $tmp or $im->wfail;
                }
                next;
        } elsif ($state eq 'commit') {
@@ -144,9 +144,9 @@ while (<$rd>) {
                        my ($mark, $path) = ($1, $2);
                        $D{$path} = $mark;
                        if ($last && $last ne 'm') {
-                               print $w "D $last\n" or $im->wfail;
+                               print $io "D $last\n" or $im->wfail;
                        }
-                       print $w "M 100644 :$mark m\n" or $im->wfail;
+                       print $io "M 100644 :$mark m\n" or $im->wfail;
                        $last = 'm';
                        next;
                }
@@ -154,18 +154,18 @@ while (<$rd>) {
                        my $mark = delete $D{$1};
                        defined $mark or die "undeleted path: $1\n";
                        if ($last && $last ne 'd') {
-                               print $w "D $last\n" or $im->wfail;
+                               print $io "D $last\n" or $im->wfail;
                        }
-                       print $w "M 100644 :$mark d\n" or $im->wfail;
+                       print $io "M 100644 :$mark d\n" or $im->wfail;
                        $last = 'd';
                        next;
                }
        }
        last if $_ eq "done\n";
-       print $w $_ or $im->wfail;
+       print $io $_ or $im->wfail;
 }
 close $rd or die "fast-export: \$?=$? \$!=$!\n";
-$r = $w = undef; # v2w->done does the actual close and error checking
+$io = undef;
 $v2w->done;
 if (my $old_mm = $old->mm) {
        $old->cleanup;