}
sub shard_commit { # via wq_io_do
- my ($self, $n) = @_;
+ my ($self) = @_;
my $op_p = delete($self->{0}) // die 'BUG: no {0} op_p';
$self->commit_txn_lazy;
- send($op_p, "shard_done $n", MSG_EOR);
+ send($op_p, "shard_done $self->{shard}", MSG_EOR);
}
-sub consumers_open { # post_loop_do
- my (undef, $consumers) = @_;
- return if $DO_QUIT;
- scalar(grep { $_->{sock} } values %$consumers);
+sub consumer_open { # post_loop_do
+ my (undef, $c) = @_; # $c is PublicInbox::PktOp
+ $DO_QUIT ? undef : defined($c->{sock});
}
-sub wait_consumers ($$$) {
- my ($self, $git, $consumers) = @_;
- local @PublicInbox::DS::post_loop_do = (\&consumers_open, $consumers);
+sub wait_active ($$$$) {
+ my ($self, $git, $active, $c) = @_;
+ local @PublicInbox::DS::post_loop_do = (\&consumer_open, $c);
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
- my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$consumers;
+ my $n = grep { ! $self->{-shard_ok}->{$_} } keys %$active;
die "E: $git->{git_dir} $n shards failed" if $n && !$DO_QUIT;
}
-sub commit_used_shards ($$$) {
- my ($self, $git, $consumers) = @_;
+sub commit_active_shards ($$$) {
+ my ($self, $git, $active) = @_;
local $self->{-shard_ok} = {};
- for my $n (keys %$consumers) {
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $self ];
- $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ], $n);
- $consumers->{$n} = $c;
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{shard_done} = [ $self ];
+ for my $n (keys %$active) {
+ $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
}
- wait_consumers($self, $git, $consumers);
+ undef $p;
+ wait_active($self, $git, $active, $c);
}
sub index_repo { # cidx_await cb
$repo->{roots} = \@roots;
local $self->{current_info} = $git->{git_dir};
my @shard_in = partition_refs($self, $git, delete($repo->{refs}));
- local $self->{-shard_ok} = {}; # [0..$#shard_in] => 1
- my $consumers = {};
+ local $self->{-shard_ok} = {};
+ my $active = {};
+ my ($c, $p) = PublicInbox::PktOp->pair;
+ $c->{ops}->{shard_done} = [ $self ];
for my $n (0..$#shard_in) {
-s $shard_in[$n] or next;
last if $DO_QUIT;
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $self ];
$IDX_SHARDS[$n]->wq_io_do('shard_index',
[ $shard_in[$n], $p->{op_p} ],
$git, \@roots);
- $consumers->{$n} = $c;
+ $active->{$n} = undef;
}
+ undef $p;
@shard_in = ();
- wait_consumers($self, $git, $consumers);
+ wait_active($self, $git, $active, $c);
if ($DO_QUIT) {
- commit_used_shards($self, $git, $consumers);
+ commit_active_shards($self, $git, $active);
progress($self, "$git->{git_dir}: done");
return;
}
$repo->{git_dir} = $git->{git_dir};
my $id = $IDX_SHARDS[$repo->{shard_n}]->wq_do('store_repo', $repo);
if ($id > 0) {
- $consumers->{$repo->{shard_n}} = undef;
- commit_used_shards($self, $git, $consumers);
+ $active->{$repo->{shard_n}} = undef;
+ commit_active_shards($self, $git, $active);
progress($self, "$git->{git_dir}: done");
return run_deferred();
}