use v5.10.1;
use parent qw(PublicInbox::Search PublicInbox::Lock PublicInbox::Umask
Exporter);
-use autodie qw(open);
+use autodie qw(closedir opendir rename);
use PublicInbox::Eml;
use PublicInbox::DS qw(now);
use PublicInbox::Search qw(xap_terms);
use PublicInbox::Git qw(git_unquote);
use PublicInbox::MsgTime qw(msg_timestamp msg_datestamp);
use PublicInbox::Address;
+use File::Glob qw(bsd_glob GLOB_NOSORT);
+use File::Path ();
use Config;
our @EXPORT_OK = qw(log2stack is_ancestor check_size prepare_stack
index_text term_generator add_val is_bad_blob update_checkpoint
our $DB_DANGEROUS = 0;
our $CHECKPOINT_INTVL = 15; # seconds
our $DEFRAG_NR = 100000; # document count
+our $SHARD_SPLIT_AT = 450000; # document count
our $BATCH_BYTES = $ENV{XAPIAN_FLUSH_THRESHOLD} ? 0x7fffffff :
# assume a typical 64-bit system has 8x more RAM than a
# typical 32-bit system:
sub need_xapian ($) { ($_[0]->{indexlevel} // 'full') =~ $xapianlevels }
+sub du_1_level (@) {
+ my $s = 0;
+ for my $d (@_) {
+ $s += (-s $_ // 0) for bsd_glob("$d/*", GLOB_NOSORT);
+ }
+ $s;
+}
+
+sub join_splits ($) {
+ my ($self) = @_;
+ require PublicInbox::Xapcmd;
+ my $cmd = PublicInbox::Xapcmd::compact_cmd($self->{-opt});
+ my $xpfx = $self->{xpfx};
+ opendir(my $dh, $xpfx);
+ my $shard = $self->{shard} // '';
+ my @tmps = grep /\A$shard\.[0-9]+\.tmp\z/, readdir($dh);
+ closedir $dh;
+ @tmps or return warn("BUG? no $shard.*.tmps to join in $xpfx\n");
+ $_ = ((split /\./, $_)[1] + 0) for @tmps; # Schwartzian transform
+ @tmps = sort { $a <=> $b } @tmps;
+ $_ = "$shard.$_.tmp" for @tmps; # undo transform
+
+ my $pr = $self->{-opt}->{-progress};
+ my @ftmps = map { "$xpfx/$_" } @tmps;
+ my ($before_bytes, $t0);
+ my $xdir = $self->xdir;
+
+ my $rdr = { -C => $xpfx };
+ my $wip = File::Temp->newdir("$shard.join-tmp-XXXX", DIR => $xpfx);
+ my $dst = $wip->dirname . "/$shard";
+ push @$cmd, $self->{shard}, @tmps, $dst;
+ $self->{-opt}->{cow} or PublicInbox::Syscall::nodatacow_dir($dst);
+ my $restore = $self->with_umask;
+ if ($pr) {
+ $pr->("$shard compacting (@$cmd)\n");
+ $before_bytes = du_1_level $xdir, @ftmps;
+ $t0 = now;
+ }
+ my $rd = popen_rd $cmd, undef, $rdr;
+ while (<$rd>) {
+ $pr or next;
+ s/\r/\r# $shard /g;
+ $pr->("# $shard $_");
+ }
+ $rd->close or die "@$cmd failed: \$?=$?\n";
+ File::Path::remove_tree(@ftmps);
+ my $owner = $self->{ibx} // $self->{eidx} // $self;
+ my $unlk = PublicInbox::Lock->new($owner->open_lock)->lock_for_scope;
+ if (-e $xdir) {
+ rename $xdir, "$dst/old";
+ } else {
+ warn "W: $xdir gone ($!), attempting to replace anyways...\n";
+ }
+ rename $dst, $xdir;
+ undef $unlk;
+ File::Path::remove_tree("$xdir/old");
+ return if !$pr;
+ my $after_bytes = du_1_level $xdir;
+ my $diff = now - $t0;
+ $pr->("$shard compact took ",
+ sprintf('%0.1fs, %0.1fMB => %0.1fMB',
+ $diff, $before_bytes >> 20, $after_bytes >> 20), "\n");
+}
+
sub idx_release {
my ($self, $wake) = @_;
if (need_xapian($self)) {
- my $xdb = delete $self->{xdb} or croak '{xdb} not acquired';
+ my $djs = delete $self->{-do_join_splits};
+ delete $self->{-doc_max};
+ my $xdb = delete $self->{xdb} or croak 'BUG: {xdb} missing';
+ if ($djs) {
+ $xdb->begin_transaction;
+ $xdb->set_metadata('split-at', '');
+ $xdb->commit_transaction;
+ }
$xdb->close;
+ delete $self->{-xdb_tmp} and croak 'BUG: {-xdb_tmp} exists';
+ join_splits($self) if $djs && delete($self->{-splits_dirty});
}
$self->lock_release($wake) if $self->{creat};
undef;
}
my $owner = $self->{ibx} // $self->{eidx} // $self;
if ($self->{creat}) {
- require File::Path;
$self->lock_acquire;
# don't create empty Xapian directories if we don't need Xapian
$flag |= $DB_NO_SYNC if !$self->{-opt}->{fsync};
my $xdb = eval { ($X->{WritableDatabase})->new($dir, $flag) };
croak "Failed opening $dir: $@" if $@;
+ $xdb->begin_transaction;
+ my $cur = $xdb->get_metadata('split-at');
+ if ($cur || $self->{-opt}->{'split-shards'}) {
+ my $new = $self->{-opt}->{'split-at'};
+ # respect in-progress value (XXX incomplete, delay until 3.x)
+ # this doesn't work if new/old public-inbox versions mix,
+ # so we currently disable {ckpt_unlocks} if using split-at
+ if ($cur) {
+ $new && $new != $cur and warn <<EOM;
+W: using existing --split-at=$cur
+EOM
+ $self->{-do_join_splits} or warn <<EOM;
+W: PID:$$ will not join split shards
+EOM
+ $self->{-opt}->{'split-at'} = $cur;
+ } else {
+ $cur ||= $new // $SHARD_SPLIT_AT;
+ $xdb->set_metadata('split-at', $cur);
+ $self->{-do_join_splits} = 1;
+ }
+ $self->{-doc_max} = $xdb->get_lastdocid || $cur;
+ }
+ ($self->{-do_join_splits} || $self->{-opt}->{dangerous}) ?
+ $xdb->commit_transaction : $xdb->cancel_transaction;
$self->{xdb} = $xdb;
}
$doc;
}
+sub _xdb_tmp_new ($$$) {
+ my ($self, $dir, $flags) = @_;
+ $flags |= $DB_DANGEROUS | $DB_NO_SYNC;
+ my $xdb_tmp;
+ eval {
+ $xdb_tmp = $X->{WritableDatabase}->new($dir, $flags);
+ $xdb_tmp->begin_transaction;
+ };
+ if (my $err = $@) { # rethrow for stacktrace w/ PERL5OPT=-MCarp=verbose
+ my @offs = sort { $a <=> $b } keys %{$self->{-xdb_tmp}};
+ croak $err, "E: xdb_tmps active: @offs";
+ }
+ $xdb_tmp;
+}
+
+sub xdb_tmp_new ($$$) {
+ my ($self, $off, $docid) = @_;
+ my $dir = $self->xdir.".$off.tmp";
+ my $restore = $self->with_umask;
+ if (mkdir $dir) {
+ my $pr = $self->{-opt}->{-progress};
+ $pr->("indexing >= #$docid to ",($self->{shard}//''),
+ ".$off.tmp...\n") if $pr;
+ PublicInbox::Syscall::nodatacow_dir($dir);
+ }
+ $self->{-xdb_tmp}->{$off} = _xdb_tmp_new $self, $dir, $DB_CREATE_OR_OPEN
+}
+
+sub xdb_tmp_get ($$) {
+ my ($self, $off) = @_;
+ my $dir = $self->xdir.".$off.tmp";
+ $self->{-xdb_tmp}->{$off} = _xdb_tmp_new $self, $dir, $DB_OPEN;
+}
+
+sub replace_doc ($$$) {
+ my ($self, $docid, $doc) = @_;
+ my ($xdb_tmp, $doc_max);
+ if (($doc_max = $self->{-doc_max}) && $docid > $doc_max) {
+ my $n = $self->{-opt}->{'split-at'} || $SHARD_SPLIT_AT;
+ my $off = int($docid / $n);
+ $xdb_tmp = $self->{-xdb_tmp}->{$off} //
+ xdb_tmp_new $self, $off, $docid;
+ }
+ ($xdb_tmp // $self->{xdb})->replace_document($docid, $doc);
+}
+
+sub _get_doc ($$) {
+ my ($self, $docid) = @_;
+ my ($xdb_tmp, $doc_max, $doc);
+ if (($doc_max = $self->{-doc_max}) && $docid > $doc_max) {
+ my $n = $self->{-opt}->{'split-at'} || $SHARD_SPLIT_AT;
+ my $off = int($docid / $n);
+ $xdb_tmp = $self->{-xdb_tmp}->{$off} // xdb_tmp_get $self, $off;
+ }
+ $doc = eval { ($xdb_tmp // $self->{xdb})->get_document($docid) };
+ if ($@) {
+ die $@ if ref($@) !~ /\bDocNotFoundError\b/;
+ warn "E: #$docid missing in Xapian\n";
+ }
+ $doc;
+}
+
sub add_xapian ($$$$) {
my ($self, $eml, $smsg, $mids) = @_;
begin_txn_lazy($self);
add_bool_term($doc, 'O'.$eidx_key) if $eidx_key ne '.';
index_list_id_raw $self, $doc, @list_ids;
}
- $self->{xdb}->replace_document($smsg->{num}, $doc);
+ replace_doc $self, $smsg->{num}, $doc;
}
sub v1_mm_init ($) {
$smsg->{num};
}
-sub _get_doc ($$) {
- my ($self, $docid) = @_;
- $self->get_doc($docid) // do {
- warn "E: #$docid missing in Xapian\n";
- undef;
- }
-}
-
sub add_eidx_info_raw {
my ($self, $docid, $eidx_key, @list_ids) = @_;
begin_txn_lazy($self);
add_bool_term($doc, 'O'.$eidx_key) if $eidx_key ne '.';
index_list_id_raw $self, $doc, @list_ids;
- $self->{xdb}->replace_document($docid, $doc);
+ replace_doc $self, $docid, $doc;
}
+# for lei/store to access uncommitted terms
sub get_terms {
my ($self, $pfx, $docid) = @_;
begin_txn_lazy($self);
# be needed and users using the "l:" prefix are probably
# rarer.
}
- $self->{xdb}->replace_document($docid, $doc);
+ replace_doc $self, $docid, $doc;
}
sub set_vmd {
return unless scalar(@rm) || scalar(@add);
$doc->remove_term($_) for @rm;
add_bool_term($doc, $_) for @add;
- $self->{xdb}->replace_document($docid, $doc);
+ replace_doc $self, $docid, $doc;
}
sub apply_vmd_mod ($$) {
$updated += scalar(@$add);
}
$updated += apply_vmd_mod($doc, $vmd);
- $self->{xdb}->replace_document($docid, $doc) if $updated;
+ replace_doc($self, $docid, $doc) if $updated;
}
sub remove_vmd {
};
}
}
- $self->{xdb}->replace_document($docid, $doc) if $replace;
+ replace_doc($self, $docid, $doc) if $replace;
}
sub update_vmd {
begin_txn_lazy($self);
my $doc = _get_doc($self, $docid) or return;
my $updated = apply_vmd_mod($doc, $vmd_mod);
- $self->{xdb}->replace_document($docid, $doc) if $updated;
+ replace_doc($self, $docid, $doc) if $updated;
$updated;
}
-sub xdb_remove {
+sub _xdb_remove ($@) {
my ($self, @docids) = @_;
+ my @warn;
begin_txn_lazy($self);
my $xdb = $self->{xdb} // die 'BUG: missing {xdb}';
+ if (my $doc_max = $self->{-doc_max}) {
+ my $n = $self->{-opt}->{'split-at'} || $SHARD_SPLIT_AT;
+ for my $docid (grep { $_ > $doc_max } @docids) {
+ my $off = int($docid / $n);
+ my $xdb_tmp = $self->{-xdb_tmp}->{$off} //
+ xdb_tmp_get $self, $off;
+ eval { $xdb_tmp->delete_document($docid) };
+ push(@warn, "E: #$docid not in Xapian tmp[$off]? $@\n")
+ if $@;
+ }
+ @docids = grep { $_ <= $doc_max } @docids;
+ }
for my $docid (@docids) {
eval { $xdb->delete_document($docid) };
- warn "E: #$docid not in Xapian? $@\n" if $@;
+ push(@warn, "E: #$docid not in Xapian? $@\n") if $@;
}
+ @warn;
+}
+
+sub xdb_remove {
+ my ($self, @docids) = @_;
+ my @warn = _xdb_remove $self, @docids;
+ warn @warn if @warn;
}
sub xdb_remove_quiet {
- my ($self, $docid) = @_;
- begin_txn_lazy($self);
- my $xdb = $self->{xdb} // die 'BUG: missing {xdb}';
- eval { $xdb->delete_document($docid) };
- ++$self->{-quiet_rm} unless $@;
+ my ($self, @docids) = @_;
+ my @warn = _xdb_remove $self, @docids;
+ $self->{-quiet_rm} += (scalar(@docids) - scalar(@warn));
}
sub nr_quiet_rm { delete($_[0]->{-quiet_rm}) // 0 }
sub DESTROY {
# order matters for unlocking
$_[0]->{xdb} = undef;
+ delete $_[0]->{-xdb_tmp};
$_[0]->{lockfh} = undef;
}
set_metadata_once($self);
$xdb->commit_transaction;
}
- $self->{oidx}->commit_lazy if $self->{oidx};
+ # for memory savings:
+ for my $xdb_tmp (values %{delete $self->{-xdb_tmp} // {}}) {
+ $xdb_tmp->commit_transaction;
+ $xdb_tmp->close; # I wasted a day because I forgot this line :<
+ $self->{-splits_dirty} = 1;
+ }
+ $self->{oidx}->commit_lazy if $self->{oidx}; # v1 only
}
sub eidx_shard_new {
package PublicInbox::Xapcmd;
use v5.12;
use autodie qw(chmod closedir open opendir rename syswrite);
-use PublicInbox::Spawn qw(which popen_rd);
+use PublicInbox::Spawn qw(which popen_rd spawn);
use PublicInbox::Syscall;
use PublicInbox::Lock;
use PublicInbox::Admin qw(setup_signals);
use File::Temp 0.19 (); # ->newdir
use File::Path qw(remove_tree);
use POSIX qw(WNOHANG dup _exit);
-use PublicInbox::DS;
+use PublicInbox::DS qw(awaitpid);
+use PublicInbox::IO qw(try_cat);
+use PublicInbox::OnDestroy;
+use Carp qw(croak);
# support testing with dev versions of Xapian which installs
# commands with a version number suffix (e.g. "xapian-compact-1.5")
our @COMPACT_OPT = qw(jobs|j=i quiet|q blocksize|b=s no-full|n fuller|F);
my %SKIP = map { $_ => 1 } qw(. ..);
+my $reap_compact = sub { # awaitpid cb
+ my ($pid, $ok, $wip, $old, $rename_on_destroy) = @_;
+ $? and die "E: xapian-compact $wip:\n", try_cat "$wip/err";
+ push @$ok, $wip, $old;
+};
+
+# TODO: use this for old code, too
+my $rename_shards = sub { # on_destroy cb
+ my ($owner, $ok, $expect_nr) = @_;
+ ($expect_nr * 2) == @$ok or return; # some xapian-compact failed
+ my (@old_shard, @unlink, @rq);
+ while (@$ok) {
+ my ($wip, $old) = splice @$ok, 0, 2;
+ my $new = $wip->dirname;
+ if (-e $old) {
+ my $mode = (stat(_))[2];
+ chmod $mode & 07777, $new;
+ push @old_shard, "$new/old";
+ push @rq, $old, $old_shard[-1];
+ } else {
+ warn "W: shard at $old disappeared during compact\n";
+ }
+ push @rq, $new, $old;
+ push @unlink, "$old/err", "$old/out";
+ }
+ # minimize lock time: (TODO: dedicated lock for shard count changes)
+ my $lk = $owner ? $owner->lock_for_scope : undef;
+ rename(shift(@rq), shift(@rq)) while @rq;
+ undef $lk;
+ remove_tree(@old_shard);
+ unlink @unlink;
+};
+
sub commit_changes ($$$$) {
my ($ibx, $im, $tmp, $opt) = @_;
my $reshard = $opt->{reshard};
kill($sig, $$ioref->attached_pid // return) if defined($$ioref);
}
+# we rely on --no-renumber to keep docids synced to NNTP
+sub compact_cmd ($) {
+ my ($opt) = @_;
+ my $cmd = [ $XAPIAN_COMPACT, '--no-renumber' ];
+ for my $sw (qw(no-full fuller multipass)) {
+ push(@$cmd, "--$sw") if $opt->{$sw};
+ }
+ for my $sw (qw(blocksize)) {
+ push(@$cmd, "--$sw", $opt->{$sw}) if defined $opt->{$sw}
+ }
+ $cmd;
+}
+
# xapian-compact wrapper
sub compact ($$$) { # cb_spawn callback
my ($ibxish, $args, $opt) = @_;
$rdr->{$fd} = $dfd;
}
- # we rely on --no-renumber to keep docids synched to NNTP
- my $cmd = [ $XAPIAN_COMPACT, '--no-renumber' ];
- for my $sw (qw(no-full fuller)) {
- push @$cmd, "--$sw" if $opt->{$sw};
- }
- for my $sw (qw(blocksize)) {
- defined(my $v = $opt->{$sw}) or next;
- push @$cmd, "--$sw", $v;
- }
+ my $cmd = compact_cmd $opt;
$pr->("$pfx `".join(' ', @$cmd)."'\n") if $pr;
push @$cmd, $src, $dst;
local @SIG{keys %SIG} = values %SIG;
compact $ibxish, [ $tmp, $new ], $opt;
}
+sub join_split_tmps ($$) {
+ my ($owner, $opt) = @_;
+ my $xpfx = $owner->{xpfx} // croak "BUG: $owner has no {xfpx}";
+ my (@shards, $tmps, @pids);
+ opendir(my $dh, $xpfx);
+ while (defined(my $dir = readdir $dh)) {
+ if ($dir =~ m!\A[0-9]+\z!) {
+ push @shards, $dir;
+ } elsif ($dir =~ m!\A([0-9]+)-[0-9]+\.tmp\z!) {
+ push @{$tmps->[$1]}, $dir;
+ }
+ }
+ closedir $dh;
+ return @pids if !$tmps || !@shards;
+ @shards = sort { $a <=> $b } @shards;
+ ($shards[-1] + 1) == scalar(@shards) or
+ croak "E: gaps in shards, have: @shards";
+
+ my $cmd = compact_cmd $opt;
+ my $ok = [];
+ my $rod = on_destroy $rename_shards, $owner, $ok, scalar(@shards);
+ for my $i (@shards) {
+ $tmps->[$i] // next; # no split tmps for a given shard
+ my $wip = File::Temp->newdir("$i.join-tmp-XXXX", DIR => $xpfx);
+ my $dn = $wip->dirname;
+ $opt->{cow} or PublicInbox::Syscall::nodatacow_dir($dn);
+ my $rdr = { -C => $xpfx };
+ open $rdr->{1}, '>>', "$dn/out";
+ open $rdr->{2}, '>>', "$dn/err";
+ my @srcs = ($i, @{$tmps->[$i]});
+ my $pid = spawn([ @$cmd, @srcs, $dn ], undef, $rdr);
+ awaitpid($pid, $reap_compact, $ok, $wip, "$xpfx/$i", $rod);
+ push @pids, $pid;
+ }
+ @pids;
+}
+
1;