$cfg->each_inbox(\&_ibx_attach, $self, $types);
}
-sub check_batch_limit ($) {
- my ($req) = @_;
- # set flag for PublicInbox::V2Writable::index_todo:
- update_checkpoint $req->{self}, $req->{new_smsg}->{bytes} and
- ${$req->{need_checkpoint}} = 1;
-}
-
sub bad_ibx_id ($$;$) {
my ($self, $ibx_id, $cb) = @_;
my $msg = "E: bad/stale ibx_id=#$ibx_id encountered";
$self->{oidx}->add_xref3($docid, $req->{xnum}, $oid, $ibx->eidx_key);
$new_smsg->{eidx_key} = $ibx->eidx_key;
$idx->index_eml($eml, $new_smsg);
- check_batch_limit($req);
+ update_checkpoint $self, $new_smsg->{bytes};
}
sub do_finalize ($) {
$oid = unpack('H*', $oid);
$r = $r ? 'unref' : 'remove';
warn "# $r #$docid $eidx_key $oid\n";
- if (checkpoint_due($sync)) {
+ if (update_checkpoint $self) {
$x3_doc = $ibx_ck = undef;
reindex_checkpoint($self, $sync);
goto restart;
DELETE FROM xref3 WHERE docid NOT IN (SELECT num FROM over)
warn "# eliminated $nr stale xref3 entries\n" if $nr != 0;
- reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+ reindex_checkpoint($self, $sync) if update_checkpoint $self;
# fixup from old bugs:
$nr = $self->{oidx}->dbh->do(<<'');
DELETE FROM over WHERE num > 0 AND num NOT IN (SELECT docid FROM xref3)
warn "# eliminated $nr stale over entries\n" if $nr != 0;
- reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+ reindex_checkpoint($self, $sync) if update_checkpoint $self;
$nr = $self->{oidx}->dbh->do(<<'');
DELETE FROM eidxq WHERE docid NOT IN (SELECT num FROM over)
warn "# eliminated $nr stale reindex queue entries\n" if $nr != 0;
- reindex_checkpoint($self, $sync) if checkpoint_due($sync);
+ reindex_checkpoint($self, $sync) if update_checkpoint $self;
my ($cur) = $self->{oidx}->dbh->selectrow_array(<<EOM);
SELECT MIN(num) FROM over WHERE num > 0
}
$cur = $n + 1;
}
- if (checkpoint_due($sync)) {
+ if (update_checkpoint $self) {
for my $idx (values %active_shards) {
$nr += $idx->ipc_do('nr_quiet_rm')
}
$self->{cfg} or die "E: GC requires ->attach_config\n";
$opt->{-idx_gc} = 1;
local $self->{checkpoint_unlocks} = 1;
+ local $self->{need_checkpoint} = 0;
my $sync = {
- need_checkpoint => \(my $need_checkpoint),
- check_intvl => 10,
- next_check => now() + 10,
-opt => $opt,
self => $self,
};
my $orig_smsg = $req->{orig_smsg} // die 'BUG: no {orig_smsg}';
my $docid = $smsg->{num} = $orig_smsg->{num};
$self->{oidx}->add_overview($eml, $smsg); # may rethread
- check_batch_limit({ %$sync, new_smsg => $smsg });
+ update_checkpoint $self, $smsg->{bytes};
my $chash0 = $smsg->{chash} // die "BUG: $smsg->{blob} no {chash}";
my $stable = delete($by_chash->{$chash0}) //
die "BUG: $smsg->{blob} chash missing";
$self->git->cat_async($xr3->[$req->{ix}]->[2], \&_reindex_oid, $req);
}
-sub checkpoint_due ($) {
- my ($sync) = @_;
- ${$sync->{need_checkpoint}} || (now() > $sync->{next_check});
-}
-
sub host_ident () {
# I've copied FS images and only changed the hostname before,
# so prepend hostname. Use `state' since these a BOFH can change
$del->execute($docid);
++${$sync->{nr}};
- if (checkpoint_due($sync)) {
+ if (update_checkpoint $self) {
$dbh = $del = $iter = undef;
reindex_checkpoint($self, $sync); # release lock
$dbh = $self->{oidx}->dbh;
$beg = $msgs->[-1]->{num} + 1;
$end = $beg + $slice;
$end = $max if $end > $max;
- if (checkpoint_due($sync)) {
+ update_checkpoint $self and
reindex_checkpoint($self, $sync); # release lock
- }
($lo, $hi) = ($msgs->[0]->{num}, $msgs->[-1]->{num});
$usr //= _unref_stale_range($sync, $ibx, "xnum < $lo");
my $x3a = $self->{oidx}->dbh->selectall_arrayref(
# Message-IDs.
$self->git->async_wait_all;
- if (checkpoint_due($sync)) {
+ if (update_checkpoint $self) {
undef $iter;
reindex_checkpoint($self, $sync);
goto dedupe_restart;
};
$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
$self->{oidx}->rethread_prepare($opt);
+ local $self->{need_checkpoint} = 0;
my $sync = {
- need_checkpoint => \(my $need_checkpoint),
- check_intvl => 10,
- next_check => now() + 10,
-opt => $opt,
# DO NOT SET {reindex} here, it's incompatible with reused
# V2Writable code, reindex is totally different here
self => $self,
-regen_fmt => "%u/?\n",
};
- local $SIG{USR1} = sub { $need_checkpoint = 1 };
+ local $SIG{USR1} = sub { $self->{need_checkpoint} = 1 };
my $quit = PublicInbox::SearchIdx::quit_cb($sync);
local $SIG{QUIT} = $quit;
local $SIG{INT} = $quit;
$size == 0 ? 1 : 0; # size == 0 means purged
}
-sub update_checkpoint ($$) {
+# returns true if checkpoint is needed
+sub update_checkpoint ($;$) {
my ($self, $bytes) = @_;
- ($self->{transact_bytes} += $bytes) >= $self->{batch_bytes} and
- return 1;
+ my $nr = $self->{transact_bytes} += $bytes // 0;
+ $self->{need_checkpoint} // return; # must be defined via local
+ return ++$self->{need_checkpoint} if $nr >= $self->{batch_bytes};
my $now = now;
my $next = $self->{next_checkpoint} //= $now + $CHECKPOINT_INTVL;
- $now > $next;
+ $self->{need_checkpoint} += ($now > $next ? 1 : 0);
}
sub index_both { # git->cat_async callback
my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
$smsg->set_bytes($$bref, $size);
my $self = $sync->{sidx};
- ${$sync->{need_checkpoint}} = 1 if update_checkpoint $self, $smsg->{bytes};
+ update_checkpoint $self, $smsg->{bytes};
local $self->{current_info} = "$self->{current_info}: $oid";
my $eml = PublicInbox::Eml->new($bref);
$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
sub v1_checkpoint ($$;$) {
my ($self, $sync, $stk) = @_;
$self->{ibx}->git->async_wait_all;
- ${$sync->{need_checkpoint}} = undef;
+ $self->{need_checkpoint} = 0;
# $newest may be undef
my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
my $nr = 0;
$sync->{nr} = \$nr;
$sync->{sidx} = $self;
- $sync->{need_checkpoint} = \(my $need_ckpt);
+ local $self->{need_checkpoint} = 0;
$sync->{latest_cmt} = \(my $latest_cmt);
$self->{mm}->{dbh}->begin_work;
} elsif ($f eq 'd') {
$git->cat_async($oid, \&unindex_both, $arg);
}
- ${$sync->{need_checkpoint}} and
- v1_checkpoint $self, $sync;
+ v1_checkpoint $self, $sync if $self->{need_checkpoint};
}
v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
}
use PublicInbox::Search;
use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob
update_checkpoint);
-use PublicInbox::DS qw(now);
+use PublicInbox::DS;
use IO::Handle; # ->autoflush
use POSIX ();
use Carp qw(confess);
$self->git->async_wait_all;
$self->update_last_commit($sync);
- ${$sync->{need_checkpoint}} = 0;
+ $self->{need_checkpoint} = 0;
my $mm_tmp = $sync->{mm_tmp};
$mm_tmp->atfork_prepare if $mm_tmp;
die 'BUG: {im} during reindex' if $self->{im};
# allow -watch or -mda to write...
$self->idx_init($sync->{-opt}); # reacquire lock
- if (my $intvl = $sync->{check_intvl}) { # eidx
- $sync->{next_check} = now + $intvl;
- }
$mm_tmp->atfork_parent if $mm_tmp;
}
}, 'PublicInbox::Smsg';
$smsg->populate($eml, $arg);
$smsg->set_bytes($$bref, $size);
- ${$arg->{need_checkpoint}} = 1 if do_idx $self, $eml, $smsg;
+ do_idx $self, $eml, $smsg;
index_finalize($arg, 1);
}
} elsif ($f eq 'd') {
$all->cat_async($oid, $unindex_oid, $req);
}
- if (${$sync->{need_checkpoint}}) {
- reindex_checkpoint($self, $sync);
- }
+ reindex_checkpoint($self, $sync) if $self->{need_checkpoint};
}
$all->async_wait_all;
$self->update_last_commit($sync, $stk);
$self->idx_init($opt); # acquire lock
if (my $art_end = $self->{ibx}->mm->max) {
$sync //= {
- need_checkpoint => \(my $bool = 0),
-opt => $opt,
self => $self,
nr => \(my $nr = 0),
sub index_sync {
my ($self, $opt) = @_;
$opt //= {};
+ local $self->{need_checkpoint} = 0;
return xapian_only($self, $opt) if $opt->{xapian_only};
my $epoch_max;
$self->{mg}->fill_alternates;
$self->{oidx}->rethread_prepare($opt);
my $sync = {
- need_checkpoint => \(my $bool = 0),
reindex => $opt->{reindex},
-opt => $opt,
self => $self,