From: Eric Wong Date: Wed, 8 Oct 2025 21:24:22 +0000 (+0000) Subject: *index: --split-shards to speeds initial indexing X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=699220cc05ecf0ad06b8f175b94253b708e43fa8;p=thirdparty%2Fpublic-inbox.git *index: --split-shards to speeds initial indexing When indexing millions of messages, Xapian has a tendency to slow down as each shard gets bigger. --split-shards allows Xapian to work on temporary shards (more akin to "epochs") and uses xapian-compact(1) to commit the finalized changes for readers. The result is roughly 2x faster for millions of messages. The downside of this switch is temporary space use increases by 2-3x and incremental changes are not visible to readers until all indexing is complete. It has no useful effect on --reindex, but --reindex is typically faster than initial indexing anyways since space is already allocated. It still takes days to create a new extindex of lore, but fewer days than before. Another beneficial side effect of this switch is it also tends to reduce the effect of fragmentation for --cow users on btrfs. --- diff --git a/Documentation/public-inbox-convert.pod b/Documentation/public-inbox-convert.pod index 86e8265b3..5905675fa 100644 --- a/Documentation/public-inbox-convert.pod +++ b/Documentation/public-inbox-convert.pod @@ -49,6 +49,12 @@ process which distributes work to the Xapian shards. =item --max-size=BYTES +=item --split-shards + +=item --split-at DOCCOUNT + +=item --multipass + =item --wal These options affect indexing. They have no effect if diff --git a/Documentation/public-inbox-extindex.pod b/Documentation/public-inbox-extindex.pod index 136edf6f5..f083678c3 100644 --- a/Documentation/public-inbox-extindex.pod +++ b/Documentation/public-inbox-extindex.pod @@ -34,6 +34,12 @@ along with L and L Perl modules. =item --batch-size SIZE +=item --split-shards + +=item --split-at DOCCOUNT + +=item --multipass + =item --wal =item --commit SECONDS diff --git a/Documentation/public-inbox-index.pod b/Documentation/public-inbox-index.pod index 53ed9e881..81134301f 100644 --- a/Documentation/public-inbox-index.pod +++ b/Documentation/public-inbox-index.pod @@ -209,6 +209,27 @@ Available in public-inbox 2.0.0+ Passed directly to L to limit changes for C<--reindex> +=item --split-shards + +Speed up initial indexing of millions of messages by ~2x at the +cost of 2-3x temporary disk space. Requires L to be +installed. Unlikely useful with L, DO NOT use with +L unless you want new messages to remain unindexed for +days until reindexing is done. + +=item --split-at DOCCOUNT + +Implies and used for tuning C<--split-shards>. The default is +likely fine. Tiny values may overflow system file/command-line +limits while giant values negate the performance benefit. + +Default: 450000 + +=item --multipass + +Passed to L when using C<--split-shards>, may +further improve performance at the expense of even more disk space. + =item --wal Enable WAL (Write-Ahead-Log) on SQLite files. WAL may reduce diff --git a/Documentation/public-inbox-xcpdb.pod b/Documentation/public-inbox-xcpdb.pod index e7c07ed38..fe58bc42b 100644 --- a/Documentation/public-inbox-xcpdb.pod +++ b/Documentation/public-inbox-xcpdb.pod @@ -81,6 +81,10 @@ affects indexing done at the end of a run. =item --max-size=BYTES +=item --split-shards + +=item --split-at DOCCOUNT + See L for a description of these options. These indexing options indexing at the end of a run. diff --git a/lib/PublicInbox/Admin.pm b/lib/PublicInbox/Admin.pm index 3c3d9f08b..590aeebfb 100644 --- a/lib/PublicInbox/Admin.pm +++ b/lib/PublicInbox/Admin.pm @@ -345,9 +345,19 @@ sub parse_unsigned ($) { sub index_prepare ($$) { my ($opt, $cfg) = @_; my $env; - if ($opt->{compact}) { + my $need_compact = $opt->{compact}; + if (defined(my $v = $opt->{'split-at'})) { + parse_unsigned(\$v) // + die "--split-at=$v not parsed as unsigned\n"; + $opt->{'split-at'} = $v; + $opt->{'split-shards'} = 1; + } + $need_compact = 1 if $opt->{'split-shards'}; + if ($need_compact) { require PublicInbox::Xapcmd; PublicInbox::Xapcmd::check_compact(); + } + if ($opt->{compact}) { $opt->{compact_opt} = { -coarse_lock => 1, compact => 1 }; if (defined(my $jobs = $opt->{jobs})) { $opt->{compact_opt}->{jobs} = $jobs; @@ -362,7 +372,7 @@ sub index_prepare ($$) { $opt->{$k} = $v; } - # out-of-the-box builds of Xapian 1.4.x are still limited to 32-bit + # out-of-the-box builds of Xapian 1.4.x are still limited to 32-bit IDs # https://getting-started-with-xapian.readthedocs.io/en/latest/concepts/indexing/limitations.html $opt->{batch_size} and $env = { XAPIAN_FLUSH_THRESHOLD => '4294967295' }; diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 1e48facaa..f83b64652 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -559,7 +559,10 @@ sub eidx_gc { # top-level entry point my ($self, $opt) = @_; $self->{cfg} or die "E: GC requires ->attach_config\n"; $opt->{-idx_gc} = 1; - local $self->{checkpoint_unlocks} = 1; + $opt->{'split-shards'} and warn "W: --split-shards ignored with --gc\n"; + delete local $opt->{'split-shards'}; + delete local $opt->{'split-at'}; + local $self->{ckpt_unlocks} = 1; local $self->{need_checkpoint} = 0; local $self->{nrec}; local $self->{-opt} = $opt; @@ -1133,7 +1136,11 @@ EOS sub eidx_sync ($$) { # main entry point my ($self, $opt) = @_; - + warn <{'split-shards'} && $opt->{reindex}; +W: --split-shards is generally not useful with --reindex +W: --split-shards prevents new messages from being indexed until --reindex +W: is complete (which may take days) +EOM local $self->{current_info} = ''; local $SIG{__WARN__} = PublicInbox::Admin::warn_cb $self; $self->idx_init($opt); # acquire lock via V2Writable::_idx_init @@ -1157,11 +1164,11 @@ sub eidx_sync ($$) { # main entry point } if (my $msgids = delete($opt->{dedupe})) { - local $self->{checkpoint_unlocks} = 1; + local $self->{ckpt_unlocks} = 1 if !$opt->{'split-shards'}; eidx_dedupe $self, $msgids; } if (delete($opt->{reindex})) { - local $self->{checkpoint_unlocks} = 1; + local $self->{ckpt_unlocks} = 1 if !$opt->{'split-shards'}; eidx_reindex $self; } diff --git a/lib/PublicInbox/SearchIdx.pm b/lib/PublicInbox/SearchIdx.pm index 11191765c..a9a0e505d 100644 --- a/lib/PublicInbox/SearchIdx.pm +++ b/lib/PublicInbox/SearchIdx.pm @@ -11,7 +11,7 @@ use strict; use v5.10.1; use parent qw(PublicInbox::Search PublicInbox::Lock PublicInbox::Umask Exporter); -use autodie qw(open); +use autodie qw(closedir opendir rename); use PublicInbox::Eml; use PublicInbox::DS qw(now); use PublicInbox::Search qw(xap_terms); @@ -29,6 +29,8 @@ use PublicInbox::Spawn qw(run_wait popen_rd); use PublicInbox::Git qw(git_unquote); use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp); use PublicInbox::Address; +use File::Glob qw(bsd_glob GLOB_NOSORT); +use File::Path (); use Config; our @EXPORT_OK = qw(log2stack is_ancestor check_size prepare_stack index_text term_generator add_val is_bad_blob update_checkpoint @@ -39,6 +41,7 @@ our $DB_NO_SYNC = 0; our $DB_DANGEROUS = 0; our $CHECKPOINT_INTVL = 15; # seconds our $DEFRAG_NR = 100000; # document count +our $SHARD_SPLIT_AT = 450000; # document count our $BATCH_BYTES = $ENV{XAPIAN_FLUSH_THRESHOLD} ? 0x7fffffff : # assume a typical 64-bit system has 8x more RAM than a # typical 32-bit system: @@ -99,11 +102,84 @@ sub new { sub need_xapian ($) { ($_[0]->{indexlevel} // 'full') =~ $xapianlevels } +sub du_1_level (@) { + my $s = 0; + for my $d (@_) { + $s += (-s $_ // 0) for bsd_glob("$d/*", GLOB_NOSORT); + } + $s; +} + +sub join_splits ($) { + my ($self) = @_; + require PublicInbox::Xapcmd; + my $cmd = PublicInbox::Xapcmd::compact_cmd($self->{-opt}); + my $xpfx = $self->{xpfx}; + opendir(my $dh, $xpfx); + my $shard = $self->{shard} // ''; + my @tmps = grep /\A$shard\.[0-9]+\.tmp\z/, readdir($dh); + closedir $dh; + @tmps or return warn("BUG? no $shard.*.tmps to join in $xpfx\n"); + $_ = ((split /\./, $_)[1] + 0) for @tmps; # Schwartzian transform + @tmps = sort { $a <=> $b } @tmps; + $_ = "$shard.$_.tmp" for @tmps; # undo transform + + my $pr = $self->{-opt}->{-progress}; + my @ftmps = map { "$xpfx/$_" } @tmps; + my ($before_bytes, $t0); + my $xdir = $self->xdir; + + my $rdr = { -C => $xpfx }; + my $wip = File::Temp->newdir("$shard.join-tmp-XXXX", DIR => $xpfx); + my $dst = $wip->dirname . "/$shard"; + push @$cmd, $self->{shard}, @tmps, $dst; + $self->{-opt}->{cow} or PublicInbox::Syscall::nodatacow_dir($dst); + my $restore = $self->with_umask; + if ($pr) { + $pr->("$shard compacting (@$cmd)\n"); + $before_bytes = du_1_level $xdir, @ftmps; + $t0 = now; + } + my $rd = popen_rd $cmd, undef, $rdr; + while (<$rd>) { + $pr or next; + s/\r/\r# $shard /g; + $pr->("# $shard $_"); + } + $rd->close or die "@$cmd failed: \$?=$?\n"; + File::Path::remove_tree(@ftmps); + my $owner = $self->{ibx} // $self->{eidx} // $self; + my $unlk = PublicInbox::Lock->new($owner->open_lock)->lock_for_scope; + if (-e $xdir) { + rename $xdir, "$dst/old"; + } else { + warn "W: $xdir gone ($!), attempting to replace anyways...\n"; + } + rename $dst, $xdir; + undef $unlk; + File::Path::remove_tree("$xdir/old"); + return if !$pr; + my $after_bytes = du_1_level $xdir; + my $diff = now - $t0; + $pr->("$shard compact took ", + sprintf('%0.1fs, %0.1fMB => %0.1fMB', + $diff, $before_bytes >> 20, $after_bytes >> 20), "\n"); +} + sub idx_release { my ($self, $wake) = @_; if (need_xapian($self)) { - my $xdb = delete $self->{xdb} or croak '{xdb} not acquired'; + my $djs = delete $self->{-do_join_splits}; + delete $self->{-doc_max}; + my $xdb = delete $self->{xdb} or croak 'BUG: {xdb} missing'; + if ($djs) { + $xdb->begin_transaction; + $xdb->set_metadata('split-at', ''); + $xdb->commit_transaction; + } $xdb->close; + delete $self->{-xdb_tmp} and croak 'BUG: {-xdb_tmp} exists'; + join_splits($self) if $djs && delete($self->{-splits_dirty}); } $self->lock_release($wake) if $self->{creat}; undef; @@ -143,7 +219,6 @@ sub idx_acquire { } my $owner = $self->{ibx} // $self->{eidx} // $self; if ($self->{creat}) { - require File::Path; $self->lock_acquire; # don't create empty Xapian directories if we don't need Xapian @@ -163,6 +238,30 @@ sub idx_acquire { $flag |= $DB_NO_SYNC if !$self->{-opt}->{fsync}; my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) }; croak "Failed opening $dir: $@" if $@; + $xdb->begin_transaction; + my $cur = $xdb->get_metadata('split-at'); + if ($cur || $self->{-opt}->{'split-shards'}) { + my $new = $self->{-opt}->{'split-at'}; + # respect in-progress value (XXX incomplete, delay until 3.x) + # this doesn't work if new/old public-inbox versions mix, + # so we currently disable {ckpt_unlocks} if using split-at + if ($cur) { + $new && $new != $cur and warn <{-do_join_splits} or warn <{-opt}->{'split-at'} = $cur; + } else { + $cur ||= $new // $SHARD_SPLIT_AT; + $xdb->set_metadata('split-at', $cur); + $self->{-do_join_splits} = 1; + } + $self->{-doc_max} = $xdb->get_lastdocid || $cur; + } + ($self->{-do_join_splits} || $self->{-opt}->{dangerous}) ? + $xdb->commit_transaction : $xdb->cancel_transaction; $self->{xdb} = $xdb; } @@ -523,6 +622,68 @@ sub eml2doc ($$$;$) { $doc; } +sub _xdb_tmp_new ($$$) { + my ($self, $dir, $flags) = @_; + $flags |= $DB_DANGEROUS | $DB_NO_SYNC; + my $xdb_tmp; + eval { + $xdb_tmp = $X->{WritableDatabase}->new($dir, $flags); + $xdb_tmp->begin_transaction; + }; + if (my $err = $@) { # rethrow for stacktrace w/ PERL5OPT=-MCarp=verbose + my @offs = sort { $a <=> $b } keys %{$self->{-xdb_tmp}}; + croak $err, "E: xdb_tmps active: @offs"; + } + $xdb_tmp; +} + +sub xdb_tmp_new ($$$) { + my ($self, $off, $docid) = @_; + my $dir = $self->xdir.".$off.tmp"; + my $restore = $self->with_umask; + if (mkdir $dir) { + my $pr = $self->{-opt}->{-progress}; + $pr->("indexing >= #$docid to ",($self->{shard}//''), + ".$off.tmp...\n") if $pr; + PublicInbox::Syscall::nodatacow_dir($dir); + } + $self->{-xdb_tmp}->{$off} = _xdb_tmp_new $self, $dir, $DB_CREATE_OR_OPEN +} + +sub xdb_tmp_get ($$) { + my ($self, $off) = @_; + my $dir = $self->xdir.".$off.tmp"; + $self->{-xdb_tmp}->{$off} = _xdb_tmp_new $self, $dir, $DB_OPEN; +} + +sub replace_doc ($$$) { + my ($self, $docid, $doc) = @_; + my ($xdb_tmp, $doc_max); + if (($doc_max = $self->{-doc_max}) && $docid > $doc_max) { + my $n = $self->{-opt}->{'split-at'} || $SHARD_SPLIT_AT; + my $off = int($docid / $n); + $xdb_tmp = $self->{-xdb_tmp}->{$off} // + xdb_tmp_new $self, $off, $docid; + } + ($xdb_tmp // $self->{xdb})->replace_document($docid, $doc); +} + +sub _get_doc ($$) { + my ($self, $docid) = @_; + my ($xdb_tmp, $doc_max, $doc); + if (($doc_max = $self->{-doc_max}) && $docid > $doc_max) { + my $n = $self->{-opt}->{'split-at'} || $SHARD_SPLIT_AT; + my $off = int($docid / $n); + $xdb_tmp = $self->{-xdb_tmp}->{$off} // xdb_tmp_get $self, $off; + } + $doc = eval { ($xdb_tmp // $self->{xdb})->get_document($docid) }; + if ($@) { + die $@ if ref($@) !~ /\bDocNotFoundError\b/; + warn "E: #$docid missing in Xapian\n"; + } + $doc; +} + sub add_xapian ($$$$) { my ($self, $eml, $smsg, $mids) = @_; begin_txn_lazy($self); @@ -541,7 +702,7 @@ sub add_xapian ($$$$) { add_bool_term($doc, 'O'.$eidx_key) if $eidx_key ne '.'; index_list_id_raw $self, $doc, @list_ids; } - $self->{xdb}->replace_document($smsg->{num}, $doc); + replace_doc $self, $smsg->{num}, $doc; } sub v1_mm_init ($) { @@ -607,14 +768,6 @@ sub add_message { # v1 + tests only $smsg->{num}; } -sub _get_doc ($$) { - my ($self, $docid) = @_; - $self->get_doc($docid) // do { - warn "E: #$docid missing in Xapian\n"; - undef; - } -} - sub add_eidx_info_raw { my ($self, $docid, $eidx_key, @list_ids) = @_; begin_txn_lazy($self); @@ -625,9 +778,10 @@ sub add_eidx_info_raw { add_bool_term($doc, 'O'.$eidx_key) if $eidx_key ne '.'; index_list_id_raw $self, $doc, @list_ids; - $self->{xdb}->replace_document($docid, $doc); + replace_doc $self, $docid, $doc; } +# for lei/store to access uncommitted terms sub get_terms { my ($self, $pfx, $docid) = @_; begin_txn_lazy($self); @@ -660,7 +814,7 @@ sub remove_eidx_info_raw { # be needed and users using the "l:" prefix are probably # rarer. } - $self->{xdb}->replace_document($docid, $doc); + replace_doc $self, $docid, $doc; } sub set_vmd { @@ -685,7 +839,7 @@ sub set_vmd { return unless scalar(@rm) || scalar(@add); $doc->remove_term($_) for @rm; add_bool_term($doc, $_) for @add; - $self->{xdb}->replace_document($docid, $doc); + replace_doc $self, $docid, $doc; } sub apply_vmd_mod ($$) { @@ -720,7 +874,7 @@ sub add_vmd { $updated += scalar(@$add); } $updated += apply_vmd_mod($doc, $vmd); - $self->{xdb}->replace_document($docid, $doc) if $updated; + replace_doc($self, $docid, $doc) if $updated; } sub remove_vmd { @@ -738,7 +892,7 @@ sub remove_vmd { }; } } - $self->{xdb}->replace_document($docid, $doc) if $replace; + replace_doc($self, $docid, $doc) if $replace; } sub update_vmd { @@ -746,26 +900,44 @@ sub update_vmd { begin_txn_lazy($self); my $doc = _get_doc($self, $docid) or return; my $updated = apply_vmd_mod($doc, $vmd_mod); - $self->{xdb}->replace_document($docid, $doc) if $updated; + replace_doc($self, $docid, $doc) if $updated; $updated; } -sub xdb_remove { +sub _xdb_remove ($@) { my ($self, @docids) = @_; + my @warn; begin_txn_lazy($self); my $xdb = $self->{xdb} // die 'BUG: missing {xdb}'; + if (my $doc_max = $self->{-doc_max}) { + my $n = $self->{-opt}->{'split-at'} || $SHARD_SPLIT_AT; + for my $docid (grep { $_ > $doc_max } @docids) { + my $off = int($docid / $n); + my $xdb_tmp = $self->{-xdb_tmp}->{$off} // + xdb_tmp_get $self, $off; + eval { $xdb_tmp->delete_document($docid) }; + push(@warn, "E: #$docid not in Xapian tmp[$off]? $@\n") + if $@; + } + @docids = grep { $_ <= $doc_max } @docids; + } for my $docid (@docids) { eval { $xdb->delete_document($docid) }; - warn "E: #$docid not in Xapian? $@\n" if $@; + push(@warn, "E: #$docid not in Xapian? $@\n") if $@; } + @warn; +} + +sub xdb_remove { + my ($self, @docids) = @_; + my @warn = _xdb_remove $self, @docids; + warn @warn if @warn; } sub xdb_remove_quiet { - my ($self, $docid) = @_; - begin_txn_lazy($self); - my $xdb = $self->{xdb} // die 'BUG: missing {xdb}'; - eval { $xdb->delete_document($docid) }; - ++$self->{-quiet_rm} unless $@; + my ($self, @docids) = @_; + my @warn = _xdb_remove $self, @docids; + $self->{-quiet_rm} += (scalar(@docids) - scalar(@warn)); } sub nr_quiet_rm { delete($_[0]->{-quiet_rm}) // 0 } @@ -1144,6 +1316,7 @@ sub _index_sync { sub DESTROY { # order matters for unlocking $_[0]->{xdb} = undef; + delete $_[0]->{-xdb_tmp}; $_[0]->{lockfh} = undef; } @@ -1193,7 +1366,13 @@ sub commit_txn_lazy { set_metadata_once($self); $xdb->commit_transaction; } - $self->{oidx}->commit_lazy if $self->{oidx}; + # for memory savings: + for my $xdb_tmp (values %{delete $self->{-xdb_tmp} // {}}) { + $xdb_tmp->commit_transaction; + $xdb_tmp->close; # I wasted a day because I forgot this line :< + $self->{-splits_dirty} = 1; + } + $self->{oidx}->commit_lazy if $self->{oidx}; # v1 only } sub eidx_shard_new { diff --git a/lib/PublicInbox/SearchIdxShard.pm b/lib/PublicInbox/SearchIdxShard.pm index ea8c4ff12..330cb5547 100644 --- a/lib/PublicInbox/SearchIdxShard.pm +++ b/lib/PublicInbox/SearchIdxShard.pm @@ -66,10 +66,4 @@ sub idx_close { $self->idx_release if $self->{xdb}; } -sub shard_close { - my ($self) = @_; - $self->ipc_do('idx_close'); - $self->ipc_worker_stop; -} - 1; diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm index 0cf30c928..4c9b45216 100644 --- a/lib/PublicInbox/V2Writable.pm +++ b/lib/PublicInbox/V2Writable.pm @@ -603,7 +603,7 @@ sub active { !!$_[0]->{im} } # public sub done { my ($self) = @_; - local $PublicInbox::DS::in_loop; # sync awaitpid in shard_close + local $PublicInbox::DS::in_loop; # sync awaitpid in ipc_worker_stop my $err = ''; if (my $im = delete $self->{im}) { eval { $im->done }; # PublicInbox::Import::done @@ -625,8 +625,9 @@ sub done { my $shards = delete $self->{idx_shards}; if ($shards) { + $_->ipc_do('idx_close') for @$shards; for (@$shards) { - eval { $_->shard_close }; + eval { $_->ipc_worker_stop }; $err .= "shard close: $@\n" if $@; } } @@ -734,7 +735,7 @@ sub reindex_checkpoint ($) { die 'BUG: {im} during reindex' if $self->{im}; $t0 = now; my $txn_bytes = $self->{transact_bytes}; - if ($self->{ibx_map} && !$self->{checkpoint_unlocks}) { + if ($self->{ibx_map} && !$self->{ckpt_unlocks}) { checkpoint($self, 1); # no need to release lock on pure index } else { $self->done; # release lock @@ -1003,6 +1004,10 @@ sub sync_prepare ($) { return 0 if $self->{quit}; my $nr = $stk ? $stk->num_records : 0; $pr->("$nr\n") if $pr; + $nr > 10000 && !$self->{-opt}->{'split-shards'} && + !$self->{-split_hinted}++ and warn <{stack} = $stk; # may be undef unshift @{$self->{todo}}, $unit; $regen_max += $nr; diff --git a/lib/PublicInbox/Xapcmd.pm b/lib/PublicInbox/Xapcmd.pm index 87db42386..6d40e32ad 100644 --- a/lib/PublicInbox/Xapcmd.pm +++ b/lib/PublicInbox/Xapcmd.pm @@ -3,7 +3,7 @@ package PublicInbox::Xapcmd; use v5.12; use autodie qw(chmod closedir open opendir rename syswrite); -use PublicInbox::Spawn qw(which popen_rd); +use PublicInbox::Spawn qw(which popen_rd spawn); use PublicInbox::Syscall; use PublicInbox::Lock; use PublicInbox::Admin qw(setup_signals); @@ -13,7 +13,10 @@ use PublicInbox::SearchIdx; use File::Temp 0.19 (); # ->newdir use File::Path qw(remove_tree); use POSIX qw(WNOHANG dup _exit); -use PublicInbox::DS; +use PublicInbox::DS qw(awaitpid); +use PublicInbox::IO qw(try_cat); +use PublicInbox::OnDestroy; +use Carp qw(croak); # support testing with dev versions of Xapian which installs # commands with a version number suffix (e.g. "xapian-compact-1.5") @@ -21,6 +24,39 @@ our $XAPIAN_COMPACT = $ENV{XAPIAN_COMPACT} || 'xapian-compact'; our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F); my %SKIP = map { $_ => 1 } qw(. ..); +my $reap_compact = sub { # awaitpid cb + my ($pid, $ok, $wip, $old, $rename_on_destroy) = @_; + $? and die "E: xapian-compact $wip:\n", try_cat "$wip/err"; + push @$ok, $wip, $old; +}; + +# TODO: use this for old code, too +my $rename_shards = sub { # on_destroy cb + my ($owner, $ok, $expect_nr) = @_; + ($expect_nr * 2) == @$ok or return; # some xapian-compact failed + my (@old_shard, @unlink, @rq); + while (@$ok) { + my ($wip, $old) = splice @$ok, 0, 2; + my $new = $wip->dirname; + if (-e $old) { + my $mode = (stat(_))[2]; + chmod $mode & 07777, $new; + push @old_shard, "$new/old"; + push @rq, $old, $old_shard[-1]; + } else { + warn "W: shard at $old disappeared during compact\n"; + } + push @rq, $new, $old; + push @unlink, "$old/err", "$old/out"; + } + # minimize lock time: (TODO: dedicated lock for shard count changes) + my $lk = $owner ? $owner->lock_for_scope : undef; + rename(shift(@rq), shift(@rq)) while @rq; + undef $lk; + remove_tree(@old_shard); + unlink @unlink; +}; + sub commit_changes ($$$$) { my ($ibx, $im, $tmp, $opt) = @_; my $reshard = $opt->{reshard}; @@ -347,6 +383,19 @@ sub kill_compact { # setup_signals callback kill($sig, $$ioref->attached_pid // return) if defined($$ioref); } +# we rely on --no-renumber to keep docids synced to NNTP +sub compact_cmd ($) { + my ($opt) = @_; + my $cmd = [ $XAPIAN_COMPACT, '--no-renumber' ]; + for my $sw (qw(no-full fuller multipass)) { + push(@$cmd, "--$sw") if $opt->{$sw}; + } + for my $sw (qw(blocksize)) { + push(@$cmd, "--$sw", $opt->{$sw}) if defined $opt->{$sw} + } + $cmd; +} + # xapian-compact wrapper sub compact ($$$) { # cb_spawn callback my ($ibxish, $args, $opt) = @_; @@ -361,15 +410,7 @@ sub compact ($$$) { # cb_spawn callback $rdr->{$fd} = $dfd; } - # we rely on --no-renumber to keep docids synched to NNTP - my $cmd = [ $XAPIAN_COMPACT, '--no-renumber' ]; - for my $sw (qw(no-full fuller)) { - push @$cmd, "--$sw" if $opt->{$sw}; - } - for my $sw (qw(blocksize)) { - defined(my $v = $opt->{$sw}) or next; - push @$cmd, "--$sw", $v; - } + my $cmd = compact_cmd $opt; $pr->("$pfx `".join(' ', @$cmd)."'\n") if $pr; push @$cmd, $src, $dst; local @SIG{keys %SIG} = values %SIG; @@ -604,4 +645,41 @@ sub cpdb ($$$) { # cb_spawn callback compact $ibxish, [ $tmp, $new ], $opt; } +sub join_split_tmps ($$) { + my ($owner, $opt) = @_; + my $xpfx = $owner->{xpfx} // croak "BUG: $owner has no {xfpx}"; + my (@shards, $tmps, @pids); + opendir(my $dh, $xpfx); + while (defined(my $dir = readdir $dh)) { + if ($dir =~ m!\A[0-9]+\z!) { + push @shards, $dir; + } elsif ($dir =~ m!\A([0-9]+)-[0-9]+\.tmp\z!) { + push @{$tmps->[$1]}, $dir; + } + } + closedir $dh; + return @pids if !$tmps || !@shards; + @shards = sort { $a <=> $b } @shards; + ($shards[-1] + 1) == scalar(@shards) or + croak "E: gaps in shards, have: @shards"; + + my $cmd = compact_cmd $opt; + my $ok = []; + my $rod = on_destroy $rename_shards, $owner, $ok, scalar(@shards); + for my $i (@shards) { + $tmps->[$i] // next; # no split tmps for a given shard + my $wip = File::Temp->newdir("$i.join-tmp-XXXX", DIR => $xpfx); + my $dn = $wip->dirname; + $opt->{cow} or PublicInbox::Syscall::nodatacow_dir($dn); + my $rdr = { -C => $xpfx }; + open $rdr->{1}, '>>', "$dn/out"; + open $rdr->{2}, '>>', "$dn/err"; + my @srcs = ($i, @{$tmps->[$i]}); + my $pid = spawn([ @$cmd, @srcs, $dn ], undef, $rdr); + awaitpid($pid, $reap_compact, $ok, $wip, "$xpfx/$i", $rod); + push @pids, $pid; + } + @pids; +} + 1; diff --git a/script/public-inbox-convert b/script/public-inbox-convert index 6f0aa209b..97d71fc7c 100755 --- a/script/public-inbox-convert +++ b/script/public-inbox-convert @@ -38,6 +38,7 @@ GetOptions($opt, qw(jobs|j=i index! help|h C=s@), qw(verbose|v+ rethread compact|c+ fsync|sync! indexlevel|index-level|L=s max_size|max-size=s commit-interval|commit=i batch_size|batch-size=s cow! wal + split-shards split-at=s multipass sequential-shard|seq-shard )) or die $help; if ($opt->{help}) { print $help; exit 0 }; diff --git a/script/public-inbox-extindex b/script/public-inbox-extindex index d12b07c46..682615729 100755 --- a/script/public-inbox-extindex +++ b/script/public-inbox-extindex @@ -32,6 +32,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i commit-interval|commit=i indexlevel|index-level|L=s max_size|max-size=s batch_size|batch-size=s + split-shards split-at=s multipass dedupe:s@ gc watch scan! dry-run|n multi-pack-index! all C=s@ help|h)) or die $help; diff --git a/script/public-inbox-index b/script/public-inbox-index index 67728091b..0ebe96d47 100755 --- a/script/public-inbox-index +++ b/script/public-inbox-index @@ -42,6 +42,7 @@ GetOptions($opt, qw(verbose|v+ reindex rethread compact|c+ jobs|j=i prune since|after=s until|before=s sequential-shard|seq-shard multi-pack-index! + split-shards split-at=s multipass no-update-extindex update-extindex|E=s@ fast-noop|F skip-docdata all C=s@ help|h)) or die $help; diff --git a/script/public-inbox-xcpdb b/script/public-inbox-xcpdb index 3faa63afb..adee64f36 100755 --- a/script/public-inbox-xcpdb +++ b/script/public-inbox-xcpdb @@ -31,6 +31,7 @@ GetOptions($opt, qw( cow! fsync|sync! compact|c reshard|R=i commit-interval|commit=i max_size|max-size=s batch_size|batch-size=s sequential-shard|seq-shard + split-shards split-at=s jobs|j=i quiet|q verbose|v blocksize|b=s no-full|n fuller|F all C=s@ help|h)) or die $help;