}
sub _unref_doc ($$$$$;$) {
- my ($sync, $docid, $ibx, $xnum, $oidbin, $eml) = @_;
+ my ($self, $docid, $ibx, $xnum, $oidbin, $eml) = @_;
my $smsg;
if (ref($docid)) {
$smsg = $docid;
my $s = 'DELETE FROM xref3 WHERE oidbin = ?';
$s .= ' AND ibx_id = ?' if defined($ibx);
$s .= ' AND xnum = ?' if defined($xnum);
- my $del = $sync->{self}->{oidx}->dbh->prepare_cached($s);
+ my $del = $self->{oidx}->dbh->prepare_cached($s);
my $col = 0;
$del->bind_param(++$col, $oidbin, SQL_BLOB);
$del->bind_param(++$col, $ibx->{-ibx_id}) if $ibx;
$del->bind_param(++$col, $xnum) if defined($xnum);
$del->execute;
- my $xr3 = $sync->{self}->{oidx}->get_xref3($docid);
+ my $xr3 = $self->{oidx}->get_xref3($docid);
if (scalar(@$xr3) == 0) { # all gone
- remove_doc($sync->{self}, $docid);
+ remove_doc($self, $docid);
} else { # enqueue for reindex of remaining messages
if ($ibx) {
my $ekey = $ibx->{-gc_eidx_key} // $ibx->eidx_key;
- my $idx = $sync->{self}->idx_shard($docid);
+ my $idx = $self->idx_shard($docid);
$idx->ipc_do('remove_eidx_info', $docid, $ekey, $eml);
} # else: we can't remove_eidx_info in reindex-only path
# replace invalidated blob ASAP with something which should be
# readable since we may commit the transaction on checkpoint.
# eidxq processing will re-apply boost
- $smsg //= $sync->{self}->{oidx}->get_art($docid);
+ $smsg //= $self->{oidx}->get_art($docid);
my $hex = unpack('H*', $oidbin);
if ($smsg && $smsg->{blob} eq $hex) {
$xr3->[0] =~ /:([a-f0-9]{40,}+)\z/ or
die "BUG: xref $xr3->[0] has no OID";
- $sync->{self}->{oidx}->update_blob($smsg, $1);
+ $self->{oidx}->update_blob($smsg, $1);
}
# yes, add, we'll need to re-apply boost
- $sync->{self}->{oidx}->eidxq_add($docid);
+ $self->{oidx}->eidxq_add($docid);
}
@$xr3
}
} else { # 'd' no {xnum}
$self->git->async_wait_all;
$oid = pack('H*', $oid);
- _unref_doc($req, $docid, $xibx, undef, $oid, $eml);
+ _unref_doc $self, $docid, $xibx, undef, $oid, $eml;
}
}
# xnum and ibx are unknown, we only call this when an entry from
# /ei*/over.sqlite3 is bad, not on entries from xap*/over.sqlite3
$req->{self}->git->async_wait_all;
- _unref_doc($req, $smsg, undef, undef, $smsg->oidbin);
+ _unref_doc $req->{self}, $smsg, undef, undef, $smsg->oidbin;
}
sub ck_existing { # git->cat_async callback
return "E: $ekey unsupported inbox version (v$v)";
}
for my $unit (@{delete($sync->{todo}) // []}) {
- last if $sync->{quit};
+ last if $self->{quit};
index_todo($self, $sync, $unit);
}
- $self->{midx}->index_ibx($ibx) unless $sync->{quit};
+ $self->{midx}->index_ibx($ibx) unless $self->{quit};
$ibx->git->cleanup; # done with this inbox, now
undef;
}
$x3_doc->execute($ibx_id);
my $ibx = { -ibx_id => $ibx_id, -gc_eidx_key => $eidx_key };
while (my ($docid, $xnum, $oid) = $x3_doc->fetchrow_array) {
- my $r = _unref_doc($sync, $docid, $ibx, $xnum, $oid);
+ my $r = _unref_doc $self, $docid, $ibx, $xnum, $oid;
$oid = unpack('H*', $oid);
$r = $r ? 'unref' : 'remove';
warn "# $r #$docid $eidx_key $oid\n";
for my $x (reverse @$ary) {
warn "removing #$docid xref3 $x->{blob}\n";
my $bin = $x->oidbin;
- my $n = _unref_doc($sync, $docid, undef, undef, $bin);
+ my $n = _unref_doc $self, $docid, undef, undef, $bin;
die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
}
my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
my $docid = $orig_smsg->{num};
if (is_bad_blob($oid, $type, $size, $expect_oid)) {
my $oidbin = pack('H*', $expect_oid);
- my $remain = _unref_doc($sync, $docid, undef, undef, $oidbin);
+ my $remain = _unref_doc $self, $docid, undef, undef, $oidbin;
if ($remain == 0) {
warn "W: #$docid ($oid) gone or corrupt\n";
} elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
$iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
$iter->execute;
while (defined(my $docid = $iter->fetchrow_array)) {
- last if $sync->{quit};
+ last if $self->{quit};
if (my $smsg = $self->{oidx}->get_art($docid)) {
_reindex_smsg($self, $sync, $smsg);
} else {
}
sub _unref_stale_range ($$$) {
- my ($sync, $ibx, $lt_or_gt) = @_;
+ my ($self, $ibx, $lt_or_gt) = @_;
my $r;
my $lim = 10000;
do {
- $r = $sync->{self}->{oidx}->dbh->selectall_arrayref(
+ $r = $self->{oidx}->dbh->selectall_arrayref(
<<EOS, undef, $ibx->{-ibx_id});
SELECT docid,xnum,oidbin FROM xref3
WHERE ibx_id = ? AND $lt_or_gt LIMIT $lim
EOS
- return if $sync->{quit};
+ return if $self->{quit};
for (@$r) { # hopefully rare, not worth optimizing:
my ($docid, $xnum, $oidbin) = @$_;
my $hex = unpack('H*', $oidbin);
warn("# $xnum:$hex (#$docid): stale\n");
- _unref_doc($sync, $docid, $ibx, $xnum, $oidbin);
+ _unref_doc $self, $docid, $ibx, $xnum, $oidbin;
}
} while (scalar(@$r) == $lim);
1;
$max0 = $ibx->mm->num_highwater;
sync_inbox($self, $sync, $ibx) and return; # warned
$max = $ibx->mm->num_highwater;
- return if $sync->{quit};
+ return if $self->{quit};
} while ($max > $max0 &&
warn("# $ekey moved $max0..$max, resyncing..\n"));
$end = $max if $end > $max;
update_checkpoint $self and
reindex_checkpoint($self, $sync); # release lock
($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num});
- $usr //= _unref_stale_range($sync, $ibx, "xnum < $lo");
+ $usr //= _unref_stale_range($self, $ibx, "xnum < $lo");
my $x3a = $self->{oidx}->dbh->selectall_arrayref(
<<"", undef, $ibx_id, $lo, $hi);
SELECT xnum,oidbin,docid FROM xref3 WHERE
$self->{oidx}->eidxq_add($num);
}
}
- return if $sync->{quit};
+ return if $self->{quit};
}
next unless scalar keys %x3m;
$self->git->async_wait_all; # wait for reindex_unseen
my $m = defined($exp) ? "mismatch (!= $exp)" : 'stale';
warn("# $xnum:$hex (#@$docids): $m\n");
for my $i (@$docids) {
- _unref_doc($sync, $i, $ibx, $xnum, $bin);
+ _unref_doc $self, $i, $ibx, $xnum, $bin;
}
- return if $sync->{quit};
+ return if $self->{quit};
}
}
defined($hi) and ($hi < $max) and
- _unref_stale_range($sync, $ibx, "xnum > $hi AND xnum <= $max");
+ _unref_stale_range($self, $ibx, "xnum > $hi AND xnum <= $max");
}
sub _reindex_inbox ($$$) {
}
for my $ibx (@{ibx_sorted($self, 'active')}) {
_reindex_inbox($self, $sync, $ibx);
- last if $sync->{quit};
+ last if $self->{quit};
}
$self->git->async_wait_all; # ensure eidxq gets filled completely
- eidxq_process($self, $sync) unless $sync->{quit};
+ eidxq_process($self, $sync) unless $self->{quit};
}
sub sync_inbox {
$iter->execute($cur_mid, $min_id);
}
while (my ($mid, $id) = $iter->fetchrow_array) {
- last if $sync->{quit};
+ last if $self->{quit};
$self->{current_info} = "dedupe $mid";
$self->{nrec} = $min_id = $id;
my ($prv, @smsg);
self => $self,
};
local $SIG{USR1} = sub { $self->{need_checkpoint} = 1 };
- my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+ my $quit = PublicInbox::SearchIdx::quit_cb $self;
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
local $SIG{TERM} = $quit;
# don't use $_ here, it'll get clobbered by reindex_checkpoint
if ($opt->{scan} // 1) {
for my $ibx (@{ibx_sorted($self, 'active')}) {
- last if $sync->{quit};
+ last if $self->{quit};
sync_inbox($self, $sync, $ibx);
}
}
- $self->{oidx}->rethread_done($opt) unless $sync->{quit};
- eidxq_process($self, $sync) unless $sync->{quit};
+ $self->{oidx}->rethread_done($opt) unless $self->{quit};
+ eidxq_process($self, $sync) unless $self->{quit};
eidxq_release($self);
done($self);
$pr->("performing initial scan ...\n") if $pr;
local $self->{-opt} = $opt;
my $sync = eidx_sync($self, $opt); # initial sync
- return if $sync->{quit};
+ return if $self->{quit};
my $oldset = PublicInbox::DS::block_signals();
local $self->{current_info} = '';
local $SIG{__WARN__} = PublicInbox::Admin::warn_cb $self;
USR1 => sub { eidx_resync_start($self) },
TSTP => sub { kill('STOP', $$) },
};
- my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+ my $quit = PublicInbox::SearchIdx::quit_cb $self;
$sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
- local @PublicInbox::DS::post_loop_do = (sub { !$sync->{quit} });
+ local @PublicInbox::DS::post_loop_do = (sub { !$self->{quit} });
$pr->("initial scan complete, entering event loop\n") if $pr;
# calls InboxIdle->event_step:
PublicInbox::DS::event_loop($sig, $oldset);
my ($self, $opt) = @_;
delete $self->{lock_path} if $opt->{-skip_lock};
$self->with_umask(\&_index_sync, $self, $opt);
- if ($opt->{reindex} && !$opt->{quit} &&
+ if ($opt->{reindex} && !$self->{quit} &&
!grep(defined, @$opt{qw(since until)})) {
my %again = %$opt;
delete @again{qw(rethread reindex)};
index_sync($self, \%again);
- $opt->{quit} = $again{quit}; # propagate to caller
}
}
if (my $pr = $self->{-opt}->{-progress}) {
$pr->("indexed $nrec/$sync->{ntodo}\n") if $nrec;
}
- if (!$stk && !$sync->{quit}) { # more to come
+ if (!$stk && !$self->{quit}) { # more to come
begin_txn_lazy($self);
$self->{mm}->{dbh}->begin_work;
}
if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
for my $oid (@leftovers) {
- last if $sync->{quit};
+ last if $self->{quit};
$oid = unpack('H*', $oid);
$git->cat_async($oid, \&v1_unindex_both, $sync);
}
}
while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
my $arg = { %$sync, cur_cmt => $cur_cmt, oid => $oid };
- last if $sync->{quit};
+ last if $self->{quit};
if ($f eq 'm') {
$arg->{autime} = $at;
$arg->{cotime} = $ct;
}
v1_checkpoint $self, $sync if $self->{need_checkpoint};
}
- v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
+ v1_checkpoint($self, $sync, $self->{quit} ? undef : $stk);
}
sub log2stack ($$$$) {
my $fh = $git->popen(@cmd, $range);
my ($at, $ct, $stk, $cmt, $l);
while (defined($l = <$fh>)) {
- return if $sync->{quit};
+ return if $self->{quit};
if ($l =~ /\A([0-9]+)-([0-9]+)-($OID)$/o) {
($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3);
$stk //= PublicInbox::IdxStack->new($cmt);
}
sub quit_cb ($) {
- my ($sync) = @_;
+ my ($self) = @_;
sub {
- # we set {-opt}->{quit} too, so ->index_sync callers
- # can abort multi-inbox loops this way
- $sync->{quit} = $sync->{self}->{-opt}->{quit} = 1;
- warn "gracefully quitting\n";
+ # we set {-opt}->{quit} for public-inbox-index so
+ # can abort multi-inbox loops this way (for now...)
+ $self->{quit} = $self->{-opt}->{quit} = 1;
+ warn "# gracefully quitting\n";
}
}
my $pr = $opt->{-progress};
local $self->{-opt} = $opt;
my $sync = { reindex => $opt->{reindex}, ibx => $ibx };
- my $quit = quit_cb($sync);
+ my $quit = quit_cb $self;
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
local $SIG{TERM} = $quit;
my $stk = prepare_stack($self, $sync, $range);
$sync->{ntodo} = $stk ? $stk->num_records : 0;
$pr->("$sync->{ntodo}\n") if $pr; # continue previous line
- v1_process_stack($self, $sync, $stk) if !$sync->{quit};
+ v1_process_stack($self, $sync, $stk) if !$self->{quit};
}
sub DESTROY {
# messages to show up in mirrors, too.
$sync->{D} //= $sync->{reindex} ? {} : undef; # OID_BIN => NR
my $stk = log2stack($self, $sync, $git, $range);
- return 0 if $sync->{quit};
+ return 0 if $self->{quit};
my $nr = $stk ? $stk->num_records : 0;
$pr->("$nr\n") if $pr;
$unit->{stack} = $stk; # may be undef
unshift @{$sync->{todo}}, $unit;
$regen_max += $nr;
}
- return 0 if $sync->{quit};
+ return 0 if $self->{quit};
# XXX this should not happen unless somebody bypasses checks in
# our code and blindly injects "d" file history into git repos
local $self->{current_info} = 'leftover ';
my $unindex_oid = $self->can('unindex_oid');
for my $oid (@leftovers) {
- last if $sync->{quit};
+ last if $self->{quit};
$oid = unpack('H*', $oid);
my $req = { %$sync, oid => $oid };
$self->git->cat_async($oid, $unindex_oid, $req);
}
$self->git->async_wait_all;
}
- return 0 if $sync->{quit};
+ return 0 if $self->{quit};
if (!$regen_max) {
$self->{-regen_fmt} = "%u/?\n";
return 0;
"$beg..$end (% $step)\n");
}
for (my $num = $beg; $num <= $end; $num += $step) {
- last if $sync->{quit};
+ last if $self->{quit};
my $smsg = $ibx->over->get_art($num) or next;
$smsg->{self} = $self;
$ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
sub index_todo ($$$) {
my ($self, $sync, $unit) = @_;
- return if $sync->{quit};
+ return if $self->{quit};
unindex_todo($self, $sync, $unit);
my $stk = delete($unit->{stack}) or return;
my $all = $self->git; # initialize self->{ibx}->{git}
local $self->{latest_cmt};
local $sync->{unit} = $unit;
while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
- if ($sync->{quit}) {
+ if ($self->{quit}) {
warn "waiting to quit...\n";
$all->async_wait_all;
$self->update_last_commit($sync);
if ($seq || !$self->{parallel}) {
my $shard_end = $self->{shards} - 1;
for my $i (0..$shard_end) {
- last if $sync->{quit};
+ last if $self->{quit};
index_xap_step($self, $sync, $art_beg + $i);
if ($i != $shard_end) {
reindex_checkpoint($self, $sync);
ibx => $self->{ibx},
epoch_max => $epoch_max,
};
- my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+ my $quit = PublicInbox::SearchIdx::quit_cb $self;
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
local $SIG{TERM} = $quit;
}
# work forwards through history
index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
- $self->{oidx}->rethread_done($opt) unless $sync->{quit};
+ $self->{oidx}->rethread_done($opt) unless $self->{quit};
$self->done;
if (my $nrec = $self->{nrec}) {
my $quit_warn;
# deal with Xapian shards sequentially
if ($seq && delete($sync->{mm_tmp})) {
- if ($sync->{quit}) {
+ if ($self->{quit}) {
$quit_warn = 1;
} else {
$self->{ibx}->{indexlevel} = $idxlevel;
xapian_only($self, $sync, $art_beg);
- $quit_warn = 1 if $sync->{quit};
+ $quit_warn = 1 if $self->{quit};
}
}
# --reindex on the command-line
- if (!$sync->{quit} && $opt->{reindex} &&
+ if (!$self->{quit} && $opt->{reindex} &&
!ref($opt->{reindex}) && $idxlevel ne 'basic') {
$self->lock_acquire;
my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0);
}
# reindex does not pick up new changes, so we rerun w/o it:
- if ($opt->{reindex} && !$sync->{quit} &&
+ if ($opt->{reindex} && !$self->{quit} &&
!grep(defined, @$opt{qw(since until)})) {
my %again = %$opt;
$sync = undef;
delete @again{qw(rethread reindex -skip_lock)};
index_sync($self, \%again);
- $opt->{quit} = $again{quit}; # propagate to caller
}
warn <<EOF if $quit_warn;
W: interrupted, --xapian-only --reindex required upon restart