]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
cindex: implement dump_roots in C++
authorEric Wong <e@80x24.org>
Thu, 24 Aug 2023 01:22:35 +0000 (01:22 +0000)
committerEric Wong <e@80x24.org>
Thu, 24 Aug 2023 07:47:53 +0000 (07:47 +0000)
It's now just `dump_roots' instead of `dump_shard_roots', since
this doesn't need to be tied to the concept of shards.  I'm
still shaky with C++, but intend to keep using stuff like
hsearch(3) to make life easier for C hackers :P

MANIFEST
lib/PublicInbox/CidxDumpShardRoots.pm [deleted file]
lib/PublicInbox/CidxXapHelperAux.pm
lib/PublicInbox/CodeSearchIdx.pm
lib/PublicInbox/XapHelper.pm
lib/PublicInbox/xap_helper.h
t/xap_helper.t

index 4f61af4215f4e683d241553469b0b529824fee6e..4bccc8493cc076fa9a8d22a6e5ee76280d4b344e 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -162,7 +162,6 @@ lib/PublicInbox/AltId.pm
 lib/PublicInbox/AutoReap.pm
 lib/PublicInbox/Cgit.pm
 lib/PublicInbox/CidxComm.pm
-lib/PublicInbox/CidxDumpShardRoots.pm
 lib/PublicInbox/CidxLogP.pm
 lib/PublicInbox/CidxRecvIbx.pm
 lib/PublicInbox/CidxXapHelperAux.pm
diff --git a/lib/PublicInbox/CidxDumpShardRoots.pm b/lib/PublicInbox/CidxDumpShardRoots.pm
deleted file mode 100644 (file)
index 34afa41..0000000
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Intended for PublicInbox::DS::event_loop for -cindex --associate
-# Iterating through mset->items is slow in Perl due to method dispatch
-# and that loop may implemented in C++ using Xapian directly
-package PublicInbox::CidxDumpShardRoots;
-use v5.12;
-use PublicInbox::Lock;
-use PublicInbox::Search qw(xap_terms);
-use Socket qw(MSG_EOR);
-
-sub start {
-       my ($cidx, $root2id, $qry_str) = @_;
-       my $op_p = delete($cidx->{0}) // die 'BUG: no {0} op_p';
-       my $sort_w = delete($cidx->{1}) // die 'BUG: no {1} $w sort pipe';
-       # sort lock is necessary if we have may root ids which cause a
-       # row length to exceed POSIX PIPE_BUF (via `$G' below)
-       my $sort_lk = bless { lock_path => $cidx->tmpdir.'/to_root_id.lock' },
-               'PublicInbox::Lock';
-       $sort_w->autoflush(1);
-       $cidx->begin_txn_lazy; # only using txn to simplify writer subs
-       my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
-       my $self = bless {
-               cidx => $cidx,
-               op_p => $op_p,
-               iter => 0,
-               mset => $cidx->mset($qry_str, $opt),
-               root2id => $root2id,
-               sort_w => $sort_w,
-               sort_lk => $sort_lk,
-       }, __PACKAGE__;
-       event_step($self);
-}
-
-sub event_step {
-       my ($self) = @_;
-       my $cidx = $self->{cidx};
-       return if $cidx->do_quit;
-       my $last = $self->{mset}->size - 1;
-       my $cur = $self->{iter};
-       my $end = $cur + 9999;
-       $end = $last if $end > $last;
-       $self->{iter} = $end + 1;
-       local $0 = "dumping shard [$cidx->{shard}] $cur..$end";
-       $cidx->progress($0);
-
-       my $root2id = $self->{root2id};
-       my $buf = '';
-       for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
-               my $doc = $x->get_document;
-               my $G = join(' ', map {
-                       $root2id->{pack('H*', $_)};
-               } xap_terms('G', $doc));
-               for my $p (@{$cidx->{ASSOC_PFX}}) {
-                       $buf .= "$_ $G\n" for (xap_terms($p, $doc));
-               }
-       }
-       $self->{sort_lk}->lock_acquire_fast;
-       print { $self->{sort_w} } $buf or die "print: $!";
-       $self->{sort_lk}->lock_release_fast;
-       $end < $last && !$cidx->do_quit and
-               PublicInbox::DS::requeue($self);
-}
-
-sub DESTROY {
-       my ($self) = @_;
-       return if $self->{cidx}->do_quit;
-       send($self->{op_p},
-               "dump_shard_roots_done $self->{cidx}->{shard}", MSG_EOR);
-}
-
-1;
index c9a5ddad27dcff30ddde3dd39429df487868fba2..f402bde0bfb1795bad3bc5d2d3312de63d9d27c2 100644 (file)
@@ -10,12 +10,8 @@ use PublicInbox::Syscall qw(EPOLLIN);
 
 # rpipe connects to req->fp[1] in xap_helper.h
 sub new {
-       my ($cls, $rpipe, $cidx, $pfx, $associate) = @_;
-       my $self = bless {
-               cidx => $cidx,
-               pfx => $pfx,
-               associate => $associate
-       }, $cls;
+       my ($cls, $rpipe, $cidx, $pfx) = @_;
+       my $self = bless { cidx => $cidx, pfx => $pfx }, $cls;
        $rpipe->blocking(0);
        $self->SUPER::new($rpipe, EPOLLIN);
 }
@@ -36,7 +32,7 @@ sub event_step {
        my @lines = split(/^/m, $buf);
        $self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n";
        for my $l (@lines) {
-               if ($l =~ /\Amset\.size=[0-9]+\n\z/) {
+               if ($l =~ /\Amset\.size=[0-9]+ nr_out=[0-9]+\n\z/) {
                        delete $self->{cidx}->{PENDING}->{$pfx};
                        $self->{cidx}->index_next;
                }
index 4a41b1dad28f61a147dc94c8d8f0df12c403b183..404d682623fe6acf8647cf398f86abe2bebda33f 100644 (file)
@@ -87,7 +87,6 @@ our (
        @AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
        @ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
        $QRY_STR, # common query string for both code and inbox associations
-       @DUMP_SHARD_ROOTS_OK, # for associate
        $DUMP_IBX_WPIPE, # goes to sort(1)
        @ID2ROOT,
 );
@@ -505,14 +504,6 @@ sub shard_commit { # via wq_io_do
        send($op_p, "shard_done $self->{shard}", MSG_EOR);
 }
 
-sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion
-       my ($self, $associate, $n) = @_;
-       return if $DO_QUIT;
-       progress($self, "dump_shard_roots [$n] done");
-       $DUMP_SHARD_ROOTS_OK[$n] = 1;
-       # may run associate()
-}
-
 sub assoc_max_init ($) {
        my ($self) = @_;
        my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX;
@@ -520,38 +511,39 @@ sub assoc_max_init ($) {
        $max < 0 ? ((2 ** 31) - 1) : $max;
 }
 
-# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2..
-sub dump_shard_roots { # via wq_io_do for associate
-       my ($self, $root2id, $qry_str) = @_;
-       PublicInbox::CidxDumpShardRoots::start($self, $root2id, $qry_str);
-}
-
 sub dump_roots_once {
        my ($self, $associate) = @_;
        $associate // die 'BUG: no $associate';
        $TODO{associating} = 1; # keep shards_active() happy
        progress($self, 'dumping IDs from coderepos');
        local $self->{xdb};
-       @ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
-       my $id = 0;
-       my %root2id = map { $_ => $id++ } @ID2ROOT;
-       # dump_shard_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
+       @ID2ROOT = $self->all_terms('G');
+       my $root2id = "$TMPDIR/root2id";
+       open my $fh, '>', $root2id or die "open($root2id): $!";
+       my $nr = -1;
+       for (@ID2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly
+       close $fh or die "close: $!";
+       # dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
        pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
        pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
        my @sort = (@SORT, '-k1,1');
        my $dst = "$TMPDIR/to_root_id";
-       open my $fh, '>', $dst or die "open($dst): $!";
+       open $fh, '>', $dst or die "open($dst): $!";
        my $env = { %$CMD_ENV, OFS => ' ' };
        my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
        my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh });
        awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
        awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(shards)'], $associate);
-       my ($c, $p) = PublicInbox::PktOp->pair;
-       $c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
-       my @arg = ('dump_shard_roots', [ $p->{op_p}, $sort_w ],
-                       \%root2id, $QRY_STR);
-       $_->wq_io_do(@arg) for @IDX_SHARDS;
-       progress($self, 'waiting on dump_shard_roots sort');
+       my @arg = ((map { ('-A', $_) } @ASSOC_PFX), '-c',
+               '-m', assoc_max_init($self), $root2id, $QRY_STR);
+       for my $d ($self->shard_dirs) {
+               pipe(my ($err_r, $err_w)) or die "pipe: $!";
+               $XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg);
+               my $desc = "dump_roots $d";
+               $self->{PENDING}->{$desc} = $associate;
+               PublicInbox::CidxXapHelperAux->new($err_r, $self, $desc);
+       }
+       progress($self, 'waiting on dump_roots sort');
 }
 
 sub dump_ibx { # sends to xap_helper.h
@@ -563,8 +555,8 @@ sub dump_ibx { # sends to xap_helper.h
        pipe(my ($r, $w)) or die "pipe: $!";
        $XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
        my $ekey = $ibx->eidx_key;
-       $self->{PENDING}->{$ekey} = undef;
-       PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate});
+       $self->{PENDING}->{$ekey} = $TODO{associate};
+       PublicInbox::CidxXapHelperAux->new($r, $self, $ekey);
 }
 
 sub dump_ibx_start {
@@ -885,8 +877,7 @@ sub associate {
        my ($self) = @_;
        return if $DO_QUIT;
        @IDX_SHARDS or return warn("# aborting on no shards\n");
-       grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
-               die "E: shards not dumped properly\n";
+       unlink("$TMPDIR/root2id");
        my @pending = keys %{$self->{PENDING}};
        die "E: pending=@pending jobs not done\n" if @pending;
        progress($self, 'associating...');
@@ -906,7 +897,7 @@ sub associate {
                my $nr = $score{$k};
                my ($ibx_id, $root) = split(/ /, $k);
                my $ekey = $IBX[$ibx_id]->eidx_key;
-               $root = unpack('H*', $ID2ROOT[$root]);
+               $root = $ID2ROOT[$root];
                progress($self, "$ekey => $root has $nr matches");
        }
        delete $TODO{associating}; # break out of shards_active()
@@ -1048,7 +1039,6 @@ sub cidx_read_comm { # via PublicInbox::CidxComm::event_step
 sub init_associate_prefork ($) {
        my ($self) = @_;
        return unless $self->{-opt}->{associate};
-       require PublicInbox::CidxDumpShardRoots;
        require PublicInbox::CidxXapHelperAux;
        require PublicInbox::XapClient;
        $self->{-pi_cfg} = PublicInbox::Config->new;
@@ -1120,7 +1110,7 @@ sub cidx_run { # main entry point
        local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
                $REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
                %TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
-               @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC, @SORT);
+               @ID2ROOT, $XH_PID, $XHC, @SORT);
        local $BATCH_BYTES = $self->{-opt}->{batch_size} //
                                $PublicInbox::SearchIdx::BATCH_BYTES;
        local $self->{ASSOC_PFX} = \@ASSOC_PFX;
index bf2f99a23093c53551c2a8ceb92ff60a49533c59..c80be81058102e2387c413625d25c969c770dfbe 100644 (file)
@@ -8,7 +8,9 @@ use Getopt::Long (); # good API even if we only use short options
 our $GLP = Getopt::Long::Parser->new;
 $GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev));
 use PublicInbox::Search qw(xap_terms);
+use PublicInbox::CodeSearch;
 use PublicInbox::IPC;
+use Fcntl qw(LOCK_UN LOCK_EX);
 my $X = \%PublicInbox::Search::X;
 our (%SRCH, %PIDS, $parent_pid);
 our $stderr = \*STDERR;
@@ -44,15 +46,63 @@ sub cmd_dump_ibx {
        my $mset = $req->{srch}->mset($qry_str, $opt);
        my $out = $req->{0};
        $out->autoflush(1);
+       my $nr = 0;
        for my $it ($mset->items) {
                my $doc = $it->get_document;
                for my $p (@pfx) {
                        for (xap_terms($p, $doc)) {
                                print $out "$_ $ibx_id\n" or die "print: $!";
+                               ++$nr;
                        }
                }
        }
-       if (my $err = $req->{1}) { say $err 'mset.size=', $mset->size }
+       if (my $err = $req->{1}) {
+               say $err 'mset.size='.$mset->size.' nr_out='.$nr
+       }
+}
+
+sub cmd_dump_roots {
+       my ($req, $root2id_file, $qry_str) = @_;
+       $qry_str // return
+               warn('usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR');
+       my @pfx = @{$req->{A}} or return warn('dump_roots requires -A PREFIX');
+       open my $fh, '<', $root2id_file or die "open($root2id_file): $!";
+       my %root2id; # record format: $OIDHEX "\0" uint32_t
+       my @x = split(/\0/, do { local $/; <$fh> } // die "readline: $!");
+       while (@x) {
+               my $oidhex = shift @x;
+               $root2id{$oidhex} = shift @x;
+       }
+       my $opt = { relevance => -1, limit => $req->{'m'},
+                       offset => $req->{o} // 0 };
+       my $mset = $req->{srch}->mset($qry_str, $opt);
+       $req->{0}->autoflush(1);
+       my $buf = '';
+       my $nr = 0;
+       for my $it ($mset->items) {
+               my $doc = $it->get_document;
+               my $G = join(' ', map { $root2id{$_} } xap_terms('G', $doc));
+               for my $p (@pfx) {
+                       for (xap_terms($p, $doc)) {
+                               $buf .= "$_ $G\n";
+                               ++$nr;
+                       }
+               }
+               if (!($nr & 0x3fff)) {
+                       flock($fh, LOCK_EX) or die "flock: $!";
+                       print { $req->{0} } $buf or die "print: $!";
+                       flock($fh, LOCK_UN) or die "flock: $!";
+                       $buf = '';
+               }
+       }
+       if ($buf ne '') {
+               flock($fh, LOCK_EX) or die "flock: $!";
+               print { $req->{0} } $buf or die "print: $!";
+               flock($fh, LOCK_UN) or die "flock: $!";
+       }
+       if (my $err = $req->{1}) {
+               say $err 'mset.size='.$mset->size.' nr_out='.$nr
+       }
 }
 
 sub dispatch {
index 52db92b7b4e0c15b2a291855c57d545aa5db8a00..c9b4e0cc490baa8008af938be0a71cb986d0c823 100644 (file)
@@ -6,12 +6,16 @@
  * this is not linked to Perl in any way.
  * C (not C++) is used as much as possible to lower the contribution
  * barrier for hackers who mainly know C (this includes the maintainer).
+ * Yes, that means we use C stdlib stuff like hsearch and open_memstream
+ * instead their equivalents in the C++ stdlib :P
  * Everything here is an unstable internal API of public-inbox and
  * NOT intended for ordinary users; only public-inbox hackers
  */
 #ifndef _ALL_SOURCE
 #      define _ALL_SOURCE
 #endif
+#include <sys/file.h>
+#include <sys/mman.h>
 #include <sys/resource.h>
 #include <sys/socket.h>
 #include <sys/stat.h>
@@ -80,6 +84,7 @@ struct req { // argv and pfxv point into global rbuf
        unsigned long long max;
        unsigned long long off;
        unsigned long timeout_sec;
+       size_t nr_out;
        long sort_col; // value column, negative means BoolWeight
        int argc;
        int pfxc;
@@ -96,6 +101,28 @@ struct worker {
        unsigned nr;
 };
 
+#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
+static size_t split2argv(char **dst, char *buf, size_t len, size_t limit)
+{
+       if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
+               warnx("bogus argument given");
+               return 0;
+       }
+       size_t nr = 0;
+       char *c = buf;
+       for (size_t i = 1; i < len; i++) {
+               if (!buf[i]) {
+                       dst[nr++] = c;
+                       c = buf + i + 1;
+               }
+               if (nr == limit) {
+                       warnx("too many args: %zu", nr);
+                       return 0;
+               }
+       }
+       return (long)nr;
+}
+
 static bool has_threadid(const struct srch *srch)
 {
        return srch->db->get_metadata("has_threadid") == "1";
@@ -118,8 +145,12 @@ static Xapian::Enquire prep_enquire(const struct req *req)
 
 static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
 {
-       if (!req->max)
-               req->max = 50;
+       if (!req->max) {
+               switch (sizeof(Xapian::doccount)) {
+               case 4: req->max = UINT_MAX; break;
+               default: req->max = ULLONG_MAX;
+               }
+       }
        for (int i = 0; i < 9; i++) {
                try {
                        Xapian::MSet mset = enq->get_mset(req->off, req->max);
@@ -131,13 +162,13 @@ static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
        return enq->get_mset(req->off, req->max);
 }
 
+// for v1, v2, and extindex
 static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
 {
        struct srch *srch = req->srch;
        Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
        if (req->Oeidx_key) {
                req->Oeidx_key[0] = 'O'; // modifies static rbuf
-               fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key);
                qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
                                        Xapian::Query(req->Oeidx_key));
        }
@@ -150,6 +181,21 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
        return enquire_mset(req, &enq);
 }
 
+// for cindex
+static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
+{
+       struct srch *srch = req->srch;
+       Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
+       // TODO: git_dir + roots_filter
+
+       // we only want commits:
+       qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+                               Xapian::Query("T" "c"));
+       Xapian::Enquire enq = prep_enquire(req);
+       enq.set_query(qry);
+       return enquire_mset(req, &enq);
+}
+
 static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
 {
        return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
@@ -165,9 +211,11 @@ static void dump_ibx_term(struct req *req, const char *pfx,
        for (cur.skip_to(pfx); cur != end; cur++) {
                std::string tn = *cur;
 
-               if (starts_with(&tn, pfx, pfx_len))
+               if (starts_with(&tn, pfx, pfx_len)) {
                        fprintf(req->fp[0], "%s %s\n",
                                tn.c_str() + pfx_len, ibx_id);
+                       ++req->nr_out;
+               }
        }
 }
 
@@ -194,7 +242,6 @@ static bool cmd_dump_ibx(struct req *req)
        }
        req->asc = true;
        req->sort_col = -1;
-       req->max = (unsigned long long)req->srch->db->get_doccount();
        Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
        for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
                try {
@@ -208,8 +255,244 @@ static bool cmd_dump_ibx(struct req *req)
                }
        }
        if (req->fp[1])
-               fprintf(req->fp[1], "mset.size=%llu\n",
-                       (unsigned long long)mset.size());
+               fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+                       (unsigned long long)mset.size(), req->nr_out);
+       return true;
+}
+
+struct fbuf {
+       FILE *fp;
+       char *ptr;
+       size_t len;
+};
+
+struct dump_roots_tmp {
+       struct stat sb;
+       void *mm_ptr;
+       char **entries;
+       struct fbuf wbuf;
+       int root2id_fd;
+};
+
+#define CLEANUP_FBUF __attribute__((__cleanup__(fbuf_ensure)))
+static void fbuf_ensure(void *ptr)
+{
+       struct fbuf *fbuf = (struct fbuf *)ptr;
+       if (fbuf->fp && fclose(fbuf->fp))
+               perror("fclose(fbuf->fp)");
+       fbuf->fp = NULL;
+       free(fbuf->ptr);
+}
+
+static bool fbuf_init(struct fbuf *fbuf)
+{
+       assert(!fbuf->ptr);
+       fbuf->fp = open_memstream(&fbuf->ptr, &fbuf->len);
+       if (fbuf->fp) return true;
+       perror("open_memstream(fbuf)");
+       return false;
+}
+
+static void xclose(int fd)
+{
+       if (close(fd) < 0 && errno != EINTR)
+               err(EXIT_FAILURE, "BUG: close");
+}
+
+#define CLEANUP_DUMP_ROOTS __attribute__((__cleanup__(dump_roots_ensure)))
+static void dump_roots_ensure(void *ptr)
+{
+       struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr;
+       if (drt->root2id_fd >= 0)
+               xclose(drt->root2id_fd);
+       hdestroy(); // idempotent
+       if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size))
+               err(EXIT_FAILURE, "BUG: munmap");
+       free(drt->entries);
+       fbuf_ensure(&drt->wbuf);
+}
+
+static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt,
+                       Xapian::Document *doc)
+{
+       if (!fbuf_init(root_ids)) return false;
+
+       bool ok = true;
+       Xapian::TermIterator cur = doc->termlist_begin();
+       Xapian::TermIterator end = doc->termlist_end();
+       ENTRY e, *ep;
+       for (cur.skip_to("G"); cur != end; cur++) {
+               std::string tn = *cur;
+               if (!starts_with(&tn, "G", 1))
+                       continue;
+               union { const char *in; char *out; } u;
+               u.in = tn.c_str() + 1;
+               e.key = u.out;
+               ep = hsearch(e, FIND);
+               if (!ep) {
+                       warnx("hsearch miss `%s'", e.key);
+                       return false;
+               }
+               // ep->data is a NUL-terminated string matching /[0-9]+/
+               fputc(' ', root_ids->fp);
+               fputs((const char *)ep->data, root_ids->fp);
+       }
+       fputc('\n', root_ids->fp);
+       if (ferror(root_ids->fp) | fclose(root_ids->fp)) {
+               perror("ferror|fclose(root_ids)");
+               ok = false;
+       }
+       root_ids->fp = NULL;
+       return ok;
+}
+
+// writes term values matching @pfx for a given @doc, ending the line
+// with the contents of @root_ids
+static void dump_roots_term(struct req *req, const char *pfx,
+                               struct dump_roots_tmp *drt,
+                               struct fbuf *root_ids,
+                               Xapian::Document *doc)
+{
+       Xapian::TermIterator cur = doc->termlist_begin();
+       Xapian::TermIterator end = doc->termlist_end();
+       size_t pfx_len = strlen(pfx);
+
+       for (cur.skip_to(pfx); cur != end; cur++) {
+               std::string tn = *cur;
+               if (!starts_with(&tn, pfx, pfx_len))
+                       continue;
+               fputs(tn.c_str() + pfx_len, drt->wbuf.fp);
+               fwrite(root_ids->ptr, root_ids->len, 1, drt->wbuf.fp);
+               ++req->nr_out;
+       }
+}
+
+// we may have lines which exceed PIPE_BUF, so we do our own
+// buffering and rely on flock(2), here
+static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
+{
+       char *p;
+       int fd = fileno(req->fp[0]);
+       bool ok = true;
+
+       if (!drt->wbuf.fp) return true;
+       if (fd < 0) err(EXIT_FAILURE, "BUG: fileno");
+       if (fclose(drt->wbuf.fp)) {
+               warn("fclose(drt->wbuf.fp)"); // malloc failure?
+               return false;
+       }
+       drt->wbuf.fp = NULL;
+       if (!drt->wbuf.len) goto done_free;
+       if (flock(drt->root2id_fd, LOCK_EX)) {
+               perror("LOCK_EX");
+               return false;
+       }
+       p = drt->wbuf.ptr;
+       do {
+               ssize_t n = write(fd, p, drt->wbuf.len);
+               if (n > 0) {
+                       drt->wbuf.len -= n;
+                       p += n;
+               } else {
+                       perror(n ? "write" : "write (zero bytes)");
+                       return false;
+               }
+       } while (drt->wbuf.len);
+       if (flock(drt->root2id_fd, LOCK_UN)) {
+               perror("LOCK_UN");
+               return false;
+       }
+done_free:
+       free(drt->wbuf.ptr);
+       drt->wbuf.ptr = NULL;
+       return ok;
+}
+
+static bool cmd_dump_roots(struct req *req)
+{
+       CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 };
+       if ((optind + 1) >= req->argc) {
+               warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
+               return false; // need file + qry_str
+       }
+       if (!req->pfxc) {
+               warnx("dump_roots requires -A PREFIX");
+               return false;
+       }
+       const char *root2id_file = req->argv[optind];
+       drt.root2id_fd = open(root2id_file, O_RDONLY);
+       if (drt.root2id_fd < 0) {
+               warn("open(%s)", root2id_file);
+               return false;
+       }
+       if (fstat(drt.root2id_fd, &drt.sb)) {
+               warn("fstat(%s)", root2id_file);
+               return false;
+       }
+       // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
+       // so /32 overestimates the number of expected entries by
+       // ~%25 (as recommended by Linux hcreate(3) manpage)
+       size_t est = (drt.sb.st_size / 32) + 1;
+       if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) {
+               warnx("%s size too big (%lld bytes > %zu)", root2id_file,
+                       (long long)drt.sb.st_size, SIZE_MAX);
+               return false;
+       }
+       drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ,
+                               MAP_PRIVATE, drt.root2id_fd, 0);
+       if (drt.mm_ptr == MAP_FAILED) {
+               warn("mmap(%s)", root2id_file);
+               return false;
+       }
+       drt.entries = (char **)calloc(est * 2, sizeof(char *));
+       if (!drt.entries) {
+               warn("calloc(%zu * 2, %zu)", est, sizeof(char *));
+               return false;
+       }
+       size_t tot = split2argv(drt.entries, (char *)drt.mm_ptr,
+                               drt.sb.st_size, est * 2);
+       if (tot <= 0) return false; // split2argv already warned on error
+       if (!hcreate(est)) {
+               warn("hcreate(%zu)", est);
+               return false;
+       }
+       for (size_t i = 0; i < tot; ) {
+               ENTRY e;
+               e.key = drt.entries[i++];
+               e.data = drt.entries[i++];
+               if (!hsearch(e, ENTER)) {
+                       warn("hsearch(%s => %s, ENTER)", e.key,
+                               (const char *)e.data);
+                       return false;
+               }
+       }
+       req->asc = true;
+       req->sort_col = -1;
+       Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
+       for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+               CLEANUP_FBUF struct fbuf root_ids = { 0 };
+               if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf))
+                       return false;
+               try {
+                       Xapian::Document doc = i.get_document();
+                       if (!root2ids_str(&root_ids, &drt, &doc))
+                               return false;
+                       for (int p = 0; p < req->pfxc; p++)
+                               dump_roots_term(req, req->pfxv[p], &drt,
+                                               &root_ids, &doc);
+               } catch (const Xapian::Error & e) {
+                       fprintf(orig_err, "W: %s (#%ld)\n",
+                               e.get_description().c_str(), (long)(*i));
+                       continue;
+               }
+               if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt))
+                       return false;
+       }
+       if (!dump_roots_flush(req, &drt))
+               return false;
+       if (req->fp[1])
+               fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+                       (unsigned long long)mset.size(), req->nr_out);
        return true;
 }
 
@@ -228,7 +511,8 @@ static const struct cmd_entry {
        cmd fn;
 } cmds[] = { // should be small enough to not need bsearch || gperf
        // most common commands first
-       CMD(dump_ibx),
+       CMD(dump_ibx), // many inboxes
+       CMD(dump_roots), // per-cidx shard
        CMD(test_inspect), // least common commands last
 };
 
@@ -240,12 +524,6 @@ union my_cmsg {
        char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
 };
 
-static void xclose(int fd)
-{
-       if (close(fd) < 0 && errno != EINTR)
-               err(EXIT_FAILURE, "BUG: close");
-}
-
 static bool recv_req(struct req *req, char *rbuf, size_t *len)
 {
        union my_cmsg cmsg = { 0 };
@@ -306,28 +584,6 @@ static bool recv_req(struct req *req, char *rbuf, size_t *len)
        return false;
 }
 
-#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
-static int split2argv(char **dst, char *buf, size_t len, size_t limit)
-{
-       if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
-               warnx("bogus argument given");
-               return 0;
-       }
-       size_t nr = 0;
-       char *c = buf;
-       for (size_t i = 1; i < len; i++) {
-               if (!buf[i]) {
-                       dst[nr++] = c;
-                       c = buf + i + 1;
-               }
-               if (nr == limit) {
-                       warnx("too many args: %zu", nr);
-                       return 0;
-               }
-       }
-       return (int)nr;
-}
-
 static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
 {
        const struct srch *a = (const struct srch *)pa;
@@ -355,7 +611,7 @@ static bool srch_init(struct req *req)
        char *dirv[MY_ARG_MAX];
        int i;
        struct srch *srch = req->srch;
-       int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
+       int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
        const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
        srch->qp_flags = FLAG_PHRASE |
                        Xapian::QueryParser::FLAG_BOOLEAN |
@@ -538,7 +794,7 @@ static void recv_loop(void) // worker process loop
                                perror("W: setlinebuf(req.fp[1])");
                        stderr = req.fp[1];
                }
-               req.argc = SPLIT2ARGV(req.argv, rbuf, len);
+               req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
                if (req.argc > 0)
                        dispatch(&req);
                if (ferror(req.fp[0]) | fclose(req.fp[0]))
index f00a845affede7a8b2b62249424e162add4addb6..92da2e6d1edcf23a572d85bbabe86a6c8ee5789b 100644 (file)
@@ -91,7 +91,7 @@ my $test = sub {
        my $res = do { local $/; <$r> };
        is(join('', @res), $res, 'got identical response w/ error pipe');
        my $stats = do { local $/; <$err_rd> };
-       is($stats, "mset.size=6\n", 'mset.size reported');
+       is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported');
 
        if ($arg[-1] !~ /\('-j0'\)/) {
                kill('KILL', $cinfo{pid});
@@ -105,12 +105,14 @@ my $test = sub {
 };
 my $ar;
 
-$ar = $test->(qw[-MPublicInbox::XapHelper -e
-               PublicInbox::XapHelper::start('-j0')]);
-$ar = $test->(qw[-MPublicInbox::XapHelper -e
-               PublicInbox::XapHelper::start('-j1')]);
-
-my @NO_CXX = (0);
+my @NO_CXX;
+if (!$ENV{TEST_XH_CXX_ONLY}) {
+       $ar = $test->(qw[-MPublicInbox::XapHelper -e
+                       PublicInbox::XapHelper::start('-j0')]);
+       $ar = $test->(qw[-MPublicInbox::XapHelper -e
+                       PublicInbox::XapHelper::start('-j1')]);
+       push @NO_CXX, 0;
+}
 SKIP: {
        eval {
                require PublicInbox::XapHelperCxx;
@@ -125,6 +127,20 @@ SKIP: {
                        PublicInbox::XapHelperCxx::start('-j1')]);
 };
 
+require PublicInbox::CodeSearch;
+my $cs_int = PublicInbox::CodeSearch->new("$crepo/public-inbox-cindex");
+my $root2id_file = "$tmp/root2id";
+my @id2root;
+{
+       open my $fh, '>', $root2id_file;
+       my $i = -1;
+       for ($cs_int->all_terms('G')) {
+               print $fh $_, "\0", ++$i, "\0";
+               $id2root[$i] = $_;
+       }
+       close $fh;
+}
+
 for my $n (@NO_CXX) {
        local $ENV{PI_NO_CXX} = $n;
        my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');
@@ -141,7 +157,19 @@ for my $n (@NO_CXX) {
        my $res = do { local $/; <$r> };
        is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
        my $err = do { local $/; <$err_r> };
-       is($err, "mset.size=1\n", "got expected status ($xhc->{impl})");
+       is($err, "mset.size=1 nr_out=2\n", "got expected status ($xhc->{impl})");
+
+       pipe($err_r, $err_w);
+       $r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A XDFID),
+                       (map { ('-d', $_) } @int),
+                       $root2id_file, 'dt:19700101'.'000000..');
+       close $err_w;
+       my @res = <$r>;
+       is(scalar(@res), 5, 'got expected rows');
+       is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)),
+               'entries match format');
+       $err = do { local $/; <$err_r> };
+       is($err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})");
 }
 
 done_testing;