--- /dev/null
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# Waits for initial `git log -p' output for PublicInbox::CodeSearchIdx.
+# The initial output from `git log -p' can take a while to generate,
+# CodeSearchIdx can process prune work while it's happening. Once
+# `git log -p' starts generating output, it should be able to keep
+# up with Xapian indexing, so we still rely on blocking reads to simplify
+# cidx_read_log_p
+package PublicInbox::CidxLogP;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLONESHOT);
+
+sub new {
+ my ($cls, $rd, $cidx, $git, $roots) = @_;
+ my $self = bless { cidx => $cidx, git => $git, roots => $roots }, $cls;
+ fcntl($rd, 1031, 1048576) if $^O eq 'linux'; # fatter pipes
+ $self->SUPER::new($rd, EPOLLIN|EPOLLONESHOT);
+}
+
+sub event_step {
+ my ($self) = @_;
+ my $rd = $self->{sock} // return warn('BUG?: no {sock}');
+ $self->close; # PublicInbox::DS::close, deferred, so $sock is usable
+ delete($self->{cidx})->cidx_read_log_p($self, $rd);
+}
+
+1;
use PublicInbox::Config qw(glob2re);
use PublicInbox::Spawn qw(spawn popen_rd);
use PublicInbox::OnDestroy;
+use PublicInbox::CidxLogP;
use Socket qw(MSG_EOR);
use Carp ();
our (
$len;
}
-# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_reap_log { # awaitpid cb
+ my ($pid, $self, $op_p) = @_;
+ if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
+ ($? & 127) == POSIX::SIGPIPE))) {
+ send($op_p, "shard_done $self->{shard}", MSG_EOR);
+ } else {
+ warn "E: git @LOG_STDIN: \$?=$?\n";
+ $self->{xdb}->cancel_transaction;
+ }
+}
+
sub shard_index { # via wq_io_do in IDX_SHARDS
- my ($self, $git, $n, $roots) = @_;
- local $self->{current_info} = "$git->{git_dir} [$n]";
- local $self->{roots} = $roots;
+ my ($self, $git, $roots) = @_;
+
my $in = delete($self->{0}) // die 'BUG: no {0} input';
my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
+ my ($rd, $pid) = $git->popen(@LOG_STDIN, undef, { 0 => $in });
+ close $in or die "close: $!";
+ awaitpid($pid, \&cidx_reap_log, $self, $op_p);
+ PublicInbox::CidxLogP->new($rd, $self, $git, $roots);
+ # CidxLogP->event_step will call cidx_read_log_p once there's input
+}
+
+# sharded reader for `git log --pretty=format: --stdin'
+sub cidx_read_log_p {
+ my ($self, $log_p, $rd) = @_;
+ my $git = delete $log_p->{git} // die 'BUG: no {git}';
+ local $self->{current_info} = "$git->{git_dir} [$self->{shard}]";
+ local $self->{roots} = delete $log_p->{roots} // die 'BUG: no {roots}';
+
local $MAX_SIZE = $self->{-opt}->{max_size};
# local-ized in parent before fork
$TXN_BYTES = $BATCH_BYTES;
local $self->{git} = $git; # for patchid
return if $DO_QUIT;
- my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
- close $in or die "close: $!";
my $nr = 0;
# a patch may have \0, see c4201214cbf10636e2c1ab9131573f735b42c8d4
my $len;
my $cmt = {};
local $/ = $FS;
- my $buf = <$rd> // return close($rd); # leading $FS
+ my $buf = <$rd> // return; # leading $FS
$buf eq $FS or die "BUG: not LF-NUL: $buf\n";
$self->begin_txn_lazy;
while (!$DO_QUIT && defined($buf = <$rd>)) {
@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
}
if (($TXN_BYTES -= $len) <= 0) {
- cidx_ckpoint($self, "[$n] $nr");
+ cidx_ckpoint($self, "[$self->{shard}] $nr");
$TXN_BYTES -= $len; # len may be huge, >TXN_BYTES;
}
update_commit($self, $cmt);
++$nr;
- cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0;
+ cidx_ckpoint($self, "[$self->{shard}] $nr") if $TXN_BYTES <= 0;
$/ = $FS;
}
- close($rd);
- if (!$? || ($DO_QUIT && (($? & 127) == $DO_QUIT ||
- ($? & 127) == POSIX::SIGPIPE))) {
- send($op_p, "shard_done $n", MSG_EOR);
- } else {
- warn "E: git @LOG_STDIN: \$?=$?\n";
- $self->{xdb}->cancel_transaction;
- }
+ # return and wait for cidx_reap_log
}
sub shard_done { # called via PktOp on shard_index completion
$c->{ops}->{shard_done} = [ $self ];
$IDX_SHARDS[$n]->wq_io_do('shard_index',
[ $shard_in[$n], $p->{op_p} ],
- $git, $n, \@roots);
+ $git, \@roots);
$consumers->{$n} = $c;
}
@shard_in = ();