]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
(ext)index: move {quit} from $sync to $self
authorEric Wong <e@80x24.org>
Fri, 10 Jan 2025 23:18:15 +0000 (23:18 +0000)
committerEric Wong <e@80x24.org>
Tue, 14 Jan 2025 04:45:31 +0000 (04:45 +0000)
No need to localize it, either, since we don't expect to
use the $self instance after {quit} is set.

lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/V2Writable.pm

index 441bb7b0da5e22a44c8de0580e5b389b61639e8d..e63d917f1490660dff73a48985b58e1e9a56260c 100644 (file)
@@ -168,7 +168,7 @@ sub remove_doc ($$) {
 }
 
 sub _unref_doc ($$$$$;$) {
-       my ($sync, $docid, $ibx, $xnum, $oidbin, $eml) = @_;
+       my ($self, $docid, $ibx, $xnum, $oidbin, $eml) = @_;
        my $smsg;
        if (ref($docid)) {
                $smsg = $docid;
@@ -185,34 +185,34 @@ sub _unref_doc ($$$$$;$) {
        my $s = 'DELETE FROM xref3 WHERE oidbin = ?';
        $s .= ' AND ibx_id = ?' if defined($ibx);
        $s .= ' AND xnum = ?' if defined($xnum);
-       my $del = $sync->{self}->{oidx}->dbh->prepare_cached($s);
+       my $del = $self->{oidx}->dbh->prepare_cached($s);
        my $col = 0;
        $del->bind_param(++$col, $oidbin, SQL_BLOB);
        $del->bind_param(++$col, $ibx->{-ibx_id}) if $ibx;
        $del->bind_param(++$col, $xnum) if defined($xnum);
        $del->execute;
-       my $xr3 = $sync->{self}->{oidx}->get_xref3($docid);
+       my $xr3 = $self->{oidx}->get_xref3($docid);
        if (scalar(@$xr3) == 0) { # all gone
-               remove_doc($sync->{self}, $docid);
+               remove_doc($self, $docid);
        } else { # enqueue for reindex of remaining messages
                if ($ibx) {
                        my $ekey = $ibx->{-gc_eidx_key} // $ibx->eidx_key;
-                       my $idx = $sync->{self}->idx_shard($docid);
+                       my $idx = $self->idx_shard($docid);
                        $idx->ipc_do('remove_eidx_info', $docid, $ekey, $eml);
                } # else: we can't remove_eidx_info in reindex-only path
 
                # replace invalidated blob ASAP with something which should be
                # readable since we may commit the transaction on checkpoint.
                # eidxq processing will re-apply boost
-               $smsg //= $sync->{self}->{oidx}->get_art($docid);
+               $smsg //= $self->{oidx}->get_art($docid);
                my $hex = unpack('H*', $oidbin);
                if ($smsg && $smsg->{blob} eq $hex) {
                        $xr3->[0] =~ /:([a-f0-9]{40,}+)\z/ or
                                die "BUG: xref $xr3->[0] has no OID";
-                       $sync->{self}->{oidx}->update_blob($smsg, $1);
+                       $self->{oidx}->update_blob($smsg, $1);
                }
                # yes, add, we'll need to re-apply boost
-               $sync->{self}->{oidx}->eidxq_add($docid);
+               $self->{oidx}->eidxq_add($docid);
        }
        @$xr3
 }
@@ -234,7 +234,7 @@ sub do_xpost ($$) {
        } else { # 'd' no {xnum}
                $self->git->async_wait_all;
                $oid = pack('H*', $oid);
-               _unref_doc($req, $docid, $xibx, undef, $oid, $eml);
+               _unref_doc $self, $docid, $xibx, undef, $oid, $eml;
        }
 }
 
@@ -302,7 +302,7 @@ sub _blob_missing ($$) { # called when a known $smsg->{blob} is gone
        # xnum and ibx are unknown, we only call this when an entry from
        # /ei*/over.sqlite3 is bad, not on entries from xap*/over.sqlite3
        $req->{self}->git->async_wait_all;
-       _unref_doc($req, $smsg, undef, undef, $smsg->oidbin);
+       _unref_doc $req->{self}, $smsg, undef, undef, $smsg->oidbin;
 }
 
 sub ck_existing { # git->cat_async callback
@@ -415,10 +415,10 @@ sub _sync_inbox ($$$) {
                return "E: $ekey unsupported inbox version (v$v)";
        }
        for my $unit (@{delete($sync->{todo}) // []}) {
-               last if $sync->{quit};
+               last if $self->{quit};
                index_todo($self, $sync, $unit);
        }
-       $self->{midx}->index_ibx($ibx) unless $sync->{quit};
+       $self->{midx}->index_ibx($ibx) unless $self->{quit};
        $ibx->git->cleanup; # done with this inbox, now
        undef;
 }
@@ -441,7 +441,7 @@ EOM
                $x3_doc->execute($ibx_id);
                my $ibx = { -ibx_id => $ibx_id, -gc_eidx_key => $eidx_key };
                while (my ($docid, $xnum, $oid) = $x3_doc->fetchrow_array) {
-                       my $r = _unref_doc($sync, $docid, $ibx, $xnum, $oid);
+                       my $r = _unref_doc $self, $docid, $ibx, $xnum, $oid;
                        $oid = unpack('H*', $oid);
                        $r = $r ? 'unref' : 'remove';
                        warn "# $r #$docid $eidx_key $oid\n";
@@ -602,7 +602,7 @@ sub _reindex_finalize ($$$) {
                for my $x (reverse @$ary) {
                        warn "removing #$docid xref3 $x->{blob}\n";
                        my $bin = $x->oidbin;
-                       my $n = _unref_doc($sync, $docid, undef, undef, $bin);
+                       my $n = _unref_doc $self, $docid, undef, undef, $bin;
                        die "BUG: $x->{blob} invalidated #$docid" if $n == 0;
                }
                my $x = pop(@$ary) // die "BUG: #$docid {by_chash} empty";
@@ -634,7 +634,7 @@ sub _reindex_oid { # git->cat_async callback
        my $docid = $orig_smsg->{num};
        if (is_bad_blob($oid, $type, $size, $expect_oid)) {
                my $oidbin = pack('H*', $expect_oid);
-               my $remain = _unref_doc($sync, $docid, undef, undef, $oidbin);
+               my $remain = _unref_doc $self, $docid, undef, undef, $oidbin;
                if ($remain == 0) {
                        warn "W: #$docid ($oid) gone or corrupt\n";
                } elsif (my $next_oid = $req->{xr3r}->[++$req->{ix}]->[2]) {
@@ -809,7 +809,7 @@ restart:
        $iter = $dbh->prepare('SELECT docid FROM eidxq ORDER BY docid ASC');
        $iter->execute;
        while (defined(my $docid = $iter->fetchrow_array)) {
-               last if $sync->{quit};
+               last if $self->{quit};
                if (my $smsg = $self->{oidx}->get_art($docid)) {
                        _reindex_smsg($self, $sync, $smsg);
                } else {
@@ -860,21 +860,21 @@ sub reindex_unseen ($$$$) {
 }
 
 sub _unref_stale_range ($$$) {
-       my ($sync, $ibx, $lt_or_gt) = @_;
+       my ($self, $ibx, $lt_or_gt) = @_;
        my $r;
        my $lim = 10000;
        do {
-               $r = $sync->{self}->{oidx}->dbh->selectall_arrayref(
+               $r = $self->{oidx}->dbh->selectall_arrayref(
                        <<EOS, undef, $ibx->{-ibx_id});
 SELECT docid,xnum,oidbin FROM xref3
 WHERE ibx_id = ? AND $lt_or_gt LIMIT $lim
 EOS
-               return if $sync->{quit};
+               return if $self->{quit};
                for (@$r) { # hopefully rare, not worth optimizing:
                        my ($docid, $xnum, $oidbin) = @$_;
                        my $hex = unpack('H*', $oidbin);
                        warn("# $xnum:$hex (#$docid): stale\n");
-                       _unref_doc($sync, $docid, $ibx, $xnum, $oidbin);
+                       _unref_doc $self, $docid, $ibx, $xnum, $oidbin;
                }
        } while (scalar(@$r) == $lim);
        1;
@@ -892,7 +892,7 @@ sub _reindex_check_ibx ($$$) {
                $max0 = $ibx->mm->num_highwater;
                sync_inbox($self, $sync, $ibx) and return; # warned
                $max = $ibx->mm->num_highwater;
-               return if $sync->{quit};
+               return if $self->{quit};
        } while ($max > $max0 &&
                warn("# $ekey moved $max0..$max, resyncing..\n"));
        $end = $max if $end > $max;
@@ -913,7 +913,7 @@ sub _reindex_check_ibx ($$$) {
                update_checkpoint $self and
                        reindex_checkpoint($self, $sync); # release lock
                ($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num});
-               $usr //= _unref_stale_range($sync, $ibx, "xnum < $lo");
+               $usr //= _unref_stale_range($self, $ibx, "xnum < $lo");
                my $x3a = $self->{oidx}->dbh->selectall_arrayref(
                        <<"", undef, $ibx_id, $lo, $hi);
 SELECT xnum,oidbin,docid FROM xref3 WHERE
@@ -935,7 +935,7 @@ ibx_id = ? AND xnum >= ? AND xnum <= ?
                                        $self->{oidx}->eidxq_add($num);
                                }
                        }
-                       return if $sync->{quit};
+                       return if $self->{quit};
                }
                next unless scalar keys %x3m;
                $self->git->async_wait_all; # wait for reindex_unseen
@@ -959,13 +959,13 @@ BUG: (non-fatal) $ekey #$xnum $smsg->{blob} still matches (old exp: $exp)
                        my $m = defined($exp) ? "mismatch (!= $exp)" : 'stale';
                        warn("# $xnum:$hex (#@$docids): $m\n");
                        for my $i (@$docids) {
-                               _unref_doc($sync, $i, $ibx, $xnum, $bin);
+                               _unref_doc $self, $i, $ibx, $xnum, $bin;
                        }
-                       return if $sync->{quit};
+                       return if $self->{quit};
                }
        }
        defined($hi) and ($hi < $max) and
-               _unref_stale_range($sync, $ibx, "xnum > $hi AND xnum <= $max");
+               _unref_stale_range($self, $ibx, "xnum > $hi AND xnum <= $max");
 }
 
 sub _reindex_inbox ($$$) {
@@ -992,10 +992,10 @@ sub eidx_reindex {
        }
        for my $ibx (@{ibx_sorted($self, 'active')}) {
                _reindex_inbox($self, $sync, $ibx);
-               last if $sync->{quit};
+               last if $self->{quit};
        }
        $self->git->async_wait_all; # ensure eidxq gets filled completely
-       eidxq_process($self, $sync) unless $sync->{quit};
+       eidxq_process($self, $sync) unless $self->{quit};
 }
 
 sub sync_inbox {
@@ -1066,7 +1066,7 @@ EOS
                $iter->execute($cur_mid, $min_id);
        }
        while (my ($mid, $id) = $iter->fetchrow_array) {
-               last if $sync->{quit};
+               last if $self->{quit};
                $self->{current_info} = "dedupe $mid";
                $self->{nrec} = $min_id = $id;
                my ($prv, @smsg);
@@ -1128,7 +1128,7 @@ sub eidx_sync { # main entry point
                self => $self,
        };
        local $SIG{USR1} = sub { $self->{need_checkpoint} = 1 };
-       my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+       my $quit = PublicInbox::SearchIdx::quit_cb $self;
        local $SIG{QUIT} = $quit;
        local $SIG{INT} = $quit;
        local $SIG{TERM} = $quit;
@@ -1153,12 +1153,12 @@ sub eidx_sync { # main entry point
        # don't use $_ here, it'll get clobbered by reindex_checkpoint
        if ($opt->{scan} // 1) {
                for my $ibx (@{ibx_sorted($self, 'active')}) {
-                       last if $sync->{quit};
+                       last if $self->{quit};
                        sync_inbox($self, $sync, $ibx);
                }
        }
-       $self->{oidx}->rethread_done($opt) unless $sync->{quit};
-       eidxq_process($self, $sync) unless $sync->{quit};
+       $self->{oidx}->rethread_done($opt) unless $self->{quit};
+       eidxq_process($self, $sync) unless $self->{quit};
 
        eidxq_release($self);
        done($self);
@@ -1385,7 +1385,7 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
        $pr->("performing initial scan ...\n") if $pr;
        local $self->{-opt} = $opt;
        my $sync = eidx_sync($self, $opt); # initial sync
-       return if $sync->{quit};
+       return if $self->{quit};
        my $oldset = PublicInbox::DS::block_signals();
        local $self->{current_info} = '';
        local $SIG{__WARN__} = PublicInbox::Admin::warn_cb $self;
@@ -1394,10 +1394,10 @@ sub eidx_watch { # public-inbox-extindex --watch main loop
                USR1 => sub { eidx_resync_start($self) },
                TSTP => sub { kill('STOP', $$) },
        };
-       my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+       my $quit = PublicInbox::SearchIdx::quit_cb $self;
        $sig->{QUIT} = $sig->{INT} = $sig->{TERM} = $quit;
        local $self->{-watch_sync} = $sync; # for ->on_inbox_unlock
-       local @PublicInbox::DS::post_loop_do = (sub { !$sync->{quit} });
+       local @PublicInbox::DS::post_loop_do = (sub { !$self->{quit} });
        $pr->("initial scan complete, entering event loop\n") if $pr;
        # calls InboxIdle->event_step:
        PublicInbox::DS::event_loop($sig, $oldset);
index 77416e61a22e3ed19e8f86516fa1b31f29dc708f..dbbd4323a799d6459d8e915557eb7fc97e209dfa 100644 (file)
@@ -839,12 +839,11 @@ sub index_sync {
        my ($self, $opt) = @_;
        delete $self->{lock_path} if $opt->{-skip_lock};
        $self->with_umask(\&_index_sync, $self, $opt);
-       if ($opt->{reindex} && !$opt->{quit} &&
+       if ($opt->{reindex} && !$self->{quit} &&
                        !grep(defined, @$opt{qw(since until)})) {
                my %again = %$opt;
                delete @again{qw(rethread reindex)};
                index_sync($self, \%again);
-               $opt->{quit} = $again{quit}; # propagate to caller
        }
 }
 
@@ -897,7 +896,7 @@ sub v1_checkpoint ($$;$) {
        if (my $pr = $self->{-opt}->{-progress}) {
                $pr->("indexed $nrec/$sync->{ntodo}\n") if $nrec;
        }
-       if (!$stk && !$sync->{quit}) { # more to come
+       if (!$stk && !$self->{quit}) { # more to come
                begin_txn_lazy($self);
                $self->{mm}->{dbh}->begin_work;
        }
@@ -917,7 +916,7 @@ sub v1_process_stack ($$$) {
        if (my @leftovers = keys %{delete($sync->{D}) // {}}) {
                warn('W: unindexing '.scalar(@leftovers)." leftovers\n");
                for my $oid (@leftovers) {
-                       last if $sync->{quit};
+                       last if $self->{quit};
                        $oid = unpack('H*', $oid);
                        $git->cat_async($oid, \&v1_unindex_both, $sync);
                }
@@ -927,7 +926,7 @@ sub v1_process_stack ($$$) {
        }
        while (my ($f, $at, $ct, $oid, $cur_cmt) = $stk->pop_rec) {
                my $arg = { %$sync, cur_cmt => $cur_cmt, oid => $oid };
-               last if $sync->{quit};
+               last if $self->{quit};
                if ($f eq 'm') {
                        $arg->{autime} = $at;
                        $arg->{cotime} = $ct;
@@ -941,7 +940,7 @@ sub v1_process_stack ($$$) {
                }
                v1_checkpoint $self, $sync if $self->{need_checkpoint};
        }
-       v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
+       v1_checkpoint($self, $sync, $self->{quit} ? undef : $stk);
 }
 
 sub log2stack ($$$$) {
@@ -969,7 +968,7 @@ sub log2stack ($$$$) {
        my $fh = $git->popen(@cmd, $range);
        my ($at, $ct, $stk, $cmt, $l);
        while (defined($l = <$fh>)) {
-               return if $sync->{quit};
+               return if $self->{quit};
                if ($l =~ /\A([0-9]+)-([0-9]+)-($OID)$/o) {
                        ($at, $ct, $cmt) = ($1 + 0, $2 + 0, $3);
                        $stk //= PublicInbox::IdxStack->new($cmt);
@@ -1061,12 +1060,12 @@ sub v1_reindex_from ($$) {
 }
 
 sub quit_cb ($) {
-       my ($sync) = @_;
+       my ($self) = @_;
        sub {
-               # we set {-opt}->{quit} too, so ->index_sync callers
-               # can abort multi-inbox loops this way
-               $sync->{quit} = $sync->{self}->{-opt}->{quit} = 1;
-               warn "gracefully quitting\n";
+               # we set {-opt}->{quit} for public-inbox-index so
+               # can abort multi-inbox loops this way (for now...)
+               $self->{quit} = $self->{-opt}->{quit} = 1;
+               warn "gracefully quitting\n";
        }
 }
 
@@ -1086,7 +1085,7 @@ sub _index_sync {
        my $pr = $opt->{-progress};
        local $self->{-opt} = $opt;
        my $sync = { reindex => $opt->{reindex}, ibx => $ibx };
-       my $quit = quit_cb($sync);
+       my $quit = quit_cb $self;
        local $SIG{QUIT} = $quit;
        local $SIG{INT} = $quit;
        local $SIG{TERM} = $quit;
@@ -1110,7 +1109,7 @@ sub _index_sync {
        my $stk = prepare_stack($self, $sync, $range);
        $sync->{ntodo} = $stk ? $stk->num_records : 0;
        $pr->("$sync->{ntodo}\n") if $pr; # continue previous line
-       v1_process_stack($self, $sync, $stk) if !$sync->{quit};
+       v1_process_stack($self, $sync, $stk) if !$self->{quit};
 }
 
 sub DESTROY {
index 27be8c39ed6073b2767cb66fb456eeeba95e3377..886f59b1135dcc551f86b8d27deff482a5f3ad9a 100644 (file)
@@ -961,14 +961,14 @@ sub sync_prepare ($$) {
                # messages to show up in mirrors, too.
                $sync->{D} //= $sync->{reindex} ? {} : undef; # OID_BIN => NR
                my $stk = log2stack($self, $sync, $git, $range);
-               return 0 if $sync->{quit};
+               return 0 if $self->{quit};
                my $nr = $stk ? $stk->num_records : 0;
                $pr->("$nr\n") if $pr;
                $unit->{stack} = $stk; # may be undef
                unshift @{$sync->{todo}}, $unit;
                $regen_max += $nr;
        }
-       return 0 if $sync->{quit};
+       return 0 if $self->{quit};
 
        # XXX this should not happen unless somebody bypasses checks in
        # our code and blindly injects "d" file history into git repos
@@ -977,14 +977,14 @@ sub sync_prepare ($$) {
                local $self->{current_info} = 'leftover ';
                my $unindex_oid = $self->can('unindex_oid');
                for my $oid (@leftovers) {
-                       last if $sync->{quit};
+                       last if $self->{quit};
                        $oid = unpack('H*', $oid);
                        my $req = { %$sync, oid => $oid };
                        $self->git->cat_async($oid, $unindex_oid, $req);
                }
                $self->git->async_wait_all;
        }
-       return 0 if $sync->{quit};
+       return 0 if $self->{quit};
        if (!$regen_max) {
                $self->{-regen_fmt} = "%u/?\n";
                return 0;
@@ -1107,7 +1107,7 @@ sub index_xap_step ($$$;$) {
                        "$beg..$end (% $step)\n");
        }
        for (my $num = $beg; $num <= $end; $num += $step) {
-               last if $sync->{quit};
+               last if $self->{quit};
                my $smsg = $ibx->over->get_art($num) or next;
                $smsg->{self} = $self;
                $ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
@@ -1123,7 +1123,7 @@ sub index_xap_step ($$$;$) {
 
 sub index_todo ($$$) {
        my ($self, $sync, $unit) = @_;
-       return if $sync->{quit};
+       return if $self->{quit};
        unindex_todo($self, $sync, $unit);
        my $stk = delete($unit->{stack}) or return;
        my $all = $self->git; # initialize self->{ibx}->{git}
@@ -1140,7 +1140,7 @@ sub index_todo ($$$) {
        local $self->{latest_cmt};
        local $sync->{unit} = $unit;
        while (my ($f, $at, $ct, $oid, $cmt) = $stk->pop_rec) {
-               if ($sync->{quit}) {
+               if ($self->{quit}) {
                        warn "waiting to quit...\n";
                        $all->async_wait_all;
                        $self->update_last_commit($sync);
@@ -1181,7 +1181,7 @@ sub xapian_only ($;$$) {
                if ($seq || !$self->{parallel}) {
                        my $shard_end = $self->{shards} - 1;
                        for my $i (0..$shard_end) {
-                               last if $sync->{quit};
+                               last if $self->{quit};
                                index_xap_step($self, $sync, $art_beg + $i);
                                if ($i != $shard_end) {
                                        reindex_checkpoint($self, $sync);
@@ -1233,7 +1233,7 @@ sub index_sync {
                ibx => $self->{ibx},
                epoch_max => $epoch_max,
        };
-       my $quit = PublicInbox::SearchIdx::quit_cb($sync);
+       my $quit = PublicInbox::SearchIdx::quit_cb $self;
        local $SIG{QUIT} = $quit;
        local $SIG{INT} = $quit;
        local $SIG{TERM} = $quit;
@@ -1256,7 +1256,7 @@ sub index_sync {
        }
        # work forwards through history
        index_todo($self, $sync, $_) for @{delete($sync->{todo}) // []};
-       $self->{oidx}->rethread_done($opt) unless $sync->{quit};
+       $self->{oidx}->rethread_done($opt) unless $self->{quit};
        $self->done;
 
        if (my $nrec = $self->{nrec}) {
@@ -1267,17 +1267,17 @@ sub index_sync {
        my $quit_warn;
        # deal with Xapian shards sequentially
        if ($seq && delete($sync->{mm_tmp})) {
-               if ($sync->{quit}) {
+               if ($self->{quit}) {
                        $quit_warn = 1;
                } else {
                        $self->{ibx}->{indexlevel} = $idxlevel;
                        xapian_only($self, $sync, $art_beg);
-                       $quit_warn = 1 if $sync->{quit};
+                       $quit_warn = 1 if $self->{quit};
                }
        }
 
        # --reindex on the command-line
-       if (!$sync->{quit} && $opt->{reindex} &&
+       if (!$self->{quit} && $opt->{reindex} &&
                        !ref($opt->{reindex}) && $idxlevel ne 'basic') {
                $self->lock_acquire;
                my $s0 = PublicInbox::SearchIdx->new($self->{ibx}, 0, 0);
@@ -1290,13 +1290,12 @@ sub index_sync {
        }
 
        # reindex does not pick up new changes, so we rerun w/o it:
-       if ($opt->{reindex} && !$sync->{quit} &&
+       if ($opt->{reindex} && !$self->{quit} &&
                        !grep(defined, @$opt{qw(since until)})) {
                my %again = %$opt;
                $sync = undef;
                delete @again{qw(rethread reindex -skip_lock)};
                index_sync($self, \%again);
-               $opt->{quit} = $again{quit}; # propagate to caller
        }
        warn <<EOF if $quit_warn;
 W: interrupted, --xapian-only --reindex required upon restart