use PublicInbox::MultiGit;
use PublicInbox::Spawn ();
use PublicInbox::Search;
-use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob);
+use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob
+ update_checkpoint);
use PublicInbox::OverIdx;
use PublicInbox::MiscIdx;
use PublicInbox::MID qw(mids);
sub check_batch_limit ($) {
my ($req) = @_;
- my $self = $req->{self};
- my $new_smsg = $req->{new_smsg};
- my $n = $self->{transact_bytes} += $new_smsg->{bytes};
-
# set flag for PublicInbox::V2Writable::index_todo:
- ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes};
+ update_checkpoint $req->{self}, $req->{new_smsg} and
+ ${$req->{need_checkpoint}} = 1;
}
sub bad_ibx_id ($$;$) {
$self->{cfg} or die "E: GC requires ->attach_config\n";
$opt->{-idx_gc} = 1;
my $sync = {
- need_checkpoint => \(my $need_checkpoint = 0),
+ need_checkpoint => \(my $need_checkpoint),
check_intvl => 10,
next_check => now() + 10,
checkpoint_unlocks => 1,
$self->idx_init($opt); # acquire lock via V2Writable::_idx_init
$self->{oidx}->rethread_prepare($opt);
my $sync = {
- need_checkpoint => \(my $need_checkpoint = 0),
+ need_checkpoint => \(my $need_checkpoint),
check_intvl => 10,
next_check => now() + 10,
-opt => $opt,
use parent qw(PublicInbox::Search PublicInbox::Lock PublicInbox::Umask
Exporter);
use PublicInbox::Eml;
+use PublicInbox::DS qw(now);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::InboxWritable;
use PublicInbox::MID qw(mids_for_index mids);
use PublicInbox::Address;
use Config;
our @EXPORT_OK = qw(log2stack is_ancestor check_size prepare_stack
- index_text term_generator add_val is_bad_blob);
+ index_text term_generator add_val is_bad_blob update_checkpoint);
my $X = \%PublicInbox::Search::X;
our ($DB_CREATE_OR_OPEN, $DB_OPEN);
our $DB_NO_SYNC = 0;
our $DB_DANGEROUS = 0;
+our $CHECKPOINT_INTVL = 5; # seconds
our $BATCH_BYTES = $ENV{XAPIAN_FLUSH_THRESHOLD} ? 0x7fffffff :
# assume a typical 64-bit system has 8x more RAM than a
# typical 32-bit system:
$size == 0 ? 1 : 0; # size == 0 means purged
}
+sub update_checkpoint ($$) {
+ my ($self, $smsg) = @_;
+ ($self->{transact_bytes} += $smsg->{bytes}) >= $self->{batch_bytes} and
+ return 1;
+ my $now = now;
+ my $next = $self->{next_checkpoint} //= $now + $CHECKPOINT_INTVL;
+ $now > $next;
+}
+
sub index_both { # git->cat_async callback
my ($bref, $oid, $type, $size, $sync) = @_;
return if is_bad_blob($oid, $type, $size, $sync->{oid});
- my ($nr, $max) = @$sync{qw(nr max)};
- ++$$nr;
- $$max -= $size;
+ ++${$sync->{nr}};
my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
$smsg->set_bytes($$bref, $size);
my $self = $sync->{sidx};
+ ${$sync->{need_checkpoint}} = 1 if update_checkpoint $self, $smsg;
local $self->{current_info} = "$self->{current_info}: $oid";
my $eml = PublicInbox::Eml->new($bref);
$smsg->{num} = index_mm($self, $eml, $oid, $sync) or
sub v1_checkpoint ($$;$) {
my ($self, $sync, $stk) = @_;
$self->{ibx}->git->async_wait_all;
+ ${$sync->{need_checkpoint}} = undef;
# $newest may be undef
my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
$self->{mm}->last_commit($newest);
}
}
- ${$sync->{max}} = $self->{batch_bytes};
$self->{mm}->mm_commit;
my $xdb = $self->{xdb};
if ($newest && $xdb) {
begin_txn_lazy($self);
$self->{mm}->{dbh}->begin_work;
}
+ $self->{transact_bytes} = 0;
+ delete $self->{next_checkpoint};
}
# only for v1
sub process_stack {
my ($self, $sync, $stk) = @_;
my $git = $sync->{ibx}->git;
- my $max = $self->{batch_bytes};
my $nr = 0;
$sync->{nr} = \$nr;
- $sync->{max} = \$max;
$sync->{sidx} = $self;
+ $sync->{need_checkpoint} = \(my $need_ckpt);
$sync->{latest_cmt} = \(my $latest_cmt);
$self->{mm}->{dbh}->begin_work;
} else {
$git->cat_async($oid, \&index_both, $arg);
}
- v1_checkpoint($self, $sync) if $max <= 0;
} elsif ($f eq 'd') {
$git->cat_async($oid, \&unindex_both, $arg);
}
+ ${$sync->{need_checkpoint}} and
+ v1_checkpoint $self, $sync;
}
v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
}
$ibx->git->cat_file($tip);
$ibx->git->check($tip);
}
+ local $self->{transact_bytes} = 0;
my $pr = $opt->{-progress};
my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx };
my $quit = quit_cb($sync);
use PublicInbox::Msgmap;
use PublicInbox::Spawn qw(spawn popen_rd run_die);
use PublicInbox::Search;
-use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob);
+use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob
+ update_checkpoint);
use PublicInbox::DS qw(now);
use IO::Handle; # ->autoflush
use POSIX ();
my $idx = idx_shard($self, $smsg->{num});
$idx->index_eml($eml, $smsg);
}
- my $n = $self->{transact_bytes} += $smsg->{bytes};
- $n >= $self->{batch_bytes};
+ update_checkpoint $self, $smsg;
}
# returns undef on duplicate or spam
my $cmt = $im->add($mime, undef, $smsg); # sets $smsg->{ds|ts|blob}
$cmt = $im->get_mark($cmt);
$self->{last_commit}->[$self->{epoch_max}] = $cmt;
-
- if (do_idx($self, $mime, $smsg)) {
- $self->checkpoint;
- }
+ $self->checkpoint if do_idx $self, $mime, $smsg;
$cmt;
}
$dbh->begin_work;
}
}
+ delete $self->{next_checkpoint};
$self->{total_bytes} += $self->{transact_bytes};
$self->{transact_bytes} = 0;
}
}, 'PublicInbox::Smsg';
$smsg->populate($eml, $arg);
$smsg->set_bytes($$bref, $size);
- if (do_idx($self, $eml, $smsg)) {
- ${$arg->{need_checkpoint}} = 1;
- }
+ ${$arg->{need_checkpoint}} = 1 if do_idx $self, $eml, $smsg;
index_finalize($arg, 1);
}
my $self = delete $smsg->{self};
my $idx = idx_shard($self, $smsg->{num});
$idx->index_eml(PublicInbox::Eml->new($bref), $smsg);
- $self->{transact_bytes} += $smsg->{bytes};
}
sub index_xap_step ($$$;$) {
my $smsg = $ibx->over->get_art($num) or next;
$smsg->{self} = $self;
$ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
- if ($self->{transact_bytes} >= $self->{batch_bytes}) {
+ # n.b. ignore CHECKPOINT_INTVL for Xapian-only, Xapian doesn't
+ # have timeout problems like SQLite
+ my $n = $self->{transact_bytes} += $smsg->{bytes};
+ if ($n >= $self->{batch_bytes}) {
${$sync->{nr}} = $num;
reindex_checkpoint($self, $sync);
}