$buf;
}
+sub gcf_drain { # awaitpid cb
+ my ($pid, $inflight, $bc) = @_;
+ while (@$inflight) {
+ my ($req, $cb, $arg) = splice(@$inflight, 0, 3);
+ $req = $$req if ref($req);
+ $bc and $req =~ s/\A(?:contents|info) //;
+ $req =~ s/ .*//; # drop git_dir for Gcf2Client
+ eval { $cb->(undef, $req, undef, undef, $arg) };
+ warn "E: (in abort) $req: $@" if $@;
+ }
+}
+
sub _sock_cmd {
my ($self, $batch, $err_c) = @_;
$self->{sock} and Carp::confess('BUG: {sock} exists');
$self->{err_c} = $opt->{2} = tmpfile($id, undef, 1) or
$self->fail("tmpfile($id): $!");
}
+ my $inflight = []; # TODO consider moving this into the IO object
my $pid = spawn(\@cmd, undef, $opt);
- $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid);
+ $self->{sock} = PublicInbox::IO::attach_pid($s1, $pid,
+ \&gcf_drain, $inflight, $self->{-bc});
+ $self->{inflight} = $inflight;
}
sub cat_async_retry ($$) {
# {inflight} may be non-existent, but if it isn't we delete it
# here to prevent cleanup() from waiting:
- delete $self->{inflight};
- cleanup($self);
+ my ($sock, $epwatch) = delete @$self{qw(sock epwatch inflight)};
+ $self->SUPER::close if $epwatch;
my $new_inflight = batch_prepare($self);
while (my ($oid, $cb, $arg) = splice(@$old_inflight, 0, 3)) {
$oid = \$oid if !@$new_inflight; # to indicate oid retried
push @$new_inflight, $oid, $cb, $arg;
}
+ $sock->close if $sock; # only safe once old_inflight is empty
cat_async_step($self, $new_inflight); # take one step
}
+sub gcf_inflight ($) {
+ my ($self) = @_;
+ if ($self->{sock}) {
+ return $self->{inflight} if $self->{sock}->owner_pid == $$;
+ delete @$self{qw(sock inflight)};
+ } else {
+ $self->close;
+ }
+ undef;
+}
+
# returns true if prefetch is successful
sub async_prefetch {
my ($self, $oid, $cb, $arg) = @_;
- my $inflight = $self->{inflight} or return;
+ my $inflight = gcf_inflight($self) or return;
return if @$inflight;
substr($oid, 0, 0) = 'contents ' if $self->{-bc};
write_all($self, "$oid\n", \&cat_async_step, $inflight);
sub cat_async_step ($$) {
my ($self, $inflight) = @_;
- die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+ croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
my ($req, $cb, $arg) = @$inflight[0, 1, 2];
my ($bref, $oid, $type, $size);
my $head = $self->{sock}->my_readline;
sub cat_async_wait ($) {
my ($self) = @_;
- return $self->close if !$self->{sock};
- my $inflight = $self->{inflight} or return;
- while (scalar(@$inflight)) {
- cat_async_step($self, $inflight);
- }
+ my $inflight = gcf_inflight($self) or return;
+ cat_async_step($self, $inflight) while (scalar(@$inflight));
}
sub batch_prepare ($) {
} else {
_sock_cmd($self, 'batch');
}
- $self->{inflight} = [];
}
sub _cat_file_cb {
sub check_async_step ($$) {
my ($ck, $inflight) = @_;
- die 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
+ croak 'BUG: inflight empty or odd' if scalar(@$inflight) < 3;
my ($req, $cb, $arg) = @$inflight[0, 1, 2];
chomp(my $line = $ck->{sock}->my_readline);
my ($hex, $type, $size) = split(/ /, $line);
my ($self) = @_;
return cat_async_wait($self) if $self->{-bc};
my $ck = $self->{ck} or return;
- return $ck->close if !$ck->{sock};
- my $inflight = $ck->{inflight} or return;
+ my $inflight = gcf_inflight($ck) or return;
check_async_step($ck, $inflight) while (scalar(@$inflight));
}
} else {
_sock_cmd($self = ck($self), 'batch-check', 1);
}
- $self->{inflight} = [];
}
sub write_all {
my $inflight;
if ($self->{-bc}) { # likely as time goes on
batch_command:
- $inflight = $self->{inflight} // cat_async_begin($self);
+ $inflight = gcf_inflight($self) // cat_async_begin($self);
substr($oid, 0, 0) = 'info ';
write_all($self, "$oid\n", \&cat_async_step, $inflight);
} else { # accounts for git upgrades while we're running:
my $ck = $self->{ck}; # undef OK, maybe set in check_async_begin
- $inflight = $ck->{inflight} // check_async_begin($self);
+ $inflight = ($ck ? gcf_inflight($ck) : undef)
+ // check_async_begin($self);
goto batch_command if $self->{-bc};
write_all($self->{ck}, "$oid\n", \&check_async_step, $inflight);
}
}
sub _active ($) {
- scalar(@{$_[0]->{inflight} // []}) ||
- ($_[0]->{ck} && scalar(@{$_[0]->{ck}->{inflight} // []}))
+ scalar(@{gcf_inflight($_[0]) // []}) ||
+ ($_[0]->{ck} && scalar(@{gcf_inflight($_[0]->{ck}) // []}))
}
# check_async and cat_async may trigger the other, so ensure they're
sub cat_async_begin {
my ($self) = @_;
cleanup($self) if $self->alternates_changed;
- die 'BUG: already in async' if $self->{inflight};
+ die 'BUG: already in async' if gcf_inflight($self);
batch_prepare($self);
}
sub cat_async ($$$;$) {
my ($self, $oid, $cb, $arg) = @_;
- my $inflight = $self->{inflight} // cat_async_begin($self);
+ my $inflight = gcf_inflight($self) // cat_async_begin($self);
substr($oid, 0, 0) = 'contents ' if $self->{-bc};
write_all($self, $oid."\n", \&cat_async_step, $inflight);
push(@$inflight, $oid, $cb, $arg);
sub event_step {
my ($self) = @_;
- $self->close if !$self->{sock}; # process died while requeued
- my $inflight = $self->{inflight};
+ my $inflight = gcf_inflight($self);
if ($inflight && @$inflight) {
$self->cat_async_step($inflight);
return $self->close unless $self->{sock};
sub close {
my ($self) = @_;
- if (my $q = $self->{inflight}) { # abort inflight requests
- while (@$q) {
- my ($req, $cb, $arg) = splice(@$q, 0, 3);
- $req = $$req if ref($req);
- $self->{-bc} and $req =~ s/\A(?:contents|info) //;
- $req =~ s/ .*//; # drop git_dir for Gcf2Client
- eval { $cb->(undef, $req, undef, undef, $arg) };
- warn "E: (in abort) $req: $@" if $@;
- }
- }
+ my $sock = $self->{sock};
delete @$self{qw(-bc err_c inflight)};
delete($self->{epwatch}) ? $self->SUPER::close : delete($self->{sock});
+ $sock->close if $sock; # calls gcf_drain via awaitpid
}
package PublicInbox::GitCheck; # only for git <2.36