]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
*index: --split-shards to speeds initial indexing
authorEric Wong <e@80x24.org>
Wed, 8 Oct 2025 21:24:22 +0000 (21:24 +0000)
committerEric Wong <e@80x24.org>
Fri, 10 Oct 2025 01:12:58 +0000 (01:12 +0000)
When indexing millions of messages, Xapian has a tendency to
slow down as each shard gets bigger.  --split-shards allows
Xapian to work on temporary shards (more akin to "epochs")
and uses xapian-compact(1) to commit the finalized changes
for readers.  The result is roughly 2x faster for millions
of messages.

The downside of this switch is temporary space use increases by
2-3x and incremental changes are not visible to readers until
all indexing is complete.   It has no useful effect on
--reindex, but --reindex is typically faster than initial
indexing anyways since space is already allocated.

It still takes days to create a new extindex of lore, but
fewer days than before.

Another beneficial side effect of this switch is it also
tends to reduce the effect of fragmentation for --cow users
on btrfs.

14 files changed:
Documentation/public-inbox-convert.pod
Documentation/public-inbox-extindex.pod
Documentation/public-inbox-index.pod
Documentation/public-inbox-xcpdb.pod
lib/PublicInbox/Admin.pm
lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/SearchIdx.pm
lib/PublicInbox/SearchIdxShard.pm
lib/PublicInbox/V2Writable.pm
lib/PublicInbox/Xapcmd.pm
script/public-inbox-convert
script/public-inbox-extindex
script/public-inbox-index
script/public-inbox-xcpdb

index 86e8265b353b953021a0526d9d96a25bc9a60d77..5905675fae974677a7224bd60efb5ae545679a87 100644 (file)
@@ -49,6 +49,12 @@ process which distributes work to the Xapian shards.
 
 =item --max-size=BYTES
 
+=item --split-shards
+
+=item --split-at DOCCOUNT
+
+=item --multipass
+
 =item --wal
 
 These options affect indexing.  They have no effect if
index 136edf6f537563c2ac5f89c087ddab7bb86eb80f..f083678c339927bc97d0f52d8bc711a7ff604740 100644 (file)
@@ -34,6 +34,12 @@ along with L<DBD::SQLite> and L<DBI> Perl modules.
 
 =item --batch-size SIZE
 
+=item --split-shards
+
+=item --split-at DOCCOUNT
+
+=item --multipass
+
 =item --wal
 
 =item --commit SECONDS
index 53ed9e8812ba00a2d7bcc3901509b379a7139bc6..81134301fe947b8855793ea82b678ecd4b0489fd 100644 (file)
@@ -209,6 +209,27 @@ Available in public-inbox 2.0.0+
 
 Passed directly to L<git-log(1)> to limit changes for C<--reindex>
 
+=item --split-shards
+
+Speed up initial indexing of millions of messages by ~2x at the
+cost of 2-3x temporary disk space.  Requires L<xapian-compact(1)> to be
+installed.  Unlikely useful with L</--reindex>, DO NOT use with
+L</--reindex> unless you want new messages to remain unindexed for
+days until reindexing is done.
+
+=item --split-at DOCCOUNT
+
+Implies and used for tuning C<--split-shards>.  The default is
+likely fine.  Tiny values may overflow system file/command-line
+limits while giant values negate the performance benefit.
+
+Default: 450000
+
+=item --multipass
+
+Passed to L<xapian-compact(1)> when using C<--split-shards>, may
+further improve performance at the expense of even more disk space.
+
 =item --wal
 
 Enable WAL (Write-Ahead-Log) on SQLite files.  WAL may reduce
index e7c07ed38d4085c9c5b6bbdc7954f50aa8b2a68a..fe58bc42bb45ccfe96df713f3121c3209914bccb 100644 (file)
@@ -81,6 +81,10 @@ affects indexing done at the end of a run.
 
 =item --max-size=BYTES
 
+=item --split-shards
+
+=item --split-at DOCCOUNT
+
 See L<public-inbox-index(1)> for a description of these options.
 
 These indexing options indexing at the end of a run.
index 3c3d9f08b445b4d46be3cf477bef127fed2d1039..590aeebfb9fbe80a683c20f1a9c5715351966a98 100644 (file)
@@ -345,9 +345,19 @@ sub parse_unsigned ($) {
 sub index_prepare ($$) {
        my ($opt, $cfg) = @_;
        my $env;
-       if ($opt->{compact}) {
+       my $need_compact = $opt->{compact};
+       if (defined(my $v = $opt->{'split-at'})) {
+               parse_unsigned(\$v) //
+                               die "--split-at=$v not parsed as unsigned\n";
+               $opt->{'split-at'} = $v;
+               $opt->{'split-shards'} = 1;
+       }
+       $need_compact = 1 if $opt->{'split-shards'};
+       if ($need_compact) {
                require PublicInbox::Xapcmd;
                PublicInbox::Xapcmd::check_compact();
+       }
+       if ($opt->{compact}) {
                $opt->{compact_opt} = { -coarse_lock => 1, compact => 1 };
                if (defined(my $jobs = $opt->{jobs})) {
                        $opt->{compact_opt}->{jobs} = $jobs;
@@ -362,7 +372,7 @@ sub index_prepare ($$) {
                $opt->{$k} = $v;
        }
 
-       # out-of-the-box builds of Xapian 1.4.x are still limited to 32-bit
+       # out-of-the-box builds of Xapian 1.4.x are still limited to 32-bit IDs
        # https://getting-started-with-xapian.readthedocs.io/en/latest/concepts/indexing/limitations.html
        $opt->{batch_size} and
                $env = { XAPIAN_FLUSH_THRESHOLD => '4294967295' };
index 1e48facaac81b97c0b86dbb1404f8d0c802f0469..f83b64652aafffa70cc0d660a61e43ba8614dd4e 100644 (file)
@@ -559,7 +559,10 @@ sub eidx_gc { # top-level entry point
        my ($self, $opt) = @_;
        $self->{cfg} or die "E: GC requires ->attach_config\n";
        $opt->{-idx_gc} = 1;
-       local $self->{checkpoint_unlocks} = 1;
+       $opt->{'split-shards'} and warn "W: --split-shards ignored with --gc\n";
+       delete local $opt->{'split-shards'};
+       delete local $opt->{'split-at'};
+       local $self->{ckpt_unlocks} = 1;
        local $self->{need_checkpoint} = 0;
        local $self->{nrec};
        local $self->{-opt} = $opt;
@@ -1133,7 +1136,11 @@ EOS
 
 sub eidx_sync ($$) { # main entry point
        my ($self, $opt) = @_;
-
+       warn <<EOM if $opt->{'split-shards'} && $opt->{reindex};
+W: --split-shards is generally not useful with --reindex
+W: --split-shards prevents new messages from being indexed until --reindex
+W: is complete (which may take days)
+EOM
        local $self->{current_info} = '';
        local $SIG{__WARN__} = PublicInbox::Admin::warn_cb $self;
        $self->idx_init($opt); # acquire lock via V2Writable::_idx_init
@@ -1157,11 +1164,11 @@ sub eidx_sync ($$) { # main entry point
        }
 
        if (my $msgids = delete($opt->{dedupe})) {
-               local $self->{checkpoint_unlocks} = 1;
+               local $self->{ckpt_unlocks} = 1 if !$opt->{'split-shards'};
                eidx_dedupe $self, $msgids;
        }
        if (delete($opt->{reindex})) {
-               local $self->{checkpoint_unlocks} = 1;
+               local $self->{ckpt_unlocks} = 1 if !$opt->{'split-shards'};
                eidx_reindex $self;
        }
 
index 11191765c1aeda94338bdf7b044003798b00264c..a9a0e505dffc60a92da0aa1f8190c0ca96a98e1b 100644 (file)
@@ -11,7 +11,7 @@ use strict;
 use v5.10.1;
 use parent qw(PublicInbox::Search PublicInbox::Lock PublicInbox::Umask
        Exporter);
-use autodie qw(open);
+use autodie qw(closedir opendir rename);
 use PublicInbox::Eml;
 use PublicInbox::DS qw(now);
 use PublicInbox::Search qw(xap_terms);
@@ -29,6 +29,8 @@ use PublicInbox::Spawn qw(run_wait popen_rd);
 use PublicInbox::Git qw(git_unquote);
 use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
 use PublicInbox::Address;
+use File::Glob qw(bsd_glob GLOB_NOSORT);
+use File::Path ();
 use Config;
 our @EXPORT_OK = qw(log2stack is_ancestor check_size prepare_stack
        index_text term_generator add_val is_bad_blob update_checkpoint
@@ -39,6 +41,7 @@ our $DB_NO_SYNC = 0;
 our $DB_DANGEROUS = 0;
 our $CHECKPOINT_INTVL = 15; # seconds
 our $DEFRAG_NR = 100000; # document count
+our $SHARD_SPLIT_AT = 450000; # document count
 our $BATCH_BYTES = $ENV{XAPIAN_FLUSH_THRESHOLD} ? 0x7fffffff :
        # assume a typical 64-bit system has 8x more RAM than a
        # typical 32-bit system:
@@ -99,11 +102,84 @@ sub new {
 
 sub need_xapian ($) { ($_[0]->{indexlevel} // 'full') =~ $xapianlevels }
 
+sub du_1_level (@) {
+       my $s = 0;
+       for my $d (@_) {
+               $s += (-s $_ // 0) for bsd_glob("$d/*", GLOB_NOSORT);
+       }
+       $s;
+}
+
+sub join_splits ($) {
+       my ($self) = @_;
+       require PublicInbox::Xapcmd;
+       my $cmd = PublicInbox::Xapcmd::compact_cmd($self->{-opt});
+       my $xpfx = $self->{xpfx};
+       opendir(my $dh, $xpfx);
+       my $shard = $self->{shard} // '';
+       my @tmps = grep /\A$shard\.[0-9]+\.tmp\z/, readdir($dh);
+       closedir $dh;
+       @tmps or return warn("BUG? no $shard.*.tmps to join in $xpfx\n");
+       $_ = ((split /\./, $_)[1] + 0) for @tmps; # Schwartzian transform
+       @tmps = sort { $a <=> $b } @tmps;
+       $_ = "$shard.$_.tmp" for @tmps; # undo transform
+
+       my $pr = $self->{-opt}->{-progress};
+       my @ftmps = map { "$xpfx/$_" } @tmps;
+       my ($before_bytes, $t0);
+       my $xdir = $self->xdir;
+
+       my $rdr = { -C => $xpfx };
+       my $wip = File::Temp->newdir("$shard.join-tmp-XXXX", DIR => $xpfx);
+       my $dst = $wip->dirname . "/$shard";
+       push @$cmd, $self->{shard}, @tmps, $dst;
+       $self->{-opt}->{cow} or PublicInbox::Syscall::nodatacow_dir($dst);
+       my $restore = $self->with_umask;
+       if ($pr) {
+               $pr->("$shard compacting (@$cmd)\n");
+               $before_bytes = du_1_level $xdir, @ftmps;
+               $t0 = now;
+       }
+       my $rd = popen_rd $cmd, undef, $rdr;
+       while (<$rd>) {
+               $pr or next;
+               s/\r/\r# $shard /g;
+               $pr->("# $shard $_");
+       }
+       $rd->close or die "@$cmd failed: \$?=$?\n";
+       File::Path::remove_tree(@ftmps);
+       my $owner = $self->{ibx} // $self->{eidx} // $self;
+       my $unlk = PublicInbox::Lock->new($owner->open_lock)->lock_for_scope;
+       if (-e $xdir) {
+               rename $xdir, "$dst/old";
+       } else {
+               warn "W: $xdir gone ($!), attempting to replace anyways...\n";
+       }
+       rename $dst, $xdir;
+       undef $unlk;
+       File::Path::remove_tree("$xdir/old");
+       return if !$pr;
+       my $after_bytes = du_1_level $xdir;
+       my $diff = now - $t0;
+       $pr->("$shard compact took ",
+               sprintf('%0.1fs, %0.1fMB => %0.1fMB',
+                       $diff, $before_bytes >> 20, $after_bytes >> 20), "\n");
+}
+
 sub idx_release {
        my ($self, $wake) = @_;
        if (need_xapian($self)) {
-               my $xdb = delete $self->{xdb} or croak '{xdb} not acquired';
+               my $djs = delete $self->{-do_join_splits};
+               delete $self->{-doc_max};
+               my $xdb = delete $self->{xdb} or croak 'BUG: {xdb} missing';
+               if ($djs) {
+                       $xdb->begin_transaction;
+                       $xdb->set_metadata('split-at', '');
+                       $xdb->commit_transaction;
+               }
                $xdb->close;
+               delete $self->{-xdb_tmp} and croak 'BUG: {-xdb_tmp} exists';
+               join_splits($self) if $djs && delete($self->{-splits_dirty});
        }
        $self->lock_release($wake) if $self->{creat};
        undef;
@@ -143,7 +219,6 @@ sub idx_acquire {
        }
        my $owner = $self->{ibx} // $self->{eidx} // $self;
        if ($self->{creat}) {
-               require File::Path;
                $self->lock_acquire;
 
                # don't create empty Xapian directories if we don't need Xapian
@@ -163,6 +238,30 @@ sub idx_acquire {
        $flag |= $DB_NO_SYNC if !$self->{-opt}->{fsync};
        my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) };
        croak "Failed opening $dir: $@" if $@;
+       $xdb->begin_transaction;
+       my $cur = $xdb->get_metadata('split-at');
+       if ($cur || $self->{-opt}->{'split-shards'}) {
+               my $new = $self->{-opt}->{'split-at'};
+               # respect in-progress value (XXX incomplete, delay until 3.x)
+               # this doesn't work if new/old public-inbox versions mix,
+               # so we currently disable {ckpt_unlocks} if using split-at
+               if ($cur) {
+                       $new && $new != $cur and warn <<EOM;
+W: using existing --split-at=$cur
+EOM
+                       $self->{-do_join_splits} or warn <<EOM;
+W: PID:$$ will not join split shards
+EOM
+                       $self->{-opt}->{'split-at'} = $cur;
+               } else {
+                       $cur ||= $new // $SHARD_SPLIT_AT;
+                       $xdb->set_metadata('split-at', $cur);
+                       $self->{-do_join_splits} = 1;
+               }
+               $self->{-doc_max} = $xdb->get_lastdocid || $cur;
+       }
+       ($self->{-do_join_splits} || $self->{-opt}->{dangerous}) ?
+                       $xdb->commit_transaction : $xdb->cancel_transaction;
        $self->{xdb} = $xdb;
 }
 
@@ -523,6 +622,68 @@ sub eml2doc ($$$;$) {
        $doc;
 }
 
+sub _xdb_tmp_new ($$$) {
+       my ($self, $dir, $flags) = @_;
+       $flags |= $DB_DANGEROUS | $DB_NO_SYNC;
+       my $xdb_tmp;
+       eval {
+               $xdb_tmp = $X->{WritableDatabase}->new($dir, $flags);
+               $xdb_tmp->begin_transaction;
+       };
+       if (my $err = $@) { # rethrow for stacktrace w/ PERL5OPT=-MCarp=verbose
+               my @offs = sort { $a <=> $b } keys %{$self->{-xdb_tmp}};
+               croak $err, "E: xdb_tmps active: @offs";
+       }
+       $xdb_tmp;
+}
+
+sub xdb_tmp_new ($$$) {
+       my ($self, $off, $docid) = @_;
+       my $dir = $self->xdir.".$off.tmp";
+       my $restore = $self->with_umask;
+       if (mkdir $dir) {
+               my $pr = $self->{-opt}->{-progress};
+               $pr->("indexing >= #$docid to ",($self->{shard}//''),
+                       ".$off.tmp...\n") if $pr;
+               PublicInbox::Syscall::nodatacow_dir($dir);
+       }
+       $self->{-xdb_tmp}->{$off} = _xdb_tmp_new $self, $dir, $DB_CREATE_OR_OPEN
+}
+
+sub xdb_tmp_get ($$) {
+       my ($self, $off) = @_;
+       my $dir = $self->xdir.".$off.tmp";
+       $self->{-xdb_tmp}->{$off} = _xdb_tmp_new $self, $dir, $DB_OPEN;
+}
+
+sub replace_doc ($$$) {
+       my ($self, $docid, $doc) = @_;
+       my ($xdb_tmp, $doc_max);
+       if (($doc_max = $self->{-doc_max}) && $docid > $doc_max) {
+               my $n = $self->{-opt}->{'split-at'} || $SHARD_SPLIT_AT;
+               my $off = int($docid / $n);
+               $xdb_tmp = $self->{-xdb_tmp}->{$off} //
+                                       xdb_tmp_new $self, $off, $docid;
+       }
+       ($xdb_tmp // $self->{xdb})->replace_document($docid, $doc);
+}
+
+sub _get_doc ($$) {
+       my ($self, $docid) = @_;
+       my ($xdb_tmp, $doc_max, $doc);
+       if (($doc_max = $self->{-doc_max}) && $docid > $doc_max) {
+               my $n = $self->{-opt}->{'split-at'} || $SHARD_SPLIT_AT;
+               my $off = int($docid / $n);
+               $xdb_tmp = $self->{-xdb_tmp}->{$off} // xdb_tmp_get $self, $off;
+       }
+       $doc = eval { ($xdb_tmp // $self->{xdb})->get_document($docid) };
+       if ($@) {
+               die $@ if ref($@) !~ /\bDocNotFoundError\b/;
+               warn "E: #$docid missing in Xapian\n";
+       }
+       $doc;
+}
+
 sub add_xapian ($$$$) {
        my ($self, $eml, $smsg, $mids) = @_;
        begin_txn_lazy($self);
@@ -541,7 +702,7 @@ sub add_xapian ($$$$) {
                add_bool_term($doc, 'O'.$eidx_key) if $eidx_key ne '.';
                index_list_id_raw $self, $doc, @list_ids;
        }
-       $self->{xdb}->replace_document($smsg->{num}, $doc);
+       replace_doc $self, $smsg->{num}, $doc;
 }
 
 sub v1_mm_init ($) {
@@ -607,14 +768,6 @@ sub add_message { # v1 + tests only
        $smsg->{num};
 }
 
-sub _get_doc ($$) {
-       my ($self, $docid) = @_;
-       $self->get_doc($docid) // do {
-               warn "E: #$docid missing in Xapian\n";
-               undef;
-       }
-}
-
 sub add_eidx_info_raw {
        my ($self, $docid, $eidx_key, @list_ids) = @_;
        begin_txn_lazy($self);
@@ -625,9 +778,10 @@ sub add_eidx_info_raw {
        add_bool_term($doc, 'O'.$eidx_key) if $eidx_key ne '.';
 
        index_list_id_raw $self, $doc, @list_ids;
-       $self->{xdb}->replace_document($docid, $doc);
+       replace_doc $self, $docid, $doc;
 }
 
+# for lei/store to access uncommitted terms
 sub get_terms {
        my ($self, $pfx, $docid) = @_;
        begin_txn_lazy($self);
@@ -660,7 +814,7 @@ sub remove_eidx_info_raw {
                # be needed and users using the "l:" prefix are probably
                # rarer.
        }
-       $self->{xdb}->replace_document($docid, $doc);
+       replace_doc $self, $docid, $doc;
 }
 
 sub set_vmd {
@@ -685,7 +839,7 @@ sub set_vmd {
        return unless scalar(@rm) || scalar(@add);
        $doc->remove_term($_) for @rm;
        add_bool_term($doc, $_) for @add;
-       $self->{xdb}->replace_document($docid, $doc);
+       replace_doc $self, $docid, $doc;
 }
 
 sub apply_vmd_mod ($$) {
@@ -720,7 +874,7 @@ sub add_vmd {
                $updated += scalar(@$add);
        }
        $updated += apply_vmd_mod($doc, $vmd);
-       $self->{xdb}->replace_document($docid, $doc) if $updated;
+       replace_doc($self, $docid, $doc) if $updated;
 }
 
 sub remove_vmd {
@@ -738,7 +892,7 @@ sub remove_vmd {
                        };
                }
        }
-       $self->{xdb}->replace_document($docid, $doc) if $replace;
+       replace_doc($self, $docid, $doc) if $replace;
 }
 
 sub update_vmd {
@@ -746,26 +900,44 @@ sub update_vmd {
        begin_txn_lazy($self);
        my $doc = _get_doc($self, $docid) or return;
        my $updated = apply_vmd_mod($doc, $vmd_mod);
-       $self->{xdb}->replace_document($docid, $doc) if $updated;
+       replace_doc($self, $docid, $doc) if $updated;
        $updated;
 }
 
-sub xdb_remove {
+sub _xdb_remove ($@) {
        my ($self, @docids) = @_;
+       my @warn;
        begin_txn_lazy($self);
        my $xdb = $self->{xdb} // die 'BUG: missing {xdb}';
+       if (my $doc_max = $self->{-doc_max}) {
+               my $n = $self->{-opt}->{'split-at'} || $SHARD_SPLIT_AT;
+               for my $docid (grep { $_ > $doc_max } @docids) {
+                       my $off = int($docid / $n);
+                       my $xdb_tmp = $self->{-xdb_tmp}->{$off} //
+                                                       xdb_tmp_get $self, $off;
+                       eval { $xdb_tmp->delete_document($docid) };
+                       push(@warn, "E: #$docid not in Xapian tmp[$off]? $@\n")
+                               if $@;
+               }
+               @docids = grep { $_ <= $doc_max } @docids;
+       }
        for my $docid (@docids) {
                eval { $xdb->delete_document($docid) };
-               warn "E: #$docid not in Xapian? $@\n" if $@;
+               push(@warn, "E: #$docid not in Xapian? $@\n") if $@;
        }
+       @warn;
+}
+
+sub xdb_remove {
+       my ($self, @docids) = @_;
+       my @warn = _xdb_remove $self, @docids;
+       warn @warn if @warn;
 }
 
 sub xdb_remove_quiet {
-       my ($self, $docid) = @_;
-       begin_txn_lazy($self);
-       my $xdb = $self->{xdb} // die 'BUG: missing {xdb}';
-       eval { $xdb->delete_document($docid) };
-       ++$self->{-quiet_rm} unless $@;
+       my ($self, @docids) = @_;
+       my @warn = _xdb_remove $self, @docids;
+       $self->{-quiet_rm} += (scalar(@docids) - scalar(@warn));
 }
 
 sub nr_quiet_rm { delete($_[0]->{-quiet_rm}) // 0 }
@@ -1144,6 +1316,7 @@ sub _index_sync {
 sub DESTROY {
        # order matters for unlocking
        $_[0]->{xdb} = undef;
+       delete $_[0]->{-xdb_tmp};
        $_[0]->{lockfh} = undef;
 }
 
@@ -1193,7 +1366,13 @@ sub commit_txn_lazy {
                set_metadata_once($self);
                $xdb->commit_transaction;
        }
-       $self->{oidx}->commit_lazy if $self->{oidx};
+       # for memory savings:
+       for my $xdb_tmp (values %{delete $self->{-xdb_tmp} // {}}) {
+               $xdb_tmp->commit_transaction;
+               $xdb_tmp->close; # I wasted a day because I forgot this line :<
+               $self->{-splits_dirty} = 1;
+       }
+       $self->{oidx}->commit_lazy if $self->{oidx}; # v1 only
 }
 
 sub eidx_shard_new {
index ea8c4ff125dabcffb3bead4bf1f35cc76a6adaab..330cb55471df3c3b0a3886fafab5351595be754d 100644 (file)
@@ -66,10 +66,4 @@ sub idx_close {
        $self->idx_release if $self->{xdb};
 }
 
-sub shard_close {
-       my ($self) = @_;
-       $self->ipc_do('idx_close');
-       $self->ipc_worker_stop;
-}
-
 1;
index 0cf30c928884f47d3a17d2d5a4c4694f1986c6b9..4c9b45216cfc8b0d099a4ed5b0ceae301e06212d 100644 (file)
@@ -603,7 +603,7 @@ sub active { !!$_[0]->{im} }
 # public
 sub done {
        my ($self) = @_;
-       local $PublicInbox::DS::in_loop; # sync awaitpid in shard_close
+       local $PublicInbox::DS::in_loop; # sync awaitpid in ipc_worker_stop
        my $err = '';
        if (my $im = delete $self->{im}) {
                eval { $im->done }; # PublicInbox::Import::done
@@ -625,8 +625,9 @@ sub done {
 
        my $shards = delete $self->{idx_shards};
        if ($shards) {
+               $_->ipc_do('idx_close') for @$shards;
                for (@$shards) {
-                       eval { $_->shard_close };
+                       eval { $_->ipc_worker_stop };
                        $err .= "shard close: $@\n" if $@;
                }
        }
@@ -734,7 +735,7 @@ sub reindex_checkpoint ($) {
        die 'BUG: {im} during reindex' if $self->{im};
        $t0 = now;
        my $txn_bytes = $self->{transact_bytes};
-       if ($self->{ibx_map} && !$self->{checkpoint_unlocks}) {
+       if ($self->{ibx_map} && !$self->{ckpt_unlocks}) {
                checkpoint($self, 1); # no need to release lock on pure index
        } else {
                $self->done; # release lock
@@ -1003,6 +1004,10 @@ sub sync_prepare ($) {
                return 0 if $self->{quit};
                my $nr = $stk ? $stk->num_records : 0;
                $pr->("$nr\n") if $pr;
+               $nr > 10000 && !$self->{-opt}->{'split-shards'} &&
+                       !$self->{-split_hinted}++ and warn <<EOM;
+# hint: --split-shards recommended if you have many messages to index
+EOM
                $unit->{stack} = $stk; # may be undef
                unshift @{$self->{todo}}, $unit;
                $regen_max += $nr;
index 87db423860f0059916b1d0d5670187b401c15017..6d40e32addc5e19210e745376ba965c13cc6afc4 100644 (file)
@@ -3,7 +3,7 @@
 package PublicInbox::Xapcmd;
 use v5.12;
 use autodie qw(chmod closedir open opendir rename syswrite);
-use PublicInbox::Spawn qw(which popen_rd);
+use PublicInbox::Spawn qw(which popen_rd spawn);
 use PublicInbox::Syscall;
 use PublicInbox::Lock;
 use PublicInbox::Admin qw(setup_signals);
@@ -13,7 +13,10 @@ use PublicInbox::SearchIdx;
 use File::Temp 0.19 (); # ->newdir
 use File::Path qw(remove_tree);
 use POSIX qw(WNOHANG dup _exit);
-use PublicInbox::DS;
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::IO qw(try_cat);
+use PublicInbox::OnDestroy;
+use Carp qw(croak);
 
 # support testing with dev versions of Xapian which installs
 # commands with a version number suffix (e.g. "xapian-compact-1.5")
@@ -21,6 +24,39 @@ our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact';
 our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F);
 my %SKIP = map { $_ => 1 } qw(. ..);
 
+my $reap_compact = sub { # awaitpid cb
+       my ($pid, $ok, $wip, $old, $rename_on_destroy) = @_;
+       $? and die "E: xapian-compact $wip:\n", try_cat "$wip/err";
+       push @$ok, $wip, $old;
+};
+
+# TODO: use this for old code, too
+my $rename_shards = sub { # on_destroy cb
+       my ($owner, $ok, $expect_nr) = @_;
+       ($expect_nr * 2) == @$ok or return; # some xapian-compact failed
+       my (@old_shard, @unlink, @rq);
+       while (@$ok) {
+               my ($wip, $old) = splice @$ok, 0, 2;
+               my $new = $wip->dirname;
+               if (-e $old) {
+                       my $mode = (stat(_))[2];
+                       chmod $mode & 07777, $new;
+                       push @old_shard, "$new/old";
+                       push @rq, $old, $old_shard[-1];
+               } else {
+                       warn "W: shard at $old disappeared during compact\n";
+               }
+               push @rq, $new, $old;
+               push @unlink, "$old/err", "$old/out";
+       }
+       # minimize lock time: (TODO: dedicated lock for shard count changes)
+       my $lk = $owner ? $owner->lock_for_scope : undef;
+       rename(shift(@rq), shift(@rq)) while @rq;
+       undef $lk;
+       remove_tree(@old_shard);
+       unlink @unlink;
+};
+
 sub commit_changes ($$$$) {
        my ($ibx, $im, $tmp, $opt) = @_;
        my $reshard = $opt->{reshard};
@@ -347,6 +383,19 @@ sub kill_compact { # setup_signals callback
        kill($sig, $$ioref->attached_pid // return) if defined($$ioref);
 }
 
+# we rely on --no-renumber to keep docids synced to NNTP
+sub compact_cmd ($) {
+       my ($opt) = @_;
+       my $cmd = [ $XAPIAN_COMPACT, '--no-renumber' ];
+       for my $sw (qw(no-full fuller multipass)) {
+               push(@$cmd, "--$sw") if $opt->{$sw};
+       }
+       for my $sw (qw(blocksize)) {
+               push(@$cmd, "--$sw", $opt->{$sw}) if defined $opt->{$sw}
+       }
+       $cmd;
+}
+
 # xapian-compact wrapper
 sub compact ($$$) { # cb_spawn callback
        my ($ibxish, $args, $opt) = @_;
@@ -361,15 +410,7 @@ sub compact ($$$) { # cb_spawn callback
                $rdr->{$fd} = $dfd;
        }
 
-       # we rely on --no-renumber to keep docids synched to NNTP
-       my $cmd = [ $XAPIAN_COMPACT, '--no-renumber' ];
-       for my $sw (qw(no-full fuller)) {
-               push @$cmd, "--$sw" if $opt->{$sw};
-       }
-       for my $sw (qw(blocksize)) {
-               defined(my $v = $opt->{$sw}) or next;
-               push @$cmd, "--$sw", $v;
-       }
+       my $cmd = compact_cmd $opt;
        $pr->("$pfx `".join(' ', @$cmd)."'\n") if $pr;
        push @$cmd, $src, $dst;
        local @SIG{keys %SIG} = values %SIG;
@@ -604,4 +645,41 @@ sub cpdb ($$$) { # cb_spawn callback
        compact $ibxish, [ $tmp, $new ], $opt;
 }
 
+sub join_split_tmps ($$) {
+       my ($owner, $opt) = @_;
+       my $xpfx = $owner->{xpfx} // croak "BUG: $owner has no {xfpx}";
+       my (@shards, $tmps, @pids);
+       opendir(my $dh, $xpfx);
+       while (defined(my $dir = readdir $dh)) {
+               if ($dir =~ m!\A[0-9]+\z!) {
+                       push @shards, $dir;
+               } elsif ($dir =~ m!\A([0-9]+)-[0-9]+\.tmp\z!) {
+                       push @{$tmps->[$1]}, $dir;
+               }
+       }
+       closedir $dh;
+       return @pids if !$tmps || !@shards;
+       @shards = sort { $a <=> $b } @shards;
+       ($shards[-1] + 1) == scalar(@shards) or
+               croak "E: gaps in shards, have: @shards";
+
+       my $cmd = compact_cmd $opt;
+       my $ok = [];
+       my $rod = on_destroy $rename_shards, $owner, $ok, scalar(@shards);
+       for my $i (@shards) {
+               $tmps->[$i] // next; # no split tmps for a given shard
+               my $wip = File::Temp->newdir("$i.join-tmp-XXXX", DIR => $xpfx);
+               my $dn = $wip->dirname;
+               $opt->{cow} or PublicInbox::Syscall::nodatacow_dir($dn);
+               my $rdr = { -C => $xpfx };
+               open $rdr->{1}, '>>', "$dn/out";
+               open $rdr->{2}, '>>', "$dn/err";
+               my @srcs = ($i, @{$tmps->[$i]});
+               my $pid = spawn([ @$cmd, @srcs, $dn ], undef, $rdr);
+               awaitpid($pid, $reap_compact, $ok, $wip, "$xpfx/$i", $rod);
+               push @pids, $pid;
+       }
+       @pids;
+}
+
 1;
index 6f0aa209b5eb502ef172a026a3a5e4e0088a531d..97d71fc7ceccad4026bd49e04b9f81437141045c 100755 (executable)
@@ -38,6 +38,7 @@ GetOptions($opt, qw(jobs|j=i index! help|h C=s@),
                qw(verbose|v+ rethread compact|c+ fsync|sync!
                indexlevel|index-level|L=s max_size|max-size=s
                commit-interval|commit=i batch_size|batch-size=s cow! wal
+               split-shards split-at=s multipass
                sequential-shard|seq-shard
                )) or die $help;
 if ($opt->{help}) { print $help; exit 0 };
index d12b07c4612e4ce7c68dfb5efb712801d0659dac..6826157290ed306afacc4ee7967154535e3f9457 100755 (executable)
@@ -32,6 +32,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i
                commit-interval|commit=i
                indexlevel|index-level|L=s max_size|max-size=s
                batch_size|batch-size=s
+               split-shards split-at=s multipass
                dedupe:s@ gc watch scan! dry-run|n
                multi-pack-index! all C=s@ help|h))
        or die $help;
index 67728091b6c611180117d6c65d190a669d2d7a31..0ebe96d4763c5170aac3c768007b21792c12a383 100755 (executable)
@@ -42,6 +42,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i prune
                since|after=s until|before=s
                sequential-shard|seq-shard
                multi-pack-index!
+               split-shards split-at=s multipass
                no-update-extindex update-extindex|E=s@
                fast-noop|F skip-docdata all C=s@ help|h))
        or die $help;
index 3faa63afb598c39c5fae55d19ebed2beef89c68e..adee64f36bab75cd3e025ea34b4443d55b076f60 100755 (executable)
@@ -31,6 +31,7 @@ GetOptions($opt, qw(
        cow! fsync|sync! compact|c reshard|R=i
        commit-interval|commit=i max_size|max-size=s batch_size|batch-size=s
        sequential-shard|seq-shard
+       split-shards split-at=s
        jobs|j=i quiet|q verbose|v
        blocksize|b=s no-full|n fuller|F
        all C=s@ help|h)) or die $help;