# indexer for git coderepos, just commits and repo paths for now
# this stores normalized absolute paths of indexed GIT_DIR inside
# the DB itself and is designed to handle forks by designating roots
+# At minimum, it needs to have the pathnames of all git repos in
+# memory at runtime. --join also requires all inbox pathnames to
+# be in memory (as it happens when loaded from ~/.public-inbox/config).
#
# Unlike mail search, docid isn't tied to NNTP artnum or IMAP UID,
# there's no serial number dependency at all. The first 32-bits of
#
# We shard repos using the first 32-bits of sha256($ABS_GIT_DIR)
#
-# --associate joins root commits of coderepos to inboxes based on prefixes.
+# --join associates root commits of coderepos to inboxes based on prefixes.
#
-# Internally, each inbox is assigned a non-negative integer index ($IBX_ID),
+# Internally, each inbox is assigned a non-negative integer index ($IBX_OFF),
# and each root commit object ID (SHA-1/SHA-256 hex) is also assigned
# a non-negative integer index ($ROOT_COMMIT_OID_ID).
#
-# associate dumps to 2 intermediate files in $TMPDIR:
+# join dumps to 2 intermediate files in $TMPDIR:
#
-# * to_root_id - each line is of the format:
+# * to_root_off - each line is of the format:
#
-# $PFX $ROOT_COMMIT_OID_IDS
+# $PFX @ROOT_COMMIT_OID_OFFS
#
-# * to_ibx_id - each line is of the format:
+# * to_ibx_off - each line is of the format:
#
-# $PFX $IBX_IDS
+# $PFX @IBX_OFFS
#
-# $IBX_IDS is a comma-delimited list of integers ($IBX_ID)
-# $ROOT_COMMIT_OID_IDS is space-delimited
+# $IBX_OFFS is a comma-delimited list of integers ($IBX_ID)
+# The $IBX_OFF here is ephemeral (per-join_data) and NOT related to
+# the `ibx_off' column of `over.sqlite3' for extindex.
+# @ROOT_COMMIT_OID_OFFS is space-delimited
# In both cases, $PFX is typically the value of the patchid (XDFID) but it
# can be configured to use any combination of patchid, dfpre, dfpost or
# dfblob.
#
+# WARNING: this is vulnerable to arbitrary memory usage attacks if we
+# attempt to index or join against malicious coderepos with
+# thousands/millions of root commits. Most coderepos have only one
+# root commit, some have several: git.git currently has 7,
+# torvalds/linux.git has 4.
+# --max-size= is required to keep memory usage reasonable for gigantic
+# commits.
+#
# See PublicInbox::CodeSearch (read-only API) for more
package PublicInbox::CodeSearchIdx;
use v5.12;
use PublicInbox::DS qw(awaitpid);
use PublicInbox::PktOp;
use PublicInbox::IPC qw(nproc_shards);
-use POSIX qw(WNOHANG SEEK_SET);
+use POSIX qw(WNOHANG SEEK_SET strftime);
use File::Path ();
use File::Spec ();
use List::Util qw(max);
use PublicInbox::Git qw(%OFMT2HEXLEN);
use PublicInbox::Compat qw(uniqstr);
use PublicInbox::Aspawn qw(run_await);
+use Compress::Zlib qw(compress);
use Carp ();
+use Time::Local qw(timegm);
use autodie qw(close pipe open sysread seek sysseek send);
our $DO_QUIT = 15; # signal number
our (
$TMPDIR, # File::Temp->newdir object for prune
@PRUNE_QUEUE, # GIT_DIRs to prepare for pruning
%TODO, @IBXQ, @IBX,
- @JOIN, # join(1) command for associate
+ @JOIN, # join(1) command for --join
$CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune
@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
- @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
+ %JOIN, # CLI --join= suboptions
+ @JOIN_PFX, # any combination of XDFID, XDFPRE, XDFPOST
+ @JOIN_DT, # YYYYmmddHHMMSS for dt:
$QRY_STR, # common query string for both code and inbox associations
$DUMP_IBX_WPIPE, # goes to sort(1)
- @ID2ROOT,
+ @OFF2ROOT,
);
# stop walking history if we see >$SEEN_MAX existing commits, this assumes
our $SEEN_MAX = 100000;
# window for commits/emails to determine a inbox <-> coderepo association
-my $ASSOC_WINDOW = 50000;
+my $JOIN_WINDOW = 50000;
our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check);
}
sub repo_stored {
- my ($self, $repo_ctx, $did) = @_;
+ my ($self, $repo_ctx, $drs, $did) = @_;
$did > 0 or die "BUG: $repo_ctx->{repo}->{git_dir}: docid=$did";
- my $next = PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx);
my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{shard_done} = [ $self, $repo_ctx, $next ];
+ $c->{ops}->{shard_done} = [ $self, $repo_ctx,
+ PublicInbox::OnDestroy->new($$, \&next_repos, $repo_ctx, $drs)];
# shard_done fires when all shards are committed
my @active = keys %{$repo_ctx->{active}};
$IDX_SHARDS[$_]->wq_io_do('shard_commit', [ $p->{op_p} ]) for @active;
}
sub prune_done { # called via prune_do completion
- my ($self, $n) = @_;
+ my ($self, $drs, $n) = @_;
return if $DO_QUIT || !$PRUNE_DONE;
die "BUG: \$PRUNE_DONE->[$n] already defined" if $PRUNE_DONE->[$n];
$PRUNE_DONE->[$n] = 1;
send($op_p, "shard_done $self->{shard}", 0);
}
-sub assoc_window_args ($) {
- my ($self) = @_;
- my $n = $self->{-opt}->{'associate-window'} // $ASSOC_WINDOW;
- $n <= 0 ? () : ('-m', $n);
-}
-
sub start_xhc () {
my ($xhc, $pid) = PublicInbox::XapClient::start_helper("-j$NPROC");
awaitpid($pid, \&cmd_done, ['xap_helper', "-j$NPROC"]);
}
sub dump_roots_start {
- my ($self, $associate) = @_;
+ my ($self, $do_join) = @_;
$XHC //= start_xhc;
- $associate // die 'BUG: no $associate';
- $TODO{associating} = 1; # keep shards_active() happy
+ $do_join // die 'BUG: no $do_join';
progress($self, 'dumping IDs from coderepos');
local $self->{xdb};
- @ID2ROOT = $self->all_terms('G');
+ @OFF2ROOT = $self->all_terms('G');
my $root2id = "$TMPDIR/root2id";
open my $fh, '>', $root2id;
my $nr = -1;
- for (@ID2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly
+ for (@OFF2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly
close $fh;
- # dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
+ # dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_off
my ($sort_opt, $fold_opt);
pipe(local $sort_opt->{0}, my $sort_w);
pipe(local $fold_opt->{0}, local $sort_opt->{1});
my @sort = (@SORT, '-k1,1');
- my $dst = "$TMPDIR/to_root_id";
+ my $dst = "$TMPDIR/to_root_off";
open $fold_opt->{1}, '>', $dst;
my $fold_env = { %$CMD_ENV, OFS => ' ' };
- run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $associate);
- run_await(\@UNIQ_FOLD, $fold_env, $fold_opt, \&cmd_done, $associate);
- my @arg = ((map { ('-A', $_) } @ASSOC_PFX), '-c',
- assoc_window_args($self), $root2id, $QRY_STR);
+ run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $do_join);
+ run_await(\@UNIQ_FOLD, $fold_env, $fold_opt, \&cmd_done, $do_join);
+ my $window = $JOIN{window} // $JOIN_WINDOW;
+ my @m = $window <= 0 ? () : ('-m', $window);
+ my @arg = ((map { ('-A', $_) } @JOIN_PFX), '-c',
+ @m, $root2id, $QRY_STR);
for my $d ($self->shard_dirs) {
pipe(my $err_r, my $err_w);
$XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg);
my $desc = "dump_roots $d";
- $self->{PENDING}->{$desc} = $associate;
+ $self->{PENDING}->{$desc} = $do_join;
PublicInbox::CidxXapHelperAux->new($err_r, $self, $desc);
}
progress($self, 'waiting on dump_roots sort');
}
sub dump_ibx { # sends to xap_helper.h
- my ($self, $ibx_id) = @_;
- my $ibx = $IBX[$ibx_id] // die "BUG: no IBX[$ibx_id]";
+ my ($self, $ibx_off) = @_;
+ my $ibx = $IBX[$ibx_off] // die "BUG: no IBX[$ibx_off]";
my $ekey = $ibx->eidx_key;
my $srch = $ibx->isrch or return warn <<EOM;
W: $ekey not indexed for search
EOM
- # note: we don't send associate_max_args to dump_ibx since we
- # have to post-filter non-patch messages
+ # note: we don't send `-m MAX' to dump_ibx since we have to
+ # post-filter non-patch messages for now...
my @cmd = ('dump_ibx', $srch->xh_args,
- (map { ('-A', $_) } @ASSOC_PFX), $ibx_id, $QRY_STR);
+ (map { ('-A', $_) } @JOIN_PFX), $ibx_off, $QRY_STR);
pipe(my $r, my $w);
$XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
- $self->{PENDING}->{$ekey} = $TODO{associate};
+ $self->{PENDING}->{$ekey} = $TODO{do_join};
PublicInbox::CidxXapHelperAux->new($r, $self, $ekey);
}
sub dump_ibx_start {
- my ($self, $associate) = @_;
+ my ($self, $do_join) = @_;
$XHC //= start_xhc;
my ($sort_opt, $fold_opt);
pipe(local $sort_opt->{0}, $DUMP_IBX_WPIPE);
pipe(local $fold_opt->{0}, local $sort_opt->{1});
- my @sort = (@SORT, '-k1,1'); # sort only on ASSOC_PFX
- # pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_id
- open $fold_opt->{1}, '>', "$TMPDIR/to_ibx_id";
- run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $associate);
- run_await(\@UNIQ_FOLD, $CMD_ENV, $fold_opt, \&cmd_done, $associate);
+ my @sort = (@SORT, '-k1,1'); # sort only on JOIN_PFX
+ # pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_off
+ open $fold_opt->{1}, '>', "$TMPDIR/to_ibx_off";
+ run_await(\@sort, $CMD_ENV, $sort_opt, \&cmd_done, $do_join);
+ run_await(\@UNIQ_FOLD, $CMD_ENV, $fold_opt, \&cmd_done, $do_join);
}
sub index_next ($) {
delete $TODO{dump_ibx_start}; # runs OnDestroy once
return dump_ibx($self, shift @IBXQ) if @IBXQ;
undef $DUMP_IBX_WPIPE; # done dumping inboxes
- delete $TODO{associate};
+ delete $TODO{do_join};
}
# else: wait for shards_active (post_loop_do) callback
}
sub next_repos { # OnDestroy cb
- my ($repo_ctx) = @_;
+ my ($repo_ctx, $drs) = @_;
my ($self, $repo, $active) = @$repo_ctx{qw(self repo active)};
progress($self, "$repo->{git_dir}: done");
return if $DO_QUIT || !$REPO_CTX;
$n = $repo->{shard_n};
$active->{$n} = undef;
my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{repo_stored} = [ $self, $repo_ctx ];
- $c->{-cidx_dump_roots_start} = $drs if $drs;
+ $c->{ops}->{repo_stored} = [ $self, $repo_ctx, $drs ];
$IDX_SHARDS[$n]->wq_io_do('store_repo', [ $p->{op_p} ], $repo);
# repo_stored will fire once store_repo is done
}
sub cmd_done { # run_await cb for sort, xapian-delve, sed failures
my ($pid, $cmd, undef, undef, $run_on_destroy) = @_;
$? and die "fatal: @$cmd (\$?=$?)\n";
- # $run_on_destroy calls associate() or run_prune()
+ # $run_on_destroy calls do_join() or run_prune()
+}
+
+sub current_join_data ($) {
+ my ($self) = @_;
+ local $self->{xdb} = $RDONLY_XDB[0] // die 'BUG: shard[0] undef';
+ # we support multiple PI_CONFIG files for a cindex:
+ $self->join_data;
+}
+
+# combined previously stored stats with new
+sub score_old_join_data ($$$) {
+ my ($self, $score, $ekeys_new) = @_;
+ my $old = ($JOIN{reset} ? undef : current_join_data($self)) or return;
+ my @old = @$old{qw(ekeys roots ibx2root)};
+ @old == 3 or return warn "W: ekeys/roots missing from old JOIN data\n";
+ progress($self, 'merging old join data...');
+ my ($ekeys_old, $roots_old, $ibx2root_old) = @old;
+ # score: "ibx_off root_off" => nr
+ my $i = -1;
+ my %root2id_new = map { $_ => ++$i } @OFF2ROOT;
+ $i = -1;
+ my %ekey2id_new = map { $_ => ++$i } @$ekeys_new;
+ for my $ibx_off_old (0..$#$ibx2root_old) {
+ my $root_offs_old = $ibx2root_old->[$ibx_off_old];
+ my $ekey = $ekeys_old->[$ibx_off_old] //
+ warn "W: no ibx #$ibx_off_old in old JOIN data\n";
+ my $ibx_off_new = $ekey2id_new{$ekey // next} //
+ warn "W: `$ekey' no longer exists\n";
+ for (@$root_offs_old) {
+ my ($nr, $rid_old) = @$_;
+ my $root_old = $roots_old->[$rid_old] //
+ warn "W: no root #$rid_old in old JOIN data\n";
+ my $rid_new = $root2id_new{$root_old // next} //
+ warn "W: root `$root_old' no longer exists\n";
+ $score->{"$ibx_off_new $rid_new"} += $nr;
+ }
+ }
+}
+
+sub metadata_set { # via wq_do
+ my ($self, $key, $val, $commit) = @_;
+ $self->begin_txn_lazy;
+ $self->{xdb}->set_metadata($key, $val);
+ $self->commit_txn_lazy if $commit || defined(wantarray);
}
# runs once all inboxes and shards are dumped via OnDestroy
-sub associate {
+sub do_join {
my ($self) = @_;
return if $DO_QUIT;
$XHC = 0; # should not be recreated again
@IDX_SHARDS or return warn("# aborting on no shards\n");
unlink("$TMPDIR/root2id");
my @pending = keys %{$self->{PENDING}};
- die "E: pending=@pending jobs not done\n" if @pending;
- progress($self, 'associating...');
- my @join = (@JOIN, 'to_ibx_id', 'to_root_id');
+ die "BUG: pending=@pending jobs not done\n" if @pending;
+ progress($self, 'joining...');
+ my @join = (@JOIN, 'to_ibx_off', 'to_root_off');
+ if (my $time = which('time')) { unshift @join, $time };
my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
my %score;
- while (<$rd>) { # PFX ibx_ids root_id
- my (undef, $ibx_ids, @root_ids) = split(/ /, $_);
- for my $ibx_id (split(/,/, $ibx_ids)) {
- ++$score{"$ibx_id $_"} for @root_ids;
+ while (<$rd>) { # PFX ibx_offs root_off
+ chop eq "\n" or die "no newline from @join: <$_>";
+ my (undef, $ibx_offs, @root_offs) = split / /, $_;
+ for my $ibx_off (split(/,/, $ibx_offs)) {
+ ++$score{"$ibx_off $_"} for @root_offs;
}
}
$rd->close or die "fatal: @join failed: \$?=$?";
- my $min = $self->{-opt}->{'assoc-min'} // 10;
- progress($self, scalar(keys %score).' potential pairings...');
- for my $k (keys %score) {
- my $nr = $score{$k};
- my ($ibx_id, $root) = split(/ /, $k);
- my $ekey = $IBX[$ibx_id]->eidx_key;
- $root = $ID2ROOT[$root];
+ my $nr = scalar(keys %score) or do {
+ delete $TODO{joining};
+ return progress($self, 'no potential new pairings');
+ };
+ progress($self, "$nr potential new pairings...");
+ my @ekeys = map { $_->eidx_key } @IBX;
+ score_old_join_data($self, \%score, \@ekeys);
+ my $new;
+ while (my ($k, $nr) = each %score) {
+ my ($ibx_off, $root_off) = split(/ /, $k);
+ my ($ekey, $root) = ($ekeys[$ibx_off], $OFF2ROOT[$root_off]);
progress($self, "$ekey => $root has $nr matches");
+ push @{$new->{ibx2root}->[$ibx_off]}, [ $nr, $root_off ];
+ }
+ for my $ary (values %$new) { # sort by nr
+ for (@$ary) { @$_ = sort { $b->[0] <=> $a->[0] } @$_ }
}
- delete $TODO{associating}; # break out of shards_active()
- # TODO
- warn "# Waiting for $TMPDIR/cont @JOIN";
- system "ls -Rl $TMPDIR >&2";
- system "wc -l $TMPDIR/to_*_id >&2";
- #sleep(1) until -f "$TMPDIR/cont";
- # warn "# Waiting for $TMPDIR/cont";
- # sleep(1) until -f "$TMPDIR/cont";
+ $new->{ekeys} = \@ekeys;
+ $new->{roots} = \@OFF2ROOT;
+ $new->{dt} = \@JOIN_DT;
+ $new = compress(PublicInbox::Config::json()->encode($new));
+ my $key = $self->join_data_key;
+ my $wait = $IDX_SHARDS[0]->wq_do('metadata_set', $key, $new);
+ delete $TODO{joining};
}
sub require_progs {
}
}
-sub init_associate_postfork ($) {
+sub init_join_postfork ($) {
my ($self) = @_;
- return unless $self->{-opt}->{associate};
- require_progs('associate', join => \@JOIN);
- $QRY_STR = $self->{-opt}->{'associate-date-range'} // '1.year.ago..';
- substr($QRY_STR, 0, 0) = 'dt:';
- @{$self->{git_dirs} // []} or die "E: no coderepos to associate\n";
- @IBX or die "E: no inboxes to associate\n";
+ return unless $self->{-opt}->{join};
+ require_progs('join', join => \@JOIN);
+ my $d2 = '([0-9]{2})';
+ my $dt_re = qr!([0-9]{4})$d2$d2$d2$d2$d2!;
+ if (my $cur = $JOIN{reset} ? current_join_data($self) : undef) {
+ if (($cur->{dt}->[1] // '') =~ m!\A$dt_re\z!o) {
+ my ($Y, $m, $d, $H, $M, $S) = ($1, $2, $3, $4, $5, $6);
+ my $t = timegm($S, $M, $H, $d, $m - 1, $Y);
+ $t = strftime('%Y%m%d%H%M%S', gmtime($t + 1));
+ $JOIN{dt} //= "$t..";
+ } else {
+ warn <<EOM;
+BUG?: previous --join invocation did not store usable `dt' key
+EOM
+ }
+ }
+ if ($JOIN{aggressive}) {
+ $JOIN{window} //= -1;
+ $JOIN{dt} //= '..1.month.ago';
+ }
+ $QRY_STR = $JOIN{dt} // '1.year.ago..';
+ index($QRY_STR, '..') >= 0 or die "E: dt:$QRY_STR is not a range\n";
+ # Account for send->apply delay (torvalds/linux.git mean is ~20 days
+ # from Author to CommitDate in cases where CommitDate > AuthorDate
+ $QRY_STR .= '1.month.ago' if $QRY_STR =~ /\.\.\z/;
+ @{$self->{git_dirs} // []} or die "E: no coderepos to join\n";
+ @IBX or die "E: no inboxes to join\n";
my $approx_git = PublicInbox::Git->new($self->{git_dirs}->[0]); # ugh
+ substr($QRY_STR, 0, 0) = 'dt:';
$self->query_approxidate($approx_git, $QRY_STR); # in-place
- $TODO{associate} = PublicInbox::OnDestroy->new($$, \&associate, $self);
+ ($JOIN_DT[1]) = ($QRY_STR =~ /\.\.([0-9]{14})\z/); # YYYYmmddHHMMSS
+ ($JOIN_DT[0]) = ($QRY_STR =~ /\Adt:([0-9]{14})/); # YYYYmmddHHMMSS
+ $JOIN_DT[0] //= '19700101'.'000000'; # git uses unsigned times
+ $TODO{do_join} = PublicInbox::OnDestroy->new($$, \&do_join, $self);
+ $TODO{joining} = 1; # keep shards_active() happy
$TODO{dump_ibx_start} = PublicInbox::OnDestroy->new($$,
- \&dump_ibx_start, $self, $TODO{associate});
+ \&dump_ibx_start, $self, $TODO{do_join});
$TODO{dump_roots_start} = PublicInbox::OnDestroy->new($$,
- \&dump_roots_start, $self, $TODO{associate});
+ \&dump_roots_start, $self, $TODO{do_join});
+ progress($self, "will join in $QRY_STR date range...");
my $id = -1;
@IBXQ = map { ++$id } @IBX;
}
$IDX_SHARDS[$n]->wq_do('prune_one', 'P'.$git_dir);
}
my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{prune_done} = [ $self ];
- $c->{-cidx_dump_roots_start} = $drs;
+ $c->{ops}->{prune_done} = [ $self, $drs ];
$_->wq_io_do('prune_commit', [ $p->{op_p} ]) for @IDX_SHARDS;
}
-sub init_associate_prefork ($) {
+sub init_join_prefork ($) {
my ($self) = @_;
- return unless $self->{-opt}->{associate};
+ my $subopt = $self->{-opt}->{join} // return;
+ %JOIN = map {
+ my ($k, $v) = split /:/, $_, 2;
+ $k => $v // 1;
+ } split(/,/, join(',', @$subopt));
require PublicInbox::CidxXapHelperAux;
require PublicInbox::XapClient;
- $self->{-pi_cfg} = PublicInbox::Config->new;
+ my $cfg = $self->{-opt}->{-pi_cfg} // die 'BUG: -pi_cfg unset';
+ $self->{-cfg_f} = $cfg->{-f} = rel2abs_collapsed($cfg->{-f});
my @unknown;
- my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
- @pfx = map { split(/\s*,\s*/) } @pfx;
- for (@pfx) {
+ my $pfx = $JOIN{prefixes} // 'patchid';
+ for (split /\+/, $pfx) {
my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$_} //
push(@unknown, $_);
- push(@ASSOC_PFX, split(/ /, $v));
+ push(@JOIN_PFX, split(/ /, $v));
}
- die <<EOM if @unknown;
---associate-prefixes contains unsupported prefixes: @unknown
+ @unknown and die <<EOM;
+E: --join=prefixes= contains unsupported prefixes: @unknown
EOM
- @ASSOC_PFX = uniqstr @ASSOC_PFX;
+ @JOIN_PFX = uniqstr @JOIN_PFX;
my %incl = map {
- rel2abs_collapsed($_) => undef;
+ if (-f "$_/inbox.lock" || -d "$_/public-inbox") {
+ rel2abs_collapsed($_) => undef;
+ } else {
+ warn "W: `$_' is not a public inbox, skipping\n";
+ ();
+ }
} (@{$self->{-opt}->{include} // []});
my $all = $self->{-opt}->{all};
if (my $only = $self->{-opt}->{only}) {
# --all implied since no inboxes were specified with --only or --include
EOM
}
- $self->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl, $all);
- my $nr = scalar(@IBX) or die "E: no inboxes to associate\n";
- progress($self, "will associate $nr inboxes in ",
- $self->{-pi_cfg}->{-f}, " using: @pfx");
+ $self->{-opt}->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl, $all);
+ my $nr = scalar(@IBX) or die "E: no inboxes to join with\n";
+ progress($self, "will join with $nr inboxes in ",
+ $self->{-opt}->{-pi_cfg}->{-f}, " using: $pfx");
}
sub _prep_ibx { # each_inbox callback
($all || exists($incl->{$ibx->{inboxdir}})) and push @IBX, $ibx;
}
-sub show_roots { # for diagnostics
+sub show_json { # for diagnostics (unstable output)
my ($self) = @_;
+ my $s = $self->{-opt}->{show} or return; # for diagnostics
local $self->{xdb};
- my $cur = $self->xdb->allterms_begin('G');
- my $end = $self->{xdb}->allterms_end('G');
- my $qrepo = $PublicInbox::Search::X{Query}->new('T'.'r');
- my $enq = $PublicInbox::Search::X{Enquire}->new($self->{xdb});
- $enq->set_weighting_scheme($PublicInbox::Search::X{BoolWeight}->new);
- $enq->set_docid_order($PublicInbox::Search::ENQ_ASCENDING);
- for (; $cur != $end; $cur++) {
- my $G_oidhex = $cur->get_termname;
- my $qry = $PublicInbox::Search::X{Query}->new(
- PublicInbox::Search::OP_FILTER(),
- $qrepo, $G_oidhex);
- $enq->set_query($qry);
- my ($off, $lim) = (0, 10000);
- say 'commit ',substr($G_oidhex, 1), ' appears in:';
- while (1) {
- my $mset = $enq->get_mset($off, $lim);
- my $size = $mset->size or last;
- for my $x ($mset->items) {
- my $doc = $x->get_document;
- for (xap_terms('P', $x->get_document)) {
- say '- /', substr($_, 1);
- }
- }
- $off += $size;
+ my %ret;
+ my @todo = @$s;
+ while (defined(my $f = shift @todo)) {
+ if ($f =~ /\A(?:roots2paths|paths2roots|join_data)\z/) {
+ $ret{$f} = $self->$f;
+ } elsif ($f eq '') { # default --show (no args)
+ push @todo, qw(roots2paths join_data);
+ } else {
+ warn "E: cannot show `$f'\n";
}
}
+ my $json = ref(PublicInbox::Config::json())->new;
+ $json->utf8->canonical->pretty; # n.b. FS pathnames may not be UTF-8...
+ say $json->encode(\%ret);
}
sub do_inits { # called via PublicInbox::DS::add_timer
my ($self) = @_;
- init_associate_postfork($self);
+ init_join_postfork($self);
init_prune($self);
scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
- my $max = $TODO{associate} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
+ my $max = $TODO{do_join} ? max($LIVE_JOBS, $NPROC) : $LIVE_JOBS;
index_next($self) for (1..$max);
}
local $GIT_TODO = [];
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
- %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
- @ID2ROOT, $XHC, @SORT, $GITS_NR);
+ %TODO, @IBXQ, @IBX, @JOIN, %JOIN, @JOIN_PFX,
+ @JOIN_DT, $DUMP_IBX_WPIPE, @OFF2ROOT, $XHC, @SORT, $GITS_NR);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local $MAX_SIZE = $self->{-opt}->{max_size};
- local $self->{ASSOC_PFX} = \@ASSOC_PFX;
- local $self->{PENDING} = {};
- local $self->{-pi_cfg};
- if ($self->{-opt}->{'associate-aggressive'}) { # shortcut
- $self->{-opt}->{'associate-date-range'} //= '19700101000000..';
- $self->{-opt}->{'associate-window'} //= -1;
- $self->{-opt}->{associate} //= 1;
- }
- if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
+ local $self->{PENDING} = {}; # used by PublicInbox::CidxXapHelperAux
+ local $self->{-cfg_f};
+ if (grep { $_ } @{$self->{-opt}}{qw(prune join)}) {
require File::Temp;
$TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
$CMD_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
- require_progs('(prune|associate)', sort => \@SORT);
+ require_progs('(prune|join)', sort => \@SORT);
for (qw(parallel compress-program buffer-size)) { # GNU sort
my $v = $self->{-opt}->{"sort-$_"};
push @SORT, "--$_=$v" if defined $v;
}
- init_associate_prefork($self)
+ init_join_prefork($self)
}
local @IDX_SHARDS = cidx_init($self); # forks workers
local $self->{current_info} = '';
PublicInbox::DS::event_loop($MY_SIG, $SIGSET);
PublicInbox::DS->Reset;
$self->lock_release(!!$NCHANGE);
- show_roots($self) if $self->{-opt}->{'show-roots'} # for diagnostics
+ show_json($self);
}
sub ipc_atfork_child { # @IDX_SHARDS
void *mm_ptr;
char **entries;
struct fbuf wbuf;
- int root2id_fd;
+ int root2off_fd;
};
// n.b. __cleanup__ works fine with C++ exceptions, but not longjmp
static void dump_roots_ensure(void *ptr)
{
struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr;
- if (drt->root2id_fd >= 0)
- xclose(drt->root2id_fd);
+ if (drt->root2off_fd >= 0)
+ xclose(drt->root2off_fd);
hdestroy(); // idempotent
if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size))
EABORT("BUG: munmap(%p, %zu)", drt->mm_ptr, drt->sb.st_size);
fbuf_ensure(&drt->wbuf);
}
-static bool root2ids_str(struct fbuf *root_ids, Xapian::Document *doc)
+static bool root2offs_str(struct fbuf *root_offs, Xapian::Document *doc)
{
Xapian::TermIterator cur = doc->termlist_begin();
Xapian::TermIterator end = doc->termlist_end();
ENTRY e, *ep;
- fbuf_init(root_ids);
+ fbuf_init(root_offs);
for (cur.skip_to("G"); cur != end; cur++) {
std::string tn = *cur;
if (!starts_with(&tn, "G", 1))
ep = hsearch(e, FIND);
if (!ep) ABORT("hsearch miss `%s'", e.key);
// ep->data is a NUL-terminated string matching /[0-9]+/
- fputc(' ', root_ids->fp);
- fputs((const char *)ep->data, root_ids->fp);
+ fputc(' ', root_offs->fp);
+ fputs((const char *)ep->data, root_offs->fp);
}
- fputc('\n', root_ids->fp);
- if (ferror(root_ids->fp) | fclose(root_ids->fp))
- err(EXIT_FAILURE, "ferror|fclose(root_ids)"); // ENOMEM
- root_ids->fp = NULL;
+ fputc('\n', root_offs->fp);
+ if (ferror(root_offs->fp) | fclose(root_offs->fp))
+ err(EXIT_FAILURE, "ferror|fclose(root_offs)"); // ENOMEM
+ root_offs->fp = NULL;
return true;
}
// writes term values matching @pfx for a given @doc, ending the line
-// with the contents of @root_ids
+// with the contents of @root_offs
static void dump_roots_term(struct req *req, const char *pfx,
struct dump_roots_tmp *drt,
- struct fbuf *root_ids,
+ struct fbuf *root_offs,
Xapian::Document *doc)
{
Xapian::TermIterator cur = doc->termlist_begin();
if (!starts_with(&tn, pfx, pfx_len))
continue;
fputs(tn.c_str() + pfx_len, drt->wbuf.fp);
- fwrite(root_ids->ptr, root_ids->len, 1, drt->wbuf.fp);
+ fwrite(root_offs->ptr, root_offs->len, 1, drt->wbuf.fp);
++req->nr_out;
}
}
err(EXIT_FAILURE, "ferror|fclose(drt->wbuf.fp)");
drt->wbuf.fp = NULL;
if (!drt->wbuf.len) goto done_free;
- while (flock(drt->root2id_fd, LOCK_EX)) {
+ while (flock(drt->root2off_fd, LOCK_EX)) {
if (errno == EINTR) continue;
err(EXIT_FAILURE, "LOCK_EX"); // ENOLCK?
}
return false;
}
} while (drt->wbuf.len);
- while (flock(drt->root2id_fd, LOCK_UN)) {
+ while (flock(drt->root2off_fd, LOCK_UN)) {
if (errno == EINTR) continue;
err(EXIT_FAILURE, "LOCK_UN"); // ENOLCK?
}
struct dump_roots_tmp *drt,
Xapian::MSetIterator *i)
{
- CLEANUP_FBUF struct fbuf root_ids = {}; // " $ID0 $ID1 $IDx..\n"
+ CLEANUP_FBUF struct fbuf root_offs = {}; // " $ID0 $ID1 $IDx..\n"
try {
Xapian::Document doc = i->get_document();
- if (!root2ids_str(&root_ids, &doc))
+ if (!root2offs_str(&root_offs, &doc))
return ITER_ABORT; // bad request, abort
for (int p = 0; p < req->pfxc; p++)
dump_roots_term(req, req->pfxv[p], drt,
- &root_ids, &doc);
+ &root_offs, &doc);
} catch (const Xapian::DatabaseModifiedError & e) {
req->srch->db->reopen();
return ITER_RETRY;
static bool cmd_dump_roots(struct req *req)
{
CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt = {};
- drt.root2id_fd = -1;
+ drt.root2off_fd = -1;
if ((optind + 1) >= req->argc)
ABORT("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
if (!req->pfxc)
ABORT("dump_roots requires -A PREFIX");
- const char *root2id_file = req->argv[optind];
- drt.root2id_fd = open(root2id_file, O_RDONLY);
- if (drt.root2id_fd < 0)
- EABORT("open(%s)", root2id_file);
- if (fstat(drt.root2id_fd, &drt.sb)) // ENOMEM?
- err(EXIT_FAILURE, "fstat(%s)", root2id_file);
+ const char *root2off_file = req->argv[optind];
+ drt.root2off_fd = open(root2off_file, O_RDONLY);
+ if (drt.root2off_fd < 0)
+ EABORT("open(%s)", root2off_file);
+ if (fstat(drt.root2off_fd, &drt.sb)) // ENOMEM?
+ err(EXIT_FAILURE, "fstat(%s)", root2off_file);
// each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
// so /32 overestimates the number of expected entries by
// ~%25 (as recommended by Linux hcreate(3) manpage)
size_t est = (drt.sb.st_size / 32) + 1; //+1 for "\0" termination
if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX)
err(EXIT_FAILURE, "%s size too big (%lld bytes > %zu)",
- root2id_file, (long long)drt.sb.st_size, SIZE_MAX);
+ root2off_file, (long long)drt.sb.st_size, SIZE_MAX);
drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ,
- MAP_PRIVATE, drt.root2id_fd, 0);
+ MAP_PRIVATE, drt.root2off_fd, 0);
if (drt.mm_ptr == MAP_FAILED)
- err(EXIT_FAILURE, "mmap(%zu, %s)", drt.sb.st_size,root2id_file);
+ err(EXIT_FAILURE, "mmap(%zu, %s)",
+ drt.sb.st_size, root2off_file);
drt.entries = (char **)calloc(est * 2, sizeof(char *));
if (!drt.entries)
err(EXIT_FAILURE, "calloc(%zu * 2, %zu)", est, sizeof(char *));