]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
cindex: fix sorting and uniqueness
authorEric Wong <e@80x24.org>
Thu, 24 Aug 2023 01:22:34 +0000 (01:22 +0000)
committerEric Wong <e@80x24.org>
Thu, 24 Aug 2023 07:47:52 +0000 (07:47 +0000)
We can't rely on combining the `-u' and `-k1,1' switches of POSIX
sort(1) to do what we want.  So only rely on `sort -k1,1' while
introducing a small Perl helper to fold identical prefixes into
one line.  In other words, input such as:

  deadbeef 0
  deadbeef 1
  deadbeef 2

Was getting deduplicated into a single line:

  deadbeef 0

... with `sort -u -k1,1'
This makes puts the output into a more optimal form for eventual
(not-fully-implemented-yet) parsing:

  deadbeef 0,1,2

ORS is current the comma (`,') for inbox IDs, but it'll be a
space (` ') for coderepo root IDs.  This implementation also
combines identical IDs in the 2nd column.  Thus:

  deadbeef 0
  deadbeef 0

Becomes a single `deadbeef 0' line thanks to the use of
XS List::Util::uniq (which beats a pure Perl hash).

I attempted to implement this in awk but Perl is close enough to
gawk in performance while being shorter and easier-to-understand
due to List::Util::uniq.  mawk was faster, but still not enough
to matter as the bottleneck is from iterating through Xapian
MSets.

lib/PublicInbox/CodeSearchIdx.pm

index b8afecd2c68b17733006d3ceb20e73b8e4c2e069..4a41b1dad28f61a147dc94c8d8f0df12c403b183 100644 (file)
 #
 # * to_root_id - each line is of the format:
 #
-#      $PFX $ROOT_COMMIT_OID_ID
+#      $PFX $ROOT_COMMIT_OID_IDS
 #
 # * to_ibx_id - each line is of the format:
 #
-#      $PFX $IBX_ID
+#      $PFX $IBX_IDS
 #
+# $IBX_IDS is a comma-delimited list of integers ($IBX_ID)
+# $ROOT_COMMIT_OID_IDS is space-delimited
 # In both cases, $PFX is typically the value of the patchid (XDFID) but it
 # can be configured to use any combination of patchid, dfpre, dfpost or
 # dfblob.
@@ -134,6 +136,20 @@ sub new {
        $self;
 }
 
+# This is similar to uniq(1) on the first column, but combines the
+# contents of subsequent columns using $OFS.
+our @UNIQ_FOLD = ($^X, $^W ? ('-w') : (), qw(-MList::Util=uniq -ane), <<'EOM');
+BEGIN { $ofs = $ENV{OFS} // ','; $apfx = '' }
+if ($F[0] eq $apfx) {
+       shift @F;
+       push @ids, @F;
+} else {
+       print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids;
+       ($apfx, @ids) = @F;
+}
+END { print $apfx.' '.join($ofs, uniq(@ids))."\n" if @ids }
+EOM
+
 # TODO: may be used for reshard/compact
 sub count_shards { scalar($_[0]->xdb_shards_flat) }
 
@@ -519,16 +535,21 @@ sub dump_roots_once {
        @ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
        my $id = 0;
        my %root2id = map { $_ => $id++ } @ID2ROOT;
-       pipe(my ($r, $w)) or die "pipe: $!";
+       # dump_shard_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
+       pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+       pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
        my @sort = (@SORT, '-k1,1');
        my $dst = "$TMPDIR/to_root_id";
        open my $fh, '>', $dst or die "open($dst): $!";
-       my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $r, 1 => $fh });
-       close $r or die "close: $!";
+       my $env = { %$CMD_ENV, OFS => ' ' };
+       my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
+       my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh });
        awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+       awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(shards)'], $associate);
        my ($c, $p) = PublicInbox::PktOp->pair;
        $c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
-       my @arg = ('dump_shard_roots', [ $p->{op_p}, $w ], \%root2id, $QRY_STR);
+       my @arg = ('dump_shard_roots', [ $p->{op_p}, $sort_w ],
+                       \%root2id, $QRY_STR);
        $_->wq_io_do(@arg) for @IDX_SHARDS;
        progress($self, 'waiting on dump_shard_roots sort');
 }
@@ -549,12 +570,15 @@ sub dump_ibx { # sends to xap_helper.h
 sub dump_ibx_start {
        my ($self, $associate) = @_;
        pipe(my $sort_r, $DUMP_IBX_WPIPE) or die "pipe: $!";
-       my @sort = (@SORT, '-k1,1');
+       pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
+       my @sort = (@SORT, '-k1,1'); # sort only on ASSOC_PFX
+       # pipeline: dump_ibx | sort -k1,1 | uniq_fold >to_ibx_id
        my $dst = "$TMPDIR/to_ibx_id";
        open my $fh, '>', $dst or die "open($dst): $!";
-       my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
-       close $sort_r or die "close: $!";
+       my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
+       my $fold_pid = spawn(\@UNIQ_FOLD, $CMD_ENV, { 0 => $fold_r, 1 => $fh });
        awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+       awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(ibx)'], $associate);
        ($XHC, $XH_PID) = PublicInbox::XapClient::start_helper("-j$NPROC");
 }
 
@@ -869,9 +893,11 @@ sub associate {
        my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
        my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
        my %score;
-       while (<$rd>) { # PFX ibx_id root_id
-               my (undef, $ibx_id, @root_id) = split(/ /, $_);
-               ++$score{"$ibx_id $_"} for @root_id;
+       while (<$rd>) { # PFX ibx_ids root_id
+               my (undef, $ibx_ids, @root_ids) = split(/ /, $_);
+               for my $ibx_id (split(/,/, $ibx_ids)) {
+                       ++$score{"$ibx_id $_"} for @root_ids;
+               }
        }
        close $rd or die "@join failed: $?=$?";
        my $min = $self->{-opt}->{'assoc-min'} // 10;
@@ -939,9 +965,10 @@ sub init_prune ($) {
        my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
        pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
        pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
+       my @sort_u = (@SORT, '-u');
        open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
-       my $pid = spawn(\@SORT, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
-       awaitpid($pid, \&cmd_done, \@SORT, $run_prune);
+       my $pid = spawn(\@sort_u, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
+       awaitpid($pid, \&cmd_done, \@sort_u, $run_prune);
        $pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out });
        awaitpid($pid, \&cmd_done, \@sed, $run_prune);
        $pid = spawn(\@delve, undef, { 1 => $delve_out });
@@ -971,12 +998,13 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
        pipe(my ($sort_in, $awk_out)) or die "pipe: $!";
        pipe(my ($comm_in, $sort_out)) or die "pipe: $!";
        my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out });
-       my $sort_pid = spawn(\@SORT, $CMD_ENV,
+       my @sort_u = (@SORT, '-u');
+       my $sort_pid = spawn(\@sort_u, $CMD_ENV,
                                { 0 => $sort_in, 1 => $sort_out });
        my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $CMD_ENV,
                                { 0 => $comm_in, -C => "$TMPDIR" });
        awaitpid($awk_pid, \&cmd_done, \@AWK);
-       awaitpid($sort_pid, \&cmd_done, \@SORT);
+       awaitpid($sort_pid, \&cmd_done, \@sort_u);
        awaitpid($comm_pid, \&cmd_done, \@COMM);
        PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
        my $git_ver = PublicInbox::Git::git_version();
@@ -1092,10 +1120,9 @@ sub cidx_run { # main entry point
        local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
                $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
                %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
-               @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC);
+               @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC, @SORT);
        local $BATCH_BYTES = $self->{-opt}->{batch_size} //
                                $PublicInbox::SearchIdx::BATCH_BYTES;
-       local @SORT = (undef, '-u');
        local $self->{ASSOC_PFX} = \@ASSOC_PFX;
        local $self->{PENDING} = {};
        local $self->{-pi_cfg};