sub _fd_constrained ($) {
my ($self) = @_;
$self->{-fd_constrained} //= do {
- my $soft;
- if (eval { require BSD::Resource; 1 }) {
- my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
- ($soft, undef) = BSD::Resource::getrlimit($NOFILE);
- } else {
- chomp($soft = `sh -c 'ulimit -n'`);
- }
+ my $soft = PublicInbox::Search::ulimit_n;
if (defined($soft)) {
# $want is an estimate
my $want = scalar(@{$self->{ibx_active}}) + 64;
#
# v1.6.0 adds BYTES, UID and THREADID values
SCHEMA_VERSION => 15,
+
+ # we may have up to 8 FDs per shard (depends on Xapian *shrug*)
+ SHARD_COST => 8,
};
use PublicInbox::Smsg;
}
}
+# not sure where best to put this...
+sub ulimit_n () {
+ my $n;
+ if (eval { require BSD::Resource; 1 }) {
+ my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
+ ($n, undef) = BSD::Resource::getrlimit($NOFILE);
+ } else {
+ require PublicInbox::Spawn;
+ $n = PublicInbox::Spawn::run_qx([qw(/bin/sh -c), 'ulimit -n']);
+ }
+ $n;
+}
+
1;
use Fcntl qw(LOCK_UN LOCK_EX);
use Carp qw(croak);
my $X = \%PublicInbox::Search::X;
-our (%SRCH, %WORKERS, $nworker, $workerset, $in);
+our (%SRCH, %WORKERS, $nworker, $workerset, $in, $SHARD_NFD, $MY_FD_MAX);
our $stderr = \*STDERR;
sub cmd_test_inspect {
my $key = "-d\0".join("\0-d\0", @$dirs);
$key .= "\0".join("\0", map { ('-Q', $_) } @{$req->{Q}}) if $req->{Q};
my $new;
- $req->{srch} = $SRCH{$key} //= do {
+ $req->{srch} = $SRCH{$key} // do {
$new = { qp_flags => $PublicInbox::Search::QP_FLAGS };
+ my $nfd = scalar(@$dirs) * PublicInbox::Search::SHARD_COST;
+ $SHARD_NFD += $nfd;
+ if ($SHARD_NFD > $MY_FD_MAX) {
+ $SHARD_NFD = $nfd;
+ %SRCH = ();
+ }
my $first = shift @$dirs;
my $slow_phrase = -f "$first/iamchert";
$new->{xdb} = $X->{Database}->new($first);
bless $new, $req->{c} ? 'PublicInbox::CodeSearch' :
'PublicInbox::Search';
$new->{qp} = $new->qparse_new;
- $new;
+ $SRCH{$key} = $new;
};
$req->{srch}->{xdb}->reopen unless $new;
$req->{Q} && !$req->{srch}->{qp_extra_done} and
my $c = getsockopt(local $in = \*STDIN, SOL_SOCKET, SO_TYPE);
unpack('i', $c) == SOCK_SEQPACKET or die 'stdin is not SOCK_SEQPACKET';
- local (%SRCH, %WORKERS);
+ local (%SRCH, %WORKERS, $SHARD_NFD, $MY_FD_MAX);
PublicInbox::Search::load_xapian();
$GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or
die 'bad args';
for (@PublicInbox::DS::UNBLOCKABLE, POSIX::SIGUSR1) {
$workerset->delset($_) or die "delset($_): $!";
}
+ $MY_FD_MAX = PublicInbox::Search::ulimit_n //
+ die "E: unable to get RLIMIT_NOFILE: $!";
+ warn "W: RLIMIT_NOFILE=$MY_FD_MAX too low\n" if $MY_FD_MAX < 72;
+ $MY_FD_MAX -= 64;
local $nworker = $opt->{j};
return recv_loop() if $nworker == 0;
$ldflags .= ' -Wl,--compress-debug-sections=zlib' if $^O ne 'openbsd';
my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -pipe') . ' ' .
' -DTHREADID=' . PublicInbox::Search::THREADID .
+ ' -DSHARD_COST=' . PublicInbox::Search::SHARD_COST .
' -DXH_SPEC="'.join('',
map { s/=.*/:/; $_ } @PublicInbox::Search::XH_SPEC) . '" ' .
($ENV{LDFLAGS} // $ldflags);
KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *,
srch_hash, srch_eq)
static srch_set *srch_cache;
+static long my_fd_max, shard_nfd;
// sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET
static volatile int sock_fd = STDIN_FILENO;
static sigset_t fullset, workerset;
return false;
}
+static void srch_free(struct srch *srch)
+{
+ delete srch->qp;
+ delete srch->db;
+ free(srch);
+}
+
+static void srch_cache_renew(struct srch *keep)
+{
+ khint_t k;
+
+ // can't delete while iterating, so just free each + clear
+ for (k = kh_begin(srch_cache); k != kh_end(srch_cache); k++) {
+ if (!kh_exist(srch_cache, k)) continue;
+ struct srch *cur = kh_key(srch_cache, k);
+
+ if (cur != keep)
+ srch_free(cur);
+ }
+ srch_set_cs_clear(srch_cache);
+ if (keep) {
+ int absent;
+ k = srch_set_put(srch_cache, keep, &absent);
+ assert(absent);
+ assert(k < kh_end(srch_cache));
+ }
+}
+
static bool srch_init(struct req *req)
{
int i;
Xapian::QueryParser::FLAG_WILDCARD;
if (is_chert(req->dirv[0]))
srch->qp_flags &= ~FLAG_PHRASE;
+ long nfd = req->dirc * SHARD_COST;
+
+ shard_nfd += nfd;
+ if (shard_nfd > my_fd_max) {
+ srch_cache_renew(srch);
+ shard_nfd = nfd;
+ }
try {
srch->db = new Xapian::Database(req->dirv[0]);
} catch (...) {
req->srch->qp_extra_done = true;
}
-static void srch_free(struct srch *srch)
-{
- delete srch->qp;
- delete srch->db;
- free(srch);
-}
-
static void dispatch(struct req *req)
{
int c;
cleanup_pids();
if (!srch_cache)
return;
-
- khint_t k;
- for (k = kh_begin(srch_cache); k != kh_end(srch_cache); k++) {
- if (kh_exist(srch_cache, k))
- srch_free(kh_key(srch_cache, k));
- }
+ srch_cache_renew(NULL);
srch_set_destroy(srch_cache);
srch_cache = NULL;
}
socklen_t slen = (socklen_t)sizeof(c);
stdout_path = getenv("STDOUT_PATH");
stderr_path = getenv("STDERR_PATH");
+ struct rlimit rl;
if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen))
err(EXIT_FAILURE, "getsockopt");
if (c != SOCK_SEQPACKET)
errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET");
+ if (getrlimit(RLIMIT_NOFILE, &rl))
+ err(EXIT_FAILURE, "getrlimit");
+ my_fd_max = rl.rlim_cur;
+ if (my_fd_max < 72)
+ warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max);
+ my_fd_max -= 64;
+
mail_nrp_init();
code_nrp_init();
srch_cache = srch_set_init();
}
}
+SKIP: {
+ my $nr = $ENV{TEST_XH_FDMAX} or
+ skip 'TEST_XH_FDMAX unset', 1;
+ my @xhc = map {
+ local $ENV{PI_NO_CXX} = $_;
+ PublicInbox::XapClient::start_helper('-j0');
+ } @NO_CXX;
+ my $n = 1;
+ my $exp;
+ for (0..(PublicInbox::Search::ulimit_n() * $nr)) {
+ for my $xhc (@xhc) {
+ my $r = $xhc->mkreq([], qw(mset -Q), "tst$n=XTST$n",
+ @ibx_shard_args, qw(rt:0..));
+ chomp(my @res = readline($r));
+ $exp //= $res[0];
+ $exp eq $res[0] or
+ is $exp, $res[0], "mset mismatch on n=$n";
+ ++$n;
+ }
+ }
+ ok $exp, "got expected entries ($n)";
+}
+
done_testing;