lib/PublicInbox/Filter/Vger.pm
lib/PublicInbox/GetlineBody.pm
lib/PublicInbox/Git.pm
+lib/PublicInbox/GitAsyncRd.pm
+lib/PublicInbox/GitAsyncWr.pm
lib/PublicInbox/GitHTTPBackend.pm
lib/PublicInbox/HTTP.pm
lib/PublicInbox/HTTPD.pm
t/git-http-backend.t
t/git.fast-import-data
t/git.t
+t/git_async.t
t/html_index.t
t/httpd-corner.psgi
t/httpd-corner.t
require IO::Handle;
use PublicInbox::Spawn qw(spawn popen_rd);
use Fcntl qw(:seek);
+my $have_async = eval {
+ require PublicInbox::EvCleanup;
+ require PublicInbox::GitAsyncRd;
+};
# Documentation/SubmittingPatches recommends 12 (Linux v4.4)
my $abbrev = `git config core.abbrev` || 12;
$self->{$in} = $in_r;
}
+# legacy synchronous API
sub cat_file_begin {
my ($self, $obj) = @_;
$self->_bidi_pipe(qw(--batch in out pid));
($in, $1, $2, $3);
}
+# legacy synchronous API
sub cat_file_finish {
my ($self, $left) = @_;
my $max = 8192;
fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n");
}
+# legacy synchronous API
sub cat_file {
my ($self, $obj, $ref) = @_;
sub batch_prepare ($) { _bidi_pipe($_[0], qw(--batch in out pid)) }
+# legacy synchronous API
sub check {
my ($self, $obj) = @_;
$self->_bidi_pipe(qw(--batch-check in_c out_c pid_c));
my ($self) = @_;
_destroy($self, qw(in out pid));
_destroy($self, qw(in_c out_c pid_c));
+
+ if ($have_async) {
+ my %h = %$self; # yup, copy ourselves
+ %$self = ();
+ my $ds_closed;
+
+ # schedule closing with Danga::Socket::close:
+ foreach (qw(async async_c)) {
+ my $ds = delete $h{$_} or next;
+ $ds->close;
+ $ds_closed = 1;
+ }
+
+ # can't do waitpid in _destroy() until next tick,
+ # since D::S defers closing until end of current event loop
+ $ds_closed and PublicInbox::EvCleanup::next_tick(sub {
+ _destroy(\%h, qw(in_a out_a pid_a));
+ _destroy(\%h, qw(in_ac out_ac pid_ac));
+ });
+ }
}
sub DESTROY { cleanup(@_) }
+# modern async API
+sub check_async_ds ($$$) {
+ my ($self, $obj, $cb) = @_;
+ ($self->{async_c} ||= do {
+ _bidi_pipe($self, qw(--batch-check in_ac out_ac pid_ac));
+ PublicInbox::GitAsyncRd->new($self->{in_ac}, $self->{out_ac}, 1)
+ })->cat_file_async($obj, $cb);
+}
+
+sub cat_async_ds ($$$) {
+ my ($self, $obj, $cb) = @_;
+ ($self->{async} ||= do {
+ _bidi_pipe($self, qw(--batch in_a out_a pid_a));
+ PublicInbox::GitAsyncRd->new($self->{in_a}, $self->{out_a});
+ })->cat_file_async($obj, $cb);
+}
+
+sub async_info_compat ($) {
+ local $/ = "\n";
+ chomp(my $line = $_[0]->getline);
+ [ split(/ /, $line) ];
+}
+
+sub check_async_compat ($$$) {
+ my ($self, $obj, $cb) = @_;
+ $self->_bidi_pipe(qw(--batch-check in_c out_c pid_c));
+ $self->{out_c}->print($obj."\n") or fail($self, "write error: $!");
+ my $info = async_info_compat($self->{in_c});
+ $cb->($info);
+}
+
+sub cat_async_compat ($$$) {
+ my ($self, $obj, $cb) = @_;
+ $self->_bidi_pipe(qw(--batch in out pid));
+ $self->{out}->print($obj."\n") or fail($self, "write error: $!");
+ my $in = $self->{in};
+ my $info = async_info_compat($in);
+ $cb->($info);
+ return if scalar(@$info) != 3; # missing
+ my $max = 8192;
+ my $left = $info->[2];
+ my ($buf, $r);
+ while ($left > 0) {
+ $r = read($in, $buf, $left > $max ? $max : $left);
+ return $cb->($r) unless $r; # undef or 0
+ $left -= $r;
+ $cb->(\$buf);
+ }
+ $r = read($in, $buf, 1);
+ defined($r) or fail($self, "read failed: $!");
+ fail($self, 'newline missing after blob') if ($r != 1 || $buf ne "\n");
+}
+
+if ($have_async) {
+ *check_async = *check_async_ds;
+ *cat_async = *cat_async_ds;
+} else {
+ *check_async = *check_async_compat;
+ *cat_async = *cat_async_compat;
+}
+
1;
__END__
=pod
--- /dev/null
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# internal class used by PublicInbox::Git + Danga::Socket
+# This parses the output pipe of "git cat-file --batch/--batch-check"
+package PublicInbox::GitAsyncRd;
+use strict;
+use warnings;
+use base qw(Danga::Socket);
+use fields qw(jobq rbuf wr check);
+use PublicInbox::GitAsyncWr;
+our $MAX = 65536; # Import may bump this in the future
+
+sub new {
+ my ($class, $rd, $wr, $check) = @_;
+ my $self = fields::new($class);
+ IO::Handle::blocking($rd, 0);
+ $self->SUPER::new($rd);
+ $self->{jobq} = []; # [ [ $obj, $cb, $state ], ... ]
+ my $buf = '';
+ $self->{rbuf} = \$buf;
+ $self->{wr} = PublicInbox::GitAsyncWr->new($wr);
+ $self->{check} = $check;
+ $self->watch_read(1);
+ $self;
+}
+
+sub cat_file_async {
+ my ($self, $obj, $cb) = @_;
+ # order matters
+ push @{$self->{jobq}}, [ $obj, $cb ];
+ $self->{wr}->write($obj."\n");
+}
+
+# Returns: an array ref of the info line for --batch-check and --batch,
+# which may be: [ $obj, 'missing']
+# Returns undef on error
+sub read_info ($) {
+ my ($self) = @_;
+ my $rbuf = $self->{rbuf};
+ my $rd = $self->{sock};
+
+ while (1) {
+ $$rbuf =~ s/\A([^\n]+)\n//s and return [ split(/ /, $1) ];
+
+ my $r = sysread($rd, $$rbuf, 110, length($$rbuf));
+ next if $r;
+ return $r;
+ }
+}
+
+sub event_read {
+ my ($self) = @_;
+ my $jobq = $self->{jobq};
+ my ($cur, $obj, $cb, $info, $left);
+ my $check = $self->{check};
+ my ($rbuf, $rlen, $need, $buf);
+take_job:
+ $cur = shift @$jobq or die 'BUG: empty job queue in '.__PACKAGE__;
+ ($obj, $cb, $info, $left) = @$cur;
+ if (!$info) {
+ $info = read_info($self);
+ if (!defined $info && ($!{EAGAIN} || $!{EINTR})) {
+ return unshift(@$jobq, $cur)
+ }
+ $cb->($info); # $info may 0 (EOF, or undef, $cb will see $!)
+ return $self->close unless $info;
+ if ($check || (scalar(@$info) != 3)) {
+ # do not monopolize the event loop if we're drained:
+ return if ${$self->{rbuf}} eq '';
+ goto take_job;
+ }
+ $cur->[2] = $info;
+ my $len = $info->[2];
+ $left = \$len;
+ $cur->[3] = $left; # onto reading body...
+ }
+ ref($left) or die 'BUG: $left not ref in '.__PACKAGE__;
+
+ $rbuf = $self->{rbuf};
+ $rlen = length($$rbuf);
+ $need = $$left + 1; # +1 for trailing LF
+ $buf = '';
+
+ if ($rlen == $need) {
+final_hunk:
+ $self->{rbuf} = \$buf;
+ $$left = undef;
+ my $lf = chop $$rbuf;
+ $lf eq "\n" or die "BUG: missing LF (got $lf)";
+ $cb->($rbuf);
+
+ return if $buf eq '';
+ goto take_job;
+ } elsif ($rlen < $need) {
+ my $all = $need - $rlen;
+ my $n = $all > $MAX ? $MAX : $all;
+ my $r = sysread($self->{sock}, $$rbuf, $n, $rlen);
+ if ($r) {
+ goto final_hunk if $r == $all;
+
+ # more to read later...
+ $$left -= $r;
+ $self->{rbuf} = \$buf;
+ $cb->($rbuf);
+
+ # don't monopolize the event loop
+ return unshift(@$jobq, $cur);
+ } elsif (!defined $r) {
+ return unshift(@$jobq, $cur) if $!{EAGAIN} || $!{EINTR};
+ }
+ $cb->($r); # $cb should handle 0 and undef (and see $!)
+ $self->close; # FAIL...
+ } else { # too much data in rbuf
+ $buf = substr($$rbuf, $need, $rlen - $need);
+ $$rbuf = substr($$rbuf, 0, $need);
+ goto final_hunk;
+ }
+}
+
+sub close {
+ my $self = shift;
+ my $jobq = $self->{jobq};
+ $self->{jobq} = [];
+ $_->[1]->(0) for @$jobq;
+ $self->{wr}->close;
+ $self->SUPER::close(@_);
+}
+
+sub event_hup { $_[0]->close }
+sub event_err { $_[0]->close }
+
+1;
--- /dev/null
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# internal class used by PublicInbox::Git + Danga::Socket
+# This writes to the input pipe of "git cat-file --batch/--batch-check"
+package PublicInbox::GitAsyncWr;
+use strict;
+use warnings;
+use base qw(Danga::Socket);
+
+sub new {
+ my ($class, $io) = @_;
+ my $self = fields::new($class);
+ IO::Handle::blocking($io, 0);
+ $self->SUPER::new($io);
+}
+
+# we only care about write + event_write
+
+sub event_hup { $_[0]->close }
+sub event_err { $_[0]->close }
+
+1;
--- /dev/null
+# Copyright (C) 2016 all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use strict;
+use warnings;
+use Test::More;
+$SIG{PIPE} = 'IGNORE';
+foreach my $mod (qw(Danga::Socket)) {
+ eval "require $mod";
+ plan skip_all => "$mod missing for git_async.t" if $@;
+}
+use File::Temp qw/tempdir/;
+use Cwd qw/getcwd/;
+my $tmpdir = tempdir('git_async-XXXXXX', TMPDIR => 1, CLEANUP => 1);
+use_ok 'PublicInbox::Git';
+my $dir = "$tmpdir/git.git";
+{
+ is(system(qw(git init -q --bare), $dir), 0, 'created git directory');
+ my @cmd = ('git', "--git-dir=$dir", 'fast-import', '--quiet');
+ my $fi_data = getcwd().'/t/git.fast-import-data';
+ ok(-r $fi_data, "fast-import data readable (or run test at top level)");
+ my $pid = fork;
+ defined $pid or die "fork failed: $!\n";
+ if ($pid == 0) {
+ open STDIN, '<', $fi_data or die "open $fi_data: $!\n";
+ exec @cmd;
+ die "failed exec: ",join(' ', @cmd),": $!\n";
+ }
+ waitpid $pid, 0;
+ is($?, 0, 'fast-import succeeded');
+}
+
+{
+ my $f = 'HEAD:foo.txt';
+ my @args;
+ my $n = 0;
+ my $git = PublicInbox::Git->new($dir);
+ Danga::Socket->SetPostLoopCallback(sub {
+ my ($fdmap) = @_;
+ foreach (values %$fdmap) {
+ return 1 if ref($_) =~ /::GitAsync/;
+ }
+ 0
+ });
+ $git->check_async($f, sub {
+ $n++;
+ @args = @_;
+ $git = undef;
+ });
+ Danga::Socket->EventLoop;
+ my @exp = PublicInbox::Git->new($dir)->check($f);
+ my $exp = [ \@exp ];
+ is_deeply(\@args, $exp, 'matches regular check');
+ is($n, 1, 'callback only called once');
+ $git = PublicInbox::Git->new($dir);
+ $n = 0;
+ my $max = 100;
+ my $missing = 'm';
+ my $m = 0;
+ for my $i (0..$max) {
+ my $k = "HEAD:m$i";
+ $git->check_async($k, sub {
+ my ($info) = @_;
+ ++$n;
+ ++$m if $info->[1] eq 'missing' && $info->[0] eq $k;
+ });
+ if ($git->{async_c}->{wr}->{write_buf_size}) {
+ diag("async_check capped at $i");
+ $max = $i;
+ last;
+ }
+ }
+ is($m, $n, 'everything expected missing is missing');
+ $git->check_async($f, sub { $git = undef });
+ Danga::Socket->EventLoop;
+
+ $git = PublicInbox::Git->new($dir);
+ my $info;
+ my $str = '';
+ my @missing;
+ $git->cat_async('HEAD:miss', sub {
+ my ($miss) = @_;
+ push @missing, $miss;
+ });
+ $git->cat_async($f, sub {
+ my $res = $_[0];
+ if (ref($res) eq 'ARRAY') {
+ is($info, undef, 'info unset, setting..');
+ $info = $res;
+ } elsif (ref($res) eq 'SCALAR') {
+ $str .= $$res;
+ if (length($str) >= $info->[2]) {
+ is($info->[2], length($str), 'length match');
+ $git = undef
+ }
+ }
+ });
+ Danga::Socket->EventLoop;
+ is_deeply(\@missing, [['HEAD:miss', 'missing']], 'missing cat OK');
+ is($git, undef, 'git undefined');
+ $git = PublicInbox::Git->new($dir);
+ my $sref = $git->cat_file($f);
+ is($str, $$sref, 'matches synchronous version');
+ $git = undef;
+ Danga::Socket->RunTimers;
+}
+
+{
+ my $git = PublicInbox::Git->new($dir);
+ foreach my $s (qw(check_async_compat cat_async_compat)) {
+ my @missing;
+ $git->check_async_compat('HED:miss1ng', sub {
+ my ($miss) = @_;
+ push @missing, $miss;
+ });
+ is_deeply(\@missing, [['HED:miss1ng', 'missing']],
+ "missing $s OK");
+ }
+ my @info;
+ my $str = '';
+ $git->cat_async_compat('HEAD:foo.txt', sub {
+ my $ref = $_[0];
+ my $t = ref $ref;
+ if ($t eq 'ARRAY') {
+ push @info, $ref;
+ } elsif ($t eq 'SCALAR') {
+ $str .= $$ref;
+ } else {
+ fail "fail type: $t";
+ }
+ });
+ is_deeply(\@info, [ [ 'bf4f17855632367a160bef055fc8ba4675d10e6b',
+ 'blob', 18 ]], 'info matches compat');
+ is($str, "-----\nhello\nworld\n", 'data matches compat');
+}
+
+done_testing();
+
+1;