]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
(ext)index: use time-based commits to avoid busy timeout
authorEric Wong <e@80x24.org>
Wed, 11 Dec 2024 08:10:44 +0000 (08:10 +0000)
committerEric Wong <e@80x24.org>
Thu, 12 Dec 2024 08:44:13 +0000 (08:44 +0000)
With public-facing read-only daemons typically run from a
different user than writers, we cannot rely on SQLite WAL
(write-ahead-log) for parallelism since all readers need write
permissions on read-write FSes to read from WAL DBs.  Since we
can't force or even encourage WAL use for public-facing inboxes,
we need to ensure long-running --reindex jobs can commit
occasionally to prevent read-only daemons from hitting the
default 30s busy_timeout set by DBD::SQLite (not SQLite itself).

This mainly affects --reindex users, but can also affect
newly-cloned inboxes which are being served by read-only
daemons while they're being indexed.

This change only benefits read-only processes, and is likely to
penalize writer performance and storage efficiency due to
increased write frequency.  We still maintain and respect
--batch-size for memory sized-based commits in addition to
time-based commits, but the new time-based commit interval is
necessary in case the batch size is too large or the system
is too slow to index a large batch.

While Xapian doesn't need time-based commits for read
parallelism, we commit to Xapian anyways since we want to
minimize consistency problems on interrupted indexing jobs.

Followup-to: 807abf67e14d (lei/store: auto-commit for long-running imports, 2024-11-15)
lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/V2Writable.pm

index d8db7d4b03ea226c00feddbc15e319b1b763dc20..fe2f5d2e47d4f6e6269c8b9ed186076057ba04d1 100644 (file)
@@ -26,7 +26,8 @@ use PublicInbox::Isearch;
 use PublicInbox::MultiGit;
 use PublicInbox::Spawn ();
 use PublicInbox::Search;
-use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob);
+use PublicInbox::SearchIdx qw(prepare_stack is_ancestor is_bad_blob
+       update_checkpoint);
 use PublicInbox::OverIdx;
 use PublicInbox::MiscIdx;
 use PublicInbox::MID qw(mids);
@@ -123,12 +124,9 @@ sub attach_config {
 
 sub check_batch_limit ($) {
        my ($req) = @_;
-       my $self = $req->{self};
-       my $new_smsg = $req->{new_smsg};
-       my $n = $self->{transact_bytes} += $new_smsg->{bytes};
-
        # set flag for PublicInbox::V2Writable::index_todo:
-       ${$req->{need_checkpoint}} = 1 if $n >= $self->{batch_bytes};
+       update_checkpoint $req->{self}, $req->{new_smsg} and
+               ${$req->{need_checkpoint}} = 1;
 }
 
 sub bad_ibx_id ($$;$) {
@@ -535,7 +533,7 @@ sub eidx_gc {
        $self->{cfg} or die "E: GC requires ->attach_config\n";
        $opt->{-idx_gc} = 1;
        my $sync = {
-               need_checkpoint => \(my $need_checkpoint = 0),
+               need_checkpoint => \(my $need_checkpoint),
                check_intvl => 10,
                next_check => now() + 10,
                checkpoint_unlocks => 1,
@@ -1136,7 +1134,7 @@ sub eidx_sync { # main entry point
        $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
        $self->{oidx}->rethread_prepare($opt);
        my $sync = {
-               need_checkpoint => \(my $need_checkpoint = 0),
+               need_checkpoint => \(my $need_checkpoint),
                check_intvl => 10,
                next_check => now() + 10,
                -opt => $opt,
index 48ba806a68dc1458aff3d2242e5df20e33c18e37..3a85f55210e94f0307a1176148e4de364e5c399b 100644 (file)
@@ -12,6 +12,7 @@ use v5.10.1;
 use parent qw(PublicInbox::Search PublicInbox::Lock PublicInbox::Umask
        Exporter);
 use PublicInbox::Eml;
+use PublicInbox::DS qw(now);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::InboxWritable;
 use PublicInbox::MID qw(mids_for_index mids);
@@ -28,11 +29,12 @@ use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
 use PublicInbox::Address;
 use Config;
 our @EXPORT_OK = qw(log2stack is_ancestor check_size prepare_stack
-       index_text term_generator add_val is_bad_blob);
+       index_text term_generator add_val is_bad_blob update_checkpoint);
 my $X = \%PublicInbox::Search::X;
 our ($DB_CREATE_OR_OPEN, $DB_OPEN);
 our $DB_NO_SYNC = 0;
 our $DB_DANGEROUS = 0;
+our $CHECKPOINT_INTVL = 5; # seconds
 our $BATCH_BYTES = $ENV{XAPIAN_FLUSH_THRESHOLD} ? 0x7fffffff :
        # assume a typical 64-bit system has 8x more RAM than a
        # typical 32-bit system:
@@ -785,15 +787,23 @@ sub is_bad_blob ($$$$) {
        $size == 0 ? 1 : 0; # size == 0 means purged
 }
 
+sub update_checkpoint ($$) {
+       my ($self, $smsg) = @_;
+       ($self->{transact_bytes} += $smsg->{bytes}) >= $self->{batch_bytes} and
+               return 1;
+       my $now = now;
+       my $next = $self->{next_checkpoint} //= $now + $CHECKPOINT_INTVL;
+       $now > $next;
+}
+
 sub index_both { # git->cat_async callback
        my ($bref, $oid, $type, $size, $sync) = @_;
        return if is_bad_blob($oid, $type, $size, $sync->{oid});
-       my ($nr, $max) = @$sync{qw(nr max)};
-       ++$$nr;
-       $$max -= $size;
+       ++${$sync->{nr}};
        my $smsg = bless { blob => $oid }, 'PublicInbox::Smsg';
        $smsg->set_bytes($$bref, $size);
        my $self = $sync->{sidx};
+       ${$sync->{need_checkpoint}} = 1 if update_checkpoint $self, $smsg;
        local $self->{current_info} = "$self->{current_info}: $oid";
        my $eml = PublicInbox::Eml->new($bref);
        $smsg->{num} = index_mm($self, $eml, $oid, $sync) or
@@ -850,6 +860,7 @@ sub check_size { # check_async cb for -index --max-size=...
 sub v1_checkpoint ($$;$) {
        my ($self, $sync, $stk) = @_;
        $self->{ibx}->git->async_wait_all;
+       ${$sync->{need_checkpoint}} = undef;
 
        # $newest may be undef
        my $newest = $stk ? $stk->{latest_cmt} : ${$sync->{latest_cmt}};
@@ -859,7 +870,6 @@ sub v1_checkpoint ($$;$) {
                        $self->{mm}->last_commit($newest);
                }
        }
-       ${$sync->{max}} = $self->{batch_bytes};
        $self->{mm}->mm_commit;
        my $xdb = $self->{xdb};
        if ($newest && $xdb) {
@@ -889,17 +899,18 @@ sub v1_checkpoint ($$;$) {
                begin_txn_lazy($self);
                $self->{mm}->{dbh}->begin_work;
        }
+       $self->{transact_bytes} = 0;
+       delete $self->{next_checkpoint};
 }
 
 # only for v1
 sub process_stack {
        my ($self, $sync, $stk) = @_;
        my $git = $sync->{ibx}->git;
-       my $max = $self->{batch_bytes};
        my $nr = 0;
        $sync->{nr} = \$nr;
-       $sync->{max} = \$max;
        $sync->{sidx} = $self;
+       $sync->{need_checkpoint} = \(my $need_ckpt);
        $sync->{latest_cmt} = \(my $latest_cmt);
 
        $self->{mm}->{dbh}->begin_work;
@@ -926,10 +937,11 @@ sub process_stack {
                        } else {
                                $git->cat_async($oid, \&index_both, $arg);
                        }
-                       v1_checkpoint($self, $sync) if $max <= 0;
                } elsif ($f eq 'd') {
                        $git->cat_async($oid, \&unindex_both, $arg);
                }
+               ${$sync->{need_checkpoint}} and
+                       v1_checkpoint $self, $sync;
        }
        v1_checkpoint($self, $sync, $sync->{quit} ? undef : $stk);
 }
@@ -1072,6 +1084,7 @@ sub _index_sync {
                $ibx->git->cat_file($tip);
                $ibx->git->check($tip);
        }
+       local $self->{transact_bytes} = 0;
        my $pr = $opt->{-progress};
        my $sync = { reindex => $opt->{reindex}, -opt => $opt, ibx => $ibx };
        my $quit = quit_cb($sync);
index af9aaef32e17cb00d5d6da0c8a8be9351e380e09..5f3bfde54b5cf96347dae95fa22fad1c4be405f3 100644 (file)
@@ -20,7 +20,8 @@ use PublicInbox::OverIdx;
 use PublicInbox::Msgmap;
 use PublicInbox::Spawn qw(spawn popen_rd run_die);
 use PublicInbox::Search;
-use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob);
+use PublicInbox::SearchIdx qw(log2stack is_ancestor check_size is_bad_blob
+       update_checkpoint);
 use PublicInbox::DS qw(now);
 use IO::Handle; # ->autoflush
 use POSIX ();
@@ -106,8 +107,7 @@ sub do_idx ($$$) {
                my $idx = idx_shard($self, $smsg->{num});
                $idx->index_eml($eml, $smsg);
        }
-       my $n = $self->{transact_bytes} += $smsg->{bytes};
-       $n >= $self->{batch_bytes};
+       update_checkpoint $self, $smsg;
 }
 
 # returns undef on duplicate or spam
@@ -136,10 +136,7 @@ sub add {
        my $cmt = $im->add($mime, undef, $smsg); # sets $smsg->{ds|ts|blob}
        $cmt = $im->get_mark($cmt);
        $self->{last_commit}->[$self->{epoch_max}] = $cmt;
-
-       if (do_idx($self, $mime, $smsg)) {
-               $self->checkpoint;
-       }
+       $self->checkpoint if do_idx $self, $mime, $smsg;
        $cmt;
 }
 
@@ -564,6 +561,7 @@ shard[$i] bad echo:$echo != $i waiting for txn commit
                        $dbh->begin_work;
                }
        }
+       delete $self->{next_checkpoint};
        $self->{total_bytes} += $self->{transact_bytes};
        $self->{transact_bytes} = 0;
 }
@@ -819,9 +817,7 @@ sub index_oid { # cat_async callback
        }, 'PublicInbox::Smsg';
        $smsg->populate($eml, $arg);
        $smsg->set_bytes($$bref, $size);
-       if (do_idx($self, $eml, $smsg)) {
-               ${$arg->{need_checkpoint}} = 1;
-       }
+       ${$arg->{need_checkpoint}} = 1 if do_idx $self, $eml, $smsg;
        index_finalize($arg, 1);
 }
 
@@ -1103,7 +1099,6 @@ sub index_xap_only { # git->cat_async callback
        my $self = delete $smsg->{self};
        my $idx = idx_shard($self, $smsg->{num});
        $idx->index_eml(PublicInbox::Eml->new($bref), $smsg);
-       $self->{transact_bytes} += $smsg->{bytes};
 }
 
 sub index_xap_step ($$$;$) {
@@ -1122,7 +1117,10 @@ sub index_xap_step ($$$;$) {
                my $smsg = $ibx->over->get_art($num) or next;
                $smsg->{self} = $self;
                $ibx->git->cat_async($smsg->{blob}, \&index_xap_only, $smsg);
-               if ($self->{transact_bytes} >= $self->{batch_bytes}) {
+               # n.b. ignore CHECKPOINT_INTVL for Xapian-only, Xapian doesn't
+               # have timeout problems like SQLite
+               my $n = $self->{transact_bytes} += $smsg->{bytes};
+               if ($n >= $self->{batch_bytes}) {
                        ${$sync->{nr}} = $num;
                        reindex_checkpoint($self, $sync);
                }