From: Eric Wong Date: Thu, 20 Feb 2025 22:14:29 +0000 (+0000) Subject: xap_helper: support thread:{SUBQUERY} via C++ X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4a682a8519187b13425a47ed1b6b25c4ea3b3f30;p=thirdparty%2Fpublic-inbox.git xap_helper: support thread:{SUBQUERY} via C++ Stealing the idea and code from notmuch to perform subqueries within search. One major internal difference from notmuch is we store THREADID as numeric column in Xapian whereas notmuch stores a boolean term. The use of a column lets us use set_collapse_key to deduplicate results within Xapian itself. The other difference from notmuch is we avoid exposing the numeric THREADID since they're unstable and not reproducible in mirrors, thus we also support `thread:MSGID' instead of `thread:THREADID' in brace-less queries. --- diff --git a/MANIFEST b/MANIFEST index d45350381..ce1b2fddc 100644 --- a/MANIFEST +++ b/MANIFEST @@ -393,6 +393,7 @@ lib/PublicInbox/lg2.h lib/PublicInbox/xap_helper.h lib/PublicInbox/xh_cidx.h lib/PublicInbox/xh_mset.h +lib/PublicInbox/xh_thread_fp.h sa_config/Makefile sa_config/README sa_config/root/etc/spamassassin/public-inbox.pre diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm index 922bd583b..817abcfb4 100644 --- a/lib/PublicInbox/XapHelperCxx.pm +++ b/lib/PublicInbox/XapHelperCxx.pm @@ -28,7 +28,8 @@ $idir //= $ENV{PERL_INLINE_DIRECTORY} // substr($dir, 0, 0) = "$idir/"; my $bin = "$dir/xap_helper"; my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!); -my @srcs = map { $srcpfx.$_ } qw(xh_mset.h xh_cidx.h xap_helper.h); +my @srcs = map { $srcpfx.$_ } + qw(xh_mset.h xh_cidx.h xh_thread_fp.h xap_helper.h); my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm); my $ldflags = '-Wl,-O1'; $ldflags .= ' -Wl,--compress-debug-sections=zlib' if $^O ne 'openbsd'; diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 95896725c..7e48de8ae 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -139,6 +139,7 @@ static int srch_eq(const struct srch *a, const struct srch *b) KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *, srch_hash, srch_eq) static srch_set *srch_cache; +static struct srch *cur_srch; // for ThreadFieldProcessor static long my_fd_max, shard_nfd; // sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET static volatile int sock_fd = STDIN_FILENO; @@ -580,6 +581,8 @@ static void srch_cache_renew(struct srch *keep) } } +#include "xh_thread_fp.h" // ThreadFieldProcessor + static void srch_init(struct req *req) { int i; @@ -634,10 +637,16 @@ static void srch_init(struct req *req) srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME); srch->qp->SET_MAX_EXPANSION(100); - if (req->code_search) + if (req->code_search) { qp_init_code_search(srch->qp); // CodeSearch.pm - else + } else { + Xapian::FieldProcessor *fp; + qp_init_mail_search(srch->qp); // Search.pm + // n.b. ->release() starts Xapian refcounting + fp = (new ThreadFieldProcessor(*srch->qp))->release(); + srch->qp->add_boolean_prefix("thread", fp); + } } // setup query parser for altid and arbitrary headers @@ -773,6 +782,7 @@ static void dispatch(struct req *req) if (req->timeout_sec) alarm(req->timeout_sec > UINT_MAX ? UINT_MAX : (unsigned)req->timeout_sec); + cur_srch = req->srch; // set global for *FieldProcessor try { if (!req->fn(req)) warnx("`%s' failed", req->argv[0]); @@ -834,6 +844,7 @@ static void req_cleanup(void *ptr) { struct req *req = (struct req *)ptr; free(req->lenv); + cur_srch = NULL; } static void reopen_logs(void) diff --git a/lib/PublicInbox/xh_thread_fp.h b/lib/PublicInbox/xh_thread_fp.h new file mode 100644 index 000000000..c7d36c362 --- /dev/null +++ b/lib/PublicInbox/xh_thread_fp.h @@ -0,0 +1,75 @@ +// thread field processor from notmuch - Copyright 2018 David Bremner +// License: GPL-3.0+ +// Disclaimer: Eric doesn't know C++ + +class ThreadFieldProcessor : public Xapian::FieldProcessor { +protected: + Xapian::QueryParser &qp; +public: + ThreadFieldProcessor(Xapian::QueryParser &qp_) : qp(qp_) {}; + Xapian::Query operator()(const std::string &str); +}; + +static enum exc_iter xpand_col_iter(std::set &vals, + Xapian::MSetIterator *i, + unsigned column) +{ + try { + Xapian::Document doc = i->get_document(); + vals.insert(doc.get_value(column)); + } catch (const Xapian::DatabaseModifiedError &e) { + cur_srch->db->reopen(); + return ITER_RETRY; + } catch (const Xapian::DocNotFoundError &e) { // oh well... + warnx("doc not found: %s", e.get_description().c_str()); + } + return ITER_OK; +} + +static Xapian::Query qry_xpand_col(Xapian::Query qry, unsigned column) +{ + Xapian::Query xqry = Xapian::Query::MatchNothing; + + Xapian::Enquire enq(*cur_srch->db); + std::set vals; // serialised Xapian column + + enq.set_weighting_scheme(Xapian::BoolWeight()); + enq.set_query(qry); + enq.set_collapse_key(column); + + Xapian::MSet mset = enq.get_mset(0, cur_srch->db->get_doccount()); + + for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) { + for (int t = 10; t > 0; --t) + switch (xpand_col_iter(vals, &i, column)) { + case ITER_OK: t = 0; break; // leave inner loop + case ITER_RETRY: break; // continue for-loop + case ITER_ABORT: return xqry; // impossible + } + } + + std::set::const_iterator tid; + for (tid = vals.begin(); tid != vals.end(); tid++) + xqry = Xapian::Query(Xapian::Query::OP_OR, xqry, + Xapian::Query( + Xapian::Query::OP_VALUE_RANGE, + column, *tid, *tid)); + return xqry; +} + +// Xapian calls this when processing queries since it's registered by +// ->add_boolean_prefix +Xapian::Query ThreadFieldProcessor::operator()(const std::string &str) +{ + Xapian::Query qry; + + if (str.at(0) != '{') { // thread:$MSGID (no `{'/`}' encasement) + qry = Xapian::Query("Q" + str); + } else if (str.size() <= 1 || str.at(str.size() - 1) != '}') { + throw Xapian::QueryParserError("missing } in '" + str + "'"); + } else { // thread:"{hello world}" + std::string qstr = str.substr(1, str.size() - 2); + qry = cur_srch->qp->parse_query(qstr, cur_srch->qp_flags); + } + return qry_xpand_col(qry, THREADID); +} diff --git a/t/xap_helper.t b/t/xap_helper.t index b0fa75a2e..3e8176a01 100644 --- a/t/xap_helper.t +++ b/t/xap_helper.t @@ -40,6 +40,41 @@ my $v2 = create_inbox 'v2', indexlevel => 'medium', version => 2, } }; +my $thr = create_inbox 'thr', indexlevel => 'medium', version => 2, + tmpdir => "$tmp/thr", sub { + my ($im) = @_; + my $common = < +To: meta\@public-inbox.org +Date: Mon, 1 Apr 2019 08:15:21 +0000 +EOM + $im->add(PublicInbox::Eml->new(< + +hi +EOM + my @t = qw(wildfires earthquake flood asteroid drought plague); + my $nr = 0; + for my $x (@t) { + ++$nr; + $im->add(PublicInbox::Eml->new(< +Message-ID: + +$x +EOM + $im->add(PublicInbox::Eml->new(< +Message-ID: + +$x +EOM + } +}; + my @ibx_idx = glob("$v2->{inboxdir}/xap*/?"); my @ibx_shard_args = map { ('-d', $_) } @ibx_idx; my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?"); @@ -269,6 +304,53 @@ for my $n (@NO_CXX) { is $nr_out, scalar(@oids), "output count matches $xhc->{impl}" or diag explain(\@res, \@err); + SKIP: { + $xhc->{impl} =~ /cxx/i or + skip "`thread:' field processor requires C++", 1; + require PublicInbox::XhcMset; + my $over = $thr->over; + my @thr_idx = glob("$thr->{inboxdir}/xap*/?"); + my @thr_shard_args = map { ('-d', $_) } @thr_idx; + my (@art, $mset, $err); + my $capture = sub { ($mset, $err) = @_ }; + my $retrieve = sub { + my ($qstr) = @_; + $r = $xhc->mkreq(undef, 'mset', @thr_shard_args, $qstr); + PublicInbox::XhcMset->maybe_new($r, undef, $capture); + map { $over->get_art($_->get_docid) } $mset->items; + }; + @art = $retrieve->('thread:thread-root@example wildfires'); + is scalar(@art), 1, 'got 1 result'; + is scalar(grep { $_->{mid} =~ /thread-miss/ } @art), 0, + 'no thread misses in result'; + ok !$err, 'no error from thread:MSGID search'; + + @art = $retrieve->('thread:thread-root@example'); + is scalar(@art), 7, + 'expected number of results for thread:MSGID'; + is scalar(grep { + $_->{mid} eq 'thread-root@example' || + $_->{references} =~ // + } @art), + scalar(@art), + 'got all matching results for thread:MSGID'; + + @art = $retrieve->('thread:"{ s:broken }"'); + is scalar(@art), 6, + 'expected number of results for thread:"{ SUBQUERY }"'; + is scalar(grep { $_->{subject} =~ /broken/ } @art), + scalar(@art), + 'expected matches for thread:"{ SUBQUERY }"'; + + my $nr = $ENV{TEST_LEAK_NR} or skip 'TEST_LEAK_NR unset', 1; + $ENV{VALGRIND} or diag +"W: `VALGRIND=' unset w/ TEST_LEAK_NR (using -fsanitize?)"; + for (1..$nr) { + $retrieve->('thread:thread-root@example wildfires'); + $retrieve->('thread:"{ s:broken }" wildfires'); + } + } + if ($ENV{TEST_XH_TIMEOUT}) { diag 'testing timeouts...'; for my $j (qw(0 1)) {