progress($self, "scanning $n code repositories...");
}
-sub prune_do { # via wq_io_do in IDX_SHARDS
+sub prune_init { # via wq_io_do in IDX_SHARDS
my ($self) = @_;
- my $gone = delete $self->{0} // die 'BUG: no {0} gone input';
- my $prune_op_p = delete $self->{1} // die 'BUG: no {1} op_p';
+ $self->{nr_prune} = 0;
$TXN_BYTES = $BATCH_BYTES;
$self->begin_txn_lazy;
- my $xdb = $self->{xdb};
- my $nr = 0;
- local $/ = "\0";
- while (my $p = <$gone>) { # Q$cmt or P$git_dir
- chomp $p;
- my @docids = docids_by_postlist($self, $p);
- for (@docids) {
- $TXN_BYTES -= $xdb->get_doclength($_) * 42;
- $xdb->delete_document($_);
- }
- ++$nr;
- $TXN_BYTES < 0 and
- cidx_ckpoint($self, "prune [$self->{shard}] $nr");
+}
+
+sub prune_one { # via wq_io_do in IDX_SHARDS
+ my ($self, $term) = @_;
+ my @docids = docids_by_postlist($self, $term);
+ for (@docids) {
+ $TXN_BYTES -= $self->{xdb}->get_doclength($_) * 42;
+ $self->{xdb}->delete_document($_);
}
- send($prune_op_p, "prune_done $self->{shard}", MSG_EOR);
+ ++$self->{nr_prune};
+ $TXN_BYTES < 0 and
+ cidx_ckpoint($self, "prune [$self->{shard}] $self->{nr_prune}");
+}
+
+sub prune_commit { # via wq_io_do in IDX_SHARDS
+ my ($self) = @_;
+ my $prune_op_p = delete $self->{0} // die 'BUG: no {0} op_p';
+ my $nr = delete $self->{nr_prune} // die 'BUG: nr_prune undef';
cidx_ckpoint($self, "prune [$self->{shard}] $nr done") if $nr;
+ send($prune_op_p, "prune_done $self->{shard}", MSG_EOR);
}
sub shards_active { # post_loop_do
sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
my ($self, $comm_rd) = @_;
return if $DO_QUIT;
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{prune_done} = [ $self ];
- my @gone;
- for my $n (0..$#IDX_SHARDS) {
- pipe(my ($r, $w)) or die "pipe: $!";
- push @gone, $w;
- $IDX_SHARDS[$n]->wq_io_do('prune_do', [$r, $p->{op_p}]);
- }
+ $_->wq_do('prune_init') for @IDX_SHARDS;
while (defined(my $cmt = <$comm_rd>)) {
- chomp $cmt;
- my $n = hex(substr($cmt, 0, 8)) % scalar(@gone);
- print { $gone[$n] } 'Q', $cmt, "\0" or die "print: $!";
+ chop($cmt) eq "\n" or die "BUG: no LF in comm output ($cmt)";
+ my $n = hex(substr($cmt, 0, 8)) % scalar(@IDX_SHARDS);
+ $IDX_SHARDS[$n]->wq_do('prune_one', 'Q'.$cmt);
last if $DO_QUIT;
}
for my $git_dir (@GIT_DIR_GONE) {
- my $n = git_dir_hash($git_dir) % scalar(@gone);
- print { $gone[$n] } 'P', $git_dir, "\0" or die "print: $!";
+ my $n = git_dir_hash($git_dir) % scalar(@IDX_SHARDS);
+ $IDX_SHARDS[$n]->wq_do('prune_one', 'P'.$git_dir);
}
- for (@gone) { close $_ or die "close: $!" };
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{prune_done} = [ $self ];
+ $_->wq_io_do('prune_commit', [ $p->{op_p} ]) for @IDX_SHARDS;
}
sub init_associate_prefork ($) {