From: Eric Wong Date: Fri, 10 Jan 2025 23:18:15 +0000 (+0000) Subject: (ext)index: move {quit} from $sync to $self X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2bd64cd544fb63998b8ad6f5ce1a7aa9c8545951;p=thirdparty%2Fpublic-inbox.git (ext)index: move {quit} from $sync to $self No need to localize it, either, since we don't expect to use the $self instance after {quit} is set. --- diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 441bb7b0d..e63d917f1 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -168,7 +168,7 @@ sub remove_doc ($$) { } sub _unref_doc ($$$$$;$) { - my ($sync, $docid, $ibx, $xnum, $oidbin, $eml) = @_; + my ($self, $docid, $ibx, $xnum, $oidbin, $eml) = @_; my $smsg; if (ref($docid)) { $smsg = $docid; @@ -185,34 +185,34 @@ sub _unref_doc ($$$$$;$) { my $s = 'DELETE FROM xref3 WHERE oidbin = ?'; $s .= ' AND ibx_id = ?' if defined($ibx); $s .= ' AND xnum = ?' if defined($xnum); - my $del = $sync->{self}->{oidx}->dbh->prepare_cached($s); + my $del = $self->{oidx}->dbh->prepare_cached($s); my $col = 0; $del->bind_param(++$col, $oidbin, SQL_BLOB); $del->bind_param(++$col, $ibx->{-ibx_id}) if $ibx; $del->bind_param(++$col, $xnum) if defined($xnum); $del->execute; - my $xr3 = $sync->{self}->{oidx}->get_xref3($docid); + my $xr3 = $self->{oidx}->get_xref3($docid); if (scalar(@$xr3) == 0) { # all gone - remove_doc($sync->{self}, $docid); + remove_doc($self, $docid); } else { # enqueue for reindex of remaining messages if ($ibx) { my $ekey = $ibx->{-gc_eidx_key} // $ibx->eidx_key; - my $idx = $sync->{self}->idx_shard($docid); + my $idx = $self->idx_shard($docid); $idx->ipc_do('remove_eidx_info', $docid, $ekey, $eml); } # else: we can't remove_eidx_info in reindex-only path # replace invalidated blob ASAP with something which should be # readable since we may commit the transaction on checkpoint. # eidxq processing will re-apply boost - $smsg //= $sync->{self}->{oidx}->get_art($docid); + $smsg //= $self->{oidx}->get_art($docid); my $hex = unpack('H*', $oidbin); if ($smsg && $smsg->{blob} eq $hex) { $xr3->[0] =~ /:([a-f0-9]{40,}+)\z/ or die "BUG: xref $xr3->[0] has no OID"; - $sync->{self}->{oidx}->update_blob($smsg, $1); + $self->{oidx}->update_blob($smsg, $1); } # yes, add, we'll need to re-apply boost - $sync->{self}->{oidx}->eidxq_add($docid); + $self->{oidx}->eidxq_add($docid); } @$xr3 } @@ -234,7 +234,7 @@ sub do_xpost ($$) { } else { # 'd' no {xnum} $self->git->async_wait_all; $oid = pack('H*', $oid); - _unref_doc($req, $docid, $xibx, undef, $oid, $eml); + _unref_doc $self, $docid, $xibx, undef, $oid, $eml; } } @@ -302,7 +302,7 @@ sub _blob_missing ($$) { # called when a known $smsg->{blob} is gone # xnum and ibx are unknown, we only call this when an entry from # /ei*/over.sqlite3 is bad, not on entries from xap*/over.sqlite3 $req->{self}->git->async_wait_all; - _unref_doc($req, $smsg, undef, undef, $smsg->oidbin); + _unref_doc $req->{self}, $smsg, undef, undef, $smsg->oidbin; } sub ck_existing { # git->cat_async callback @@ -415,10 +415,10 @@ sub _sync_inbox ($$$) { return "E: $ekey unsupported inbox version (v$v)"; } for my $unit (@{delete($sync->{todo}) // []}) { - last if $sync->{quit}; + last if $self->{quit}; index_todo($self, $sync, $unit); } - $self->{midx}->index_ibx($ibx) unless $sync->{quit}; + $self->{midx}->index_ibx($ibx) unless $self->{quit}; $ibx->git->cleanup; # done with this inbox, now undef; } @@ -441,7 +441,7 @@ EOM $x3_doc->execute($ibx_id); my $ibx = { -ibx_id => $ibx_id, -gc_eidx_key => $eidx_key }; while (my ($docid, $xnum, $oid) = $x3_doc->fetchrow_array) { - my $r = _unref_doc($sync, $docid, $ibx, $xnum, $oid); + my $r = _unref_doc $self, $docid, $ibx, $xnum, $oid; $oid = unpack('H*', $oid); $r = $r ? 'unref' : 'remove'; warn "# $r #$docid $eidx_key $oid\n"; @@ -602,7 +602,7 @@ sub _reindex_finalize ($$$) { for my $x (reverse @$ary) { warn "removing #$docid xref3 $x->{blob}\n"; my $bin = $x->oidbin; - my $n = _unref_doc($sync, $docid, undef, undef, $bin); + my $n = _unref_doc $self, $docid, undef, undef, $bin; die "BUG: $x->{blob} invalidated #$docid" if $n == 0; } my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty"; @@ -634,7 +634,7 @@ sub _reindex_oid { # git->cat_async callback my $docid = $orig_smsg->{num}; if (is_bad_blob($oid, $type, $size, $expect_oid)) { my $oidbin = pack('H*', $expect_oid); - my $remain = _unref_doc($sync, $docid, undef, undef, $oidbin); + my $remain = _unref_doc $self, $docid, undef, undef, $oidbin; if ($remain == 0) { warn "W: #$docid ($oid) gone or corrupt\n"; } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) { @@ -809,7 +809,7 @@ restart: $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC'); $iter->execute; while (defined(my $docid = $iter->fetchrow_array)) { - last if $sync->{quit}; + last if $self->{quit}; if (my $smsg = $self->{oidx}->get_art($docid)) { _reindex_smsg($self, $sync, $smsg); } else { @@ -860,21 +860,21 @@ sub reindex_unseen ($$$$) { } sub _unref_stale_range ($$$) { - my ($sync, $ibx, $lt_or_gt) = @_; + my ($self, $ibx, $lt_or_gt) = @_; my $r; my $lim = 10000; do { - $r = $sync->{self}->{oidx}->dbh->selectall_arrayref( + $r = $self->{oidx}->dbh->selectall_arrayref( <{-ibx_id}); SELECT docid,xnum,oidbin FROM xref3 WHERE ibx_id = ? AND $lt_or_gt LIMIT $lim EOS - return if $sync->{quit}; + return if $self->{quit}; for (@$r) { # hopefully rare, not worth optimizing: my ($docid, $xnum, $oidbin) = @$_; my $hex = unpack('H*', $oidbin); warn("# $xnum:$hex (#$docid): stale\n"); - _unref_doc($sync, $docid, $ibx, $xnum, $oidbin); + _unref_doc $self, $docid, $ibx, $xnum, $oidbin; } } while (scalar(@$r) == $lim); 1; @@ -892,7 +892,7 @@ sub _reindex_check_ibx ($$$) { $max0 = $ibx->mm->num_highwater; sync_inbox($self, $sync, $ibx) and return; # warned $max = $ibx->mm->num_highwater; - return if $sync->{quit}; + return if $self->{quit}; } while ($max > $max0 && warn("# $ekey moved $max0..$max, resyncing..\n")); $end = $max if $end > $max; @@ -913,7 +913,7 @@ sub _reindex_check_ibx ($$$) { update_checkpoint $self and reindex_checkpoint($self, $sync); # release lock ($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num}); - $usr //= _unref_stale_range($sync, $ibx, "xnum < $lo"); + $usr //= _unref_stale_range($self, $ibx, "xnum < $lo"); my $x3a = $self->{oidx}->dbh->selectall_arrayref( <<"", undef, $ibx_id, $lo, $hi); SELECT xnum,oidbin,docid FROM xref3 WHERE @@ -935,7 +935,7 @@ ibx_id = ? AND xnum >= ? AND xnum <= ? $self->{oidx}->eidxq_add($num); } } - return if $sync->{quit}; + return if $self->{quit}; } next unless scalar keys %x3m; $self->git->async_wait_all; # wait for reindex_unseen @@ -959,13 +959,13 @@ BUG: (non-fatal) $ekey #$xnum $smsg->{blob} still matches (old exp: $exp) my $m = defined($exp) ? "mismatch (!= $exp)" : 'stale'; warn("# $xnum:$hex (#@$docids): $m\n"); for my $i (@$docids) { - _unref_doc($sync, $i, $ibx, $xnum, $bin); + _unref_doc $self, $i, $ibx, $xnum, $bin; } - return if $sync->{quit}; + return if $self->{quit}; } } defined($hi) and ($hi < $max) and - _unref_stale_range($sync, $ibx, "xnum > $hi AND xnum <= $max"); + _unref_stale_range($self, $ibx, "xnum > $hi AND xnum <= $max"); } sub _reindex_inbox ($$$) { @@ -992,10 +992,10 @@ sub eidx_reindex { } for my $ibx (@{ibx_sorted($self, 'active')}) { _reindex_inbox($self, $sync, $ibx); - last if $sync->{quit}; + last if $self->{quit}; } $self->git->async_wait_all; # ensure eidxq gets filled completely - eidxq_process($self, $sync) unless $sync->{quit}; + eidxq_process($self, $sync) unless $self->{quit}; } sub sync_inbox { @@ -1066,7 +1066,7 @@ EOS $iter->execute($cur_mid, $min_id); } while (my ($mid, $id) = $iter->fetchrow_array) { - last if $sync->{quit}; + last if $self->{quit}; $self->{current_info} = "dedupe $mid"; $self->{nrec} = $min_id = $id; my ($prv, @smsg); @@ -1128,7 +1128,7 @@ sub eidx_sync { # main entry point self => $self, }; local $SIG{USR1} = sub { $self->{need_checkpoint} = 1 }; - my $quit = PublicInbox::SearchIdx::quit_cb($sync); + my $quit = PublicInbox::SearchIdx::quit_cb $self; local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; local $SIG{TERM} = $quit; @@ -1153,12 +1153,12 @@ sub eidx_sync { # main entry point # don't use $_ here, it'll get clobbered by reindex_checkpoint if ($opt->{scan} // 1) { for my $ibx (@{ibx_sorted($self, 'active')}) { - last if $sync->{quit}; + last if $self->{quit}; sync_inbox($self, $sync, $ibx); } } - $self->{oidx}->rethread_done($opt) unless $sync->{quit}; - eidxq_process($self, $sync) unless $sync->{quit}; + $self->{oidx}->rethread_done($opt) unless $self->{quit}; + eidxq_process($self, $sync) unless $self->{quit}; eidxq_release($self); done($self); @@ -1385,7 +1385,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop $pr->("performing initial scan ...\n") if $pr; local $self->{-opt} = $opt; my $sync = eidx_sync($self, $opt); # initial sync - return if $sync->{quit}; + return if $self->{quit}; my $oldset = PublicInbox::DS::block_signals(); local $self->{current_info} = ''; local $SIG{__WARN__} = PublicInbox::Admin::warn_cb $self; @@ -1394,10 +1394,10 @@ sub eidx_watch { # public-inbox-extindex --watch main loop USR1 => sub { eidx_resync_start($self) }, TSTP => sub { kill('STOP', $$) }, }; - my $quit = PublicInbox::SearchIdx::quit_cb($sync); + my $quit = PublicInbox::SearchIdx::quit_cb $self; $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit; local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock - local @PublicInbox::DS::post_loop_do = (sub { !$sync->{quit} }); + local @PublicInbox::DS::post_loop_do = (sub { !$self->{quit} }); $pr->("initial scan complete, entering event loop\n") if $pr; # calls InboxIdle->event_step: PublicInbox::DS::event_loop($sig, $oldset); diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 77416e61a..dbbd4323a 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -839,12 +839,11 @@ sub index_sync { my ($self, $opt) = @_; delete $self->{lock_path} if $opt->{-skip_lock}; $self->with_umask(\&_index_sync, $self, $opt); - if ($opt->{reindex} && !$opt->{quit} && + if ($opt->{reindex} && !$self->{quit} && !grep(defined, @$opt{qw(since until)})) { my %again = %$opt; delete @again{qw(rethread reindex)}; index_sync($self, \%again); - $opt->{quit} = $again{quit}; # propagate to caller } } @@ -897,7 +896,7 @@ sub v1_checkpoint ($$;$) { if (my $pr = $self->{-opt}->{-progress}) { $pr->("indexed $nrec/$sync->{ntodo}\n") if $nrec; } - if (!$stk && !$sync->{quit}) { # more to come + if (!$stk && !$self->{quit}) { # more to come begin_txn_lazy($self); $self->{mm}->{dbh}->begin_work; } @@ -917,7 +916,7 @@ sub v1_process_stack ($$$) { if (my @leftovers = keys %{delete($sync->{D}) // {}}) { warn('W: unindexing '.scalar(@leftovers)." leftovers\n"); for my $oid (@leftovers) { - last if $sync->{quit}; + last if $self->{quit}; $oid = unpack('H*', $oid); $git->cat_async($oid, \&v1_unindex_both, $sync); } @@ -927,7 +926,7 @@ sub v1_process_stack ($$$) { } while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) { my $arg = { %$sync, cur_cmt => $cur_cmt, oid => $oid }; - last if $sync->{quit}; + last if $self->{quit}; if ($f eq 'm') { $arg->{autime} = $at; $arg->{cotime} = $ct; @@ -941,7 +940,7 @@ sub v1_process_stack ($$$) { } v1_checkpoint $self, $sync if $self->{need_checkpoint}; } - v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk); + v1_checkpoint($self, $sync, $self->{quit} ? undef : $stk); } sub log2stack ($$$$) { @@ -969,7 +968,7 @@ sub log2stack ($$$$) { my $fh = $git->popen(@cmd, $range); my ($at, $ct, $stk, $cmt, $l); while (defined($l = <$fh>)) { - return if $sync->{quit}; + return if $self->{quit}; if ($l =~ /\A([0-9]+)-([0-9]+)-($OID)$/o) { ($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3); $stk //= PublicInbox::IdxStack->new($cmt); @@ -1061,12 +1060,12 @@ sub v1_reindex_from ($$) { } sub quit_cb ($) { - my ($sync) = @_; + my ($self) = @_; sub { - # we set {-opt}->{quit} too, so ->index_sync callers - # can abort multi-inbox loops this way - $sync->{quit} = $sync->{self}->{-opt}->{quit} = 1; - warn "gracefully quitting\n"; + # we set {-opt}->{quit} for public-inbox-index so + # can abort multi-inbox loops this way (for now...) + $self->{quit} = $self->{-opt}->{quit} = 1; + warn "# gracefully quitting\n"; } } @@ -1086,7 +1085,7 @@ sub _index_sync { my $pr = $opt->{-progress}; local $self->{-opt} = $opt; my $sync = { reindex => $opt->{reindex}, ibx => $ibx }; - my $quit = quit_cb($sync); + my $quit = quit_cb $self; local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; local $SIG{TERM} = $quit; @@ -1110,7 +1109,7 @@ sub _index_sync { my $stk = prepare_stack($self, $sync, $range); $sync->{ntodo} = $stk ? $stk->num_records : 0; $pr->("$sync->{ntodo}\n") if $pr; # continue previous line - v1_process_stack($self, $sync, $stk) if !$sync->{quit}; + v1_process_stack($self, $sync, $stk) if !$self->{quit}; } sub DESTROY { diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 27be8c39e..886f59b11 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -961,14 +961,14 @@ sub sync_prepare ($$) { # messages to show up in mirrors, too. $sync->{D} //= $sync->{reindex} ? {} : undef; # OID_BIN => NR my $stk = log2stack($self, $sync, $git, $range); - return 0 if $sync->{quit}; + return 0 if $self->{quit}; my $nr = $stk ? $stk->num_records : 0; $pr->("$nr\n") if $pr; $unit->{stack} = $stk; # may be undef unshift @{$sync->{todo}}, $unit; $regen_max += $nr; } - return 0 if $sync->{quit}; + return 0 if $self->{quit}; # XXX this should not happen unless somebody bypasses checks in # our code and blindly injects "d" file history into git repos @@ -977,14 +977,14 @@ sub sync_prepare ($$) { local $self->{current_info} = 'leftover '; my $unindex_oid = $self->can('unindex_oid'); for my $oid (@leftovers) { - last if $sync->{quit}; + last if $self->{quit}; $oid = unpack('H*', $oid); my $req = { %$sync, oid => $oid }; $self->git->cat_async($oid, $unindex_oid, $req); } $self->git->async_wait_all; } - return 0 if $sync->{quit}; + return 0 if $self->{quit}; if (!$regen_max) { $self->{-regen_fmt} = "%u/?\n"; return 0; @@ -1107,7 +1107,7 @@ sub index_xap_step ($$$;$) { "$beg..$end (% $step)\n"); } for (my $num = $beg; $num <= $end; $num += $step) { - last if $sync->{quit}; + last if $self->{quit}; my $smsg = $ibx->over->get_art($num) or next; $smsg->{self} = $self; $ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg); @@ -1123,7 +1123,7 @@ sub index_xap_step ($$$;$) { sub index_todo ($$$) { my ($self, $sync, $unit) = @_; - return if $sync->{quit}; + return if $self->{quit}; unindex_todo($self, $sync, $unit); my $stk = delete($unit->{stack}) or return; my $all = $self->git; # initialize self->{ibx}->{git} @@ -1140,7 +1140,7 @@ sub index_todo ($$$) { local $self->{latest_cmt}; local $sync->{unit} = $unit; while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) { - if ($sync->{quit}) { + if ($self->{quit}) { warn "waiting to quit...\n"; $all->async_wait_all; $self->update_last_commit($sync); @@ -1181,7 +1181,7 @@ sub xapian_only ($;$$) { if ($seq || !$self->{parallel}) { my $shard_end = $self->{shards} - 1; for my $i (0..$shard_end) { - last if $sync->{quit}; + last if $self->{quit}; index_xap_step($self, $sync, $art_beg + $i); if ($i != $shard_end) { reindex_checkpoint($self, $sync); @@ -1233,7 +1233,7 @@ sub index_sync { ibx => $self->{ibx}, epoch_max => $epoch_max, }; - my $quit = PublicInbox::SearchIdx::quit_cb($sync); + my $quit = PublicInbox::SearchIdx::quit_cb $self; local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; local $SIG{TERM} = $quit; @@ -1256,7 +1256,7 @@ sub index_sync { } # work forwards through history index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []}; - $self->{oidx}->rethread_done($opt) unless $sync->{quit}; + $self->{oidx}->rethread_done($opt) unless $self->{quit}; $self->done; if (my $nrec = $self->{nrec}) { @@ -1267,17 +1267,17 @@ sub index_sync { my $quit_warn; # deal with Xapian shards sequentially if ($seq && delete($sync->{mm_tmp})) { - if ($sync->{quit}) { + if ($self->{quit}) { $quit_warn = 1; } else { $self->{ibx}->{indexlevel} = $idxlevel; xapian_only($self, $sync, $art_beg); - $quit_warn = 1 if $sync->{quit}; + $quit_warn = 1 if $self->{quit}; } } # --reindex on the command-line - if (!$sync->{quit} && $opt->{reindex} && + if (!$self->{quit} && $opt->{reindex} && !ref($opt->{reindex}) && $idxlevel ne 'basic') { $self->lock_acquire; my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0); @@ -1290,13 +1290,12 @@ sub index_sync { } # reindex does not pick up new changes, so we rerun w/o it: - if ($opt->{reindex} && !$sync->{quit} && + if ($opt->{reindex} && !$self->{quit} && !grep(defined, @$opt{qw(since until)})) { my %again = %$opt; $sync = undef; delete @again{qw(rethread reindex -skip_lock)}; index_sync($self, \%again); - $opt->{quit} = $again{quit}; # propagate to caller } warn <