lib/PublicInbox/xap_helper.h
lib/PublicInbox/xh_cidx.h
lib/PublicInbox/xh_mset.h
+lib/PublicInbox/xh_thread_fp.h
sa_config/Makefile
sa_config/README
sa_config/root/etc/spamassassin/public-inbox.pre
substr($dir, 0, 0) = "$idir/";
my $bin = "$dir/xap_helper";
my ($srcpfx) = (__FILE__ =~ m!\A(.+/)[^/]+\z!);
-my @srcs = map { $srcpfx.$_ } qw(xh_mset.h xh_cidx.h xap_helper.h);
+my @srcs = map { $srcpfx.$_ }
+ qw(xh_mset.h xh_cidx.h xh_thread_fp.h xap_helper.h);
my @pm_dep = map { $srcpfx.$_ } qw(Search.pm CodeSearch.pm);
my $ldflags = '-Wl,-O1';
$ldflags .= ' -Wl,--compress-debug-sections=zlib' if $^O ne 'openbsd';
KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *,
srch_hash, srch_eq)
static srch_set *srch_cache;
+static struct srch *cur_srch; // for ThreadFieldProcessor
static long my_fd_max, shard_nfd;
// sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET
static volatile int sock_fd = STDIN_FILENO;
}
}
+#include "xh_thread_fp.h" // ThreadFieldProcessor
+
static void srch_init(struct req *req)
{
int i;
srch->qp->set_stemming_strategy(Xapian::QueryParser::STEM_SOME);
srch->qp->SET_MAX_EXPANSION(100);
- if (req->code_search)
+ if (req->code_search) {
qp_init_code_search(srch->qp); // CodeSearch.pm
- else
+ } else {
+ Xapian::FieldProcessor *fp;
+
qp_init_mail_search(srch->qp); // Search.pm
+ // n.b. ->release() starts Xapian refcounting
+ fp = (new ThreadFieldProcessor(*srch->qp))->release();
+ srch->qp->add_boolean_prefix("thread", fp);
+ }
}
// setup query parser for altid and arbitrary headers
if (req->timeout_sec)
alarm(req->timeout_sec > UINT_MAX ?
UINT_MAX : (unsigned)req->timeout_sec);
+ cur_srch = req->srch; // set global for *FieldProcessor
try {
if (!req->fn(req))
warnx("`%s' failed", req->argv[0]);
{
struct req *req = (struct req *)ptr;
free(req->lenv);
+ cur_srch = NULL;
}
static void reopen_logs(void)
--- /dev/null
+// thread field processor from notmuch - Copyright 2018 David Bremner
+// License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+// Disclaimer: Eric doesn't know C++
+
+class ThreadFieldProcessor : public Xapian::FieldProcessor {
+protected:
+ Xapian::QueryParser &qp;
+public:
+ ThreadFieldProcessor(Xapian::QueryParser &qp_) : qp(qp_) {};
+ Xapian::Query operator()(const std::string &str);
+};
+
+static enum exc_iter xpand_col_iter(std::set<std::string> &vals,
+ Xapian::MSetIterator *i,
+ unsigned column)
+{
+ try {
+ Xapian::Document doc = i->get_document();
+ vals.insert(doc.get_value(column));
+ } catch (const Xapian::DatabaseModifiedError &e) {
+ cur_srch->db->reopen();
+ return ITER_RETRY;
+ } catch (const Xapian::DocNotFoundError &e) { // oh well...
+ warnx("doc not found: %s", e.get_description().c_str());
+ }
+ return ITER_OK;
+}
+
+static Xapian::Query qry_xpand_col(Xapian::Query qry, unsigned column)
+{
+ Xapian::Query xqry = Xapian::Query::MatchNothing;
+
+ Xapian::Enquire enq(*cur_srch->db);
+ std::set<std::string> vals; // serialised Xapian column
+
+ enq.set_weighting_scheme(Xapian::BoolWeight());
+ enq.set_query(qry);
+ enq.set_collapse_key(column);
+
+ Xapian::MSet mset = enq.get_mset(0, cur_srch->db->get_doccount());
+
+ for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+ for (int t = 10; t > 0; --t)
+ switch (xpand_col_iter(vals, &i, column)) {
+ case ITER_OK: t = 0; break; // leave inner loop
+ case ITER_RETRY: break; // continue for-loop
+ case ITER_ABORT: return xqry; // impossible
+ }
+ }
+
+ std::set<std::string>::const_iterator tid;
+ for (tid = vals.begin(); tid != vals.end(); tid++)
+ xqry = Xapian::Query(Xapian::Query::OP_OR, xqry,
+ Xapian::Query(
+ Xapian::Query::OP_VALUE_RANGE,
+ column, *tid, *tid));
+ return xqry;
+}
+
+// Xapian calls this when processing queries since it's registered by
+// ->add_boolean_prefix
+Xapian::Query ThreadFieldProcessor::operator()(const std::string &str)
+{
+ Xapian::Query qry;
+
+ if (str.at(0) != '{') { // thread:$MSGID (no `{'/`}' encasement)
+ qry = Xapian::Query("Q" + str);
+ } else if (str.size() <= 1 || str.at(str.size() - 1) != '}') {
+ throw Xapian::QueryParserError("missing } in '" + str + "'");
+ } else { // thread:"{hello world}"
+ std::string qstr = str.substr(1, str.size() - 2);
+ qry = cur_srch->qp->parse_query(qstr, cur_srch->qp_flags);
+ }
+ return qry_xpand_col(qry, THREADID);
+}
}
};
+my $thr = create_inbox 'thr', indexlevel => 'medium', version => 2,
+ tmpdir => "$tmp/thr", sub {
+ my ($im) = @_;
+ my $common = <<EOM;
+From: <BOFH\@YHBT.net>
+To: meta\@public-inbox.org
+Date: Mon, 1 Apr 2019 08:15:21 +0000
+EOM
+ $im->add(PublicInbox::Eml->new(<<EOM));
+${common}Subject: root message
+Message-ID: <thread-root\@example>
+
+hi
+EOM
+ my @t = qw(wildfires earthquake flood asteroid drought plague);
+ my $nr = 0;
+ for my $x (@t) {
+ ++$nr;
+ $im->add(PublicInbox::Eml->new(<<EOM)) or xbail;
+${common}Subject: Re: root reply
+References: <thread-root\@example>
+Message-ID: <thread-hit-$nr\@example>
+
+$x
+EOM
+ $im->add(PublicInbox::Eml->new(<<EOM)) or xbail;
+${common}Subject: broken thread from $x
+References: <ghost-root\@example>
+Message-ID: <thread-miss-$nr\@example>
+
+$x
+EOM
+ }
+};
+
my @ibx_idx = glob("$v2->{inboxdir}/xap*/?");
my @ibx_shard_args = map { ('-d', $_) } @ibx_idx;
my (@int) = glob("$crepo/public-inbox-cindex/cidx*/?");
is $nr_out, scalar(@oids), "output count matches $xhc->{impl}" or
diag explain(\@res, \@err);
+ SKIP: {
+ $xhc->{impl} =~ /cxx/i or
+ skip "`thread:' field processor requires C++", 1;
+ require PublicInbox::XhcMset;
+ my $over = $thr->over;
+ my @thr_idx = glob("$thr->{inboxdir}/xap*/?");
+ my @thr_shard_args = map { ('-d', $_) } @thr_idx;
+ my (@art, $mset, $err);
+ my $capture = sub { ($mset, $err) = @_ };
+ my $retrieve = sub {
+ my ($qstr) = @_;
+ $r = $xhc->mkreq(undef, 'mset', @thr_shard_args, $qstr);
+ PublicInbox::XhcMset->maybe_new($r, undef, $capture);
+ map { $over->get_art($_->get_docid) } $mset->items;
+ };
+ @art = $retrieve->('thread:thread-root@example wildfires');
+ is scalar(@art), 1, 'got 1 result';
+ is scalar(grep { $_->{mid} =~ /thread-miss/ } @art), 0,
+ 'no thread misses in result';
+ ok !$err, 'no error from thread:MSGID search';
+
+ @art = $retrieve->('thread:thread-root@example');
+ is scalar(@art), 7,
+ 'expected number of results for thread:MSGID';
+ is scalar(grep {
+ $_->{mid} eq 'thread-root@example' ||
+ $_->{references} =~ /<thread-root\@example>/
+ } @art),
+ scalar(@art),
+ 'got all matching results for thread:MSGID';
+
+ @art = $retrieve->('thread:"{ s:broken }"');
+ is scalar(@art), 6,
+ 'expected number of results for thread:"{ SUBQUERY }"';
+ is scalar(grep { $_->{subject} =~ /broken/ } @art),
+ scalar(@art),
+ 'expected matches for thread:"{ SUBQUERY }"';
+
+ my $nr = $ENV{TEST_LEAK_NR} or skip 'TEST_LEAK_NR unset', 1;
+ $ENV{VALGRIND} or diag
+"W: `VALGRIND=' unset w/ TEST_LEAK_NR (using -fsanitize?)";
+ for (1..$nr) {
+ $retrieve->('thread:thread-root@example wildfires');
+ $retrieve->('thread:"{ s:broken }" wildfires');
+ }
+ }
+
if ($ENV{TEST_XH_TIMEOUT}) {
diag 'testing timeouts...';
for my $j (qw(0 1)) {