lib/PublicInbox/AutoReap.pm
lib/PublicInbox/Cgit.pm
lib/PublicInbox/CidxComm.pm
-lib/PublicInbox/CidxDumpShardRoots.pm
lib/PublicInbox/CidxLogP.pm
lib/PublicInbox/CidxRecvIbx.pm
lib/PublicInbox/CidxXapHelperAux.pm
+++ /dev/null
-# Copyright (C) all contributors <meta@public-inbox.org>
-# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
-
-# Intended for PublicInbox::DS::event_loop for -cindex --associate
-# Iterating through mset->items is slow in Perl due to method dispatch
-# and that loop may implemented in C++ using Xapian directly
-package PublicInbox::CidxDumpShardRoots;
-use v5.12;
-use PublicInbox::Lock;
-use PublicInbox::Search qw(xap_terms);
-use Socket qw(MSG_EOR);
-
-sub start {
- my ($cidx, $root2id, $qry_str) = @_;
- my $op_p = delete($cidx->{0}) // die 'BUG: no {0} op_p';
- my $sort_w = delete($cidx->{1}) // die 'BUG: no {1} $w sort pipe';
- # sort lock is necessary if we have may root ids which cause a
- # row length to exceed POSIX PIPE_BUF (via `$G' below)
- my $sort_lk = bless { lock_path => $cidx->tmpdir.'/to_root_id.lock' },
- 'PublicInbox::Lock';
- $sort_w->autoflush(1);
- $cidx->begin_txn_lazy; # only using txn to simplify writer subs
- my $opt = { limit => $cidx->assoc_max_init, relevance => -2 };
- my $self = bless {
- cidx => $cidx,
- op_p => $op_p,
- iter => 0,
- mset => $cidx->mset($qry_str, $opt),
- root2id => $root2id,
- sort_w => $sort_w,
- sort_lk => $sort_lk,
- }, __PACKAGE__;
- event_step($self);
-}
-
-sub event_step {
- my ($self) = @_;
- my $cidx = $self->{cidx};
- return if $cidx->do_quit;
- my $last = $self->{mset}->size - 1;
- my $cur = $self->{iter};
- my $end = $cur + 9999;
- $end = $last if $end > $last;
- $self->{iter} = $end + 1;
- local $0 = "dumping shard [$cidx->{shard}] $cur..$end";
- $cidx->progress($0);
-
- my $root2id = $self->{root2id};
- my $buf = '';
- for my $x (($self->{mset}->items)[$cur..$end]) { # FIXME: slow loop
- my $doc = $x->get_document;
- my $G = join(' ', map {
- $root2id->{pack('H*', $_)};
- } xap_terms('G', $doc));
- for my $p (@{$cidx->{ASSOC_PFX}}) {
- $buf .= "$_ $G\n" for (xap_terms($p, $doc));
- }
- }
- $self->{sort_lk}->lock_acquire_fast;
- print { $self->{sort_w} } $buf or die "print: $!";
- $self->{sort_lk}->lock_release_fast;
- $end < $last && !$cidx->do_quit and
- PublicInbox::DS::requeue($self);
-}
-
-sub DESTROY {
- my ($self) = @_;
- return if $self->{cidx}->do_quit;
- send($self->{op_p},
- "dump_shard_roots_done $self->{cidx}->{shard}", MSG_EOR);
-}
-
-1;
# rpipe connects to req->fp[1] in xap_helper.h
sub new {
- my ($cls, $rpipe, $cidx, $pfx, $associate) = @_;
- my $self = bless {
- cidx => $cidx,
- pfx => $pfx,
- associate => $associate
- }, $cls;
+ my ($cls, $rpipe, $cidx, $pfx) = @_;
+ my $self = bless { cidx => $cidx, pfx => $pfx }, $cls;
$rpipe->blocking(0);
$self->SUPER::new($rpipe, EPOLLIN);
}
my @lines = split(/^/m, $buf);
$self->{buf} = pop @lines if substr($lines[-1], -1) ne "\n";
for my $l (@lines) {
- if ($l =~ /\Amset\.size=[0-9]+\n\z/) {
+ if ($l =~ /\Amset\.size=[0-9]+ nr_out=[0-9]+\n\z/) {
delete $self->{cidx}->{PENDING}->{$pfx};
$self->{cidx}->index_next;
}
@AWK, @COMM, @SORT, # awk(1), comm(1), sort(1) commands
@ASSOC_PFX, # any combination of XDFID, XDFPRE, XDFPOST
$QRY_STR, # common query string for both code and inbox associations
- @DUMP_SHARD_ROOTS_OK, # for associate
$DUMP_IBX_WPIPE, # goes to sort(1)
@ID2ROOT,
);
send($op_p, "shard_done $self->{shard}", MSG_EOR);
}
-sub dump_shard_roots_done { # via PktOp on dump_shard_roots completion
- my ($self, $associate, $n) = @_;
- return if $DO_QUIT;
- progress($self, "dump_shard_roots [$n] done");
- $DUMP_SHARD_ROOTS_OK[$n] = 1;
- # may run associate()
-}
-
sub assoc_max_init ($) {
my ($self) = @_;
my $max = $self->{-opt}->{'associate-max'} // $ASSOC_MAX;
$max < 0 ? ((2 ** 31) - 1) : $max;
}
-# dump the patchids of each shard: $XDFID $ROOT1 $ROOT2..
-sub dump_shard_roots { # via wq_io_do for associate
- my ($self, $root2id, $qry_str) = @_;
- PublicInbox::CidxDumpShardRoots::start($self, $root2id, $qry_str);
-}
-
sub dump_roots_once {
my ($self, $associate) = @_;
$associate // die 'BUG: no $associate';
$TODO{associating} = 1; # keep shards_active() happy
progress($self, 'dumping IDs from coderepos');
local $self->{xdb};
- @ID2ROOT = map { pack('H*', $_) } $self->all_terms('G');
- my $id = 0;
- my %root2id = map { $_ => $id++ } @ID2ROOT;
- # dump_shard_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
+ @ID2ROOT = $self->all_terms('G');
+ my $root2id = "$TMPDIR/root2id";
+ open my $fh, '>', $root2id or die "open($root2id): $!";
+ my $nr = -1;
+ for (@ID2ROOT) { print $fh $_, "\0", ++$nr, "\0" } # mmap-friendly
+ close $fh or die "close: $!";
+ # dump_roots | sort -k1,1 | OFS=' ' uniq_fold >to_root_id
pipe(my ($sort_r, $sort_w)) or die "pipe: $!";
pipe(my ($fold_r, $fold_w)) or die "pipe: $!";
my @sort = (@SORT, '-k1,1');
my $dst = "$TMPDIR/to_root_id";
- open my $fh, '>', $dst or die "open($dst): $!";
+ open $fh, '>', $dst or die "open($dst): $!";
my $env = { %$CMD_ENV, OFS => ' ' };
my $sort_pid = spawn(\@sort, $CMD_ENV, { 0 => $sort_r, 1 => $fold_w });
my $fold_pid = spawn(\@UNIQ_FOLD, $env, { 0 => $fold_r, 1 => $fh });
awaitpid($sort_pid, \&cmd_done, \@sort, $associate);
awaitpid($fold_pid, \&cmd_done, [@UNIQ_FOLD, '(shards)'], $associate);
- my ($c, $p) = PublicInbox::PktOp->pair;
- $c->{ops}->{dump_shard_roots_done} = [ $self, $associate ];
- my @arg = ('dump_shard_roots', [ $p->{op_p}, $sort_w ],
- \%root2id, $QRY_STR);
- $_->wq_io_do(@arg) for @IDX_SHARDS;
- progress($self, 'waiting on dump_shard_roots sort');
+ my @arg = ((map { ('-A', $_) } @ASSOC_PFX), '-c',
+ '-m', assoc_max_init($self), $root2id, $QRY_STR);
+ for my $d ($self->shard_dirs) {
+ pipe(my ($err_r, $err_w)) or die "pipe: $!";
+ $XHC->mkreq([$sort_w, $err_w], qw(dump_roots -d), $d, @arg);
+ my $desc = "dump_roots $d";
+ $self->{PENDING}->{$desc} = $associate;
+ PublicInbox::CidxXapHelperAux->new($err_r, $self, $desc);
+ }
+ progress($self, 'waiting on dump_roots sort');
}
sub dump_ibx { # sends to xap_helper.h
pipe(my ($r, $w)) or die "pipe: $!";
$XHC->mkreq([$DUMP_IBX_WPIPE, $w], @cmd);
my $ekey = $ibx->eidx_key;
- $self->{PENDING}->{$ekey} = undef;
- PublicInbox::CidxXapHelperAux->new($r, $self, $ekey, $TODO{associate});
+ $self->{PENDING}->{$ekey} = $TODO{associate};
+ PublicInbox::CidxXapHelperAux->new($r, $self, $ekey);
}
sub dump_ibx_start {
my ($self) = @_;
return if $DO_QUIT;
@IDX_SHARDS or return warn("# aborting on no shards\n");
- grep(defined, @DUMP_SHARD_ROOTS_OK) == @IDX_SHARDS or
- die "E: shards not dumped properly\n";
+ unlink("$TMPDIR/root2id");
my @pending = keys %{$self->{PENDING}};
die "E: pending=@pending jobs not done\n" if @pending;
progress($self, 'associating...');
my $nr = $score{$k};
my ($ibx_id, $root) = split(/ /, $k);
my $ekey = $IBX[$ibx_id]->eidx_key;
- $root = unpack('H*', $ID2ROOT[$root]);
+ $root = $ID2ROOT[$root];
progress($self, "$ekey => $root has $nr matches");
}
delete $TODO{associating}; # break out of shards_active()
sub init_associate_prefork ($) {
my ($self) = @_;
return unless $self->{-opt}->{associate};
- require PublicInbox::CidxDumpShardRoots;
require PublicInbox::CidxXapHelperAux;
require PublicInbox::XapClient;
$self->{-pi_cfg} = PublicInbox::Config->new;
local ($DO_QUIT, $REINDEX, $TXN_BYTES, @GIT_DIR_GONE, @PRUNE_QUEUE,
$REPO_CTX, %ALT_FH, $TMPDIR, @AWK, @COMM, $CMD_ENV,
%TODO, @IBXQ, @IBX, @JOIN, @ASSOC_PFX, $DUMP_IBX_WPIPE,
- @ID2ROOT, @DUMP_SHARD_ROOTS_OK, $XH_PID, $XHC, @SORT);
+ @ID2ROOT, $XH_PID, $XHC, @SORT);
local $BATCH_BYTES = $self->{-opt}->{batch_size} //
$PublicInbox::SearchIdx::BATCH_BYTES;
local $self->{ASSOC_PFX} = \@ASSOC_PFX;
our $GLP = Getopt::Long::Parser->new;
$GLP->configure(qw(require_order bundling no_ignore_case no_auto_abbrev));
use PublicInbox::Search qw(xap_terms);
+use PublicInbox::CodeSearch;
use PublicInbox::IPC;
+use Fcntl qw(LOCK_UN LOCK_EX);
my $X = \%PublicInbox::Search::X;
our (%SRCH, %PIDS, $parent_pid);
our $stderr = \*STDERR;
my $mset = $req->{srch}->mset($qry_str, $opt);
my $out = $req->{0};
$out->autoflush(1);
+ my $nr = 0;
for my $it ($mset->items) {
my $doc = $it->get_document;
for my $p (@pfx) {
for (xap_terms($p, $doc)) {
print $out "$_ $ibx_id\n" or die "print: $!";
+ ++$nr;
}
}
}
- if (my $err = $req->{1}) { say $err 'mset.size=', $mset->size }
+ if (my $err = $req->{1}) {
+ say $err 'mset.size='.$mset->size.' nr_out='.$nr
+ }
+}
+
+sub cmd_dump_roots {
+ my ($req, $root2id_file, $qry_str) = @_;
+ $qry_str // return
+ warn('usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR');
+ my @pfx = @{$req->{A}} or return warn('dump_roots requires -A PREFIX');
+ open my $fh, '<', $root2id_file or die "open($root2id_file): $!";
+ my %root2id; # record format: $OIDHEX "\0" uint32_t
+ my @x = split(/\0/, do { local $/; <$fh> } // die "readline: $!");
+ while (@x) {
+ my $oidhex = shift @x;
+ $root2id{$oidhex} = shift @x;
+ }
+ my $opt = { relevance => -1, limit => $req->{'m'},
+ offset => $req->{o} // 0 };
+ my $mset = $req->{srch}->mset($qry_str, $opt);
+ $req->{0}->autoflush(1);
+ my $buf = '';
+ my $nr = 0;
+ for my $it ($mset->items) {
+ my $doc = $it->get_document;
+ my $G = join(' ', map { $root2id{$_} } xap_terms('G', $doc));
+ for my $p (@pfx) {
+ for (xap_terms($p, $doc)) {
+ $buf .= "$_ $G\n";
+ ++$nr;
+ }
+ }
+ if (!($nr & 0x3fff)) {
+ flock($fh, LOCK_EX) or die "flock: $!";
+ print { $req->{0} } $buf or die "print: $!";
+ flock($fh, LOCK_UN) or die "flock: $!";
+ $buf = '';
+ }
+ }
+ if ($buf ne '') {
+ flock($fh, LOCK_EX) or die "flock: $!";
+ print { $req->{0} } $buf or die "print: $!";
+ flock($fh, LOCK_UN) or die "flock: $!";
+ }
+ if (my $err = $req->{1}) {
+ say $err 'mset.size='.$mset->size.' nr_out='.$nr
+ }
}
sub dispatch {
* this is not linked to Perl in any way.
* C (not C++) is used as much as possible to lower the contribution
* barrier for hackers who mainly know C (this includes the maintainer).
+ * Yes, that means we use C stdlib stuff like hsearch and open_memstream
+ * instead their equivalents in the C++ stdlib :P
* Everything here is an unstable internal API of public-inbox and
* NOT intended for ordinary users; only public-inbox hackers
*/
#ifndef _ALL_SOURCE
# define _ALL_SOURCE
#endif
+#include <sys/file.h>
+#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/stat.h>
unsigned long long max;
unsigned long long off;
unsigned long timeout_sec;
+ size_t nr_out;
long sort_col; // value column, negative means BoolWeight
int argc;
int pfxc;
unsigned nr;
};
+#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
+static size_t split2argv(char **dst, char *buf, size_t len, size_t limit)
+{
+ if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
+ warnx("bogus argument given");
+ return 0;
+ }
+ size_t nr = 0;
+ char *c = buf;
+ for (size_t i = 1; i < len; i++) {
+ if (!buf[i]) {
+ dst[nr++] = c;
+ c = buf + i + 1;
+ }
+ if (nr == limit) {
+ warnx("too many args: %zu", nr);
+ return 0;
+ }
+ }
+ return (long)nr;
+}
+
static bool has_threadid(const struct srch *srch)
{
return srch->db->get_metadata("has_threadid") == "1";
static Xapian::MSet enquire_mset(struct req *req, Xapian::Enquire *enq)
{
- if (!req->max)
- req->max = 50;
+ if (!req->max) {
+ switch (sizeof(Xapian::doccount)) {
+ case 4: req->max = UINT_MAX; break;
+ default: req->max = ULLONG_MAX;
+ }
+ }
for (int i = 0; i < 9; i++) {
try {
Xapian::MSet mset = enq->get_mset(req->off, req->max);
return enq->get_mset(req->off, req->max);
}
+// for v1, v2, and extindex
static Xapian::MSet mail_mset(struct req *req, const char *qry_str)
{
struct srch *srch = req->srch;
Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
if (req->Oeidx_key) {
req->Oeidx_key[0] = 'O'; // modifies static rbuf
- fprintf(stderr, "dbg eidxkey:%s>\n", req->Oeidx_key);
qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
Xapian::Query(req->Oeidx_key));
}
return enquire_mset(req, &enq);
}
+// for cindex
+static Xapian::MSet commit_mset(struct req *req, const char *qry_str)
+{
+ struct srch *srch = req->srch;
+ Xapian::Query qry = srch->qp->parse_query(qry_str, srch->qp_flags);
+ // TODO: git_dir + roots_filter
+
+ // we only want commits:
+ qry = Xapian::Query(Xapian::Query::OP_FILTER, qry,
+ Xapian::Query("T" "c"));
+ Xapian::Enquire enq = prep_enquire(req);
+ enq.set_query(qry);
+ return enquire_mset(req, &enq);
+}
+
static bool starts_with(const std::string *s, const char *pfx, size_t pfx_len)
{
return s->size() >= pfx_len && !memcmp(pfx, s->c_str(), pfx_len);
for (cur.skip_to(pfx); cur != end; cur++) {
std::string tn = *cur;
- if (starts_with(&tn, pfx, pfx_len))
+ if (starts_with(&tn, pfx, pfx_len)) {
fprintf(req->fp[0], "%s %s\n",
tn.c_str() + pfx_len, ibx_id);
+ ++req->nr_out;
+ }
}
}
}
req->asc = true;
req->sort_col = -1;
- req->max = (unsigned long long)req->srch->db->get_doccount();
Xapian::MSet mset = mail_mset(req, req->argv[optind + 1]);
for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
try {
}
}
if (req->fp[1])
- fprintf(req->fp[1], "mset.size=%llu\n",
- (unsigned long long)mset.size());
+ fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+ (unsigned long long)mset.size(), req->nr_out);
+ return true;
+}
+
+struct fbuf {
+ FILE *fp;
+ char *ptr;
+ size_t len;
+};
+
+struct dump_roots_tmp {
+ struct stat sb;
+ void *mm_ptr;
+ char **entries;
+ struct fbuf wbuf;
+ int root2id_fd;
+};
+
+#define CLEANUP_FBUF __attribute__((__cleanup__(fbuf_ensure)))
+static void fbuf_ensure(void *ptr)
+{
+ struct fbuf *fbuf = (struct fbuf *)ptr;
+ if (fbuf->fp && fclose(fbuf->fp))
+ perror("fclose(fbuf->fp)");
+ fbuf->fp = NULL;
+ free(fbuf->ptr);
+}
+
+static bool fbuf_init(struct fbuf *fbuf)
+{
+ assert(!fbuf->ptr);
+ fbuf->fp = open_memstream(&fbuf->ptr, &fbuf->len);
+ if (fbuf->fp) return true;
+ perror("open_memstream(fbuf)");
+ return false;
+}
+
+static void xclose(int fd)
+{
+ if (close(fd) < 0 && errno != EINTR)
+ err(EXIT_FAILURE, "BUG: close");
+}
+
+#define CLEANUP_DUMP_ROOTS __attribute__((__cleanup__(dump_roots_ensure)))
+static void dump_roots_ensure(void *ptr)
+{
+ struct dump_roots_tmp *drt = (struct dump_roots_tmp *)ptr;
+ if (drt->root2id_fd >= 0)
+ xclose(drt->root2id_fd);
+ hdestroy(); // idempotent
+ if (drt->mm_ptr && munmap(drt->mm_ptr, drt->sb.st_size))
+ err(EXIT_FAILURE, "BUG: munmap");
+ free(drt->entries);
+ fbuf_ensure(&drt->wbuf);
+}
+
+static bool root2ids_str(struct fbuf *root_ids, struct dump_roots_tmp *drt,
+ Xapian::Document *doc)
+{
+ if (!fbuf_init(root_ids)) return false;
+
+ bool ok = true;
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ ENTRY e, *ep;
+ for (cur.skip_to("G"); cur != end; cur++) {
+ std::string tn = *cur;
+ if (!starts_with(&tn, "G", 1))
+ continue;
+ union { const char *in; char *out; } u;
+ u.in = tn.c_str() + 1;
+ e.key = u.out;
+ ep = hsearch(e, FIND);
+ if (!ep) {
+ warnx("hsearch miss `%s'", e.key);
+ return false;
+ }
+ // ep->data is a NUL-terminated string matching /[0-9]+/
+ fputc(' ', root_ids->fp);
+ fputs((const char *)ep->data, root_ids->fp);
+ }
+ fputc('\n', root_ids->fp);
+ if (ferror(root_ids->fp) | fclose(root_ids->fp)) {
+ perror("ferror|fclose(root_ids)");
+ ok = false;
+ }
+ root_ids->fp = NULL;
+ return ok;
+}
+
+// writes term values matching @pfx for a given @doc, ending the line
+// with the contents of @root_ids
+static void dump_roots_term(struct req *req, const char *pfx,
+ struct dump_roots_tmp *drt,
+ struct fbuf *root_ids,
+ Xapian::Document *doc)
+{
+ Xapian::TermIterator cur = doc->termlist_begin();
+ Xapian::TermIterator end = doc->termlist_end();
+ size_t pfx_len = strlen(pfx);
+
+ for (cur.skip_to(pfx); cur != end; cur++) {
+ std::string tn = *cur;
+ if (!starts_with(&tn, pfx, pfx_len))
+ continue;
+ fputs(tn.c_str() + pfx_len, drt->wbuf.fp);
+ fwrite(root_ids->ptr, root_ids->len, 1, drt->wbuf.fp);
+ ++req->nr_out;
+ }
+}
+
+// we may have lines which exceed PIPE_BUF, so we do our own
+// buffering and rely on flock(2), here
+static bool dump_roots_flush(struct req *req, struct dump_roots_tmp *drt)
+{
+ char *p;
+ int fd = fileno(req->fp[0]);
+ bool ok = true;
+
+ if (!drt->wbuf.fp) return true;
+ if (fd < 0) err(EXIT_FAILURE, "BUG: fileno");
+ if (fclose(drt->wbuf.fp)) {
+ warn("fclose(drt->wbuf.fp)"); // malloc failure?
+ return false;
+ }
+ drt->wbuf.fp = NULL;
+ if (!drt->wbuf.len) goto done_free;
+ if (flock(drt->root2id_fd, LOCK_EX)) {
+ perror("LOCK_EX");
+ return false;
+ }
+ p = drt->wbuf.ptr;
+ do {
+ ssize_t n = write(fd, p, drt->wbuf.len);
+ if (n > 0) {
+ drt->wbuf.len -= n;
+ p += n;
+ } else {
+ perror(n ? "write" : "write (zero bytes)");
+ return false;
+ }
+ } while (drt->wbuf.len);
+ if (flock(drt->root2id_fd, LOCK_UN)) {
+ perror("LOCK_UN");
+ return false;
+ }
+done_free:
+ free(drt->wbuf.ptr);
+ drt->wbuf.ptr = NULL;
+ return ok;
+}
+
+static bool cmd_dump_roots(struct req *req)
+{
+ CLEANUP_DUMP_ROOTS struct dump_roots_tmp drt { .root2id_fd = -1 };
+ if ((optind + 1) >= req->argc) {
+ warnx("usage: dump_roots [OPTIONS] ROOT2ID_FILE QRY_STR");
+ return false; // need file + qry_str
+ }
+ if (!req->pfxc) {
+ warnx("dump_roots requires -A PREFIX");
+ return false;
+ }
+ const char *root2id_file = req->argv[optind];
+ drt.root2id_fd = open(root2id_file, O_RDONLY);
+ if (drt.root2id_fd < 0) {
+ warn("open(%s)", root2id_file);
+ return false;
+ }
+ if (fstat(drt.root2id_fd, &drt.sb)) {
+ warn("fstat(%s)", root2id_file);
+ return false;
+ }
+ // each entry is at least 43 bytes ({OIDHEX}\0{INT}\0),
+ // so /32 overestimates the number of expected entries by
+ // ~%25 (as recommended by Linux hcreate(3) manpage)
+ size_t est = (drt.sb.st_size / 32) + 1;
+ if ((uint64_t)drt.sb.st_size > (uint64_t)SIZE_MAX) {
+ warnx("%s size too big (%lld bytes > %zu)", root2id_file,
+ (long long)drt.sb.st_size, SIZE_MAX);
+ return false;
+ }
+ drt.mm_ptr = mmap(NULL, drt.sb.st_size, PROT_READ,
+ MAP_PRIVATE, drt.root2id_fd, 0);
+ if (drt.mm_ptr == MAP_FAILED) {
+ warn("mmap(%s)", root2id_file);
+ return false;
+ }
+ drt.entries = (char **)calloc(est * 2, sizeof(char *));
+ if (!drt.entries) {
+ warn("calloc(%zu * 2, %zu)", est, sizeof(char *));
+ return false;
+ }
+ size_t tot = split2argv(drt.entries, (char *)drt.mm_ptr,
+ drt.sb.st_size, est * 2);
+ if (tot <= 0) return false; // split2argv already warned on error
+ if (!hcreate(est)) {
+ warn("hcreate(%zu)", est);
+ return false;
+ }
+ for (size_t i = 0; i < tot; ) {
+ ENTRY e;
+ e.key = drt.entries[i++];
+ e.data = drt.entries[i++];
+ if (!hsearch(e, ENTER)) {
+ warn("hsearch(%s => %s, ENTER)", e.key,
+ (const char *)e.data);
+ return false;
+ }
+ }
+ req->asc = true;
+ req->sort_col = -1;
+ Xapian::MSet mset = commit_mset(req, req->argv[optind + 1]);
+ for (Xapian::MSetIterator i = mset.begin(); i != mset.end(); i++) {
+ CLEANUP_FBUF struct fbuf root_ids = { 0 };
+ if (!drt.wbuf.fp && !fbuf_init(&drt.wbuf))
+ return false;
+ try {
+ Xapian::Document doc = i.get_document();
+ if (!root2ids_str(&root_ids, &drt, &doc))
+ return false;
+ for (int p = 0; p < req->pfxc; p++)
+ dump_roots_term(req, req->pfxv[p], &drt,
+ &root_ids, &doc);
+ } catch (const Xapian::Error & e) {
+ fprintf(orig_err, "W: %s (#%ld)\n",
+ e.get_description().c_str(), (long)(*i));
+ continue;
+ }
+ if (!(req->nr_out & 0x3fff) && !dump_roots_flush(req, &drt))
+ return false;
+ }
+ if (!dump_roots_flush(req, &drt))
+ return false;
+ if (req->fp[1])
+ fprintf(req->fp[1], "mset.size=%llu nr_out=%zu\n",
+ (unsigned long long)mset.size(), req->nr_out);
return true;
}
cmd fn;
} cmds[] = { // should be small enough to not need bsearch || gperf
// most common commands first
- CMD(dump_ibx),
+ CMD(dump_ibx), // many inboxes
+ CMD(dump_roots), // per-cidx shard
CMD(test_inspect), // least common commands last
};
char pad[sizeof(struct cmsghdr) + 16 + RECV_FD_SPACE];
};
-static void xclose(int fd)
-{
- if (close(fd) < 0 && errno != EINTR)
- err(EXIT_FAILURE, "BUG: close");
-}
-
static bool recv_req(struct req *req, char *rbuf, size_t *len)
{
union my_cmsg cmsg = { 0 };
return false;
}
-#define SPLIT2ARGV(dst,buf,len) split2argv(dst,buf,len,MY_ARRAY_SIZE(dst))
-static int split2argv(char **dst, char *buf, size_t len, size_t limit)
-{
- if (buf[0] == 0 || len == 0 || buf[len - 1] != 0) {
- warnx("bogus argument given");
- return 0;
- }
- size_t nr = 0;
- char *c = buf;
- for (size_t i = 1; i < len; i++) {
- if (!buf[i]) {
- dst[nr++] = c;
- c = buf + i + 1;
- }
- if (nr == limit) {
- warnx("too many args: %zu", nr);
- return 0;
- }
- }
- return (int)nr;
-}
-
static int srch_cmp(const void *pa, const void *pb) // for tfind|tsearch
{
const struct srch *a = (const struct srch *)pa;
char *dirv[MY_ARG_MAX];
int i;
struct srch *srch = req->srch;
- int dirc = SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
+ int dirc = (int)SPLIT2ARGV(dirv, srch->paths, (size_t)srch->paths_len);
const unsigned FLAG_PHRASE = Xapian::QueryParser::FLAG_PHRASE;
srch->qp_flags = FLAG_PHRASE |
Xapian::QueryParser::FLAG_BOOLEAN |
perror("W: setlinebuf(req.fp[1])");
stderr = req.fp[1];
}
- req.argc = SPLIT2ARGV(req.argv, rbuf, len);
+ req.argc = (int)SPLIT2ARGV(req.argv, rbuf, len);
if (req.argc > 0)
dispatch(&req);
if (ferror(req.fp[0]) | fclose(req.fp[0]))
my $res = do { local $/; <$r> };
is(join('', @res), $res, 'got identical response w/ error pipe');
my $stats = do { local $/; <$err_rd> };
- is($stats, "mset.size=6\n", 'mset.size reported');
+ is($stats, "mset.size=6 nr_out=6\n", 'mset.size reported');
if ($arg[-1] !~ /\('-j0'\)/) {
kill('KILL', $cinfo{pid});
};
my $ar;
-$ar = $test->(qw[-MPublicInbox::XapHelper -e
- PublicInbox::XapHelper::start('-j0')]);
-$ar = $test->(qw[-MPublicInbox::XapHelper -e
- PublicInbox::XapHelper::start('-j1')]);
-
-my @NO_CXX = (0);
+my @NO_CXX;
+if (!$ENV{TEST_XH_CXX_ONLY}) {
+ $ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j0')]);
+ $ar = $test->(qw[-MPublicInbox::XapHelper -e
+ PublicInbox::XapHelper::start('-j1')]);
+ push @NO_CXX, 0;
+}
SKIP: {
eval {
require PublicInbox::XapHelperCxx;
PublicInbox::XapHelperCxx::start('-j1')]);
};
+require PublicInbox::CodeSearch;
+my $cs_int = PublicInbox::CodeSearch->new("$crepo/public-inbox-cindex");
+my $root2id_file = "$tmp/root2id";
+my @id2root;
+{
+ open my $fh, '>', $root2id_file;
+ my $i = -1;
+ for ($cs_int->all_terms('G')) {
+ print $fh $_, "\0", ++$i, "\0";
+ $id2root[$i] = $_;
+ }
+ close $fh;
+}
+
for my $n (@NO_CXX) {
local $ENV{PI_NO_CXX} = $n;
my ($xhc, $pid) = PublicInbox::XapClient::start_helper('-j0');
my $res = do { local $/; <$r> };
is($res, "$dfid 9\n$mid 9\n", "got expected result ($xhc->{impl})");
my $err = do { local $/; <$err_r> };
- is($err, "mset.size=1\n", "got expected status ($xhc->{impl})");
+ is($err, "mset.size=1 nr_out=2\n", "got expected status ($xhc->{impl})");
+
+ pipe($err_r, $err_w);
+ $r = $xhc->mkreq([ undef, $err_w ], qw(dump_roots -c -A XDFID),
+ (map { ('-d', $_) } @int),
+ $root2id_file, 'dt:19700101'.'000000..');
+ close $err_w;
+ my @res = <$r>;
+ is(scalar(@res), 5, 'got expected rows');
+ is(scalar(@res), scalar(grep(/\A[0-9a-f]{40,} [0-9]+\n\z/, @res)),
+ 'entries match format');
+ $err = do { local $/; <$err_r> };
+ is($err, "mset.size=6 nr_out=5\n", "got expected status ($xhc->{impl})");
}
done_testing;