Another step towards eradicating the $sync structure.
This intermediate step does introduce the $self arg into
log2stack() and prepare_stack() but $self will eventually
eliminate $sync entirely.
my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv");
my $head = $ibx->mm->last_commit //
return "E: $ibx->{inboxdir} is not indexed";
- my $stk = prepare_stack($sync, $lc ? "$lc..$head" : $head);
+ my $stk = prepare_stack($self, $sync,
+ $lc ? "$lc..$head" : $head);
my $unit = { stack => $stk, git => $ibx->git };
push @{$sync->{todo}}, $unit;
} else {
local $self->{checkpoint_unlocks} = 1;
local $self->{need_checkpoint} = 0;
local $self->{nrec};
- my $sync = { -opt => $opt, self => $self };
+ local $self->{-opt} = $opt;
+ my $sync = { self => $self };
$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
eidx_gc_scan_inboxes($self, $sync);
eidx_gc_scan_shards($self, $sync);
my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return;
$self->{nrec} = 0;
local $sync->{-regen_fmt} = "%u/$tot\n";
- my $pr = $sync->{-opt}->{-progress};
+ my $pr = $self->{-opt}->{-progress};
if ($pr) {
my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq');
my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq');
# first, check if we missed any messages in target $ibx
my $msgs;
- my $pr = $sync->{-opt}->{-progress};
+ my $pr = $self->{-opt}->{-progress};
local $sync->{-regen_fmt} = "$ekey checking %u/$max\n";
$self->{nrec} = 0;
- my $fast = $sync->{-opt}->{fast};
+ my $fast = $self->{-opt}->{fast};
my $usr; # _unref_stale_range (< $lo) called
my ($lo, $hi);
while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end, $opt)})) {
print STDERR
"# <$keep->{mid}> keeping #$keep->{num}, dropping ",
join(', ', map { "#$_->{num}" } @$ary),"\n";
- next if $per_mid->{sync}->{-opt}->{'dry-run'};
+ next if $self->{-opt}->{'dry-run'};
my $oidx = $self->{oidx};
for my $smsg (@$ary) {
my $gone = $smsg->{num};
goto dedupe_restart if defined($msgids->[++$idx]);
my $n = delete $sync->{dedupe_cull};
- if (my $pr = $sync->{-opt}->{-progress}) {
+ if (my $pr = $self->{-opt}->{-progress}) {
$pr->("culled $n/$candidates candidates ($nr_mid msgids)\n");
}
$self->{nrec} = 0;
$self->{oidx}->rethread_prepare($opt);
local $self->{need_checkpoint} = 0;
local $self->{nrec} = 0;
+ local $self->{-opt} = $opt;
my $sync = {
- -opt => $opt,
# DO NOT SET {reindex} here, it's incompatible with reused
# V2Writable code, reindex is totally different here
# compared to v1/v2 inboxes because we have multiple histories
sub on_inbox_unlock { # called by PublicInbox::InboxIdle
my ($self, $ibx) = @_;
- my $opt = $self->{-watch_sync}->{-opt};
+ my $opt = $self->{-opt};
my $pr = $opt->{-progress};
my $ekey = $ibx->eidx_key;
local $0 = "sync $ekey";
sub eidx_reload { # -extindex --watch SIGHUP handler
my ($self, $idler) = @_;
if ($self->{cfg}) {
- my $pr = $self->{-watch_sync}->{-opt}->{-progress};
+ my $pr = $self->{-opt}->{-progress};
$pr->('reloading ...') if $pr;
delete $self->{-resync_queue};
delete $self->{-ibx_ary_known};
}
}
+# FIXME: totally untested and undocumented
sub eidx_watch { # public-inbox-extindex --watch main loop
my ($self, $opt) = @_;
local @SIG{keys %SIG} = values %SIG;
}
my $pr = $opt->{-progress};
$pr->("performing initial scan ...\n") if $pr;
+ local $self->{-opt} = $opt;
my $sync = eidx_sync($self, $opt); # initial sync
return if $sync->{quit};
my $oldset = PublicInbox::DS::block_signals();
my $sync = { D => {}, ibx => $ibx }; # D => {} filters out deletes
my ($f, $at, $ct, $oid, $cmt);
for my $git (grep defined, @g) {
- my $s = PublicInbox::SearchIdx::log2stack($sync, $git, 'HEAD');
+ my $s = PublicInbox::SearchIdx::log2stack($sync, $sync,
+ $git, 'HEAD');
while (($f, $at, $ct, $oid, $cmt) = $s->pop_rec) {
$git->cat_async($oid, \&oid2eml, $self) if $f eq 'm';
}
my $n = $xdb->get_metadata('has_threadid');
$xdb->set_metadata('has_threadid', '1') if $n ne '1';
}
- $self->{oidx}->rethread_done($sync->{-opt}); # all done
+ $self->{oidx}->rethread_done($self->{-opt}); # all done
}
commit_txn_lazy($self);
$sync->{ibx}->git->cleanup;
my $nrec = $self->{nrec};
idx_release($self, $nrec);
# let another process do some work...
- if (my $pr = $sync->{-opt}->{-progress}) {
+ if (my $pr = $self->{-opt}->{-progress}) {
$pr->("indexed $nrec/$sync->{ntodo}\n") if $nrec;
}
if (!$stk && !$sync->{quit}) { # more to come
$git->cat_async($oid, \&unindex_both, $sync);
}
}
- if ($sync->{max_size} = $sync->{-opt}->{max_size}) {
+ if ($sync->{max_size} = $self->{-opt}->{max_size}) {
$sync->{index_oid} = \&index_both;
}
while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
}
-sub log2stack ($$$) {
- my ($sync, $git, $range) = @_;
+sub log2stack ($$$$) {
+ my ($self, $sync, $git, $range) = @_;
my $D = $sync->{D}; # OID_BIN => NR (if reindexing, undef otherwise)
my ($add, $del);
if ($sync->{ibx}->version == 1) {
my @cmd = qw(log --raw -r --pretty=tformat:%at-%ct-%H
--no-notes --no-color --no-renames --no-abbrev);
for my $k (qw(since until)) {
- my $v = $sync->{-opt}->{$k} // next;
- next if !$sync->{-opt}->{reindex};
+ my $v = $self->{-opt}->{$k} // next;
+ next if !$self->{-opt}->{reindex};
push @cmd, "--$k=$v";
}
my $fh = $git->popen(@cmd, $range);
$stk->read_prepare;
}
-sub prepare_stack ($$) {
- my ($sync, $range) = @_;
+sub prepare_stack ($$$) {
+ my ($self, $sync, $range) = @_;
my $git = $sync->{ibx}->git;
if (index($range, '..') < 0) {
return PublicInbox::IdxStack->new->read_prepare if $?;
}
$sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR
- log2stack($sync, $git, $range);
+ log2stack($self, $sync, $git, $range);
}
# --is-ancestor requires git 1.8.0+
# don't rewind if --{since,until,before,after} are in use
return if $cur ne '' &&
- grep(defined, @{$sync->{-opt}}{qw(since until)}) &&
+ grep(defined, @{$self->{-opt}}{qw(since until)}) &&
is_ancestor($git, $new, $cur);
return 1 if $cur ne '' && !is_ancestor($git, $cur, $new);
sub {
# we set {-opt}->{quit} too, so ->index_sync callers
# can abort multi-inbox loops this way
- $sync->{quit} = $sync->{-opt}->{quit} = 1;
+ $sync->{quit} = $sync->{self}->{-opt}->{quit} = 1;
warn "gracefully quitting\n";
}
}
}
local $self->{transact_bytes} = 0;
my $pr = $opt->{-progress};
- my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx };
+ local $self->{-opt} = $opt;
+ my $sync = { reindex => $opt->{reindex}, ibx => $ibx };
my $quit = quit_cb($sync);
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
my $lx = reindex_from($sync->{reindex}, $last_commit);
my $range = $lx eq '' ? $tip : "$lx..$tip";
$pr->("counting changes\n\t$range ... ") if $pr;
- my $stk = prepare_stack($sync, $range);
+ my $stk = prepare_stack($self, $sync, $range);
$sync->{ntodo} = $stk ? $stk->num_records : 0;
$pr->("$sync->{ntodo}\n") if $pr; # continue previous line
process_stack($self, $sync, $stk) if !$sync->{quit};
$self->done; # release lock
}
- if (my $pr = $sync->{-regen_fmt} ? $sync->{-opt}->{-progress} : undef) {
+ if (my $pr = $sync->{-regen_fmt} ? $self->{-opt}->{-progress} : undef) {
$pr->(sprintf $sync->{-regen_fmt}, $self->{nrec});
}
# allow -watch or -mda to write...
- $self->idx_init($sync->{-opt}); # reacquire lock
+ $self->idx_init($self->{-opt}); # reacquire lock
$mm_tmp->atfork_parent if $mm_tmp;
}
}
# don't rewind if --{since,until,before,after} are in use
return if (defined($last) &&
- grep(defined, @{$sync->{-opt}}{qw(since until)}) &&
+ grep(defined, @{$self->{-opt}}{qw(since until)}) &&
is_ancestor($self->git, $latest_cmt, $last));
last_epoch_commit($self, $unit->{epoch}, $latest_cmt);
# returns a revision range for git-log(1)
sub log_range ($$$) {
my ($sync, $unit, $tip) = @_;
- my $opt = $sync->{-opt};
+ my $opt = $sync->{self}->{-opt};
my $pr = $opt->{-progress} if (($opt->{verbose} || 0) > 1);
my $i = $unit->{epoch};
my $cur = $sync->{ranges}->[$i] or do {
sub sync_prepare ($$) {
my ($self, $sync) = @_;
$sync->{ranges} = sync_ranges($self, $sync);
- my $pr = $sync->{-opt}->{-progress};
+ my $pr = $self->{-opt}->{-progress};
my $regen_max = 0;
my $head = $sync->{ibx}->{ref_head} || 'HEAD';
my $pfx;
# rerun index_sync without {reindex}
$reindex_heads = $self->last_commits($sync);
}
- if ($sync->{max_size} = $sync->{-opt}->{max_size}) {
+ if ($sync->{max_size} = $self->{-opt}->{max_size}) {
$sync->{index_oid} = $self->can('index_oid');
}
my $git_pfx = "$sync->{ibx}->{inboxdir}/git";
# because we want NNTP article number gaps from unindexed
# messages to show up in mirrors, too.
$sync->{D} //= $sync->{reindex} ? {} : undef; # OID_BIN => NR
- my $stk = log2stack($sync, $git, $range);
+ my $stk = log2stack($self, $sync, $git, $range);
return 0 if $sync->{quit};
my $nr = $stk ? $stk->num_records : 0;
$pr->("$nr\n") if $pr;
$fh->close or die "git log failed: \$?=$?";
$self->git->async_wait_all;
- return unless $sync->{-opt}->{prune};
+ return unless $self->{-opt}->{prune};
my $after = scalar keys %$unindexed;
return if $before == $after;
$step //= $self->{shards};
my $ibx = $self->{ibx};
- if (my $pr = $sync->{-opt}->{-progress}) {
+ if (my $pr = $self->{-opt}->{-progress}) {
$pr->("Xapian indexlevel=$ibx->{indexlevel} ".
"$beg..$end (% $step)\n");
}
$self->update_last_commit($sync, $stk);
}
-sub xapian_only {
- my ($self, $opt, $sync, $art_beg) = @_;
- my $seq = $opt->{'sequential-shard'};
+sub xapian_only ($;$$) {
+ my ($self, $sync, $art_beg) = @_;
+ my $seq = $self->{-opt}->{'sequential-shard'};
$art_beg //= 0;
local $self->{parallel} = 0 if $seq;
- $self->idx_init($opt); # acquire lock
+ $self->idx_init($self->{-opt}); # acquire lock
if (my $art_end = $self->{ibx}->mm->max) {
$sync //= {
- -opt => $opt,
self => $self,
-regen_fmt => "%u/?\n",
};
$opt //= {};
local $self->{need_checkpoint} = 0;
local $self->{nrec} = 0;
- return xapian_only($self, $opt) if $opt->{xapian_only};
+ local $self->{-opt} = $opt;
+ return xapian_only($self) if $opt->{xapian_only};
my $epoch_max = $self->{ibx}->max_git_epoch // return;
my $latest = $self->{mg}->epoch_dir."/$epoch_max.git";
$self->{oidx}->rethread_prepare($opt);
my $sync = {
reindex => $opt->{reindex},
- -opt => $opt,
self => $self,
ibx => $self->{ibx},
epoch_max => $epoch_max,
$self->done;
if (my $nrec = $self->{nrec}) {
- my $pr = $sync->{-opt}->{-progress};
+ my $pr = $self->{-opt}->{-progress};
$pr->('all.git '.sprintf($sync->{-regen_fmt}, $nrec)) if $pr;
}
$quit_warn = 1;
} else {
$self->{ibx}->{indexlevel} = $idxlevel;
- xapian_only($self, $opt, $sync, $art_beg);
+ xapian_only($self, $sync, $art_beg);
$quit_warn = 1 if $sync->{quit};
}
}