From: Eric Wong Date: Thu, 12 Dec 2024 10:10:40 +0000 (+0000) Subject: searchidx: consolidate checkpoint accounting X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=64a9dc19c9a5deea0721882cf290cdcf23c569ab;p=thirdparty%2Fpublic-inbox.git searchidx: consolidate checkpoint accounting We can eliminate check_batch_limit() and checkpoint_due() from extsearchidx in favor of update_checkpoint() in searchidx. We can also get rid of the awkward scalar deref for setting the {need_checkpoint} field. The only behavioral difference is the checkpoint interval is standardized to 5s and -extindex no longer uses 10s for its checkpoints. In retrospect, 5s should work more nicely for public-facing indices since they spend less time waiting on writers, but it has the downside of potentially hurting writer performance. This is another step in the gradual shift away from the $sync arg in favor of `local $self->{...}'. --- diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index a172ba624..52d7c3b15 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -122,13 +122,6 @@ sub attach_config { $cfg->each_inbox(\&_ibx_attach, $self, $types); } -sub check_batch_limit ($) { - my ($req) = @_; - # set flag for PublicInbox::V2Writable::index_todo: - update_checkpoint $req->{self}, $req->{new_smsg}->{bytes} and - ${$req->{need_checkpoint}} = 1; -} - sub bad_ibx_id ($$;$) { my ($self, $ibx_id, $cb) = @_; my $msg = "E: bad/stale ibx_id=#$ibx_id encountered"; @@ -262,7 +255,7 @@ sub index_unseen ($) { $self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key); $new_smsg->{eidx_key} = $ibx->eidx_key; $idx->index_eml($eml, $new_smsg); - check_batch_limit($req); + update_checkpoint $self, $new_smsg->{bytes}; } sub do_finalize ($) { @@ -450,7 +443,7 @@ EOM $oid = unpack('H*', $oid); $r = $r ? 'unref' : 'remove'; warn "# $r #$docid $eidx_key $oid\n"; - if (checkpoint_due($sync)) { + if (update_checkpoint $self) { $x3_doc = $ibx_ck = undef; reindex_checkpoint($self, $sync); goto restart; @@ -483,20 +476,20 @@ sub eidx_gc_scan_shards ($$) { # TODO: use for lei/store DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over) warn "# eliminated $nr stale xref3 entries\n" if $nr != 0; - reindex_checkpoint($self, $sync) if checkpoint_due($sync); + reindex_checkpoint($self, $sync) if update_checkpoint $self; # fixup from old bugs: $nr = $self->{oidx}->dbh->do(<<''); DELETE FROM over WHERE num > 0 AND num NOT IN (SELECT docid FROM xref3) warn "# eliminated $nr stale over entries\n" if $nr != 0; - reindex_checkpoint($self, $sync) if checkpoint_due($sync); + reindex_checkpoint($self, $sync) if update_checkpoint $self; $nr = $self->{oidx}->dbh->do(<<''); DELETE FROM eidxq WHERE docid NOT IN (SELECT num FROM over) warn "# eliminated $nr stale reindex queue entries\n" if $nr != 0; - reindex_checkpoint($self, $sync) if checkpoint_due($sync); + reindex_checkpoint($self, $sync) if update_checkpoint $self; my ($cur) = $self->{oidx}->dbh->selectrow_array(< 0 @@ -517,7 +510,7 @@ SELECT num FROM over WHERE num >= ? ORDER BY num ASC LIMIT 10000 } $cur = $n + 1; } - if (checkpoint_due($sync)) { + if (update_checkpoint $self) { for my $idx (values %active_shards) { $nr += $idx->ipc_do('nr_quiet_rm') } @@ -533,10 +526,8 @@ sub eidx_gc { # top-level entry point $self->{cfg} or die "E: GC requires ->attach_config\n"; $opt->{-idx_gc} = 1; local $self->{checkpoint_unlocks} = 1; + local $self->{need_checkpoint} = 0; my $sync = { - need_checkpoint => \(my $need_checkpoint), - check_intvl => 10, - next_check => now() + 10, -opt => $opt, self => $self, }; @@ -585,7 +576,7 @@ sub _reindex_finalize ($$$) { my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}'; my $docid = $smsg->{num} = $orig_smsg->{num}; $self->{oidx}->add_overview($eml, $smsg); # may rethread - check_batch_limit({ %$sync, new_smsg => $smsg }); + update_checkpoint $self, $smsg->{bytes}; my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}"; my $stable = delete($by_chash->{$chash0}) // die "BUG: $smsg->{blob} chash missing"; @@ -699,11 +690,6 @@ BUG? #$docid $smsg->{blob} is not referenced by inboxes during reindex $self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req); } -sub checkpoint_due ($) { - my ($sync) = @_; - ${$sync->{need_checkpoint}} || (now() > $sync->{next_check}); -} - sub host_ident () { # I've copied FS images and only changed the hostname before, # so prepend hostname. Use `state' since these a BOFH can change @@ -830,7 +816,7 @@ restart: $del->execute($docid); ++${$sync->{nr}}; - if (checkpoint_due($sync)) { + if (update_checkpoint $self) { $dbh = $del = $iter = undef; reindex_checkpoint($self, $sync); # release lock $dbh = $self->{oidx}->dbh; @@ -922,9 +908,8 @@ sub _reindex_check_ibx ($$$) { $beg = $msgs->[-1]->{num} + 1; $end = $beg + $slice; $end = $max if $end > $max; - if (checkpoint_due($sync)) { + update_checkpoint $self and reindex_checkpoint($self, $sync); # release lock - } ($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num}); $usr //= _unref_stale_range($sync, $ibx, "xnum < $lo"); my $x3a = $self->{oidx}->dbh->selectall_arrayref( @@ -1107,7 +1092,7 @@ EOS # Message-IDs. $self->git->async_wait_all; - if (checkpoint_due($sync)) { + if (update_checkpoint $self) { undef $iter; reindex_checkpoint($self, $sync); goto dedupe_restart; @@ -1133,10 +1118,8 @@ sub eidx_sync { # main entry point }; $self->idx_init($opt); # acquire lock via V2Writable::_idx_init $self->{oidx}->rethread_prepare($opt); + local $self->{need_checkpoint} = 0; my $sync = { - need_checkpoint => \(my $need_checkpoint), - check_intvl => 10, - next_check => now() + 10, -opt => $opt, # DO NOT SET {reindex} here, it's incompatible with reused # V2Writable code, reindex is totally different here @@ -1144,7 +1127,7 @@ sub eidx_sync { # main entry point self => $self, -regen_fmt => "%u/?\n", }; - local $SIG{USR1} = sub { $need_checkpoint = 1 }; + local $SIG{USR1} = sub { $self->{need_checkpoint} = 1 }; my $quit = PublicInbox::SearchIdx::quit_cb($sync); local $SIG{QUIT} = $quit; local $SIG{INT} = $quit; diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 0d9acd20b..8ac8cac31 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -787,13 +787,15 @@ sub is_bad_blob ($$$$) { $size == 0 ? 1 : 0; # size == 0 means purged } -sub update_checkpoint ($$) { +# returns true if checkpoint is needed +sub update_checkpoint ($;$) { my ($self, $bytes) = @_; - ($self->{transact_bytes} += $bytes) >= $self->{batch_bytes} and - return 1; + my $nr = $self->{transact_bytes} += $bytes // 0; + $self->{need_checkpoint} // return; # must be defined via local + return ++$self->{need_checkpoint} if $nr >= $self->{batch_bytes}; my $now = now; my $next = $self->{next_checkpoint} //= $now + $CHECKPOINT_INTVL; - $now > $next; + $self->{need_checkpoint} += ($now > $next ? 1 : 0); } sub index_both { # git->cat_async callback @@ -803,7 +805,7 @@ sub index_both { # git->cat_async callback my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg'; $smsg->set_bytes($$bref, $size); my $self = $sync->{sidx}; - ${$sync->{need_checkpoint}} = 1 if update_checkpoint $self, $smsg->{bytes}; + update_checkpoint $self, $smsg->{bytes}; local $self->{current_info} = "$self->{current_info}: $oid"; my $eml = PublicInbox::Eml->new($bref); $smsg->{num} = index_mm($self, $eml, $oid, $sync) or @@ -860,7 +862,7 @@ sub check_size { # check_async cb for -index --max-size=... sub v1_checkpoint ($$;$) { my ($self, $sync, $stk) = @_; $self->{ibx}->git->async_wait_all; - ${$sync->{need_checkpoint}} = undef; + $self->{need_checkpoint} = 0; # $newest may be undef my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}}; @@ -910,7 +912,7 @@ sub process_stack { my $nr = 0; $sync->{nr} = \$nr; $sync->{sidx} = $self; - $sync->{need_checkpoint} = \(my $need_ckpt); + local $self->{need_checkpoint} = 0; $sync->{latest_cmt} = \(my $latest_cmt); $self->{mm}->{dbh}->begin_work; @@ -940,8 +942,7 @@ sub process_stack { } elsif ($f eq 'd') { $git->cat_async($oid, \&unindex_both, $arg); } - ${$sync->{need_checkpoint}} and - v1_checkpoint $self, $sync; + v1_checkpoint $self, $sync if $self->{need_checkpoint}; } v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk); } diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 87118ec46..15945b355 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -22,7 +22,7 @@ use PublicInbox::Spawn qw(spawn popen_rd run_die); use PublicInbox::Search; use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob update_checkpoint); -use PublicInbox::DS qw(now); +use PublicInbox::DS; use IO::Handle; # ->autoflush use POSIX (); use Carp qw(confess); @@ -703,7 +703,7 @@ sub reindex_checkpoint ($$) { $self->git->async_wait_all; $self->update_last_commit($sync); - ${$sync->{need_checkpoint}} = 0; + $self->{need_checkpoint} = 0; my $mm_tmp = $sync->{mm_tmp}; $mm_tmp->atfork_prepare if $mm_tmp; die 'BUG: {im} during reindex' if $self->{im}; @@ -719,9 +719,6 @@ sub reindex_checkpoint ($$) { # allow -watch or -mda to write... $self->idx_init($sync->{-opt}); # reacquire lock - if (my $intvl = $sync->{check_intvl}) { # eidx - $sync->{next_check} = now + $intvl; - } $mm_tmp->atfork_parent if $mm_tmp; } @@ -817,7 +814,7 @@ sub index_oid { # cat_async callback }, 'PublicInbox::Smsg'; $smsg->populate($eml, $arg); $smsg->set_bytes($$bref, $size); - ${$arg->{need_checkpoint}} = 1 if do_idx $self, $eml, $smsg; + do_idx $self, $eml, $smsg; index_finalize($arg, 1); } @@ -1169,9 +1166,7 @@ sub index_todo ($$$) { } elsif ($f eq 'd') { $all->cat_async($oid, $unindex_oid, $req); } - if (${$sync->{need_checkpoint}}) { - reindex_checkpoint($self, $sync); - } + reindex_checkpoint($self, $sync) if $self->{need_checkpoint}; } $all->async_wait_all; $self->update_last_commit($sync, $stk); @@ -1185,7 +1180,6 @@ sub xapian_only { $self->idx_init($opt); # acquire lock if (my $art_end = $self->{ibx}->mm->max) { $sync //= { - need_checkpoint => \(my $bool = 0), -opt => $opt, self => $self, nr => \(my $nr = 0), @@ -1214,6 +1208,7 @@ sub xapian_only { sub index_sync { my ($self, $opt) = @_; $opt //= {}; + local $self->{need_checkpoint} = 0; return xapian_only($self, $opt) if $opt->{xapian_only}; my $epoch_max; @@ -1238,7 +1233,6 @@ sub index_sync { $self->{mg}->fill_alternates; $self->{oidx}->rethread_prepare($opt); my $sync = { - need_checkpoint => \(my $bool = 0), reindex => $opt->{reindex}, -opt => $opt, self => $self,