From: Eric Wong Date: Fri, 10 Jan 2025 23:18:09 +0000 (+0000) Subject: (ext)index: move {-opt} from $sync to $self X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9a22ce092b4c676e4d658c87de880ea84a5e8bf9;p=thirdparty%2Fpublic-inbox.git (ext)index: move {-opt} from $sync to $self Another step towards eradicating the $sync structure. This intermediate step does introduce the $self arg into log2stack() and prepare_stack() but $self will eventually eliminate $sync entirely. --- diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index ebbb2af13..25f2d8e72 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -407,7 +407,8 @@ sub _sync_inbox ($$$) { my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv"); my $head = $ibx->mm->last_commit // return "E: $ibx->{inboxdir} is not indexed"; - my $stk = prepare_stack($sync, $lc ? "$lc..$head" : $head); + my $stk = prepare_stack($self, $sync, + $lc ? "$lc..$head" : $head); my $unit = { stack => $stk, git => $ibx->git }; push @{$sync->{todo}}, $unit; } else { @@ -529,7 +530,8 @@ sub eidx_gc { # top-level entry point local $self->{checkpoint_unlocks} = 1; local $self->{need_checkpoint} = 0; local $self->{nrec}; - my $sync = { -opt => $opt, self => $self }; + local $self->{-opt} = $opt; + my $sync = { self => $self }; $self->idx_init($opt); # acquire lock via V2Writable::_idx_init eidx_gc_scan_inboxes($self, $sync); eidx_gc_scan_shards($self, $sync); @@ -793,7 +795,7 @@ sub eidxq_process ($$) { # for reindexing my $tot = $dbh->selectrow_array('SELECT COUNT(*) FROM eidxq') or return; $self->{nrec} = 0; local $sync->{-regen_fmt} = "%u/$tot\n"; - my $pr = $sync->{-opt}->{-progress}; + my $pr = $self->{-opt}->{-progress}; if ($pr) { my $min = $dbh->selectrow_array('SELECT MIN(docid) FROM eidxq'); my $max = $dbh->selectrow_array('SELECT MAX(docid) FROM eidxq'); @@ -896,10 +898,10 @@ sub _reindex_check_ibx ($$$) { # first, check if we missed any messages in target $ibx my $msgs; - my $pr = $sync->{-opt}->{-progress}; + my $pr = $self->{-opt}->{-progress}; local $sync->{-regen_fmt} = "$ekey checking %u/$max\n"; $self->{nrec} = 0; - my $fast = $sync->{-opt}->{fast}; + my $fast = $self->{-opt}->{fast}; my $usr; # _unref_stale_range (< $lo) called my ($lo, $hi); while (scalar(@{$msgs = $ibx->over->query_xover($beg, $end, $opt)})) { @@ -1023,7 +1025,7 @@ sub dd_smsg { # git->cat_async callback print STDERR "# <$keep->{mid}> keeping #$keep->{num}, dropping ", join(', ', map { "#$_->{num}" } @$ary),"\n"; - next if $per_mid->{sync}->{-opt}->{'dry-run'}; + next if $self->{-opt}->{'dry-run'}; my $oidx = $self->{oidx}; for my $smsg (@$ary) { my $gone = $smsg->{num}; @@ -1100,7 +1102,7 @@ EOS goto dedupe_restart if defined($msgids->[++$idx]); my $n = delete $sync->{dedupe_cull}; - if (my $pr = $sync->{-opt}->{-progress}) { + if (my $pr = $self->{-opt}->{-progress}) { $pr->("culled $n/$candidates candidates ($nr_mid msgids)\n"); } $self->{nrec} = 0; @@ -1115,8 +1117,8 @@ sub eidx_sync { # main entry point $self->{oidx}->rethread_prepare($opt); local $self->{need_checkpoint} = 0; local $self->{nrec} = 0; + local $self->{-opt} = $opt; my $sync = { - -opt => $opt, # DO NOT SET {reindex} here, it's incompatible with reused # V2Writable code, reindex is totally different here # compared to v1/v2 inboxes because we have multiple histories @@ -1306,7 +1308,7 @@ sub _watch_commit { # PublicInbox::DS::add_timer callback sub on_inbox_unlock { # called by PublicInbox::InboxIdle my ($self, $ibx) = @_; - my $opt = $self->{-watch_sync}->{-opt}; + my $opt = $self->{-opt}; my $pr = $opt->{-progress}; my $ekey = $ibx->eidx_key; local $0 = "sync $ekey"; @@ -1320,7 +1322,7 @@ sub on_inbox_unlock { # called by PublicInbox::InboxIdle sub eidx_reload { # -extindex --watch SIGHUP handler my ($self, $idler) = @_; if ($self->{cfg}) { - my $pr = $self->{-watch_sync}->{-opt}->{-progress}; + my $pr = $self->{-opt}->{-progress}; $pr->('reloading ...') if $pr; delete $self->{-resync_queue}; delete $self->{-ibx_ary_known}; @@ -1359,6 +1361,7 @@ sub event_step { # PublicInbox::DS::requeue callback } } +# FIXME: totally untested and undocumented sub eidx_watch { # public-inbox-extindex --watch main loop my ($self, $opt) = @_; local @SIG{keys %SIG} = values %SIG; @@ -1378,6 +1381,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop } my $pr = $opt->{-progress}; $pr->("performing initial scan ...\n") if $pr; + local $self->{-opt} = $opt; my $sync = eidx_sync($self, $opt); # initial sync return if $sync->{quit}; my $oldset = PublicInbox::DS::block_signals(); diff --git a/lib/PublicInbox/LeiInput.pm b/lib/PublicInbox/LeiInput.pm index 0a6aba82e..618829ef3 100644 --- a/lib/PublicInbox/LeiInput.pm +++ b/lib/PublicInbox/LeiInput.pm @@ -146,7 +146,8 @@ EOM my $sync = { D => {}, ibx => $ibx }; # D => {} filters out deletes my ($f, $at, $ct, $oid, $cmt); for my $git (grep defined, @g) { - my $s = PublicInbox::SearchIdx::log2stack($sync, $git, 'HEAD'); + my $s = PublicInbox::SearchIdx::log2stack($sync, $sync, + $git, 'HEAD'); while (($f, $at, $ct, $oid, $cmt) = $s->pop_rec) { $git->cat_async($oid, \&oid2eml, $self) if $f eq 'm'; } diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 34df5c90c..628a1469c 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -887,14 +887,14 @@ sub v1_checkpoint ($$;$) { my $n = $xdb->get_metadata('has_threadid'); $xdb->set_metadata('has_threadid', '1') if $n ne '1'; } - $self->{oidx}->rethread_done($sync->{-opt}); # all done + $self->{oidx}->rethread_done($self->{-opt}); # all done } commit_txn_lazy($self); $sync->{ibx}->git->cleanup; my $nrec = $self->{nrec}; idx_release($self, $nrec); # let another process do some work... - if (my $pr = $sync->{-opt}->{-progress}) { + if (my $pr = $self->{-opt}->{-progress}) { $pr->("indexed $nrec/$sync->{ntodo}\n") if $nrec; } if (!$stk && !$sync->{quit}) { # more to come @@ -923,7 +923,7 @@ sub process_stack { $git->cat_async($oid, \&unindex_both, $sync); } } - if ($sync->{max_size} = $sync->{-opt}->{max_size}) { + if ($sync->{max_size} = $self->{-opt}->{max_size}) { $sync->{index_oid} = \&index_both; } while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) { @@ -946,8 +946,8 @@ sub process_stack { v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk); } -sub log2stack ($$$) { - my ($sync, $git, $range) = @_; +sub log2stack ($$$$) { + my ($self, $sync, $git, $range) = @_; my $D = $sync->{D}; # OID_BIN => NR (if reindexing, undef otherwise) my ($add, $del); if ($sync->{ibx}->version == 1) { @@ -964,8 +964,8 @@ sub log2stack ($$$) { my @cmd = qw(log --raw -r --pretty=tformat:%at-%ct-%H --no-notes --no-color --no-renames --no-abbrev); for my $k (qw(since until)) { - my $v = $sync->{-opt}->{$k} // next; - next if !$sync->{-opt}->{reindex}; + my $v = $self->{-opt}->{$k} // next; + next if !$self->{-opt}->{reindex}; push @cmd, "--$k=$v"; } my $fh = $git->popen(@cmd, $range); @@ -999,8 +999,8 @@ sub log2stack ($$$) { $stk->read_prepare; } -sub prepare_stack ($$) { - my ($sync, $range) = @_; +sub prepare_stack ($$$) { + my ($self, $sync, $range) = @_; my $git = $sync->{ibx}->git; if (index($range, '..') < 0) { @@ -1010,7 +1010,7 @@ sub prepare_stack ($$) { return PublicInbox::IdxStack->new->read_prepare if $?; } $sync->{D} = $sync->{reindex} ? {} : undef; # OID_BIN => NR - log2stack($sync, $git, $range); + log2stack($self, $sync, $git, $range); } # --is-ancestor requires git 1.8.0+ @@ -1028,7 +1028,7 @@ sub need_update ($$$$) { # don't rewind if --{since,until,before,after} are in use return if $cur ne '' && - grep(defined, @{$sync->{-opt}}{qw(since until)}) && + grep(defined, @{$self->{-opt}}{qw(since until)}) && is_ancestor($git, $new, $cur); return 1 if $cur ne '' && !is_ancestor($git, $cur, $new); @@ -1067,7 +1067,7 @@ sub quit_cb ($) { sub { # we set {-opt}->{quit} too, so ->index_sync callers # can abort multi-inbox loops this way - $sync->{quit} = $sync->{-opt}->{quit} = 1; + $sync->{quit} = $sync->{self}->{-opt}->{quit} = 1; warn "gracefully quitting\n"; } } @@ -1086,7 +1086,8 @@ sub _index_sync { } local $self->{transact_bytes} = 0; my $pr = $opt->{-progress}; - my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx }; + local $self->{-opt} = $opt; + my $sync = { reindex => $opt->{reindex}, ibx => $ibx }; my $quit = quit_cb($sync); local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; @@ -1108,7 +1109,7 @@ sub _index_sync { my $lx = reindex_from($sync->{reindex}, $last_commit); my $range = $lx eq '' ? $tip : "$lx..$tip"; $pr->("counting changes\n\t$range ... ") if $pr; - my $stk = prepare_stack($sync, $range); + my $stk = prepare_stack($self, $sync, $range); $sync->{ntodo} = $stk ? $stk->num_records : 0; $pr->("$sync->{ntodo}\n") if $pr; # continue previous line process_stack($self, $sync, $stk) if !$sync->{quit}; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index dd3258f39..74281fed4 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -710,12 +710,12 @@ sub reindex_checkpoint ($$) { $self->done; # release lock } - if (my $pr = $sync->{-regen_fmt} ? $sync->{-opt}->{-progress} : undef) { + if (my $pr = $sync->{-regen_fmt} ? $self->{-opt}->{-progress} : undef) { $pr->(sprintf $sync->{-regen_fmt}, $self->{nrec}); } # allow -watch or -mda to write... - $self->idx_init($sync->{-opt}); # reacquire lock + $self->idx_init($self->{-opt}); # reacquire lock $mm_tmp->atfork_parent if $mm_tmp; } @@ -829,7 +829,7 @@ sub update_last_commit { } # don't rewind if --{since,until,before,after} are in use return if (defined($last) && - grep(defined, @{$sync->{-opt}}{qw(since until)}) && + grep(defined, @{$self->{-opt}}{qw(since until)}) && is_ancestor($self->git, $latest_cmt, $last)); last_epoch_commit($self, $unit->{epoch}, $latest_cmt); @@ -847,7 +847,7 @@ sub last_commits { # returns a revision range for git-log(1) sub log_range ($$$) { my ($sync, $unit, $tip) = @_; - my $opt = $sync->{-opt}; + my $opt = $sync->{self}->{-opt}; my $pr = $opt->{-progress} if (($opt->{verbose} || 0) > 1); my $i = $unit->{epoch}; my $cur = $sync->{ranges}->[$i] or do { @@ -910,7 +910,7 @@ sub artnum_max { $_[0]->{mm}->num_highwater } sub sync_prepare ($$) { my ($self, $sync) = @_; $sync->{ranges} = sync_ranges($self, $sync); - my $pr = $sync->{-opt}->{-progress}; + my $pr = $self->{-opt}->{-progress}; my $regen_max = 0; my $head = $sync->{ibx}->{ref_head} || 'HEAD'; my $pfx; @@ -935,7 +935,7 @@ sub sync_prepare ($$) { # rerun index_sync without {reindex} $reindex_heads = $self->last_commits($sync); } - if ($sync->{max_size} = $sync->{-opt}->{max_size}) { + if ($sync->{max_size} = $self->{-opt}->{max_size}) { $sync->{index_oid} = $self->can('index_oid'); } my $git_pfx = "$sync->{ibx}->{inboxdir}/git"; @@ -960,7 +960,7 @@ sub sync_prepare ($$) { # because we want NNTP article number gaps from unindexed # messages to show up in mirrors, too. $sync->{D} //= $sync->{reindex} ? {} : undef; # OID_BIN => NR - my $stk = log2stack($sync, $git, $range); + my $stk = log2stack($self, $sync, $git, $range); return 0 if $sync->{quit}; my $nr = $stk ? $stk->num_records : 0; $pr->("$nr\n") if $pr; @@ -1066,7 +1066,7 @@ sub unindex_todo ($$$) { $fh->close or die "git log failed: \$?=$?"; $self->git->async_wait_all; - return unless $sync->{-opt}->{prune}; + return unless $self->{-opt}->{prune}; my $after = scalar keys %$unindexed; return if $before == $after; @@ -1102,7 +1102,7 @@ sub index_xap_step ($$$;$) { $step //= $self->{shards}; my $ibx = $self->{ibx}; - if (my $pr = $sync->{-opt}->{-progress}) { + if (my $pr = $self->{-opt}->{-progress}) { $pr->("Xapian indexlevel=$ibx->{indexlevel} ". "$beg..$end (% $step)\n"); } @@ -1169,15 +1169,14 @@ sub index_todo ($$$) { $self->update_last_commit($sync, $stk); } -sub xapian_only { - my ($self, $opt, $sync, $art_beg) = @_; - my $seq = $opt->{'sequential-shard'}; +sub xapian_only ($;$$) { + my ($self, $sync, $art_beg) = @_; + my $seq = $self->{-opt}->{'sequential-shard'}; $art_beg //= 0; local $self->{parallel} = 0 if $seq; - $self->idx_init($opt); # acquire lock + $self->idx_init($self->{-opt}); # acquire lock if (my $art_end = $self->{ibx}->mm->max) { $sync //= { - -opt => $opt, self => $self, -regen_fmt => "%u/?\n", }; @@ -1206,7 +1205,8 @@ sub index_sync { $opt //= {}; local $self->{need_checkpoint} = 0; local $self->{nrec} = 0; - return xapian_only($self, $opt) if $opt->{xapian_only}; + local $self->{-opt} = $opt; + return xapian_only($self) if $opt->{xapian_only}; my $epoch_max = $self->{ibx}->max_git_epoch // return; my $latest = $self->{mg}->epoch_dir."/$epoch_max.git"; @@ -1231,7 +1231,6 @@ sub index_sync { $self->{oidx}->rethread_prepare($opt); my $sync = { reindex => $opt->{reindex}, - -opt => $opt, self => $self, ibx => $self->{ibx}, epoch_max => $epoch_max, @@ -1263,7 +1262,7 @@ sub index_sync { $self->done; if (my $nrec = $self->{nrec}) { - my $pr = $sync->{-opt}->{-progress}; + my $pr = $self->{-opt}->{-progress}; $pr->('all.git '.sprintf($sync->{-regen_fmt}, $nrec)) if $pr; } @@ -1274,7 +1273,7 @@ sub index_sync { $quit_warn = 1; } else { $self->{ibx}->{indexlevel} = $idxlevel; - xapian_only($self, $opt, $sync, $art_beg); + xapian_only($self, $sync, $art_beg); $quit_warn = 1 if $sync->{quit}; } }