$MY_SIG, # like %SIG
$SIGSET,
$TXN_BYTES, # number of bytes in current shard transaction
+ $BATCH_BYTES,
$DO_QUIT, # signal number
- @RDONLY_SHARDS, # Xapian::Database
+ @RDONLY_XDB, # Xapian::Database
@IDX_SHARDS, # clones of self
$MAX_SIZE,
$TMP_GIT, # PublicInbox::Git object for --prune
$REINDEX, # PublicInbox::SharedKV
+ @GIT_DIR_GONE, # [ git_dir1, git_dir2 ]
+ %TO_PRUNE, # (docid => docid) mapping (hash in case of retry_reopen)
+ $PRUNE_CUR, # per-shard document ID
+ $PRUNE_MAX, # per-shard document ID to stop at
+ $PRUNE_OP_P, # prune_done() notification socket
+ $PRUNE_NR, # total number pruned
+ @PRUNE_DONE, # marks off prune completions
+ $NCHANGE, # current number of changes
+ %ACTIVE_GIT_DIR, # GIT_DIR => undef mapping for prune
);
# stop walking history if we see >$SEEN_MAX existing commits, this assumes
my $xdb = $self->{xdb};
for (@{$repo->{to_delete}}) { $xdb->delete_document($_) } # XXX needed?
if (defined $repo->{docid}) {
- my $doc = $xdb->get_document($repo->{docid}) //
+ my $doc = $self->get_doc($repo->{docid}) //
die "$repo->{git_dir} doc #$repo->{docid} gone";
add_val($doc, PublicInbox::CodeSearch::CT, $repo->{ct});
my %new = map { $_ => undef } @{$repo->{roots}};
}
}
-sub cidx_ckpoint ($$) {
+sub cidx_ckpoint ($;$) {
my ($self, $msg) = @_;
- progress($self, $msg);
+ progress($self, $msg) if defined($msg);
+ $TXN_BYTES = $BATCH_BYTES; # reset
+ if (my @to_prune = values(%TO_PRUNE)) {
+ %TO_PRUNE = ();
+ $PRUNE_NR += scalar(@to_prune);
+ progress($self,
+ "prune [$self->{shard}] $PRUNE_NR ($PRUNE_CUR/$PRUNE_MAX)");
+ $self->begin_txn_lazy;
+ $self->{xdb}->delete_document($_) for @to_prune;
+ }
return if $PublicInbox::Search::X{CLOEXEC_UNSET};
- $self->{xdb}->commit_transaction;
- $self->{xdb}->begin_transaction;
+ $self->commit_txn_lazy;
+ $self->begin_txn_lazy;
}
sub truncate_cmt ($$) {
}
# sharded reader for `git log --pretty=format: --stdin'
-sub shard_index { # via wq_io_do
+sub shard_index { # via wq_io_do in IDX_SHARDS
my ($self, $git, $n, $roots) = @_;
local $self->{current_info} = "$git->{git_dir} [$n]";
local $self->{roots} = $roots;
my $in = delete($self->{0}) // die 'BUG: no {0} input';
my $op_p = delete($self->{1}) // die 'BUG: no {1} op_p';
- my $batch_bytes = $self->{-opt}->{batch_size} //
- $PublicInbox::SearchIdx::BATCH_BYTES;
local $MAX_SIZE = $self->{-opt}->{max_size};
# local-ized in parent before fork
- $TXN_BYTES = $batch_bytes;
+ $TXN_BYTES = $BATCH_BYTES;
local $self->{git} = $git; # for patchid
return if $DO_QUIT;
my $rd = $git->popen(@LOG_STDIN, undef, { 0 => $in });
} else {
@$cmt{@FMT} = split(/\n/, $buf, scalar(@FMT));
}
- $TXN_BYTES -= $len;
- if ($TXN_BYTES <= 0) {
+ if (($TXN_BYTES -= $len) <= 0) {
cidx_ckpoint($self, "[$n] $nr");
- $TXN_BYTES = $batch_bytes - $len;
+ $TXN_BYTES -= $len; # len may be huge, >TXN_BYTES;
}
update_commit($self, $cmt);
++$nr;
- if ($TXN_BYTES <= 0) {
- cidx_ckpoint($self, "[$n] $nr");
- $TXN_BYTES = $batch_bytes;
- }
+ cidx_ckpoint($self, "[$n] $nr") if $TXN_BYTES <= 0;
$/ = $FS;
}
close($rd);
$self->{-shard_ok}->{$n} = 1 if defined($self->{-shard_ok});
}
+sub prune_done { # called via PktOp->event_step completion
+ my ($shard) = @_;
+ $PRUNE_DONE[$shard->{shard}] = 1;
+}
+
+sub prune_busy {
+ return if $DO_QUIT;
+ grep(defined, @PRUNE_DONE) != @IDX_SHARDS;
+}
+
+sub await_prune () {
+ local @PublicInbox::DS::post_loop_do = (\&prune_busy);
+ PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if prune_busy();
+}
+
sub seen ($$) {
my ($xdb, $q) = @_; # $q = "Q$COMMIT_HASH"
for (1..100) {
my $shard = bless { %$self, shard => $n }, ref($self);
$repo->{shard_n} = $n;
delete @$shard{qw(lockfh lock_path)};
- local $shard->{xdb} = $RDONLY_SHARDS[$n] // die "BUG: shard[$n] undef";
+ local $shard->{xdb} = $RDONLY_XDB[$n] // die "BUG: shard[$n] undef";
$shard->retry_reopen(\&check_existing, $self, $git);
}
my ($shard, $self, $git) = @_;
my @docids = docids_by_postlist($shard, 'P'.$git->{git_dir});
my $docid = shift(@docids) // return get_roots($self, $git);
- my $doc = $shard->{xdb}->get_document($docid) //
+ my $doc = $shard->get_doc($docid) //
die "BUG: no #$docid ($git->{git_dir})";
my $old_fp = $REINDEX ? "\0invalid" : $doc->get_data;
if ($old_fp eq $git->{-repo}->{fp}) { # no change
sysseek($refs, 0, SEEK_SET) or die "seek: $!"; # for rev-list --stdin
my $rfh = $git->popen(qw(rev-list --stdin), undef, { 0 => $refs });
close $refs or die "close: $!";
- my ($seen, $nchange) = (0, 0);
+ my $seen = 0;
my @shard_in = map {
$_->reopen;
open my $fh, '+>', undef or die "open: $!";
$fh;
- } @RDONLY_SHARDS;
+ } @RDONLY_XDB;
while (defined(my $cmt = <$rfh>)) {
chomp $cmt;
- my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_SHARDS);
+ my $n = hex(substr($cmt, 0, 8)) % scalar(@RDONLY_XDB);
if ($REINDEX && $REINDEX->set_maybe(pack('H*', $cmt), '')) {
say { $shard_in[$n] } $cmt or die "say: $!";
- ++$nchange;
- } elsif (seen($RDONLY_SHARDS[$n], 'Q'.$cmt)) {
+ ++$NCHANGE;
+ } elsif (seen($RDONLY_XDB[$n], 'Q'.$cmt)) {
last if ++$seen > $SEEN_MAX;
} else {
say { $shard_in[$n] } $cmt or die "say: $!";
- ++$nchange;
+ ++$NCHANGE;
$seen = 0;
}
if ($DO_QUIT) {
close($rfh);
return () if $DO_QUIT;
if (!$? || (($? & 127) == POSIX::SIGPIPE && $seen > $SEEN_MAX)) {
- $self->{nchange} += $nchange;
- progress($self, "$git->{git_dir}: $nchange commits");
+ progress($self, "$git->{git_dir}: $NCHANGE commits");
for my $fh (@shard_in) {
$fh->flush or die "flush: $!";
sysseek($fh, 0, SEEK_SET) or die "seek: $!";
sub load_existing ($) { # for -u/--update
my ($self) = @_;
- my $dirs = $self->{git_dirs} // [];
+ my $dirs = $self->{git_dirs} //= [];
if ($self->{-opt}->{update} || $self->{-opt}->{prune}) {
local $self->{xdb};
$self->xdb or
die "E: $self->{cidx_dir} non-existent for --update\n";
- my @missing;
my @cur = grep {
if (-e $_) {
1;
} else {
- push @missing, $_;
+ push @GIT_DIR_GONE, $_;
undef;
}
} $self->all_terms('P');
- @missing = () if $self->{-opt}->{prune};
- @missing and warn "W: the following repos no longer exist:\n",
- (map { "W:\t$_\n" } @missing),
+ if (@GIT_DIR_GONE && !$self->{-opt}->{prune}) {
+ warn "W: the following repos no longer exist:\n",
+ (map { "W:\t$_\n" } @GIT_DIR_GONE),
"W: use --prune to remove them from ",
$self->{cidx_dir}, "\n";
+ }
push @$dirs, @cur;
}
my %uniq; # List::Util::uniq requires Perl 5.26+
}
$self->lock_acquire;
my @shards;
- local $TXN_BYTES;
for my $n (0..($self->{nshard} - 1)) {
my $shard = bless { %$self, shard => $n }, ref($self);
delete @$shard{qw(lockfh lock_path)};
$shard->idx_acquire;
$shard->idx_release;
- $shard->wq_workers_start("shard[$n]", 1, $SIGSET, {
+ $shard->wq_workers_start("cidx shard[$n]", 1, $SIGSET, {
siblings => \@shards, # for ipc_atfork_child
}, \&shard_done_wait, $self);
push @shards, $shard;
sub prune_cb { # git->check_async callback
my ($hex, $type, undef, $self_id) = @_;
- return if $type eq 'commit';
my ($self, $id) = @$self_id;
+ return if $type eq 'commit';
+ progress($self, "$hex $type #$id") if ($self->{-opt}->{verbose}//0) > 1;
my $len = $self->{xdb}->get_doclength($id);
- progress($self, "$hex $type (doclength=$len)");
- ++$self->{pruned};
- $self->{xdb}->delete_document($id);
+ $TO_PRUNE{$id} = $id;
- # all math around batch_bytes calculation is pretty fuzzy,
+ # all math around TXN_BYTES calculation is pretty fuzzy,
# but need a way to regularly flush output to avoid OOM,
# so assume the average term + position overhead is the
# answer to everything: 42
- return if ($self->{batch_bytes} -= ($len * 42)) > 0;
- cidx_ckpoint($self, "[$self->{shard}] $self->{pruned}");
- $self->{batch_bytes} = $self->{-opt}->{batch_size} //
- $PublicInbox::SearchIdx::BATCH_BYTES;
-}
-
-sub shard_prune { # via wq_io_do
- my ($self, $n, $git_dir) = @_;
- my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
- my $git = PublicInbox::Git->new($git_dir); # TMP_GIT copy
- $self->begin_txn_lazy;
- my $xdb = $self->{xdb};
- my $cur = $xdb->postlist_begin('Tc');
- my $end = $xdb->postlist_end('Tc');
- my ($id, @cmt, $oid);
- local $self->{batch_bytes} = $self->{-opt}->{batch_size} //
- $PublicInbox::SearchIdx::BATCH_BYTES;
- local $self->{pruned} = 0;
- for (; $cur != $end && !$DO_QUIT; $cur++) {
- @cmt = xap_terms('Q', $xdb, $id = $cur->get_docid);
- scalar(@cmt) == 1 or
- warn "BUG? shard[$n] #$id has multiple commits: @cmt";
- for $oid (@cmt) {
- $git->check_async($oid, \&prune_cb, [ $self, $id ]);
- }
+ cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0;
+}
+
+sub prune_git_dir ($$$) {
+ my ($self, $id, $doc) = @_;
+ my @P = xap_terms('P', $doc);
+ scalar(@P) == 1 or warn
+"BUG? shard[$self->{shard}] #$id has zero or multiple paths: @P";
+ for my $P (@P) {
+ next if exists($ACTIVE_GIT_DIR{$P}) && -d $P;
+ $TO_PRUNE{$id} = $id;
+ progress($self, "$P gone #$id");
+ my $len = $self->{xdb}->get_doclength($id);
+ cidx_ckpoint($self) if ($TXN_BYTES -= ($len * 42)) <= 0;
}
- $git->async_wait_all;
- for my $d ($self->all_terms('P')) { # GIT_DIR paths
- last if $DO_QUIT;
- next if -d $d;
- for $id (docids_by_postlist($self, 'P'.$d)) {
- progress($self, "$d gone #$id");
- $xdb->delete_document($id);
- }
- }
- $self->commit_txn_lazy;
- $self->{pruned} and
- progress($self, "[$n] pruned $self->{pruned} commits");
- send($op_p, "shard_done $n", MSG_EOR);
}
-sub do_prune ($) {
+sub event_step { # may be requeued via DS
my ($self) = @_;
- my $consumers = {};
- my $git_dir = $TMP_GIT->{git_dir};
- my $n = 0;
- local $self->{-shard_ok} = {};
- for my $s (@IDX_SHARDS) {
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $self ];
- $s->wq_io_do('shard_prune', [ $p->{op_p} ], $n, $git_dir);
- $consumers->{$n++} = $c;
+ my $PRUNE_BATCH = 1000;
+ $TXN_BYTES = $BATCH_BYTES;
+ for (; --$PRUNE_BATCH && !$DO_QUIT && $PRUNE_CUR <= $PRUNE_MAX;
+ $PRUNE_CUR++) {
+ my $doc = $self->get_doc($PRUNE_CUR) // next;
+ my @cmt = xap_terms('Q', $doc);
+ if (scalar(@cmt) == 0) {
+ prune_git_dir($self, $PRUNE_CUR, $doc);
+ } else {
+ scalar(@cmt) == 1 or warn
+"BUG? shard[$self->{shard}] #$PRUNE_CUR has multiple commits: @cmt";
+ for my $o (@cmt) {
+ $TMP_GIT->check_async($o, \&prune_cb,
+ [$self, $PRUNE_CUR])
+ }
+ }
}
- wait_consumers($self, $TMP_GIT, $consumers);
+ $TMP_GIT->async_wait_all;
+ cidx_ckpoint($self);
+ return PublicInbox::DS::requeue($self) if $PRUNE_CUR <= $PRUNE_MAX;
+ send($PRUNE_OP_P, 'prune_done', MSG_EOR);
+ $TMP_GIT->cleanup;
+ $TMP_GIT = $PRUNE_OP_P = $PRUNE_CUR = $PRUNE_MAX = undef;
+ %ACTIVE_GIT_DIR = ();
+}
+
+sub prune_start { # via wq_io_do in IDX_SHARDS
+ my ($self, $git_dir, @active_git_dir) = @_;
+ $PRUNE_CUR = 1;
+ $PRUNE_OP_P = delete $self->{0} // die 'BUG: no {0} op_p';
+ %ACTIVE_GIT_DIR = map { $_ => undef } @active_git_dir;
+ $TMP_GIT = PublicInbox::Git->new($git_dir); # TMP_GIT copy
+ $self->begin_txn_lazy;
+ $PRUNE_MAX = $self->{xdb}->get_lastdocid // 1;
+ event_step($self);
}
sub shards_active { # post_loop_do
return if $DO_QUIT;
- scalar(grep { $_->{-cidx_quit} } @IDX_SHARDS);
+ scalar(grep { $_->{-cidx_quit} } (@IDX_SHARDS));
}
# signal handlers
-sub kill_shards { $_->wq_kill(@_) for @IDX_SHARDS }
+sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
sub parent_quit {
$DO_QUIT = POSIX->can("SIG$_[0]")->();
sub init_tmp_git_dir ($) {
my ($self) = @_;
- return unless $self->{-opt}->{prune};
require File::Temp;
require PublicInbox::Import;
my $tmp = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
undef;
}
+sub start_prune ($) {
+ my ($self) = @_;
+ init_tmp_git_dir($self);
+ my @active_git_dir = (@{$self->{git_dirs}}, @GIT_DIR_GONE);
+ for my $s (@IDX_SHARDS) {
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{prune_done} = [ $s ];
+ $s->wq_io_do('prune_start', [ $p->{op_p} ],
+ $TMP_GIT->{git_dir}, @active_git_dir)
+ }
+}
+
sub cidx_run { # main entry point
my ($self) = @_;
my $restore_umask = prep_umask($self);
my $restore = PublicInbox::OnDestroy->new($$,
\&PublicInbox::DS::sig_setmask, $SIGSET);
local $LIVE = {};
- local ($DO_QUIT, $TMP_GIT, $REINDEX);
+ local ($DO_QUIT, $TMP_GIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE,
+ @PRUNE_DONE);
+ local $BATCH_BYTES = $self->{-opt}->{batch_size} //
+ $PublicInbox::SearchIdx::BATCH_BYTES;
local @IDX_SHARDS = cidx_init($self);
local $self->{current_info} = '';
local $MY_SIG = {
$_ =~ /$re/ ? (warn("# excluding $_\n"), 0) : 1;
} @{$self->{git_dirs}};
}
- local $self->{nchange} = 0;
+ local $NCHANGE = 0;
local $LIVE_JOBS = $self->{-opt}->{jobs} ||
PublicInbox::IPC::detect_nproc() || 2;
- local @RDONLY_SHARDS = $self->xdb_shards_flat;
- init_tmp_git_dir($self);
- do_prune($self) if $self->{-opt}->{prune};
+ local @RDONLY_XDB = $self->xdb_shards_flat;
+ start_prune($self) if $self->{-opt}->{prune};
scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+ await_prune if $self->{-opt}->{prune};
for my $s (@IDX_SHARDS) {
$s->{-cidx_quit} = 1;
local @PublicInbox::DS::post_loop_do = (\&shards_active);
PublicInbox::DS::event_loop($MY_SIG, $SIGSET) if shards_active();
- $self->lock_release(!!$self->{nchange});
+ $self->lock_release(!!$NCHANGE);
}
-sub ipc_atfork_child {
+sub ipc_atfork_child { # @IDX_SHARDS
my ($self) = @_;
$self->SUPER::ipc_atfork_child;
$SIG{USR1} = \&shard_usr1;