]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
cindex: read-only association dump
authorEric Wong <e@80x24.org>
Thu, 24 Aug 2023 01:22:31 +0000 (01:22 +0000)
committerEric Wong <e@80x24.org>
Thu, 24 Aug 2023 07:47:49 +0000 (07:47 +0000)
This will eventually allow associating coderepos with inboxes
and vice-versa; avoiding the need for manual configuration via
tedious publicinbox.*.coderepo directives.

I'm not sure how this should be stored for WWW, yet, but it's
required since it takes about 8 hours to do this fully across
lore and git.kernel.org.

MANIFEST
lib/PublicInbox/CidxDumpIbx.pm [new file with mode: 0644]
lib/PublicInbox/CidxDumpShardRoots.pm [new file with mode: 0644]
lib/PublicInbox/CidxRecvIbx.pm [new file with mode: 0644]
lib/PublicInbox/CodeSearchIdx.pm
lib/PublicInbox/Config.pm
lib/PublicInbox/Search.pm
script/public-inbox-cindex

index 1001ca08253d6ac9910ba6010dbda6aef5caf0b6..162e30381250b807b2e060df64fc830e88667504 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,7 +162,10 @@ lib/PublicInbox/AltId.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
 lib/PublicInbox/CidxComm.pm
+lib/PublicInbox/CidxDumpIbx.pm
+lib/PublicInbox/CidxDumpShardRoots.pm
 lib/PublicInbox/CidxLogP.pm
+lib/PublicInbox/CidxRecvIbx.pm
 lib/PublicInbox/CmdIPC4.pm
 lib/PublicInbox/CodeSearch.pm
 lib/PublicInbox/CodeSearchIdx.pm
diff --git a/lib/PublicInbox/CidxDumpIbx.pm b/lib/PublicInbox/CidxDumpIbx.pm
new file mode 100644 (file)
index 0000000..e1bc273
--- /dev/null
@@ -0,0 +1,59 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate
+# Iterating through mset->items is slow in Perl due to method dispatch
+# and that loop may implemented in C++ using Xapian directly
+package PublicInbox::CidxDumpIbx;
+use v5.12;
+use PublicInbox::Search qw(xap_terms);
+use PublicInbox::DS;
+use Socket qw(MSG_EOR);
+
+sub start {
+       my ($rcvibx, $ibx_id) = @_;
+       my $cidx = $rcvibx->{cidx};
+       my $ibx = $cidx->{IBX}->[$ibx_id] // die "BUG: no IBX[$ibx_id]";
+       my $self = bless { rcvibx => $rcvibx, ekey => $ibx->eidx_key,
+               ibx_id => $ibx_id }, __PACKAGE__;
+       $self->{srch} = $ibx->isrch // do {
+               warn("W: $self->{ekey} has no search index (ignoring)\n");
+               return undef;
+       };
+       my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
+       $self->{mset} = $self->{srch}->mset($rcvibx->{qry_str}, $opt);
+       $self->{iter} = 0;
+       event_step($self);
+}
+
+sub event_step {
+       my ($self) = @_;
+       my $rcvibx = $self->{rcvibx} // die 'BUG: no rcvibx';
+       return if $rcvibx->{cidx}->do_quit;
+       my $last = $self->{mset}->size - 1;
+       my $cur = $self->{iter};
+       my $end = $cur + 9999;
+       if ($end >= $last) {
+               send($rcvibx->{op_p}, 'index_next', MSG_EOR);
+               $end = $last;
+       }
+       $self->{iter} = $end + 1;
+       local $0 = "dumping $self->{ekey} $cur..$end";
+
+       my $sort_w = $rcvibx->{sort_w};
+       my $ibx_id = $self->{ibx_id};
+       local $0 = "dumping $self->{ekey} $cur..$end";
+       $rcvibx->{cidx}->progress($0);
+       for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
+               my $doc = $x->get_document;
+               for my $p (@{$rcvibx->{cidx}->{ASSOC_PFX}}) {
+                       for (xap_terms($p, $doc)) {
+                               print $sort_w "$_ $ibx_id\n" or die "print: $!";
+                       }
+               }
+       }
+       $end < $last && !$rcvibx->{cidx}->do_quit and
+               PublicInbox::DS::requeue($self);
+}
+
+1;
diff --git a/lib/PublicInbox/CidxDumpShardRoots.pm b/lib/PublicInbox/CidxDumpShardRoots.pm
new file mode 100644 (file)
index 0000000..34afa41
--- /dev/null
@@ -0,0 +1,73 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+
+# Intended for PublicInbox::DS::event_loop for -cindex --associate
+# Iterating through mset->items is slow in Perl due to method dispatch
+# and that loop may implemented in C++ using Xapian directly
+package PublicInbox::CidxDumpShardRoots;
+use v5.12;
+use PublicInbox::Lock;
+use PublicInbox::Search qw(xap_terms);
+use Socket qw(MSG_EOR);
+
+sub start {
+       my ($cidx, $root2id, $qry_str) = @_;
+       my $op_p = delete($cidx->{0}) // die 'BUG: no {0} op_p';
+       my $sort_w = delete($cidx->{1}) // die 'BUG: no {1} $w sort pipe';
+       # sort lock is necessary if we have may root ids which cause a
+       # row length to exceed POSIX PIPE_BUF (via `$G' below)
+       my $sort_lk = bless { lock_path => $cidx->tmpdir.'/to_root_id.lock' },
+               'PublicInbox::Lock';
+       $sort_w->autoflush(1);
+       $cidx->begin_txn_lazy; # only using txn to simplify writer subs
+       my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
+       my $self = bless {
+               cidx => $cidx,
+               op_p => $op_p,
+               iter => 0,
+               mset => $cidx->mset($qry_str, $opt),
+               root2id => $root2id,
+               sort_w => $sort_w,
+               sort_lk => $sort_lk,
+       }, __PACKAGE__;
+       event_step($self);
+}
+
+sub event_step {
+       my ($self) = @_;
+       my $cidx = $self->{cidx};
+       return if $cidx->do_quit;
+       my $last = $self->{mset}->size - 1;
+       my $cur = $self->{iter};
+       my $end = $cur + 9999;
+       $end = $last if $end > $last;
+       $self->{iter} = $end + 1;
+       local $0 = "dumping shard [$cidx->{shard}] $cur..$end";
+       $cidx->progress($0);
+
+       my $root2id = $self->{root2id};
+       my $buf = '';
+       for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
+               my $doc = $x->get_document;
+               my $G = join(' ', map {
+                       $root2id->{pack('H*', $_)};
+               } xap_terms('G', $doc));
+               for my $p (@{$cidx->{ASSOC_PFX}}) {
+                       $buf .= "$_ $G\n" for (xap_terms($p, $doc));
+               }
+       }
+       $self->{sort_lk}->lock_acquire_fast;
+       print { $self->{sort_w} } $buf or die "print: $!";
+       $self->{sort_lk}->lock_release_fast;
+       $end < $last && !$cidx->do_quit and
+               PublicInbox::DS::requeue($self);
+}
+
+sub DESTROY {
+       my ($self) = @_;
+       return if $self->{cidx}->do_quit;
+       send($self->{op_p},
+               "dump_shard_roots_done $self->{cidx}->{shard}", MSG_EOR);
+}
+
+1;
diff --git a/lib/PublicInbox/CidxRecvIbx.pm b/lib/PublicInbox/CidxRecvIbx.pm
new file mode 100644 (file)
index 0000000..6add8e5
--- /dev/null
@@ -0,0 +1,46 @@
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+#
+# dumps all per-inbox info for -cindex --associate
+# integrated into the event loop for signalfd SIGINT handling
+package PublicInbox::CidxRecvIbx;
+use v5.12;
+use parent qw(PublicInbox::DS);
+use PublicInbox::Syscall qw(EPOLLIN EPOLLEXCLUSIVE);
+use Socket qw(MSG_EOR);
+use PublicInbox::CidxDumpIbx;
+
+sub new {
+       my ($cls, $cidx, $qry_str) = @_;
+       my ($op_p, $r_ibx, $sort_w) = delete @$cidx{0..2};
+       $op_p // die 'BUG: no $op_p';
+       $r_ibx // die 'BUG: no $r_ibx';
+       $sort_w // die 'BUG: no $sort_w';
+       my $self = bless {}, $cls;
+       $self->SUPER::new($r_ibx, EPOLLIN|EPOLLEXCLUSIVE);
+       $self->{cidx} = $cidx;
+       $self->{sort_w} = $sort_w;
+       $self->{op_p} = $op_p; # PublicInbox::CidxDumpIbx uses this
+       $self->{qry_str} = $qry_str;
+       # writes to this pipe are never longer than POSIX PIPE_BUF,
+       # so rely on POSIX atomicity guarantees
+       $sort_w->autoflush(1);
+       $self;
+}
+
+sub event_step {
+       my ($self) = @_;
+       recv($self->{sock}, my $ibx_id, 25, 0) // die "recv: $!";
+       return $self->close if $ibx_id eq '' || $self->{cidx}->do_quit;
+       PublicInbox::CidxDumpIbx::start($self, $ibx_id);
+}
+
+sub close {
+       my ($self) = @_;
+       $self->{cidx}->do_quit or
+               send($self->{op_p},
+                       "recv_ibx_done $self->{cidx}->{shard}", MSG_EOR);
+       $self->SUPER::close; # PublicInbox::DS::close
+}
+
+1;
index ba14e52ae4680484c716e3ccec61632c30d9e7ab..2480dbd20564ccd41847ca53fdeae5e74a01b6d7 100644 (file)
 #
 # We shard repos using the first 32-bits of sha256($ABS_GIT_DIR)
 #
+# --associate joins root commits of coderepos to inboxes based on prefixes.
+#
+# Internally, each inbox is assigned a non-negative integer index ($IBX_ID),
+# and each root commit object ID (SHA-1/SHA-256 hex) is also assigned
+# a non-negative integer index ($ROOT_COMMIT_OID_ID).
+#
+# associate dumps to 2 intermediate files in $TMPDIR:
+#
+# * to_root_id - each line is of the format:
+#
+#      $PFX $ROOT_COMMIT_OID_ID
+#
+# * to_ibx_id - each line is of the format:
+#
+#      $PFX $IBX_ID
+#
+# In both cases, $PFX is typically the value of the patchid (XDFID) but it
+# can be configured to use any combination of patchid, dfpre, dfpost or
+# dfblob.
+#
 # See PublicInbox::CodeSearch (read-only API) for more
 package PublicInbox::CodeSearchIdx;
 use v5.12;
@@ -25,14 +45,14 @@ use File::Spec ();
 use PublicInbox::SHA qw(sha256_hex);
 use PublicInbox::Search qw(xap_terms);
 use PublicInbox::SearchIdx qw(add_val);
-use PublicInbox::Config qw(glob2re);
+use PublicInbox::Config qw(glob2re rel2abs_collapsed);
 use PublicInbox::Spawn qw(which spawn popen_rd);
 use PublicInbox::OnDestroy;
 use PublicInbox::CidxLogP;
 use PublicInbox::CidxComm;
 use PublicInbox::Git qw(%OFMT2HEXLEN);
 use PublicInbox::Compat qw(uniqstr);
-use Socket qw(MSG_EOR);
+use Socket qw(MSG_EOR AF_UNIX SOCK_SEQPACKET);
 use Carp ();
 our (
        $LIVE, # pid => cmd
@@ -55,8 +75,15 @@ our (
        %ALT_FH, # hexlen => tmp IO for TMPDIR git alternates
        $TMPDIR, # File::Temp->newdir object for prune
        @PRUNE_QUEUE, # GIT_DIRs to prepare for pruning
-       $PRUNE_ENV, # env for awk(1), comm(1), sort(1) commands during prune
+       %TODO, @IBXQ, @IBX,
+       @JOIN, # join(1) command for associate
+       $CMD_ENV, # env for awk(1), comm(1), sort(1) commands during prune
        @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
+       @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
+       $QRY_STR, # common query string for both code and inbox associations
+       $IBXDIR_FEED, # SOCK_SEQPACKET
+       @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK, # for associate
+       @ID2ROOT,
 );
 
 # stop walking history if we see >$SEEN_MAX existing commits, this assumes
@@ -64,6 +91,9 @@ our (
 # git walks commits quickly if it doesn't have to read trees
 our $SEEN_MAX = 100000;
 
+# window for commits/emails to determine a inbox <-> coderepo association
+my $ASSOC_MAX = 50000;
+
 our @PRUNE_BATCH = qw(git _ cat-file --batch-all-objects --batch-check);
 
 # TODO: do we care about committer name + email? or tree OID?
@@ -455,6 +485,91 @@ sub shard_commit { # via wq_io_do
        send($op_p, "shard_done $self->{shard}", MSG_EOR);
 }
 
+sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion
+       my ($self, $associate, $n) = @_;
+       return if $DO_QUIT;
+       progress($self, "dump_shard_roots [$n] done");
+       $DUMP_SHARD_ROOTS_OK[$n] = 1;
+       # may run associate()
+}
+
+sub assoc_max_init ($) {
+       my ($self) = @_;
+       my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX;
+       $max = $ASSOC_MAX if !$max;
+       $max < 0 ? ((2 ** 31) - 1) : $max;
+}
+
+# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2..
+sub dump_shard_roots { # via wq_io_do for associate
+       my ($self, $root2id, $qry_str) = @_;
+       PublicInbox::CidxDumpShardRoots::start($self, $root2id, $qry_str);
+}
+
+sub dump_roots_once {
+       my ($self, $associate) = @_;
+       $associate // die 'BUG: no $associate';
+       $TODO{associating} = 1; # keep shards_active() happy
+       progress($self, 'dumping IDs from coderepos');
+       local $self->{xdb};
+       @ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
+       my $id = 0;
+       my %root2id = map { $_ => $id++ } @ID2ROOT;
+       pipe(my ($r, $w)) or die "pipe: $!";
+       my @sort = (@SORT, '-k1,1');
+       my $dst = "$TMPDIR/to_root_id";
+       open my $fh, '>', $dst or die "open($dst): $!";
+       my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $r, 1 => $fh });
+       close $r or die "close: $!";
+       awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+       my ($c, $p) = PublicInbox::PktOp->pair;
+       $c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
+       my @arg = ('dump_shard_roots', [ $p->{op_p}, $w ], \%root2id, $QRY_STR);
+       $_->wq_io_do(@arg) for @IDX_SHARDS;
+       progress($self, 'waiting on dump_shard_roots sort');
+}
+
+sub recv_ibx_done { # via PktOp on recv_ibx completion
+       my ($self, $pid, $n) = @_;
+       return if $DO_QUIT;
+       progress($self, "recv_ibx [$n] done");
+       $RECV_IBX_OK[$n] = 1;
+}
+
+# causes a worker to become a dumper for inbox/extindex
+sub recv_ibx { # wq_io_do
+       my ($self, $qry_str) = @_;
+       PublicInbox::CidxRecvIbx->new($self, $qry_str);
+}
+
+sub dump_ibx { # sends to PublicInbox::CidxRecvIbx::event_step
+       my ($self, $id_dir) = @_; # id_dir: "$IBX_ID=$INBOXDIR"
+       my $n = length($id_dir);
+       my $w = send($IBXDIR_FEED, $id_dir, MSG_EOR) // die "send: $!";
+       $n == $w or die "send($id_dir) $w != $n";
+}
+
+# repurpose shard workers to dump inbox patchids with perfect balance
+sub dump_ibx_start {
+       my ($self, $associate) = @_;
+       pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
+       my @sort = (@SORT, '-k1,1');
+       my $dst = "$TMPDIR/to_ibx_id";
+       open my $fh, '>', $dst or die "open($dst): $!";
+       my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fh });
+       close $sort_r or die "close: $!";
+       awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
+
+       my ($r, $w);
+       socketpair($r, $w, AF_UNIX, SOCK_SEQPACKET, 0) or die "socketpair: $!";
+       my ($c, $p) = PublicInbox::PktOp->pair;
+       $c->{ops}->{recv_ibx_done} = [ $self, $associate ];
+       $c->{ops}->{index_next} = [ $self ];
+       my $io = [ $p->{op_p}, $r, $sort_w ];
+       $_->wq_io_do('recv_ibx', $io, $QRY_STR) for @IDX_SHARDS;
+       $IBXDIR_FEED = $w;
+}
+
 sub index_next ($) {
        my ($self) = @_;
        return if $DO_QUIT;
@@ -466,6 +581,12 @@ sub index_next ($) {
                                                        $self, $git);
                fp_start($self, $git, $prep_repo);
                ct_start($self, $git, $prep_repo);
+       } elsif ($TMPDIR) {
+               delete $TODO{dump_ibx_start}; # runs OnDestroy once
+               return dump_ibx($self, shift @IBXQ) if @IBXQ;
+               progress($self, 'done dumping inboxes') if $IBXDIR_FEED;
+               undef $IBXDIR_FEED; # done dumping inboxes, dump roots
+               dump_roots_once($self, delete($TODO{associate}) // return);
        }
        # else: wait for shards_active (post_loop_do) callback
 }
@@ -502,7 +623,7 @@ sub commit_shard { # OnDestroy cb
        for my $n (keys %$active) {
                $IDX_SHARDS[$n]->wq_io_do('shard_commit', [ $p->{op_p} ]);
        }
-       undef $p; # shard_done fires when all shards are committed
+       # shard_done fires when all shards are committed
 }
 
 sub index_repo { # cidx_await cb
@@ -628,8 +749,8 @@ EOM
 
 sub scan_git_dirs ($) {
        my ($self) = @_;
-       @$GIT_TODO = @{$self->{git_dirs}};
-       index_next($self) for (1..$LIVE_JOBS);
+       my $n = @$GIT_TODO = @{$self->{git_dirs}};
+       progress($self, "scanning $n code repositories...");
 }
 
 sub prune_do { # via wq_io_do in IDX_SHARDS
@@ -661,7 +782,7 @@ sub shards_active { # post_loop_do
        return if grep(defined, $PRUNE_DONE, $GIT_TODO, $IDX_TODO, $LIVE) != 4;
        return 1 if grep(defined, @$PRUNE_DONE) != @IDX_SHARDS;
        return 1 if scalar(@$GIT_TODO) || scalar(@$IDX_TODO) || $REPO_CTX;
-       return 1 if keys(%$LIVE);
+       return 1 if keys(%$LIVE) || @IBXQ || keys(%TODO);
        for my $s (grep { $_->{-wq_s1} } @IDX_SHARDS) {
                $s->{-cidx_quit} = 1 if defined($s->{-wq_s1});
                $s->wq_close; # may recurse via awaitpid outside of event_loop
@@ -674,6 +795,7 @@ sub kill_shards { $_->wq_kill(@_) for (@IDX_SHARDS) }
 
 sub parent_quit {
        $DO_QUIT = POSIX->can("SIG$_[0]")->();
+       $IBXDIR_FEED = undef;
        kill_shards(@_);
        warn "# SIG$_[0] received, quitting...\n";
 }
@@ -717,6 +839,7 @@ sub prep_alternate_end { # awaitpid callback for config extensions.objectFormat
 E: ignoring objdir=$objdir, unknown extensions.objectFormat=$fmt
 EOM
        unless ($ALT_FH{$hexlen}) {
+               require PublicInbox::Import;
                my $git_dir = "$TMPDIR/hexlen$hexlen.git";
                PublicInbox::Import::init_bare($git_dir, 'cidx-all', $fmt);
                my $f = "$git_dir/objects/info/alternates";
@@ -739,52 +862,102 @@ sub prep_alternate_start {
        awaitpid($pid, \&prep_alternate_end, $o, $out, $run_prune);
 }
 
-sub prune_cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
-       my ($pid, $cmd, $run_prune) = @_;
+sub cmd_done { # awaitpid cb for sort, xapian-delve, sed failures
+       my ($pid, $cmd, $run_on_destroy) = @_;
        $? and die "@$cmd failed: \$?=$?";
+       # $run_on_destroy calls associate() or run_prune()
+}
+
+# runs once all inboxes and shards are dumped via OnDestroy
+sub associate {
+       my ($self) = @_;
+       return if $DO_QUIT;
+       @IDX_SHARDS or return warn("# aborting on no shards\n");
+       grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
+               die "E: shards not dumped properly\n";
+       grep(defined, @RECV_IBX_OK) == @IDX_SHARDS or
+               die "E: inboxes not dumped properly\n";
+       progress($self, 'associating...');
+       my @join = ('time', @JOIN, 'to_ibx_id', 'to_root_id');
+       my $rd = popen_rd(\@join, $CMD_ENV, { -C => "$TMPDIR" });
+       my %score;
+       while (<$rd>) { # PFX ibx_id root_id
+               my (undef, $ibx_id, @root_id) = split(/ /, $_);
+               ++$score{"$ibx_id $_"} for @root_id;
+       }
+       close $rd or die "@join failed: $?=$?";
+       my $min = $self->{-opt}->{'assoc-min'} // 10;
+       progress($self, scalar(keys %score).' potential pairings...');
+       for my $k (keys %score) {
+               my $nr = $score{$k};
+               my ($ibx_id, $root) = split(/ /, $k);
+               my $ekey = $IBX[$ibx_id]->eidx_key;
+               $root = unpack('H*', $ID2ROOT[$root]);
+               progress($self, "$ekey => $root has $nr matches");
+       }
+       delete $TODO{associating}; # break out of shards_active()
+       # TODO
+       warn "# Waiting for $TMPDIR/cont @JOIN";
+       system "ls -Rl $TMPDIR >&2";
+       system "wc -l $TMPDIR/to_*_id >&2";
+       #sleep(1) until -f "$TMPDIR/cont";
+       # warn "# Waiting for $TMPDIR/cont";
+       # sleep(1) until -f "$TMPDIR/cont";
+}
+
+sub require_progs {
+       my $op = shift;
+       while (my ($x, $argv) = splice(@_, 0, 2)) {
+               my $e = $x;
+               $e =~ tr/a-z-/A-Z_/;
+               my $c = $ENV{$e} // $x;
+               $argv->[0] //= which($c) // die "E: `$x' required for --$op\n";
+       }
+}
+
+sub init_associate_postfork ($) {
+       my ($self) = @_;
+       return unless $self->{-opt}->{associate};
+       require_progs('associate', join => \@JOIN);
+       $QRY_STR = $self->{-opt}->{'associate-date-range'} // '1.year.ago..';
+       substr($QRY_STR, 0, 0) = 'dt:';
+       scalar(@{$self->{git_dirs} //  []}) or die <<EOM;
+E: no coderepos to associate
+EOM
+       my $approx_git = PublicInbox::Git->new($self->{git_dirs}->[0]); # ugh
+       $self->query_approxidate($approx_git, $QRY_STR); # in-place
+       $TODO{associate} = PublicInbox::OnDestroy->new($$, \&associate, $self);
+       $TODO{dump_ibx_start} = PublicInbox::OnDestroy->new($$,
+                               \&dump_ibx_start, $self, $TODO{associate});
+       my $id = -1;
+       @IBXQ = map { ++$id } @IBX;
 }
 
 sub init_prune ($) {
        my ($self) = @_;
        return (@$PRUNE_DONE = map { 1 } @IDX_SHARDS) if !$self->{-opt}->{prune};
 
-       require File::Temp;
-       require PublicInbox::Import;
-       $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
-
        # Dealing with millions of commits here at once, so use faster tools.
        # xapian-delve is nearly an order-of-magnitude faster than Xapian Perl
        # bindings.  sed/awk are faster than Perl for simple stream ops, and
        # sort+comm are more memory-efficient with gigantic lists
        my @delve = (undef, qw(-A Q -1));
        my @sed = (undef, '-ne', 's/^Q//p');
-       @SORT = (undef, '-u');
        @COMM = (undef, qw(-2 -3 indexed_commits -));
        @AWK = (undef, '$2 == "commit" { print $1 }'); # --batch-check output
-       my @x = ('xapian-delve' => \@delve, sed => \@sed,
-               sort => \@SORT, comm => \@COMM, awk => \@AWK);
-       while (my ($x, $argv) = splice(@x, 0, 2)) {
-               my $e = $x;
-               $e =~ tr/a-z-/A-Z_/;
-               my $c = $ENV{$e} // $x;
-               $argv->[0] = which($c) // die "E: `$x' required for --prune\n";
-       }
+       require_progs('prune', 'xapian-delve' => \@delve, sed => \@sed,
+                       comm => \@COMM, awk => \@AWK);
        for (0..$#IDX_SHARDS) { push @delve, "$self->{xpfx}/$_" }
-       for (qw(parallel compress-program buffer-size)) { # GNU sort options
-               my $v = $self->{-opt}->{"sort-$_"};
-               push @SORT, "--$_=$v" if defined $v;
-       }
        my $run_prune = PublicInbox::OnDestroy->new($$, \&run_prune, $self);
        pipe(my ($sed_in, $delve_out)) or die "pipe: $!";
        pipe(my ($sort_in, $sed_out)) or die "pipe: $!";
        open(my $sort_out, '+>', "$TMPDIR/indexed_commits") or die "open: $!";
-       $PRUNE_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
-       my $pid = spawn(\@SORT, $PRUNE_ENV, { 0 => $sort_in, 1 => $sort_out });
-       awaitpid($pid, \&prune_cmd_done, \@SORT, $run_prune);
-       $pid = spawn(\@sed, $PRUNE_ENV, { 0 => $sed_in, 1 => $sed_out });
-       awaitpid($pid, \&prune_cmd_done, \@sed, $run_prune);
+       my $pid = spawn(\@SORT, $CMD_ENV, { 0 => $sort_in, 1 => $sort_out });
+       awaitpid($pid, \&cmd_done, \@SORT, $run_prune);
+       $pid = spawn(\@sed, $CMD_ENV, { 0 => $sed_in, 1 => $sed_out });
+       awaitpid($pid, \&cmd_done, \@sed, $run_prune);
        $pid = spawn(\@delve, undef, { 1 => $delve_out });
-       awaitpid($pid, \&prune_cmd_done, \@delve, $run_prune);
+       awaitpid($pid, \&cmd_done, \@delve, $run_prune);
        @PRUNE_QUEUE = @{$self->{git_dirs}};
        for (1..$LIVE_JOBS) {
                prep_alternate_start(shift(@PRUNE_QUEUE) // last, $run_prune);
@@ -809,14 +982,14 @@ sub run_prune { # OnDestroy when `git config extensions.objectFormat' are done
        pipe(my ($awk_in, $batch_out)) or die "pipe: $!";
        pipe(my ($sort_in, $awk_out)) or die "pipe: $!";
        pipe(my ($comm_in, $sort_out)) or die "pipe: $!";
-       my $awk_pid = spawn(\@AWK, $PRUNE_ENV, { 0 => $awk_in, 1 => $awk_out });
-       my $sort_pid = spawn(\@SORT, $PRUNE_ENV,
+       my $awk_pid = spawn(\@AWK, $CMD_ENV, { 0 => $awk_in, 1 => $awk_out });
+       my $sort_pid = spawn(\@SORT, $CMD_ENV,
                                { 0 => $sort_in, 1 => $sort_out });
-       my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $PRUNE_ENV,
+       my ($comm_rd, $comm_pid) = popen_rd(\@COMM, $CMD_ENV,
                                { 0 => $comm_in, -C => "$TMPDIR" });
-       awaitpid($awk_pid, \&prune_cmd_done, \@AWK);
-       awaitpid($sort_pid, \&prune_cmd_done, \@SORT);
-       awaitpid($comm_pid, \&prune_cmd_done, \@COMM);
+       awaitpid($awk_pid, \&cmd_done, \@AWK);
+       awaitpid($sort_pid, \&cmd_done, \@SORT);
+       awaitpid($comm_pid, \&cmd_done, \@COMM);
        PublicInbox::CidxComm->new($comm_rd, $self); # calls cidx_read_comm
        my $git_ver = PublicInbox::Git::git_version();
        push @PRUNE_BATCH, '--buffer' if $git_ver ge v2.6;
@@ -856,6 +1029,35 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
        for (@gone) { close $_ or die "close: $!" };
 }
 
+sub init_associate_prefork ($) {
+       my ($self) = @_;
+       return unless $self->{-opt}->{associate};
+       require PublicInbox::CidxRecvIbx;
+       require PublicInbox::CidxDumpShardRoots;
+       $self->{-pi_cfg} = PublicInbox::Config->new;
+       my @unknown;
+       my @pfx = @{$self->{-opt}->{'associate-prefixes'} // [ 'patchid' ]};
+       for (map { split(/\s*,\s*/) } @pfx) {
+               my $v = $PublicInbox::Search::PATCH_BOOL_COMMON{$_} //
+                       push(@unknown, $_);
+               push(@ASSOC_PFX, split(/ /, $v));
+       }
+       die <<EOM if @unknown;
+--associate-prefixes contains unsupported prefixes: @unknown
+EOM
+       @ASSOC_PFX = uniqstr @ASSOC_PFX;
+       my %incl = map {
+               rel2abs_collapsed($_) => undef;
+       } (@{$self->{-opt}->{include} // []});
+       $self->{-pi_cfg}->each_inbox(\&_prep_ibx, $self, \%incl);
+}
+
+sub _prep_ibx { # each_inbox callback
+       my ($ibx, $self, $incl) = @_;
+       ($self->{-opt}->{all} || exists($incl->{$ibx->{inboxdir}})) and
+               push @{$self->{IBX}}, $ibx;
+}
+
 sub cidx_run { # main entry point
        my ($self) = @_;
        my $restore_umask = prep_umask($self);
@@ -868,10 +1070,27 @@ sub cidx_run { # main entry point
        local $IDX_TODO = [];
        local $GIT_TODO = [];
        local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
-               $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, @SORT, $PRUNE_ENV);
+               $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
+               %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $IBXDIR_FEED, @ID2ROOT,
+               @DUMP_SHARD_ROOTS_OK, @RECV_IBX_OK);
        local $BATCH_BYTES = $self->{-opt}->{batch_size} //
                                $PublicInbox::SearchIdx::BATCH_BYTES;
-       local @IDX_SHARDS = cidx_init($self);
+       local @SORT = (undef, '-u');
+       local $self->{IBX} = \@IBX;
+       local $self->{ASSOC_PFX} = \@ASSOC_PFX;
+       local $self->{-pi_cfg};
+       if (grep { $_ } @{$self->{-opt}}{qw(prune associate)}) {
+               require File::Temp;
+               $TMPDIR = File::Temp->newdir('cidx-all-git-XXXX', TMPDIR => 1);
+               $CMD_ENV = { TMPDIR => "$TMPDIR", LC_ALL => 'C', LANG => 'C' };
+               require_progs('(prune|associate)', sort => \@SORT);
+               for (qw(parallel compress-program buffer-size)) { # GNU sort
+                       my $v = $self->{-opt}->{"sort-$_"};
+                       push @SORT, "--$_=$v" if defined $v;
+               }
+               init_associate_prefork($self)
+       }
+       local @IDX_SHARDS = cidx_init($self); # forks workers
        local $self->{current_info} = '';
        local $MY_SIG = {
                CHLD => \&PublicInbox::DS::enqueue_reap,
@@ -919,7 +1138,9 @@ sub cidx_run { # main entry point
                        PublicInbox::IPC::detect_nproc() || 2;
        local @RDONLY_XDB = $self->xdb_shards_flat;
        init_prune($self);
+       init_associate_postfork($self);
        scan_git_dirs($self) if $self->{-opt}->{scan} // 1;
+       index_next($self) for (1..$LIVE_JOBS);
 
        # FreeBSD ignores/discards SIGCHLD while signals are blocked and
        # EVFILT_SIGNAL is inactive, so we pretend we have a SIGCHLD pending
@@ -954,4 +1175,9 @@ sub shard_done_wait { # awaitpid cb via ipc_worker_reap
        PublicInbox::DS::enqueue_reap() if !shards_active(); # once more for PLC
 }
 
+sub do_quit { $DO_QUIT }
+
+sub tmpdir { $TMPDIR }
+
+
 1;
index 2f1b41227e9cb09f512784c09d0a2fedb65017f7..0a6b210fe2fa8ab21a3c761340961ac3d33b9ffa 100644 (file)
@@ -11,7 +11,7 @@ package PublicInbox::Config;
 use strict;
 use v5.10.1;
 use parent qw(Exporter);
-our @EXPORT_OK = qw(glob2re);
+our @EXPORT_OK = qw(glob2re rel2abs_collapsed);
 use PublicInbox::Inbox;
 use PublicInbox::Spawn qw(popen_rd);
 our $LD_PRELOAD = $ENV{LD_PRELOAD}; # only valid at startup
index 1559d9b31c0d1cbeeaebfed23a9d85606b9d4bc7..d5b0bceb19f15565253715dcf1aac9f38227dbd5 100644 (file)
@@ -197,7 +197,7 @@ sub xdir ($;$) {
        my ($self, $rdonly) = @_;
        if ($rdonly || !defined($self->{shard})) {
                $self->{xpfx};
-       } else { # v2 + extindex only:
+       } else { # v2, extindex, cindex only:
                "$self->{xpfx}/$self->{shard}";
        }
 }
index 2f7796e75faea8ec03c36908bbf7c65dd45effeb..888c8b1030c8872a86bdc0bac81af8fad4a1c5a1 100755 (executable)
@@ -26,8 +26,10 @@ See public-inbox-cindex(1) man page for full documentation.
 EOF
 my $opt = { fsync => 1, scan => 1 }; # --no-scan is hidden
 GetOptions($opt, qw(quiet|q verbose|v+ reindex jobs|j=i fsync|sync! dangerous
-               indexlevel|index-level|L=s
+               indexlevel|index-level|L=s associate associate-max=i
+               associate-date-range=s associate-prefixes=s@
                batch_size|batch-size=s max_size|max-size=s
+               include|I=s@ only=s@ all
                project-list=s exclude=s@
                sort-parallel=s sort-compress-program=s sort-buffer-size=s
                d=s update|u scan! prune dry-run|n C=s@ help|h))