]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
xap_helper: implement mset endpoint for WWW, IMAP, etc...
authorEric Wong <e@80x24.org>
Tue, 28 Nov 2023 14:56:19 +0000 (14:56 +0000)
committerEric Wong <e@80x24.org>
Wed, 29 Nov 2023 02:13:20 +0000 (02:13 +0000)
The C++ version will allow us to take full advantage of Xapian's
APIs for better queries, and the Perl bindings version can still
be advantageous in the future since we'll be able to support
timeouts effectively.

MANIFEST
Makefile.PL
lib/PublicInbox/Search.pm
lib/PublicInbox/XapHelper.pm
lib/PublicInbox/XapHelperCxx.pm
lib/PublicInbox/xap_helper.h
lib/PublicInbox/xh_cidx.h
lib/PublicInbox/xh_mset.h [new file with mode: 0644]
t/cindex.t
t/xap_helper.t

index bbbe0b9195405ef06423b4adb33a1f2e508dc4cb..7b6178f9101d27fa05828e1ecf25eebafc78ecb4 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -379,6 +379,7 @@ lib/PublicInbox/Xapcmd.pm
 lib/PublicInbox/gcf2_libgit2.h
 lib/PublicInbox/xap_helper.h
 lib/PublicInbox/xh_cidx.h
+lib/PublicInbox/xh_mset.h
 sa_config/Makefile
 sa_config/README
 sa_config/root/etc/spamassassin/public-inbox.pre
index 38e030f556edba24b42efd8f1b33b5ddc74b10fd..28f8263e4d7bb694ddf8599a5c60c089b8f363bc 100644 (file)
@@ -273,14 +273,16 @@ pm_to_blib : lib/PublicInbox.pm
 lib/PublicInbox.pm : FORCE
        VERSION=\$(VERSION) \$(PERL) -w ./version-gen.perl
 
+XH_TESTS = t/xap_helper.t t/cindex.t
+
 test-asan : pure_all
-       TEST_XH_CXX_ONLY=1 CXXFLAGS='-O0 -Wall -ggdb3 -fsanitize=address' \\
-               prove -bvw t/xap_helper.t
+       TEST_XH_CXX_ONLY=1 CXXFLAGS='-Wall -ggdb3 -fsanitize=address' \\
+               prove -bvw \$(XH_TESTS)
 
 VG_OPT = -v --trace-children=yes --track-fds=yes
 VG_OPT += --leak-check=yes --track-origins=yes
 test-valgrind : pure_all
        TEST_XH_CXX_ONLY=1 VALGRIND="valgrind \$(VG_OPT)" \\
-               prove -bvw t/xap_helper.t
+               prove -bvw \$(XH_TESTS)
 EOF
 }
index 477f77dcffebcd87f6bf829ec87290699ce5cc07..6145b027edb7cd40ffad3718346ba1d1d2209421 100644 (file)
@@ -76,6 +76,25 @@ our @MAIL_VMAP = (
 );
 our @MAIL_NRP;
 
+# Getopt::Long spec, only short options for portability in C++ implementation
+our @XH_SPEC = (
+       'a', # ascending sort
+       'c', # code search
+       'd=s@', # shard dirs
+       'g=s', # git dir (with -c)
+       'k=i', # sort column (like sort(1))
+       'm=i', # maximum number of results
+       'o=i', # offset
+       'p', # show percent
+       'r', # 1=relevance then column
+       't', # collapse threads
+       'A=s@', # prefixes
+       'D', # emit docdata
+       'K=i', # timeout kill after i seconds
+       'O=s', # eidx_key
+       'T=i', # threadid
+);
+
 sub load_xapian () {
        return 1 if defined $Xap;
        # n.b. PI_XAPIAN is intended for development use only
@@ -247,6 +266,12 @@ sub mdocid {
        int(($docid - 1) / $nshard) + 1;
 }
 
+sub docids_to_artnums {
+       my $nshard = shift->{nshard};
+       # XXX does array vs arrayref make a difference in modern Perls?
+       map { int(($_ - 1) / $nshard) + 1 } @_;
+}
+
 sub mset_to_artnums {
        my ($self, $mset) = @_;
        my $nshard = $self->{nshard};
index fe831b8f6b6819638857fe4ea99414cf41603e8a..b21e70a29028108ae34d2b446ffb75507465f49c 100644 (file)
@@ -21,21 +21,6 @@ my $X = \%PublicInbox::Search::X;
 our (%SRCH, %WORKERS, $nworker, $workerset, $in);
 our $stderr = \*STDERR;
 
-# only short options for portability in C++ implementation
-our @SPEC = (
-       'a', # ascending sort
-       'c', # code search
-       'd=s@', # shard dirs
-       'k=i', # sort column (like sort(1))
-       'm=i', # maximum number of results
-       'o=i', # offset
-       'r', # 1=relevance then column
-       't', # collapse threads
-       'A=s@', # prefixes
-       'O=s', # eidx_key
-       'T=i', # timeout in seconds
-);
-
 sub cmd_test_inspect {
        my ($req) = @_;
        print { $req->{0} } "pid=$$ has_threadid=",
@@ -144,10 +129,44 @@ sub cmd_dump_roots {
        emit_mset_stats($req, $mset);
 }
 
+sub mset_iter ($$) {
+       my ($req, $it) = @_;
+       eval {
+               my $buf = $it->get_docid;
+               $buf .= "\0".$it->get_percent if $req->{p};
+               my $doc = ($req->{A} || $req->{D}) ? $it->get_document : undef;
+               for my $p (@{$req->{A}}) {
+                       $buf .= "\0".$p.$_ for xap_terms($p, $doc);
+               }
+               $buf .= "\0".$doc->get_data if $req->{D};
+               say { $req->{0} } $buf;
+       };
+       $@ ? iter_retry_check($req) : 0;
+}
+
+sub cmd_mset { # to be used by WWW + IMAP
+       my ($req, $qry_str) = @_;
+       $qry_str // die 'usage: mset [OPTIONS] QRY_STR';
+       my $opt = { limit => $req->{'m'}, offset => $req->{o} // 0 };
+       $opt->{relevance} = 1 if $req->{r};
+       $opt->{threads} = 1 if defined $req->{t};
+       $opt->{git_dir} = $req->{g} if defined $req->{g};
+       $opt->{eidx_key} = $req->{O} if defined $req->{O};
+       $opt->{threadid} = $req->{T} if defined $req->{T};
+       my $mset = $req->{srch}->mset($qry_str, $opt);
+       say { $req->{0} } 'mset.size=', $mset->size;
+       for my $it ($mset->items) {
+               for (my $t = 10; $t > 0; --$t) {
+                       $t = mset_iter($req, $it) // $t;
+               }
+       }
+}
+
 sub dispatch {
        my ($req, $cmd, @argv) = @_;
        my $fn = $req->can("cmd_$cmd") or return;
-       $GLP->getoptionsfromarray(\@argv, $req, @SPEC) or return;
+       $GLP->getoptionsfromarray(\@argv, $req, @PublicInbox::Search::XH_SPEC)
+               or return;
        my $dirs = delete $req->{d} or die 'no -d args';
        my $key = join("\0", @$dirs);
        $req->{srch} = $SRCH{$key} //= do {
index 8a66fdcd0e77e1932d1049fb85139fe387d7d335..1aa75f2a9cdc98170110c380e988ff5f2e8e618d 100644 (file)
@@ -20,13 +20,15 @@ $ENV{PERL_INLINE_DIRECTORY} // die('BUG: PERL_INLINE_DIRECTORY unset');
 substr($dir, 0, 0) = "$ENV{PERL_INLINE_DIRECTORY}/";
 my $bin = "$dir/xap_helper";
 my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!);
-my @srcs = map { $srcpfx.$_ } qw(xap_helper.h xh_cidx.h);
+my @srcs = map { $srcpfx.$_ } qw(xh_mset.h xh_cidx.h xap_helper.h);
 my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm);
 my $ldflags = '-Wl,-O1';
 $ldflags .= ' -Wl,--compress-debug-sections=zlib' if $^O ne 'openbsd';
 my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -pipe') . ' ' .
        ' -DTHREADID=' . PublicInbox::Search::THREADID .
-       ' ' . ($ENV{LDFLAGS} // $ldflags);
+       ' -DXH_SPEC="'.join('',
+               map { s/=.*/:/; $_ } @PublicInbox::Search::XH_SPEC) . '" ' .
+       ($ENV{LDFLAGS} // $ldflags);
 my $xap_modversion;
 
 sub xap_cfg (@) {
index 89d151d97badde692d8a187c7433ee5c1a367711..1866556740bbf127e4a5998bfe24c6d8f15e2db4 100644 (file)
@@ -124,10 +124,12 @@ struct req { // argv and pfxv point into global rbuf
        char *argv[MY_ARG_MAX];
        char *pfxv[MY_ARG_MAX]; // -A <prefix>
        struct srch *srch;
+       char *Pgit_dir;
        char *Oeidx_key;
        cmd fn;
        unsigned long long max;
        unsigned long long off;
+       unsigned long long threadid;
        unsigned long timeout_sec;
        size_t nr_out;
        long sort_col; // value column, negative means BoolWeight
@@ -138,6 +140,8 @@ struct req { // argv and pfxv point into global rbuf
        bool collapse_threads;
        bool code_search;
        bool relevance; // sort by relevance before column
+       bool emit_percent;
+       bool emit_docdata;
        bool asc; // ascending sort
 };
 
@@ -230,12 +234,53 @@ static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
        return enquire_mset(req, &enq);
 }
 
+static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
+{
+       return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
+}
+
+static void apply_roots_filter(struct req *req, Xapian::Query *qry)
+{
+       if (!req->Pgit_dir) return;
+       req->Pgit_dir[0] = 'P'; // modifies static rbuf
+       Xapian::Database *xdb = req->srch->db;
+       for (int i = 0; i < 9; i++) {
+               try {
+                       std::string P = req->Pgit_dir;
+                       Xapian::PostingIterator p = xdb->postlist_begin(P);
+                       if (p == xdb->postlist_end(P)) {
+                               warnx("W: %s not indexed?", req->Pgit_dir + 1);
+                               return;
+                       }
+                       Xapian::TermIterator cur = xdb->termlist_begin(*p);
+                       Xapian::TermIterator end = xdb->termlist_end(*p);
+                       cur.skip_to("G");
+                       if (cur == end) {
+                               warnx("W: %s has no root commits?",
+                                       req->Pgit_dir + 1);
+                               return;
+                       }
+                       Xapian::Query f = Xapian::Query(*cur);
+                       for (++cur; cur != end; ++cur) {
+                               std::string tn = *cur;
+                               if (!starts_with(&tn, "G", 1))
+                                       continue;
+                               f = Xapian::Query(Xapian::Query::OP_OR, f, tn);
+                       }
+                       *qry = Xapian::Query(Xapian::Query::OP_FILTER, *qry, f);
+                       return;
+               } catch (const Xapian::DatabaseModifiedError & e) {
+                       xdb->reopen();
+               }
+       }
+}
+
 // for cindex
 static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
 {
        struct srch *srch = req->srch;
        Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
-       // TODO: git_dir + roots_filter
+       apply_roots_filter(req, &qry);
 
        // we only want commits:
        qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
@@ -254,11 +299,6 @@ static void emit_mset_stats(struct req *req, const Xapian::MSet *mset)
                ABORT("BUG: %s caller only passed 1 FD", req->argv[0]);
 }
 
-static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
-{
-       return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
-}
-
 static int my_setlinebuf(FILE *fp) // glibc setlinebuf(3) can't report errors
 {
        return setvbuf(fp, NULL, _IOLBF, 0);
@@ -284,6 +324,32 @@ static void fbuf_init(struct fbuf *fbuf)
        if (!fbuf->fp) err(EXIT_FAILURE, "open_memstream(fbuf)");
 }
 
+static bool write_all(int fd, const struct fbuf *wbuf, size_t len)
+{
+       const char *p = wbuf->ptr;
+       assert(wbuf->len >= len);
+       do { // write to client FD
+               ssize_t n = write(fd, p, len);
+               if (n > 0) {
+                       len -= n;
+                       p += n;
+               } else {
+                       perror(n ? "write" : "write (zero bytes)");
+                       return false;
+               }
+       } while (len);
+       return true;
+}
+
+#define ERR_FLUSH(f) do { \
+       if (ferror(f) | fflush(f)) err(EXIT_FAILURE, "ferror|fflush "#f); \
+} while (0)
+
+#define ERR_CLOSE(f, e) do { \
+       if (ferror(f) | fclose(f)) \
+               e ? err(e, "ferror|fclose "#f) : perror("ferror|fclose "#f); \
+} while (0)
+
 static void xclose(int fd)
 {
        if (close(fd) < 0 && errno != EINTR)
@@ -339,6 +405,7 @@ static bool cmd_test_inspect(struct req *req)
        return false;
 }
 
+#include "xh_mset.h" // read-only (WWW, IMAP, lei) stuff
 #include "xh_cidx.h" // CodeSearchIdx.pm stuff
 
 #define CMD(n) { .fn_len = sizeof(#n) - 1, .fn_name = #n, .fn = cmd_##n }
@@ -348,6 +415,7 @@ static const struct cmd_entry {
        cmd fn;
 } cmds[] = { // should be small enough to not need bsearch || gperf
        // most common commands first
+       CMD(mset), // WWW and IMAP requests
        CMD(dump_ibx), // many inboxes
        CMD(dump_roots), // per-cidx shard
        CMD(test_inspect), // least common commands last
@@ -520,7 +588,7 @@ static void dispatch(struct req *req)
        char *end;
        FILE *kfp;
        struct srch **s;
-       req->fn = NULL;
+       req->threadid = ULLONG_MAX;
        for (c = 0; c < (int)MY_ARRAY_SIZE(cmds); c++) {
                if (cmds[c].fn_len == size &&
                        !memcmp(cmds[c].fn_name, req->argv[0], size)) {
@@ -540,12 +608,13 @@ static void dispatch(struct req *req)
        optarg = NULL;
        MY_DO_OPTRESET();
 
-       // keep sync with @PublicInbox::XapHelper::SPEC
-       while ((c = getopt(req->argc, req->argv, "acd:k:m:o:rtA:O:T:")) != -1) {
+       // XH_SPEC is generated from @PublicInbox::Search::XH_SPEC
+       while ((c = getopt(req->argc, req->argv, XH_SPEC)) != -1) {
                switch (c) {
                case 'a': req->asc = true; break;
                case 'c': req->code_search = true; break;
                case 'd': fwrite(optarg, strlen(optarg) + 1, 1, kfp); break;
+               case 'g': req->Pgit_dir = optarg - 1; break; // pad "P" prefix
                case 'k':
                        req->sort_col = strtol(optarg, &end, 10);
                        if (*end) ABORT("-k %s", optarg);
@@ -563,6 +632,7 @@ static void dispatch(struct req *req)
                        if (*end || req->off == ULLONG_MAX)
                                ABORT("-o %s", optarg);
                        break;
+               case 'p': req->emit_percent = true; break;
                case 'r': req->relevance = true; break;
                case 't': req->collapse_threads = true; break;
                case 'A':
@@ -570,17 +640,22 @@ static void dispatch(struct req *req)
                        if (MY_ARG_MAX == req->pfxc)
                                ABORT("too many -A");
                        break;
-               case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix
-               case 'T':
+               case 'D': req->emit_docdata = true; break;
+               case 'K':
                        req->timeout_sec = strtoul(optarg, &end, 10);
                        if (*end || req->timeout_sec == ULONG_MAX)
+                               ABORT("-K %s", optarg);
+                       break;
+               case 'O': req->Oeidx_key = optarg - 1; break; // pad "O" prefix
+               case 'T':
+                       req->threadid = strtoull(optarg, &end, 10);
+                       if (*end || req->threadid == ULLONG_MAX)
                                ABORT("-T %s", optarg);
                        break;
                default: ABORT("bad switch `-%c'", c);
                }
        }
-       if (ferror(kfp) | fclose(kfp)) /* sets kbuf.srch */
-               err(EXIT_FAILURE, "ferror|fclose"); // likely ENOMEM
+       ERR_CLOSE(kfp, EXIT_FAILURE); // may ENOMEM, sets kbuf.srch
        kbuf.srch->db = NULL;
        kbuf.srch->qp = NULL;
        kbuf.srch->paths_len = size - offsetof(struct srch, paths);
@@ -639,8 +714,7 @@ static void stderr_restore(FILE *tmp_err)
        stderr = orig_err;
        return;
 #endif
-       if (ferror(stderr) | fflush(stderr))
-               err(EXIT_FAILURE, "ferror|fflush stderr");
+       ERR_CLOSE(stderr, EXIT_FAILURE);
        while (dup2(orig_err_fd, STDERR_FILENO) < 0) {
                if (errno != EINTR)
                        err(EXIT_FAILURE, "dup2(%d => 2)", orig_err_fd);
@@ -670,12 +744,10 @@ static void recv_loop(void) // worker process loop
                        stderr_set(req.fp[1]);
                req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
                dispatch(&req);
-               if (ferror(req.fp[0]) | fclose(req.fp[0]))
-                       perror("ferror|fclose fp[0]");
+               ERR_CLOSE(req.fp[0], 0);
                if (req.fp[1]) {
                        stderr_restore(req.fp[1]);
-                       if (ferror(req.fp[1]) | fclose(req.fp[1]))
-                               perror("ferror|fclose fp[1]");
+                       ERR_CLOSE(req.fp[1], 0);
                }
        }
 }
index c2d9416266c958d286a48d92908f090adb620079..1980f9f60014d52d6eb182536905dc324e7ec434 100644 (file)
@@ -107,8 +107,7 @@ static bool root2offs_str(struct fbuf *root_offs, Xapian::Document *doc)
                fputs((const char *)ep->data, root_offs->fp);
        }
        fputc('\n', root_offs->fp);
-       if (ferror(root_offs->fp) | fclose(root_offs->fp))
-               err(EXIT_FAILURE, "ferror|fclose(root_offs)"); // ENOMEM
+       ERR_CLOSE(root_offs->fp, EXIT_FAILURE); // ENOMEM
        root_offs->fp = NULL;
        return true;
 }
@@ -138,38 +137,24 @@ static void dump_roots_term(struct req *req, const char *pfx,
 // buffering and rely on flock(2), here
 static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
 {
-       char *p;
-       int fd = fileno(req->fp[0]);
        bool ok = true;
+       off_t off = ftello(drt->wbuf.fp);
+       if (off < 0) EABORT("ftello");
+       if (!off) return ok;
+
+       ERR_FLUSH(drt->wbuf.fp); // ENOMEM
+       int fd = fileno(req->fp[0]);
 
-       if (!drt->wbuf.fp) return true;
-       if (fd < 0) EABORT("BUG: fileno");
-       if (ferror(drt->wbuf.fp) | fclose(drt->wbuf.fp)) // ENOMEM?
-               err(EXIT_FAILURE, "ferror|fclose(drt->wbuf.fp)");
-       drt->wbuf.fp = NULL;
-       if (!drt->wbuf.len) goto done_free;
        while (flock(drt->root2off_fd, LOCK_EX)) {
                if (errno == EINTR) continue;
                err(EXIT_FAILURE, "LOCK_EX"); // ENOLCK?
        }
-       p = drt->wbuf.ptr;
-       do { // write to client FD
-               ssize_t n = write(fd, p, drt->wbuf.len);
-               if (n > 0) {
-                       drt->wbuf.len -= n;
-                       p += n;
-               } else {
-                       perror(n ? "write" : "write (zero bytes)");
-                       return false;
-               }
-       } while (drt->wbuf.len);
+       ok = write_all(fd, &drt->wbuf, (size_t)off);
        while (flock(drt->root2off_fd, LOCK_UN)) {
                if (errno == EINTR) continue;
                err(EXIT_FAILURE, "LOCK_UN"); // ENOLCK?
        }
-done_free: // OK to skip on errors, dump_roots_ensure calls fbuf_ensure
-       free(drt->wbuf.ptr);
-       drt->wbuf.ptr = NULL;
+       if (fseeko(drt->wbuf.fp, 0, SEEK_SET)) EABORT("fseeko");
        return ok;
 }
 
@@ -238,11 +223,11 @@ static bool cmd_dump_roots(struct req *req)
        req->sort_col = -1;
        Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
 
+       fbuf_init(&drt.wbuf);
+
        // @UNIQ_FOLD in CodeSearchIdx.pm can handle duplicate lines fine
        // in case we need to retry on DB reopens
        for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
-               if (!drt.wbuf.fp)
-                       fbuf_init(&drt.wbuf);
                for (int t = 10; t > 0; --t)
                        switch (dump_roots_iter(req, &drt, &i)) {
                        case ITER_OK: t = 0; break; // leave inner loop
diff --git a/lib/PublicInbox/xh_mset.h b/lib/PublicInbox/xh_mset.h
new file mode 100644 (file)
index 0000000..056fe22
--- /dev/null
@@ -0,0 +1,96 @@
+// Copyright (C) all contributors <meta@public-inbox.org>
+// License: GPL-2.0+ <https://www.gnu.org/licenses/gpl-2.0.txt>
+// This file is only intended to be included by xap_helper.h
+// it implements pieces used by WWW, IMAP and lei
+
+static void emit_doc_term(FILE *fp, const char *pfx, Xapian::Document *doc)
+{
+       Xapian::TermIterator cur = doc->termlist_begin();
+       Xapian::TermIterator end = doc->termlist_end();
+       size_t pfx_len = strlen(pfx);
+
+       for (cur.skip_to(pfx); cur != end; cur++) {
+               std::string tn = *cur;
+               if (!starts_with(&tn, pfx, pfx_len)) continue;
+               fputc(0, fp);
+               fwrite(tn.data(), tn.size(), 1, fp);
+       }
+}
+
+static enum exc_iter mset_iter(const struct req *req, FILE *fp, off_t off,
+                               Xapian::MSetIterator *i)
+{
+       try {
+               fprintf(fp, "%llu", (unsigned long long)(*(*i))); // get_docid
+               if (req->emit_percent)
+                       fprintf(fp, "%c%d", 0, i->get_percent());
+               if (req->pfxc || req->emit_docdata) {
+                       Xapian::Document doc = i->get_document();
+                       for (int p = 0; p < req->pfxc; p++)
+                               emit_doc_term(fp, req->pfxv[p], &doc);
+                       if (req->emit_docdata) {
+                               std::string d = doc.get_data();
+                               fputc(0, fp);
+                               fwrite(d.data(), d.size(), 1, fp);
+                       }
+               }
+               fputc('\n', fp);
+       } catch (const Xapian::DatabaseModifiedError & e) {
+               req->srch->db->reopen();
+               if (fseeko(fp, off, SEEK_SET) < 0) EABORT("fseeko");
+               return ITER_RETRY;
+       } catch (const Xapian::DocNotFoundError & e) { // oh well...
+               warnx("doc not found: %s", e.get_description().c_str());
+               if (fseeko(fp, off, SEEK_SET) < 0) EABORT("fseeko");
+       }
+       return ITER_OK;
+}
+
+#ifndef WBUF_FLUSH_THRESHOLD
+#      define WBUF_FLUSH_THRESHOLD (BUFSIZ - 1000)
+#endif
+#if WBUF_FLUSH_THRESHOLD < 0
+#      undef WBUF_FLUSH_THRESHOLD
+#      define WBUF_FLUSH_THRESHOLD BUFSIZ
+#endif
+
+static bool cmd_mset(struct req *req)
+{
+       if (optind >= req->argc) ABORT("usage: mset [OPTIONS] WANT QRY_STR");
+       if (req->fp[1]) ABORT("mset only accepts 1 FD");
+       const char *qry_str = req->argv[optind];
+       CLEANUP_FBUF struct fbuf wbuf = {};
+       Xapian::MSet mset = req->code_search ? commit_mset(req, qry_str) :
+                                               mail_mset(req, qry_str);
+       fbuf_init(&wbuf);
+       fprintf(wbuf.fp, "mset.size=%llu\n", (unsigned long long)mset.size());
+       int fd = fileno(req->fp[0]);
+       for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+               off_t off = ftello(wbuf.fp);
+               if (off < 0) EABORT("ftello");
+               /*
+                * TODO verify our fflush + fseeko use isn't affected by a
+                * glibc <2.25 bug:
+                * https://sourceware.org/bugzilla/show_bug.cgi?id=20181
+                * CentOS 7.x only has glibc 2.17.  In any case, bug #20181
+                * shouldn't affect us since our use of fseeko is used to
+                * effectively discard data.
+                */
+               if (off > WBUF_FLUSH_THRESHOLD) {
+                       ERR_FLUSH(wbuf.fp);
+                       if (!write_all(fd, &wbuf, (size_t)off)) return false;
+                       if (fseeko(wbuf.fp, 0, SEEK_SET)) EABORT("fseeko");
+                       off = 0;
+               }
+               for (int t = 10; t > 0; --t)
+                       switch (mset_iter(req, wbuf.fp, off, &i)) {
+                       case ITER_OK: t = 0; break; // leave inner loop
+                       case ITER_RETRY: break; // continue for-loop
+                       case ITER_ABORT: return false; // error
+                       }
+       }
+       off_t off = ftello(wbuf.fp);
+       if (off < 0) EABORT("ftello");
+       ERR_FLUSH(wbuf.fp);
+       return off > 0 ? write_all(fd, &wbuf, (size_t)off) : true;
+}
index 261945bfac293a8fa5619c1a13fabbbb1543eea0..a90750920ef94facb59e4351f85e3c6237d5b40c 100644 (file)
@@ -121,22 +121,70 @@ my $no_metadata_set = sub {
 
 use_ok 'PublicInbox::CodeSearch';
 
+
+my @xh_args;
+my $exp = [ 'initial with NUL character', 'remove NUL character' ];
+my $zp_git = abs_path("$zp/.git");
 if ('multi-repo search') {
        my $csrch = PublicInbox::CodeSearch->new("$tmp/ext");
        my $mset = $csrch->mset('NUL');
        is(scalar($mset->items), 2, 'got results');
-       my $exp = [ 'initial with NUL character', 'remove NUL character' ];
        my @have = sort(map { $_->get_document->get_data } $mset->items);
        is_xdeeply(\@have, $exp, 'got expected subjects');
 
        $mset = $csrch->mset('NUL', { git_dir => "$tmp/wt0/.git" });
        is(scalar($mset->items), 0, 'no results with other GIT_DIR');
 
-       $mset = $csrch->mset('NUL', { git_dir => abs_path("$zp/.git") });
+       $mset = $csrch->mset('NUL', { git_dir => $zp_git });
        @have = sort(map { $_->get_document->get_data } $mset->items);
        is_xdeeply(\@have, $exp, 'got expected subjects w/ GIT_DIR filter');
        my @xdb = $csrch->xdb_shards_flat;
        $no_metadata_set->(0, ['indexlevel'], \@xdb);
+       @xh_args = $csrch->xh_args;
+}
+
+my $test_xhc = sub {
+       my ($xhc) = @_;
+       my $impl = $xhc->{impl};
+       my ($r, @l);
+       $r = $xhc->mkreq([], qw(mset -D -c -g), $zp_git, @xh_args, 'NUL');
+       chomp(@l = <$r>);
+       is(shift(@l), 'mset.size=2', "got expected header $impl");
+       my %docid2data;
+       my @got = sort map {
+               my @f = split /\0/;
+               is scalar(@f), 2, 'got 2 entries';
+               $docid2data{$f[0]} = $f[1];
+               $f[1];
+       } @l;
+       is_deeply(\@got, $exp, "expected doc_data $impl");
+
+       $r = $xhc->mkreq([], qw(mset -c -g), "$tmp/wt0/.git", @xh_args, 'NUL');
+       chomp(@l = <$r>);
+       is(shift(@l), 'mset.size=0', "got miss in wrong dir $impl");
+       is_deeply(\@l, [], "no extra lines $impl");
+
+       my $csrch = PublicInbox::CodeSearch->new("$tmp/ext");
+       while (my ($did, $expect) = each %docid2data) {
+               is_deeply($csrch->xdb->get_document($did)->get_data,
+                       $expect, "docid=$did data matches");
+       }
+       ok(!$xhc->{io}->close, "$impl close");
+       is($?, 66 << 8, "got EX_NOINPUT from $impl exit");
+};
+
+SKIP: {
+       require_mods('+SCM_RIGHTS', 1);
+       require PublicInbox::XapClient;
+       my $xhc = PublicInbox::XapClient::start_helper('-j0');
+       $test_xhc->($xhc);
+       skip 'PI_NO_CXX set', 1 if $ENV{PI_NO_CXX};
+       $xhc->{impl} =~ /Cxx/ or
+               skip 'C++ compiler or xapian development libs missing', 1;
+       skip 'TEST_XH_CXX_ONLY set', 1 if $ENV{TEST_XH_CXX_ONLY};
+       local $ENV{PI_NO_CXX} = 1; # force XS or SWIG binding test
+       $xhc = PublicInbox::XapClient::start_helper('-j0');
+       $test_xhc->($xhc);
 }
 
 if ('--update') {
index e3abedede77fd8ee97823671c26a6966f798b4e4..ee25b2dc4913161dc1be6393730f5842ab8029ae 100644 (file)
@@ -40,6 +40,7 @@ my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2,
 };
 
 my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
+my @ibx_shard_args = map { ('-d', $_) } @ibx_idx;
 my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
 my (@ext) = glob("$crepo/cidx-ext/cidx*/?");
 is(scalar(@ext), 2, 'have 2 external shards') or diag explain(\@ext);
@@ -76,8 +77,7 @@ my $test = sub {
        is($cinfo{has_threadid}, '0', 'has_threadid false for cindex');
        is($cinfo{pid}, $info{pid}, 'PID unchanged for cindex');
 
-       my @dump = (qw(dump_ibx -A XDFID), (map { ('-d', $_) } @ibx_idx),
-                       qw(13 rt:0..));
+       my @dump = (qw(dump_ibx -A XDFID), @ibx_shard_args, qw(13 rt:0..));
        $r = $doreq->($s, @dump);
        my @res;
        while (sysread($r, my $buf, 512) != 0) { push @res, $buf }
@@ -89,7 +89,8 @@ my $test = sub {
        my $res = do { local $/; <$r> };
        is(join('', @res), $res, 'got identical response w/ error pipe');
        my $stats = do { local $/; <$err_rd> };
-       is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported');
+       is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported') or
+               diag "res=$res";
 
        return wantarray ? ($ar, $s) : $ar if $cinfo{pid} == $pid;
 
@@ -198,7 +199,47 @@ for my $n (@NO_CXX) {
        is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)),
                'entries match format');
        $err = do { local $/; <$err_r> };
-       is($err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})");
+       is $err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})";
+
+       $r = $xhc->mkreq([], qw(mset -p -A XDFID -A Q), @ibx_shard_args,
+                               'dfn:lib/PublicInbox/Search.pm');
+       chomp((my $hdr, @res) = readline($r));
+       is $hdr, 'mset.size=1', "got expected header via mset ($xhc->{impl}";
+       is scalar(@res), 1, 'got one result';
+       @res = split /\0/, $res[0];
+       {
+               my $doc = $v2->search->xdb->get_document($res[0]);
+               my @q = PublicInbox::Search::xap_terms('Q', $doc);
+               is_deeply \@q, [ $mid ], 'docid usable';
+       }
+       ok $res[1] > 0 && $res[1] <= 100, 'pct > 0 && <= 100';
+       is $res[2], 'XDFID'.$dfid, 'XDFID result matches';
+       is $res[3], 'Q'.$mid, 'Q (msgid) mset result matches';
+       is scalar(@res), 4, 'only 4 columns in result';
+
+       $r = $xhc->mkreq([], qw(mset -p -A XDFID -A Q), @ibx_shard_args,
+                               'dt:19700101'.'000000..');
+       chomp(($hdr, @res) = readline($r));
+       is $hdr, 'mset.size=6',
+               "got expected header via multi-result mset ($xhc->{impl}";
+       is(scalar(@res), 6, 'got 6 rows');
+       for my $r (@res) {
+               my ($docid, $pct, @rest) = split /\0/, $r;
+               my $doc = $v2->search->xdb->get_document($docid);
+               ok $pct > 0 && $pct <= 100,
+                       "pct > 0 && <= 100 #$docid ($xhc->{impl})";
+               my %terms;
+               for (@rest) {
+                       s/\A([A-Z]+)// or xbail 'no prefix=', \@rest;
+                       push @{$terms{$1}}, $_;
+               }
+               while (my ($pfx, $vals) = each %terms) {
+                       @$vals = sort @$vals;
+                       my @q = PublicInbox::Search::xap_terms($pfx, $doc);
+                       is_deeply $vals, \@q,
+                               "#$docid $pfx as expected ($xhc->{impl})";
+               }
+       }
 }
 
 done_testing;