]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
xap_helper: support thread:{SUBQUERY} via C++
authorEric Wong <e@80x24.org>
Thu, 20 Feb 2025 22:14:29 +0000 (22:14 +0000)
committerEric Wong <e@80x24.org>
Sun, 23 Feb 2025 12:39:52 +0000 (12:39 +0000)
Stealing the idea and code from notmuch to perform subqueries
within search.  One major internal difference from notmuch is we
store THREADID as numeric column in Xapian whereas notmuch
stores a boolean term.  The use of a column lets us use
set_collapse_key to deduplicate results within Xapian itself.

The other difference from notmuch is we avoid exposing the
numeric THREADID since they're unstable and not reproducible in
mirrors, thus we also support `thread:MSGID' instead of
`thread:THREADID' in brace-less queries.

MANIFEST
lib/PublicInbox/XapHelperCxx.pm
lib/PublicInbox/xap_helper.h
lib/PublicInbox/xh_thread_fp.h [new file with mode: 0644]
t/xap_helper.t

index d45350381150a422fd7afd0375d80c66b8019f2d..ce1b2fddc9a883f92c2d0c5edbb92b88fdec6591 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -393,6 +393,7 @@ lib/PublicInbox/lg2.h
 lib/PublicInbox/xap_helper.h
 lib/PublicInbox/xh_cidx.h
 lib/PublicInbox/xh_mset.h
+lib/PublicInbox/xh_thread_fp.h
 sa_config/Makefile
 sa_config/README
 sa_config/root/etc/spamassassin/public-inbox.pre
index 922bd583ba0a911a96c8d8080c0bf9b7a54d9917..817abcfb417b7ba9f965c63f7150a6385d8a3f61 100644 (file)
@@ -28,7 +28,8 @@ $idir //= $ENV{PERL_INLINE_DIRECTORY} //
 substr($dir, 0, 0) = "$idir/";
 my $bin = "$dir/xap_helper";
 my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!);
-my @srcs = map { $srcpfx.$_ } qw(xh_mset.h xh_cidx.h xap_helper.h);
+my @srcs = map { $srcpfx.$_ }
+       qw(xh_mset.h xh_cidx.h xh_thread_fp.h xap_helper.h);
 my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm);
 my $ldflags = '-Wl,-O1';
 $ldflags .= ' -Wl,--compress-debug-sections=zlib' if $^O ne 'openbsd';
index 95896725cde2e32897394c14980d46b69891fbb7..7e48de8ae46adb4b7a650a11d801de54d0e6f5d4 100644 (file)
@@ -139,6 +139,7 @@ static int srch_eq(const struct srch *a, const struct srch *b)
 KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *,
                srch_hash, srch_eq)
 static srch_set *srch_cache;
+static struct srch *cur_srch; // for ThreadFieldProcessor
 static long my_fd_max, shard_nfd;
 // sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET
 static volatile int sock_fd = STDIN_FILENO;
@@ -580,6 +581,8 @@ static void srch_cache_renew(struct srch *keep)
        }
 }
 
+#include "xh_thread_fp.h" // ThreadFieldProcessor
+
 static void srch_init(struct req *req)
 {
        int i;
@@ -634,10 +637,16 @@ static void srch_init(struct req *req)
        srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME);
        srch->qp->SET_MAX_EXPANSION(100);
 
-       if (req->code_search)
+       if (req->code_search) {
                qp_init_code_search(srch->qp); // CodeSearch.pm
-       else
+       } else {
+               Xapian::FieldProcessor *fp;
+
                qp_init_mail_search(srch->qp); // Search.pm
+               // n.b. ->release() starts Xapian refcounting
+               fp = (new ThreadFieldProcessor(*srch->qp))->release();
+               srch->qp->add_boolean_prefix("thread", fp);
+       }
 }
 
 // setup query parser for altid and arbitrary headers
@@ -773,6 +782,7 @@ static void dispatch(struct req *req)
        if (req->timeout_sec)
                alarm(req->timeout_sec > UINT_MAX ?
                        UINT_MAX : (unsigned)req->timeout_sec);
+       cur_srch = req->srch; // set global for *FieldProcessor
        try {
                if (!req->fn(req))
                        warnx("`%s' failed", req->argv[0]);
@@ -834,6 +844,7 @@ static void req_cleanup(void *ptr)
 {
        struct req *req = (struct req *)ptr;
        free(req->lenv);
+       cur_srch = NULL;
 }
 
 static void reopen_logs(void)
diff --git a/lib/PublicInbox/xh_thread_fp.h b/lib/PublicInbox/xh_thread_fp.h
new file mode 100644 (file)
index 0000000..c7d36c3
--- /dev/null
@@ -0,0 +1,75 @@
+// thread field processor from notmuch - Copyright 2018 David Bremner
+// License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+// Disclaimer: Eric doesn't know C++
+
+class ThreadFieldProcessor : public Xapian::FieldProcessor {
+protected:
+       Xapian::QueryParser &qp;
+public:
+       ThreadFieldProcessor(Xapian::QueryParser &qp_) : qp(qp_) {};
+       Xapian::Query operator()(const std::string &str);
+};
+
+static enum exc_iter xpand_col_iter(std::set<std::string> &vals,
+                                       Xapian::MSetIterator *i,
+                                       unsigned column)
+{
+       try {
+               Xapian::Document doc = i->get_document();
+               vals.insert(doc.get_value(column));
+       } catch (const Xapian::DatabaseModifiedError &e) {
+               cur_srch->db->reopen();
+               return ITER_RETRY;
+       } catch (const Xapian::DocNotFoundError &e) { // oh well...
+               warnx("doc not found: %s", e.get_description().c_str());
+       }
+       return ITER_OK;
+}
+
+static Xapian::Query qry_xpand_col(Xapian::Query qry, unsigned column)
+{
+       Xapian::Query xqry = Xapian::Query::MatchNothing;
+
+       Xapian::Enquire enq(*cur_srch->db);
+       std::set<std::string> vals; // serialised Xapian column
+
+       enq.set_weighting_scheme(Xapian::BoolWeight());
+       enq.set_query(qry);
+       enq.set_collapse_key(column);
+
+       Xapian::MSet mset = enq.get_mset(0, cur_srch->db->get_doccount());
+
+       for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++)  {
+               for (int t = 10; t > 0; --t)
+                       switch (xpand_col_iter(vals, &i, column)) {
+                       case ITER_OK: t = 0; break; // leave inner loop
+                       case ITER_RETRY: break; // continue for-loop
+                       case ITER_ABORT: return xqry; // impossible
+                       }
+       }
+
+       std::set<std::string>::const_iterator tid;
+       for (tid = vals.begin(); tid != vals.end(); tid++)
+               xqry = Xapian::Query(Xapian::Query::OP_OR, xqry,
+                               Xapian::Query(
+                                       Xapian::Query::OP_VALUE_RANGE,
+                                       column, *tid, *tid));
+       return xqry;
+}
+
+// Xapian calls this when processing queries since it's registered by
+// ->add_boolean_prefix
+Xapian::Query ThreadFieldProcessor::operator()(const std::string &str)
+{
+       Xapian::Query qry;
+
+       if (str.at(0) != '{') { // thread:$MSGID (no `{'/`}' encasement)
+               qry = Xapian::Query("Q" + str);
+       } else if (str.size() <= 1 || str.at(str.size() - 1) != '}') {
+               throw Xapian::QueryParserError("missing } in '" + str + "'");
+       } else { // thread:"{hello world}"
+               std::string qstr = str.substr(1, str.size() - 2);
+               qry = cur_srch->qp->parse_query(qstr, cur_srch->qp_flags);
+       }
+       return qry_xpand_col(qry, THREADID);
+}
index b0fa75a2e011c84e0a20e07cdc5065260f2bc84e..3e8176a011fa54bd2ede671d7dcf7f6c75643a45 100644 (file)
@@ -40,6 +40,41 @@ my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2,
        }
 };
 
+my $thr = create_inbox 'thr', indexlevel => 'medium', version => 2,
+                       tmpdir => "$tmp/thr", sub {
+       my ($im) = @_;
+       my $common = <<EOM;
+From: <BOFH\@YHBT.net>
+To: meta\@public-inbox.org
+Date: Mon, 1 Apr 2019 08:15:21 +0000
+EOM
+       $im->add(PublicInbox::Eml->new(<<EOM));
+${common}Subject: root message
+Message-ID: <thread-root\@example>
+
+hi
+EOM
+       my @t = qw(wildfires earthquake flood asteroid drought plague);
+       my $nr = 0;
+       for my $x (@t) {
+               ++$nr;
+               $im->add(PublicInbox::Eml->new(<<EOM)) or xbail;
+${common}Subject: Re: root reply
+References: <thread-root\@example>
+Message-ID: <thread-hit-$nr\@example>
+
+$x
+EOM
+               $im->add(PublicInbox::Eml->new(<<EOM)) or xbail;
+${common}Subject: broken thread from $x
+References: <ghost-root\@example>
+Message-ID: <thread-miss-$nr\@example>
+
+$x
+EOM
+       }
+};
+
 my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
 my @ibx_shard_args = map { ('-d', $_) } @ibx_idx;
 my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
@@ -269,6 +304,53 @@ for my $n (@NO_CXX) {
        is $nr_out, scalar(@oids), "output count matches $xhc->{impl}" or
                diag explain(\@res, \@err);
 
+       SKIP: {
+               $xhc->{impl} =~ /cxx/i or
+                       skip "`thread:' field processor requires C++", 1;
+               require PublicInbox::XhcMset;
+               my $over = $thr->over;
+               my @thr_idx = glob("$thr->{inboxdir}/xap*/?");
+               my @thr_shard_args = map { ('-d', $_) } @thr_idx;
+               my (@art, $mset, $err);
+               my $capture = sub { ($mset, $err) = @_ };
+               my $retrieve = sub {
+                       my ($qstr) = @_;
+                       $r = $xhc->mkreq(undef, 'mset', @thr_shard_args, $qstr);
+                       PublicInbox::XhcMset->maybe_new($r, undef, $capture);
+                       map { $over->get_art($_->get_docid) } $mset->items;
+               };
+               @art = $retrieve->('thread:thread-root@example wildfires');
+               is scalar(@art), 1, 'got 1 result';
+               is scalar(grep { $_->{mid} =~ /thread-miss/ } @art), 0,
+                       'no thread misses in result';
+               ok !$err, 'no error from thread:MSGID search';
+
+               @art = $retrieve->('thread:thread-root@example');
+               is scalar(@art), 7,
+                       'expected number of results for thread:MSGID';
+               is scalar(grep {
+                               $_->{mid} eq 'thread-root@example' ||
+                               $_->{references} =~ /<thread-root\@example>/
+                       } @art),
+                       scalar(@art),
+                       'got all matching results for thread:MSGID';
+
+               @art = $retrieve->('thread:"{ s:broken }"');
+               is scalar(@art), 6,
+                       'expected number of results for thread:"{ SUBQUERY }"';
+               is scalar(grep { $_->{subject} =~ /broken/ } @art),
+                       scalar(@art),
+                       'expected matches for thread:"{ SUBQUERY }"';
+
+               my $nr = $ENV{TEST_LEAK_NR} or skip 'TEST_LEAK_NR unset', 1;
+               $ENV{VALGRIND} or diag
+"W: `VALGRIND=' unset w/ TEST_LEAK_NR (using -fsanitize?)";
+               for (1..$nr) {
+                       $retrieve->('thread:thread-root@example wildfires');
+                       $retrieve->('thread:"{ s:broken }" wildfires');
+               }
+       }
+
        if ($ENV{TEST_XH_TIMEOUT}) {
                diag 'testing timeouts...';
                for my $j (qw(0 1)) {