From: Eric Wong Date: Fri, 10 Jan 2025 23:21:00 +0000 (+0000) Subject: (ext)index: eliminate $sync entirely X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=db80b65f24b7864f91dce3d94c918bc18d566171;p=thirdparty%2Fpublic-inbox.git (ext)index: eliminate $sync entirely Finally, the elimination of this variable simplifies subroutine calls hopefully makes object lifetimes easier to reason about. --- diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 946f5a4b3..d1a16c84f 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -373,7 +373,7 @@ sub unindex_oid { # git->cat_async callback for 'd' # overrides V2Writable::last_commits, called by sync_ranges via sync_prepare sub last_commits { - my ($self, $sync) = @_; + my ($self) = @_; my $heads = []; my $ekey = $self->{ibx}->eidx_key; my $uv = $self->{ibx}->uidvalidity; @@ -391,8 +391,8 @@ sub _ibx_index_reject ($) { undef; } -sub _sync_inbox ($$$) { - my ($self, $sync, $ibx) = @_; +sub _sync_inbox ($$) { + my ($self, $ibx) = @_; my $ekey = $ibx->eidx_key; if (defined(my $err = _ibx_index_reject($ibx))) { return "W: skipping $ekey ($err)"; @@ -406,7 +406,7 @@ sub _sync_inbox ($$$) { local $self->{unindexed}; if ($v == 2) { $self->{epoch_max} = $ibx->max_git_epoch // return; - sync_prepare($self, $sync); # or return # TODO: once MiscIdx is stable + sync_prepare($self); # or return # TODO: once MiscIdx is stable } elsif ($v == 1) { my $uv = $ibx->uidvalidity; my $lc = $self->{oidx}->eidx_meta("lc-v1:$ekey//$uv"); @@ -418,14 +418,14 @@ sub _sync_inbox ($$$) { } else { return "E: $ekey unsupported inbox version (v$v)"; } - PublicInbox::V2Writable::process_todo $self, $sync; + PublicInbox::V2Writable::process_todo $self; $self->{midx}->index_ibx($ibx) unless $self->{quit}; $ibx->git->cleanup; # done with this inbox, now undef; } -sub eidx_gc_scan_inboxes ($$) { - my ($self, $sync) = @_; +sub eidx_gc_scan_inboxes ($) { + my ($self) = @_; my ($x3_doc, $ibx_ck); restart: $x3_doc = $self->{oidx}->dbh->prepare(<{oidx}->dbh->do(<<''); DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) warn "# eliminated $nr stale xref3 entries\n" if $nr != 0; - reindex_checkpoint($self, $sync) if update_checkpoint $self; + reindex_checkpoint($self) if update_checkpoint $self; # fixup from old bugs: $nr = $self->{oidx}->dbh->do(<<''); DELETE FROM over WHERE num > 0 AND num NOT IN (SELECT docid FROM xref3) warn "# eliminated $nr stale over entries\n" if $nr != 0; - reindex_checkpoint($self, $sync) if update_checkpoint $self; + reindex_checkpoint($self) if update_checkpoint $self; $nr = $self->{oidx}->dbh->do(<<''); DELETE FROM eidxq WHERE docid NOT IN (SELECT num FROM over) warn "# eliminated $nr stale reindex queue entries\n" if $nr != 0; - reindex_checkpoint($self, $sync) if update_checkpoint $self; + reindex_checkpoint($self) if update_checkpoint $self; my ($cur) = $self->{oidx}->dbh->selectrow_array(< 0 @@ -518,7 +518,7 @@ SELECT num FROM over WHERE num >= ? ORDER BY num ASC LIMIT 10000 $nr += $idx->ipc_do('nr_quiet_rm') } %active_shards = (); - reindex_checkpoint($self, $sync); + reindex_checkpoint($self); } } warn "# eliminated $nr stale Xapian documents\n" if $nr != 0; @@ -533,15 +533,14 @@ sub eidx_gc { # top-level entry point local $self->{nrec}; local $self->{-opt} = $opt; local $self->{latest_cmt}; - my $sync = { self => $self }; $self->idx_init($opt); # acquire lock via V2Writable::_idx_init - eidx_gc_scan_inboxes($self, $sync); - eidx_gc_scan_shards($self, $sync); + eidx_gc_scan_inboxes $self; + eidx_gc_scan_shards $self; done($self); } -sub _ibx_for ($$$) { - my ($self, $sync, $smsg) = @_; +sub _ibx_for ($$) { + my ($self, $smsg) = @_; my $ibx_id = delete($smsg->{ibx_id}) // die 'BUG: {ibx_id} unset'; my $pos = $self->{id2pos}->{$ibx_id} // bad_ibx_id($self, $ibx_id, \&croak); @@ -572,8 +571,7 @@ EOF sub _reindex_finalize ($$$) { my ($req, $smsg, $eml) = @_; - my $sync = $req->{sync}; - my $self = $sync->{self}; + my $self = $req->{self}; my $by_chash = delete $req->{by_chash} or die 'BUG: no {by_chash}'; my $nr = scalar(keys(%$by_chash)) or die 'BUG: no content hashes'; my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}'; @@ -586,11 +584,11 @@ sub _reindex_finalize ($$$) { my $idx = $self->idx_shard($docid); my $top_smsg = pop @$stable; $top_smsg == $smsg or die 'BUG: top_smsg != smsg'; - my $ibx = _ibx_for($self, $sync, $smsg); + my $ibx = _ibx_for $self, $smsg; $smsg->{eidx_key} = $ibx->eidx_key; $idx->index_eml($eml, $smsg); for my $x (reverse @$stable) { - $ibx = _ibx_for($self, $sync, $x); + $ibx = _ibx_for $self, $x; my $hdr = delete $x->{hdr} // die 'BUG: no {hdr}'; $idx->ipc_do('add_eidx_info', $docid, $ibx->eidx_key, $hdr); } @@ -608,7 +606,7 @@ sub _reindex_finalize ($$$) { } my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty"; $x->{num} = delete($x->{xnum}) // die '{xnum} unset'; - $ibx = _ibx_for($self, $sync, $x); + $ibx = _ibx_for $self, $x; if (my $over = $ibx->over) { my $e = $over->get_art($x->{num}); $e->{blob} eq $x->{blob} or die <cat_async callback my ($bref, $oid, $type, $size, $req) = @_; - my $sync = $req->{sync}; - my $self = $sync->{self}; + my $self = $req->{self}; my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}'; my $expect_oid = $req->{xr3r}->[$req->{ix}]->[2]; my $docid = $orig_smsg->{num}; @@ -666,8 +663,8 @@ sub _reindex_oid { # git->cat_async callback } } -sub _reindex_smsg ($$$) { - my ($self, $sync, $smsg) = @_; +sub _reindex_smsg ($$) { + my ($self, $smsg) = @_; my $docid = $smsg->{num}; my $xr3 = $self->{oidx}->get_xref3($docid, 1); if (scalar(@$xr3) == 0) { # _reindex_check_stale should've covered this @@ -689,7 +686,7 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex $b->[1] <=> $a->[1] # break ties with {xnum} } @$xr3; @$xr3 = map { [ $_->[0], $_->[1], unpack('H*', $_->[2]) ] } @$xr3; - my $req = { orig_smsg => $smsg, sync => $sync, xr3r => $xr3, ix => 0 }; + my $req = { orig_smsg => $smsg, self => $self, xr3r => $xr3, ix => 0 }; $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req); } @@ -709,7 +706,7 @@ sub host_ident () { }; } -sub eidxq_release { +sub eidxq_release ($) { my ($self) = @_; my $expect = delete($self->{-eidxq_locked}) or return; my ($owner_pid, undef) = split(/-/, $expect); @@ -730,7 +727,7 @@ sub eidxq_release { sub DESTROY { my ($self) = @_; - eidxq_release($self) and $self->{oidx}->commit_lazy; + eidxq_release $self and $self->{oidx}->commit_lazy; } sub _eidxq_take ($) { @@ -789,8 +786,8 @@ sub prep_id2pos ($) { \%id2pos; } -sub eidxq_process ($$) { # for reindexing - my ($self, $sync) = @_; +sub eidxq_process ($) { # for reindexing + my ($self) = @_; local $self->{current_info} = 'eidxq process'; return unless ($self->{cfg} && eidxq_lock_acquire($self)); my $dbh = $self->{oidx}->dbh; @@ -812,7 +809,7 @@ restart: while (defined(my $docid = $iter->fetchrow_array)) { last if $self->{quit}; if (my $smsg = $self->{oidx}->get_art($docid)) { - _reindex_smsg($self, $sync, $smsg); + _reindex_smsg($self, $smsg); } else { warn "E: #$docid does not exist in over\n"; } @@ -821,7 +818,7 @@ restart: if (update_checkpoint $self) { $dbh = $del = $iter = undef; - reindex_checkpoint($self, $sync); # release lock + reindex_checkpoint($self); # release lock $dbh = $self->{oidx}->dbh; goto restart; } @@ -845,10 +842,10 @@ sub _reindex_unseen { # git->cat_async callback } # --reindex may catch totally unseen messages, this handles them -sub reindex_unseen ($$$$) { - my ($self, $sync, $ibx, $xsmsg) = @_; +sub reindex_unseen ($$$) { + my ($self, $ibx, $xsmsg) = @_; my $req = { - %$sync, # has {self} + self => $self, autime => $xsmsg->{ds}, cotime => $xsmsg->{ts}, oid => $xsmsg->{blob}, @@ -881,8 +878,8 @@ EOS 1; } -sub _reindex_check_ibx ($$$) { - my ($self, $sync, $ibx) = @_; +sub _reindex_check_ibx ($$) { + my ($self, $ibx) = @_; my $ibx_id = $ibx->{-ibx_id}; my $slice = 10000; my $opt = { limit => $slice }; @@ -891,7 +888,7 @@ sub _reindex_check_ibx ($$$) { my ($max, $max0); do { $max0 = $ibx->mm->num_highwater; - sync_inbox($self, $sync, $ibx) and return; # warned + sync_inbox($self, $ibx) and return; # warned $max = $ibx->mm->num_highwater; return if $self->{quit}; } while ($max > $max0 && @@ -912,7 +909,7 @@ sub _reindex_check_ibx ($$$) { $end = $beg + $slice; $end = $max if $end > $max; update_checkpoint $self and - reindex_checkpoint($self, $sync); # release lock + reindex_checkpoint($self); # release lock ($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num}); $usr //= _unref_stale_range($self, $ibx, "xnum < $lo"); my $x3a = $self->{oidx}->dbh->selectall_arrayref( @@ -930,7 +927,7 @@ ibx_id = ? AND xnum >= ? AND xnum <= ? my $k = pack('JH*', $xsmsg->{num}, $xsmsg->{blob}); my $docids = delete($x3m{$k}); if (!defined($docids)) { - reindex_unseen($self, $sync, $ibx, $xsmsg); + reindex_unseen($self, $ibx, $xsmsg); } elsif (!$fast) { for my $num (@$docids) { $self->{oidx}->eidxq_add($num); @@ -969,20 +966,20 @@ BUG: (non-fatal) $ekey #$xnum $smsg->{blob} still matches (old exp: $exp) _unref_stale_range($self, $ibx, "xnum > $hi AND xnum <= $max"); } -sub _reindex_inbox ($$$) { - my ($self, $sync, $ibx) = @_; +sub _reindex_inbox ($$) { + my ($self, $ibx) = @_; my $ekey = $ibx->eidx_key; local $self->{current_info} = $ekey; if (defined(my $err = _ibx_index_reject($ibx))) { warn "W: cannot reindex $ekey ($err)\n"; } else { - _reindex_check_ibx($self, $sync, $ibx); + _reindex_check_ibx($self, $ibx); } delete @$ibx{qw(over mm search git)}; # won't need these for a bit } -sub eidx_reindex { - my ($self, $sync) = @_; +sub eidx_reindex ($) { + my ($self) = @_; return unless $self->{cfg}; # acquire eidxq_lock early because full reindex takes forever @@ -992,16 +989,16 @@ sub eidx_reindex { return; } for my $ibx (@{ibx_sorted($self, 'active')}) { - _reindex_inbox($self, $sync, $ibx); + _reindex_inbox($self, $ibx); last if $self->{quit}; } $self->git->async_wait_all; # ensure eidxq gets filled completely - eidxq_process($self, $sync) unless $self->{quit}; + eidxq_process $self unless $self->{quit}; } -sub sync_inbox { - my ($self, $sync, $ibx) = @_; - my $err = _sync_inbox($self, $sync, $ibx); +sub sync_inbox ($$) { + my ($self, $ibx) = @_; + my $err = _sync_inbox $self, $ibx; delete @$ibx{qw(mm over)}; warn $err, "\n" if defined($err); $err; @@ -1036,8 +1033,8 @@ sub dd_smsg { # git->cat_async callback } } -sub eidx_dedupe ($$$) { - my ($self, $sync, $msgids) = @_; +sub eidx_dedupe ($$) { + my ($self, $msgids) = @_; local $self->{dedupe_cull} = 0; my $candidates = 0; my $nr_mid = 0; @@ -1090,7 +1087,7 @@ EOS if (update_checkpoint $self) { undef $iter; - reindex_checkpoint($self, $sync); + reindex_checkpoint($self); goto dedupe_restart; } } @@ -1103,7 +1100,7 @@ EOS $self->{nrec} = 0; } -sub eidx_sync { # main entry point +sub eidx_sync ($$) { # main entry point my ($self, $opt) = @_; local $self->{current_info} = ''; @@ -1115,12 +1112,6 @@ sub eidx_sync { # main entry point local $self->{-opt} = $opt; local $self->{-regen_fmt} = "%u/?\n"; local $self->{latest_cmt}; - my $sync = { - # DO NOT SET {reindex} here, it's incompatible with reused - # V2Writable code, reindex is totally different here - # compared to v1/v2 inboxes because we have multiple histories - self => $self, - }; local $SIG{USR1} = sub { $self->{need_checkpoint} = 1 }; my $quit = PublicInbox::SearchIdx::quit_cb $self; local $SIG{QUIT} = $quit; @@ -1136,30 +1127,29 @@ sub eidx_sync { # main entry point if (my $msgids = delete($opt->{dedupe})) { local $self->{checkpoint_unlocks} = 1; - eidx_dedupe($self, $sync, $msgids); + eidx_dedupe $self, $msgids; } if (delete($opt->{reindex})) { local $self->{checkpoint_unlocks} = 1; - eidx_reindex($self, $sync); + eidx_reindex $self; } # don't use $_ here, it'll get clobbered by reindex_checkpoint if ($opt->{scan} // 1) { for my $ibx (@{ibx_sorted($self, 'active')}) { last if $self->{quit}; - sync_inbox($self, $sync, $ibx); + sync_inbox $self, $ibx; } } $self->{oidx}->rethread_done($opt) unless $self->{quit}; - eidxq_process($self, $sync) unless $self->{quit}; + eidxq_process $self unless $self->{quit}; - eidxq_release($self); + eidxq_release $self; done($self); - $sync; # for eidx_watch } sub update_last_commit { # overrides V2Writable - my ($self, $sync, $stk) = @_; + my ($self, $stk) = @_; my $unit = $self->{unit} // return; my $latest_cmt = $stk ? $stk->{latest_cmt} : $self->{latest_cmt}; defined($latest_cmt) or return; @@ -1291,10 +1281,10 @@ sub idx_init { # similar to V2Writable sub _watch_commit { # PublicInbox::DS::add_timer callback my ($self) = @_; delete $self->{-commit_timer}; - eidxq_process($self, $self->{-watch_sync}); - eidxq_release($self); + eidxq_process $self; + eidxq_release $self; my $fmt = delete $self->{-regen_fmt}; - reindex_checkpoint($self, $self->{-watch_sync}); + reindex_checkpoint($self); $self->{-regen_fmt} = $fmt; # call event_step => done unless commit_timer is armed @@ -1309,7 +1299,7 @@ sub on_inbox_unlock { # called by PublicInbox::InboxIdle local $0 = "sync $ekey"; $pr->("indexing $ekey\n") if $pr; $self->idx_init($opt); - sync_inbox($self, $self->{-watch_sync}, $ibx); + sync_inbox $self, $ibx; $self->{-commit_timer} //= add_timer($opt->{'commit-interval'} // 10, \&_watch_commit, $self); } @@ -1378,7 +1368,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop my $pr = $opt->{-progress}; $pr->("performing initial scan ...\n") if $pr; local $self->{-opt} = $opt; - my $sync = eidx_sync($self, $opt); # initial sync + eidx_sync $self, $opt; # initial sync return if $self->{quit}; my $oldset = PublicInbox::DS::block_signals(); local $self->{current_info} = ''; @@ -1390,7 +1380,6 @@ sub eidx_watch { # public-inbox-extindex --watch main loop }; my $quit = PublicInbox::SearchIdx::quit_cb $self; $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit; - local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock local @PublicInbox::DS::post_loop_do = (sub { !$self->{quit} }); $pr->("initial scan complete, entering event loop\n") if $pr; # calls InboxIdle->event_step: diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 80ee346d0..3f05d5a65 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -695,11 +695,11 @@ sub atfork_child { die "BUG: unexpected mm" if $self->{mm}; } -sub reindex_checkpoint ($$) { - my ($self, $sync) = @_; +sub reindex_checkpoint ($) { + my ($self) = @_; $self->git->async_wait_all; - $self->update_last_commit($sync); + $self->update_last_commit; $self->{need_checkpoint} = 0; my $mm_tmp = $self->{mm_tmp}; $mm_tmp->atfork_prepare if $mm_tmp; @@ -815,7 +815,7 @@ sub index_oid { # cat_async callback # only update last_commit for $i on reindex iff newer than current sub update_last_commit { - my ($self, $sync, $stk) = @_; + my ($self, $stk) = @_; my $unit = $self->{unit} // return; my $latest_cmt = $stk ? $stk->{latest_cmt} : $self->{latest_cmt}; defined($latest_cmt) or return; @@ -834,7 +834,7 @@ sub update_last_commit { } sub last_commits { - my ($self, $sync) = @_; + my ($self) = @_; my $heads = []; for (my $i = $self->{epoch_max}; $i >= 0; $i--) { $heads->[$i] = last_epoch_commit($self, $i); @@ -844,8 +844,7 @@ sub last_commits { # returns a revision range for git-log(1) sub log_range ($$$) { - my ($sync, $unit, $tip) = @_; - my $self = $sync->{self}; + my ($self, $unit, $tip) = @_; my $opt = $self->{-opt}; my $pr = $opt->{-progress} if (($opt->{verbose} || 0) > 1); my $i = $unit->{epoch}; @@ -863,7 +862,7 @@ sub log_range ($$$) { my $range = "$cur..$tip"; $pr->("$i.git checking contiguity... ") if $pr; my $git = $unit->{git}; - if (is_ancestor($sync->{self}->git, $cur, $tip)) { # common case + if (is_ancestor($self->git, $cur, $tip)) { # common case $pr->("OK\n") if $pr; my $n = $git->qx(qw(rev-list --count), $range); chomp($n); @@ -906,9 +905,9 @@ starting at $range # overridden by ExtSearchIdx sub artnum_max { $_[0]->{mm}->num_highwater } -sub sync_prepare ($$) { - my ($self, $sync) = @_; - $self->{ranges} = sync_ranges($self, $sync); +sub sync_prepare ($) { + my ($self) = @_; + $self->{ranges} = sync_ranges($self); my $pr = $self->{-opt}->{-progress}; my $regen_max = 0; my $head = $self->{ibx}->{ref_head} || 'HEAD'; @@ -933,7 +932,7 @@ sub sync_prepare ($$) { } elsif ($self->{reindex}) { # V2 inbox # reindex stops at the current heads and we later # rerun index_sync without {reindex} - $reindex_heads = $self->last_commits($sync); + $reindex_heads = $self->last_commits; } $self->{max_size} = $self->{-opt}->{max_size} and $self->{index_oid} = $self->can('index_oid'); @@ -951,7 +950,7 @@ sub sync_prepare ($$) { next if $?; # new repo chomp $tip; } - my $range = log_range($sync, $unit, $tip) or next; + my $range = log_range($self, $unit, $tip) or next; # can't use 'rev-list --count' if we use --diff-filter $pr->("$pfx $i.git counting $range ... ") if $pr; # Don't bump num_highwater on --reindex by using {D}. @@ -978,7 +977,7 @@ sub sync_prepare ($$) { for my $oid (@leftovers) { last if $self->{quit}; $oid = unpack('H*', $oid); - my $req = { %$sync, oid => $oid }; + my $req = { self => $self, oid => $oid }; $self->git->cat_async($oid, $unindex_oid, $req); } $self->git->async_wait_all; @@ -1048,8 +1047,8 @@ sub git { $_[0]->{ibx}->git } # this is rare, it only happens when we get discontiguous history in # a mirror because the source used -purge or -edit -sub unindex_todo ($$$) { - my ($self, $sync, $unit) = @_; +sub unindex_todo ($$) { + my ($self, $unit) = @_; my $unindex_range = delete($unit->{unindex_range}) // return; my $unindexed = $self->{unindexed} //= {}; # $oidbin => [$num, $mid0] my $before = scalar keys %$unindexed; @@ -1060,7 +1059,8 @@ sub unindex_todo ($$$) { my $unindex_oid = $self->can('unindex_oid'); while (<$fh>) { /\A:\d{6} 100644 $OID ($OID) [AM]\tm$/o or next; - $self->git->cat_async($1, $unindex_oid, { %$sync, oid => $1 }); + $self->git->cat_async($1, $unindex_oid, + { self => $self, oid => $1 }); } $fh->close or die "git log failed: \$?=$?"; $self->git->async_wait_all; @@ -1074,10 +1074,10 @@ sub unindex_todo ($$$) { --prune=all --quiet))); } -sub sync_ranges ($$) { - my ($self, $sync) = @_; +sub sync_ranges ($) { + my ($self) = @_; my $reindex = $self->{reindex}; - return $self->last_commits($sync) unless $reindex; + return $self->last_commits unless $reindex; return [] if ref($reindex) ne 'HASH'; my $ranges = $reindex->{from}; # arrayref; @@ -1094,8 +1094,8 @@ sub index_xap_only { # git->cat_async callback $idx->index_eml(PublicInbox::Eml->new($bref), $smsg); } -sub index_xap_step ($$$$;$) { - my ($self, $sync, $beg, $end, $step) = @_; +sub index_xap_step ($$$;$) { + my ($self, $beg, $end, $step) = @_; return if $beg > $end; # nothing to do $step //= $self->{shards}; @@ -1114,14 +1114,14 @@ sub index_xap_step ($$$$;$) { my $n = $self->{transact_bytes} += $smsg->{bytes}; if ($n >= $self->{batch_bytes}) { $self->{nrec} = $num; - reindex_checkpoint($self, $sync); + reindex_checkpoint $self; } } } -sub index_todo ($$$) { - my ($self, $sync, $unit) = @_; - unindex_todo($self, $sync, $unit); +sub index_todo ($$) { + my ($self, $unit) = @_; + unindex_todo($self, $unit); my $stk = delete($unit->{stack}) or return; my $all = $self->git; # initialize self->{ibx}->{git} my $index_oid = $self->can('index_oid'); @@ -1140,11 +1140,11 @@ sub index_todo ($$$) { if ($self->{quit}) { warn "waiting to quit...\n"; $all->async_wait_all; - $self->update_last_commit($sync); + $self->update_last_commit; return; } my $req = { - %$sync, + self => $self, autime => $at, cotime => $ct, oid => $oid, @@ -1159,33 +1159,31 @@ sub index_todo ($$$) { } elsif ($f eq 'd') { $all->cat_async($oid, $unindex_oid, $req); } - reindex_checkpoint($self, $sync) if $self->{need_checkpoint}; + reindex_checkpoint $self if $self->{need_checkpoint}; } $all->async_wait_all; - $self->update_last_commit($sync, $stk); + $self->update_last_commit($stk); } -sub xapian_only ($;$$) { - my ($self, $sync, $art_beg) = @_; +sub xapian_only ($;$) { + my ($self, $art_beg) = @_; my $seq = $self->{-opt}->{'sequential-shard'}; $art_beg //= 0; local $self->{parallel} = 0 if $seq; $self->idx_init($self->{-opt}); # acquire lock if (my $art_end = $self->{ibx}->mm->max) { $self->{-regen_fmt} //= "%u/?\n"; - $sync //= { self => $self }; if ($seq || !$self->{parallel}) { my $shard_end = $self->{shards} - 1; for my $i (0..$shard_end) { last if $self->{quit}; - index_xap_step $self, $sync, $art_beg + $i, - $art_end; + index_xap_step $self, $art_beg + $i, $art_end; if ($i != $shard_end) { - reindex_checkpoint($self, $sync); + reindex_checkpoint $self; } } } else { # parallel (maybe) - index_xap_step $self, $sync, $art_beg, $art_end, 1; + index_xap_step $self, $art_beg, $art_end, 1; } } $self->git->async_wait_all; @@ -1193,11 +1191,11 @@ sub xapian_only ($;$$) { $self->done; } -sub process_todo ($$) { - my ($self, $sync) = @_; +sub process_todo ($) { + my ($self) = @_; for my $unit (@{delete($self->{todo}) // []}) { last if $self->{quit}; - $self->index_todo($sync, $unit); # may be ExtSearchIdx + $self->index_todo($unit); # may be ExtSearchIdx } } @@ -1237,13 +1235,12 @@ sub index_sync { local $self->{todo}; # sync_prepare local $self->{ranges}; local $self->{unindexed}; - my $sync = { self => $self }; my $quit = PublicInbox::SearchIdx::quit_cb $self; local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; local $SIG{TERM} = $quit; - if (sync_prepare($self, $sync)) { + if (sync_prepare($self)) { # tmp_clone seems to fail if inside a transaction, so # we rollback here (because we opened {mm} for reading) # Note: we do NOT rely on DBI transactions for atomicity; @@ -1260,7 +1257,7 @@ sub index_sync { } } # work forwards through history - process_todo $self, $sync; + process_todo $self; $self->{oidx}->rethread_done($opt) unless $self->{quit}; $self->done; @@ -1276,7 +1273,7 @@ sub index_sync { $quit_warn = 1; } else { $self->{ibx}->{indexlevel} = $idxlevel; - xapian_only($self, $sync, $art_beg); + xapian_only $self, $art_beg; $quit_warn = 1 if $self->{quit}; } } @@ -1298,7 +1295,6 @@ sub index_sync { if ($opt->{reindex} && !$self->{quit} && !grep(defined, @$opt{qw(since until)})) { my %again = %$opt; - $sync = undef; delete @again{qw(rethread reindex -skip_lock)}; index_sync($self, \%again); }