]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
initial git async work
authorEric Wong <e@80x24.org>
Sat, 31 Dec 2016 11:16:47 +0000 (11:16 +0000)
committerEric Wong <e@80x24.org>
Sat, 7 Jan 2017 23:38:29 +0000 (23:38 +0000)
This will allow us to handle network operations while waiting
on "git cat-file" to seek and unpack things.

MANIFEST
lib/PublicInbox/Git.pm
lib/PublicInbox/GitAsyncRd.pm [new file with mode: 0644]
lib/PublicInbox/GitAsyncWr.pm [new file with mode: 0644]
t/git_async.t [new file with mode: 0644]

index 59d44bcf4db51097157395d67850c7aef6c6da49..cc4882a12a4a5c14f9424908a9909496d29edd7a 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -55,6 +55,8 @@ lib/PublicInbox/Filter/Mirror.pm
 lib/PublicInbox/Filter/Vger.pm
 lib/PublicInbox/GetlineBody.pm
 lib/PublicInbox/Git.pm
+lib/PublicInbox/GitAsyncRd.pm
+lib/PublicInbox/GitAsyncWr.pm
 lib/PublicInbox/GitHTTPBackend.pm
 lib/PublicInbox/HTTP.pm
 lib/PublicInbox/HTTPD.pm
@@ -151,6 +153,7 @@ t/git-http-backend.psgi
 t/git-http-backend.t
 t/git.fast-import-data
 t/git.t
+t/git_async.t
 t/html_index.t
 t/httpd-corner.psgi
 t/httpd-corner.t
index 4dfc4099367e6b1f84b77b4ac6c38f0828b43ab1..0b482c7e29052a0b436753120e6b558853fc05ab 100644 (file)
@@ -13,6 +13,10 @@ use POSIX qw(dup2);
 require IO::Handle;
 use PublicInbox::Spawn qw(spawn popen_rd);
 use Fcntl qw(:seek);
+my $have_async = eval {
+       require PublicInbox::EvCleanup;
+       require PublicInbox::GitAsyncRd;
+};
 
 # Documentation/SubmittingPatches recommends 12 (Linux v4.4)
 my $abbrev = `git config core.abbrev` || 12;
@@ -64,6 +68,7 @@ sub _bidi_pipe {
        $self->{$in} = $in_r;
 }
 
+# legacy synchronous API
 sub cat_file_begin {
        my ($self, $obj) = @_;
        $self->_bidi_pipe(qw(--batch in out pid));
@@ -79,6 +84,7 @@ sub cat_file_begin {
        ($in, $1, $2, $3);
 }
 
+# legacy synchronous API
 sub cat_file_finish {
        my ($self, $left) = @_;
        my $max = 8192;
@@ -96,6 +102,7 @@ sub cat_file_finish {
        fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n");
 }
 
+# legacy synchronous API
 sub cat_file {
        my ($self, $obj, $ref) = @_;
 
@@ -131,6 +138,7 @@ sub cat_file {
 
 sub batch_prepare ($) { _bidi_pipe($_[0], qw(--batch in out pid)) }
 
+# legacy synchronous API
 sub check {
        my ($self, $obj) = @_;
        $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c));
@@ -185,10 +193,91 @@ sub cleanup {
        my ($self) = @_;
        _destroy($self, qw(in out pid));
        _destroy($self, qw(in_c out_c pid_c));
+
+       if ($have_async) {
+               my %h = %$self; # yup, copy ourselves
+               %$self = ();
+               my $ds_closed;
+
+               # schedule closing with Danga::Socket::close:
+               foreach (qw(async async_c)) {
+                       my $ds = delete $h{$_} or next;
+                       $ds->close;
+                       $ds_closed = 1;
+               }
+
+               # can't do waitpid in _destroy() until next tick,
+               # since D::S defers closing until end of current event loop
+               $ds_closed and PublicInbox::EvCleanup::next_tick(sub {
+                       _destroy(\%h, qw(in_a out_a pid_a));
+                       _destroy(\%h, qw(in_ac out_ac pid_ac));
+               });
+       }
 }
 
 sub DESTROY { cleanup(@_) }
 
+# modern async API
+sub check_async_ds ($$$) {
+       my ($self, $obj, $cb) = @_;
+       ($self->{async_c} ||= do {
+               _bidi_pipe($self, qw(--batch-check in_ac out_ac pid_ac));
+               PublicInbox::GitAsyncRd->new($self->{in_ac}, $self->{out_ac}, 1)
+       })->cat_file_async($obj, $cb);
+}
+
+sub cat_async_ds ($$$) {
+       my ($self, $obj, $cb) = @_;
+       ($self->{async} ||= do {
+               _bidi_pipe($self, qw(--batch in_a out_a pid_a));
+               PublicInbox::GitAsyncRd->new($self->{in_a}, $self->{out_a});
+       })->cat_file_async($obj, $cb);
+}
+
+sub async_info_compat ($) {
+       local $/ = "\n";
+       chomp(my $line = $_[0]->getline);
+       [ split(/ /, $line) ];
+}
+
+sub check_async_compat ($$$) {
+       my ($self, $obj, $cb) = @_;
+       $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c));
+       $self->{out_c}->print($obj."\n") or fail($self, "write error: $!");
+       my $info = async_info_compat($self->{in_c});
+       $cb->($info);
+}
+
+sub cat_async_compat ($$$) {
+       my ($self, $obj, $cb) = @_;
+       $self->_bidi_pipe(qw(--batch in out pid));
+       $self->{out}->print($obj."\n") or fail($self, "write error: $!");
+       my $in = $self->{in};
+       my $info = async_info_compat($in);
+       $cb->($info);
+       return if scalar(@$info) != 3; # missing
+       my $max = 8192;
+       my $left = $info->[2];
+       my ($buf, $r);
+       while ($left > 0) {
+               $r = read($in, $buf, $left > $max ? $max : $left);
+               return $cb->($r) unless $r; # undef or 0
+               $left -= $r;
+               $cb->(\$buf);
+       }
+       $r = read($in, $buf, 1);
+       defined($r) or fail($self, "read failed: $!");
+       fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n");
+}
+
+if ($have_async) {
+       *check_async = *check_async_ds;
+       *cat_async = *cat_async_ds;
+} else {
+       *check_async = *check_async_compat;
+       *cat_async = *cat_async_compat;
+}
+
 1;
 __END__
 =pod
diff --git a/lib/PublicInbox/GitAsyncRd.pm b/lib/PublicInbox/GitAsyncRd.pm
new file mode 100644 (file)
index 0000000..a56dc39
--- /dev/null
@@ -0,0 +1,133 @@
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# internal class used by PublicInbox::Git + Danga::Socket
+# This parses the output pipe of "git cat-file --batch/--batch-check"
+package PublicInbox::GitAsyncRd;
+use strict;
+use warnings;
+use base qw(Danga::Socket);
+use fields qw(jobq rbuf wr check);
+use PublicInbox::GitAsyncWr;
+our $MAX = 65536; # Import may bump this in the future
+
+sub new {
+       my ($class, $rd, $wr, $check) = @_;
+       my $self = fields::new($class);
+       IO::Handle::blocking($rd, 0);
+       $self->SUPER::new($rd);
+       $self->{jobq} = []; # [ [ $obj, $cb, $state ], ... ]
+       my $buf = '';
+       $self->{rbuf} = \$buf;
+       $self->{wr} = PublicInbox::GitAsyncWr->new($wr);
+       $self->{check} = $check;
+       $self->watch_read(1);
+       $self;
+}
+
+sub cat_file_async {
+       my ($self, $obj, $cb) = @_;
+       # order matters
+       push @{$self->{jobq}}, [ $obj, $cb ];
+       $self->{wr}->write($obj."\n");
+}
+
+# Returns: an array ref of the info line for --batch-check and --batch,
+# which may be: [ $obj, 'missing']
+# Returns undef on error
+sub read_info ($) {
+       my ($self) = @_;
+       my $rbuf = $self->{rbuf};
+       my $rd = $self->{sock};
+
+       while (1) {
+               $$rbuf =~ s/\A([^\n]+)\n//s and return [ split(/ /, $1) ];
+
+               my $r = sysread($rd, $$rbuf, 110, length($$rbuf));
+               next if $r;
+               return $r;
+       }
+}
+
+sub event_read {
+       my ($self) = @_;
+       my $jobq = $self->{jobq};
+       my ($cur, $obj, $cb, $info, $left);
+       my $check = $self->{check};
+       my ($rbuf, $rlen, $need, $buf);
+take_job:
+       $cur = shift @$jobq or die 'BUG: empty job queue in '.__PACKAGE__;
+       ($obj, $cb, $info, $left) = @$cur;
+       if (!$info) {
+               $info = read_info($self);
+               if (!defined $info && ($!{EAGAIN} || $!{EINTR})) {
+                       return unshift(@$jobq, $cur)
+               }
+               $cb->($info); # $info may 0 (EOF, or undef, $cb will see $!)
+               return $self->close unless $info;
+               if ($check || (scalar(@$info) != 3)) {
+                       # do not monopolize the event loop if we're drained:
+                       return if ${$self->{rbuf}} eq '';
+                       goto take_job;
+               }
+               $cur->[2] = $info;
+               my $len = $info->[2];
+               $left = \$len;
+               $cur->[3] = $left; # onto reading body...
+       }
+       ref($left) or die 'BUG: $left not ref in '.__PACKAGE__;
+
+       $rbuf = $self->{rbuf};
+       $rlen = length($$rbuf);
+       $need = $$left + 1; # +1 for trailing LF
+       $buf = '';
+
+       if ($rlen == $need) {
+final_hunk:
+               $self->{rbuf} = \$buf;
+               $$left = undef;
+               my $lf = chop $$rbuf;
+               $lf eq "\n" or die "BUG: missing LF (got $lf)";
+               $cb->($rbuf);
+
+               return if $buf eq '';
+               goto take_job;
+       } elsif ($rlen < $need) {
+               my $all = $need - $rlen;
+               my $n = $all > $MAX ? $MAX : $all;
+               my $r = sysread($self->{sock}, $$rbuf, $n, $rlen);
+               if ($r) {
+                       goto final_hunk if $r == $all;
+
+                       # more to read later...
+                       $$left -= $r;
+                       $self->{rbuf} = \$buf;
+                       $cb->($rbuf);
+
+                       # don't monopolize the event loop
+                       return unshift(@$jobq, $cur);
+               } elsif (!defined $r) {
+                       return unshift(@$jobq, $cur) if $!{EAGAIN} || $!{EINTR};
+               }
+               $cb->($r); # $cb should handle 0 and undef (and see $!)
+               $self->close; # FAIL...
+       } else { # too much data in rbuf
+               $buf = substr($$rbuf, $need, $rlen - $need);
+               $$rbuf = substr($$rbuf, 0, $need);
+               goto final_hunk;
+       }
+}
+
+sub close {
+       my $self = shift;
+       my $jobq = $self->{jobq};
+       $self->{jobq} = [];
+       $_->[1]->(0) for @$jobq;
+       $self->{wr}->close;
+       $self->SUPER::close(@_);
+}
+
+sub event_hup { $_[0]->close }
+sub event_err { $_[0]->close }
+
+1;
diff --git a/lib/PublicInbox/GitAsyncWr.pm b/lib/PublicInbox/GitAsyncWr.pm
new file mode 100644 (file)
index 0000000..c22f2fc
--- /dev/null
@@ -0,0 +1,23 @@
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# internal class used by PublicInbox::Git + Danga::Socket
+# This writes to the input pipe of "git cat-file --batch/--batch-check"
+package PublicInbox::GitAsyncWr;
+use strict;
+use warnings;
+use base qw(Danga::Socket);
+
+sub new {
+       my ($class, $io) = @_;
+       my $self = fields::new($class);
+       IO::Handle::blocking($io, 0);
+       $self->SUPER::new($io);
+}
+
+# we only care about write + event_write
+
+sub event_hup { $_[0]->close }
+sub event_err { $_[0]->close }
+
+1;
diff --git a/t/git_async.t b/t/git_async.t
new file mode 100644 (file)
index 0000000..c20d48e
--- /dev/null
@@ -0,0 +1,138 @@
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+$SIG{PIPE} = 'IGNORE';
+foreach my $mod (qw(Danga::Socket)) {
+       eval "require $mod";
+       plan skip_all => "$mod missing for git_async.t" if $@;
+}
+use File::Temp qw/tempdir/;
+use Cwd qw/getcwd/;
+my $tmpdir = tempdir('git_async-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+use_ok 'PublicInbox::Git';
+my $dir = "$tmpdir/git.git";
+{
+       is(system(qw(git init -q --bare), $dir), 0, 'created git directory');
+       my @cmd = ('git', "--git-dir=$dir", 'fast-import', '--quiet');
+       my $fi_data = getcwd().'/t/git.fast-import-data';
+       ok(-r $fi_data, "fast-import data readable (or run test at top level)");
+       my $pid = fork;
+       defined $pid or die "fork failed: $!\n";
+       if ($pid == 0) {
+               open STDIN, '<', $fi_data or die "open $fi_data: $!\n";
+               exec @cmd;
+               die "failed exec: ",join(' ', @cmd),": $!\n";
+       }
+       waitpid $pid, 0;
+       is($?, 0, 'fast-import succeeded');
+}
+
+{
+       my $f = 'HEAD:foo.txt';
+       my @args;
+       my $n = 0;
+       my $git = PublicInbox::Git->new($dir);
+       Danga::Socket->SetPostLoopCallback(sub {
+               my ($fdmap) = @_;
+               foreach (values %$fdmap) {
+                       return 1 if ref($_) =~ /::GitAsync/;
+               }
+               0
+       });
+       $git->check_async($f, sub {
+               $n++;
+               @args = @_;
+               $git = undef;
+       });
+       Danga::Socket->EventLoop;
+       my @exp = PublicInbox::Git->new($dir)->check($f);
+       my $exp = [ \@exp ];
+       is_deeply(\@args, $exp, 'matches regular check');
+       is($n, 1, 'callback only called once');
+       $git = PublicInbox::Git->new($dir);
+       $n = 0;
+       my $max = 100;
+       my $missing = 'm';
+       my $m = 0;
+       for my $i (0..$max) {
+               my $k = "HEAD:m$i";
+               $git->check_async($k, sub {
+                       my ($info) = @_;
+                       ++$n;
+                       ++$m if $info->[1] eq 'missing' && $info->[0] eq $k;
+               });
+               if ($git->{async_c}->{wr}->{write_buf_size}) {
+                       diag("async_check capped at $i");
+                       $max = $i;
+                       last;
+               }
+       }
+       is($m, $n, 'everything expected missing is missing');
+       $git->check_async($f, sub { $git = undef });
+       Danga::Socket->EventLoop;
+
+       $git = PublicInbox::Git->new($dir);
+       my $info;
+       my $str = '';
+       my @missing;
+       $git->cat_async('HEAD:miss', sub {
+               my ($miss) = @_;
+               push @missing, $miss;
+       });
+       $git->cat_async($f, sub {
+               my $res = $_[0];
+               if (ref($res) eq 'ARRAY') {
+                       is($info, undef, 'info unset, setting..');
+                       $info = $res;
+               } elsif (ref($res) eq 'SCALAR') {
+                       $str .= $$res;
+                       if (length($str) >= $info->[2]) {
+                               is($info->[2], length($str), 'length match');
+                               $git = undef
+                       }
+               }
+       });
+       Danga::Socket->EventLoop;
+       is_deeply(\@missing, [['HEAD:miss', 'missing']], 'missing cat OK');
+       is($git, undef, 'git undefined');
+       $git = PublicInbox::Git->new($dir);
+       my $sref = $git->cat_file($f);
+       is($str, $$sref, 'matches synchronous version');
+       $git = undef;
+       Danga::Socket->RunTimers;
+}
+
+{
+       my $git = PublicInbox::Git->new($dir);
+       foreach my $s (qw(check_async_compat cat_async_compat)) {
+               my @missing;
+               $git->check_async_compat('HED:miss1ng', sub {
+                       my ($miss) = @_;
+                       push @missing, $miss;
+               });
+               is_deeply(\@missing, [['HED:miss1ng', 'missing']],
+                       "missing $s OK");
+       }
+       my @info;
+       my $str = '';
+       $git->cat_async_compat('HEAD:foo.txt', sub {
+               my $ref = $_[0];
+               my $t = ref $ref;
+               if ($t eq 'ARRAY') {
+                       push @info, $ref;
+               } elsif ($t eq 'SCALAR') {
+                       $str .= $$ref;
+               } else {
+                       fail "fail type: $t";
+               }
+       });
+       is_deeply(\@info, [ [ 'bf4f17855632367a160bef055fc8ba4675d10e6b',
+                              'blob', 18 ]], 'info matches compat');
+       is($str, "-----\nhello\nworld\n", 'data matches compat');
+}
+
+done_testing();
+
+1;