#
# * to_root_id - each line is of the format:
#
-# $PFX $ROOT_COMMIT_OID_ID
+# $PFX $ROOT_COMMIT_OID_IDS
#
# * to_ibx_id - each line is of the format:
#
-# $PFX $IBX_ID
+# $PFX $IBX_IDS
#
+# $IBX_IDS is a comma-delimited list of integers ($IBX_ID)
+# $ROOT_COMMIT_OID_IDS is space-delimited
# In both cases, $PFX is typically the value of the patchid (XDFID) but it
# can be configured to use any combination of patchid, dfpre, dfpost or
# dfblob.
$self;
}
+# This is similar to uniq(1) on the first column, but combines the
+# contents of subsequent columns using $OFS.
+our @UNIQ_FOLD = ($^X, $^W ? ('-w') : (), qw(-MList::Util=uniq -ane), <<'EOM');
+BEGIN { $ofs = $ENV{OFS} // ','; $apfx = '' }
+if ($F[0] eq $apfx) {
+ shift @F;
+ push @ids, @F;
+} else {
+ print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids;
+ ($apfx, @ids) = @F;
+}
+END { print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids }
+EOM
+
# TODO: may be used for reshard/compact
sub count_shards { scalar($_[0]->xdb_shards_flat) }
@ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
my $id = 0;
my %root2id = map { $_ => $id++ } @ID2ROOT;
- pipe(my ($r, $w)) or die "pipe: $!";
+ # dump_shard_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
+ pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+ pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
my @sort = (@SORT, '-k1,1');
my $dst = "$TMPDIR/to_root_id";
open my $fh, '>', $dst or die "open($dst): $!";
- my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $r, 1 => $fh });
- close $r or die "close: $!";
+ my $env = { %$CMD_ENV, OFS => ' ' };
+ my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
+ my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh });
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+ awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(shards)'], $associate);
my ($c, $p) = PublicInbox::PktOp->pair;
$c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
- my @arg = ('dump_shard_roots', [ $p->{op_p}, $w ], \%root2id, $QRY_STR);
+ my @arg = ('dump_shard_roots', [ $p->{op_p}, $sort_w ],
+ \%root2id, $QRY_STR);
$_->wq_io_do(@arg) for @IDX_SHARDS;
progress($self, 'waiting on dump_shard_roots sort');
}
sub dump_ibx_start {
my ($self, $associate) = @_;
pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!";
- my @sort = (@SORT, '-k1,1');
+ pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
+ my @sort = (@SORT, '-k1,1'); # sort only on ASSOC_PFX
+ # pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_id
my $dst = "$TMPDIR/to_ibx_id";
open my $fh, '>', $dst or die "open($dst): $!";
- my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
- close $sort_r or die "close: $!";
+ my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
+ my $fold_pid = spawn(\@UNIQ_FOLD, $CMD_ENV, { 0 => $fold_r, 1 => $fh });
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+ awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(ibx)'], $associate);
($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC");
}
my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
my %score;
- while (<$rd>) { # PFX ibx_id root_id
- my (undef, $ibx_id, @root_id) = split(/ /, $_);
- ++$score{"$ibx_id $_"} for @root_id;
+ while (<$rd>) { # PFX ibx_ids root_id
+ my (undef, $ibx_ids, @root_ids) = split(/ /, $_);
+ for my $ibx_id (split(/,/, $ibx_ids)) {
+ ++$score{"$ibx_id $_"} for @root_ids;
+ }
}
close $rd or die "@join failed: $?=$?";
my $min = $self->{-opt}->{'assoc-min'} // 10;
my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
+ my @sort_u = (@SORT, '-u');
open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
- my $pid = spawn(\@SORT, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
- awaitpid($pid, \&cmd_done, \@SORT, $run_prune);
+ my $pid = spawn(\@sort_u, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
+ awaitpid($pid, \&cmd_done, \@sort_u, $run_prune);
$pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out });
awaitpid($pid, \&cmd_done, \@sed, $run_prune);
$pid = spawn(\@delve, undef, { 1 => $delve_out });
pipe(my ($sort_in, $awk_out)) or die "pipe: $!";
pipe(my ($comm_in, $sort_out)) or die "pipe: $!";
my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out });
- my $sort_pid = spawn(\@SORT, $CMD_ENV,
+ my @sort_u = (@SORT, '-u');
+ my $sort_pid = spawn(\@sort_u, $CMD_ENV,
{ 0 => $sort_in, 1 => $sort_out });
my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $CMD_ENV,
{ 0 => $comm_in, -C => "$TMPDIR" });
awaitpid($awk_pid, \&cmd_done, \@AWK);
- awaitpid($sort_pid, \&cmd_done, \@SORT);
+ awaitpid($sort_pid, \&cmd_done, \@sort_u);
awaitpid($comm_pid, \&cmd_done, \@COMM);
PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
my $git_ver = PublicInbox::Git::git_version();
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
%TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
- @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC);
+ @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC, @SORT);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
- local @SORT = (undef, '-u');
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
local $self->{PENDING} = {};
local $self->{-pi_cfg};