From: Eric Wong Date: Sun, 19 May 2024 21:55:06 +0000 (+0000) Subject: xap_helper: expire DB handles when FD table is near full X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cc970c759776fb9a4705af6a61ce9f65ca48c07b;p=thirdparty%2Fpublic-inbox.git xap_helper: expire DB handles when FD table is near full For long-lived daemons across config reloads, we shouldn't keep Xapian DBs open forever under FD pressure. So estimate the number of FDs we need per-shard and start clearing some out if we have too many open. While we're at it, hoist out our ulimit_n helper and share it across extindex and the Perl XapHelper implementation. --- diff --git a/lib/PublicInbox/ExtSearchIdx.pm b/lib/PublicInbox/ExtSearchIdx.pm index 883dbea3d..934197c05 100644 --- a/lib/PublicInbox/ExtSearchIdx.pm +++ b/lib/PublicInbox/ExtSearchIdx.pm @@ -543,13 +543,7 @@ sub _ibx_for ($$$) { sub _fd_constrained ($) { my ($self) = @_; $self->{-fd_constrained} //= do { - my $soft; - if (eval { require BSD::Resource; 1 }) { - my $NOFILE = BSD::Resource::RLIMIT_NOFILE(); - ($soft, undef) = BSD::Resource::getrlimit($NOFILE); - } else { - chomp($soft = `sh -c 'ulimit -n'`); - } + my $soft = PublicInbox::Search::ulimit_n; if (defined($soft)) { # $want is an estimate my $want = scalar(@{$self->{ibx_active}}) + 64; diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index e5c5d6abb..25ef49c5e 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -54,6 +54,9 @@ use constant { # # v1.6.0 adds BYTES, UID and THREADID values SCHEMA_VERSION => 15, + + # we may have up to 8 FDs per shard (depends on Xapian *shrug*) + SHARD_COST => 8, }; use PublicInbox::Smsg; @@ -729,4 +732,17 @@ sub get_doc ($$) { } } +# not sure where best to put this... +sub ulimit_n () { + my $n; + if (eval { require BSD::Resource; 1 }) { + my $NOFILE = BSD::Resource::RLIMIT_NOFILE(); + ($n, undef) = BSD::Resource::getrlimit($NOFILE); + } else { + require PublicInbox::Spawn; + $n = PublicInbox::Spawn::run_qx([qw(/bin/sh -c), 'ulimit -n']); + } + $n; +} + 1; diff --git a/lib/PublicInbox/XapHelper.pm b/lib/PublicInbox/XapHelper.pm index f1311bd4b..db9e99ae2 100644 --- a/lib/PublicInbox/XapHelper.pm +++ b/lib/PublicInbox/XapHelper.pm @@ -18,7 +18,7 @@ use POSIX qw(:signal_h); use Fcntl qw(LOCK_UN LOCK_EX); use Carp qw(croak); my $X = \%PublicInbox::Search::X; -our (%SRCH, %WORKERS, $nworker, $workerset, $in); +our (%SRCH, %WORKERS, $nworker, $workerset, $in, $SHARD_NFD, $MY_FD_MAX); our $stderr = \*STDERR; sub cmd_test_inspect { @@ -193,8 +193,14 @@ sub dispatch { my $key = "-d\0".join("\0-d\0", @$dirs); $key .= "\0".join("\0", map { ('-Q', $_) } @{$req->{Q}}) if $req->{Q}; my $new; - $req->{srch} = $SRCH{$key} //= do { + $req->{srch} = $SRCH{$key} // do { $new = { qp_flags => $PublicInbox::Search::QP_FLAGS }; + my $nfd = scalar(@$dirs) * PublicInbox::Search::SHARD_COST; + $SHARD_NFD += $nfd; + if ($SHARD_NFD > $MY_FD_MAX) { + $SHARD_NFD = $nfd; + %SRCH = (); + } my $first = shift @$dirs; my $slow_phrase = -f "$first/iamchert"; $new->{xdb} = $X->{Database}->new($first); @@ -207,7 +213,7 @@ sub dispatch { bless $new, $req->{c} ? 'PublicInbox::CodeSearch' : 'PublicInbox::Search'; $new->{qp} = $new->qparse_new; - $new; + $SRCH{$key} = $new; }; $req->{srch}->{xdb}->reopen unless $new; $req->{Q} && !$req->{srch}->{qp_extra_done} and @@ -305,7 +311,7 @@ sub start (@) { my $c = getsockopt(local $in = \*STDIN, SOL_SOCKET, SO_TYPE); unpack('i', $c) == SOCK_SEQPACKET or die 'stdin is not SOCK_SEQPACKET'; - local (%SRCH, %WORKERS); + local (%SRCH, %WORKERS, $SHARD_NFD, $MY_FD_MAX); PublicInbox::Search::load_xapian(); $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or die 'bad args'; @@ -314,6 +320,10 @@ sub start (@) { for (@PublicInbox::DS::UNBLOCKABLE, POSIX::SIGUSR1) { $workerset->delset($_) or die "delset($_): $!"; } + $MY_FD_MAX = PublicInbox::Search::ulimit_n // + die "E: unable to get RLIMIT_NOFILE: $!"; + warn "W: RLIMIT_NOFILE=$MY_FD_MAX too low\n" if $MY_FD_MAX < 72; + $MY_FD_MAX -= 64; local $nworker = $opt->{j}; return recv_loop() if $nworker == 0; diff --git a/lib/PublicInbox/XapHelperCxx.pm b/lib/PublicInbox/XapHelperCxx.pm index 74852ad19..922bd583b 100644 --- a/lib/PublicInbox/XapHelperCxx.pm +++ b/lib/PublicInbox/XapHelperCxx.pm @@ -34,6 +34,7 @@ my $ldflags = '-Wl,-O1'; $ldflags .= ' -Wl,--compress-debug-sections=zlib' if $^O ne 'openbsd'; my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -pipe') . ' ' . ' -DTHREADID=' . PublicInbox::Search::THREADID . + ' -DSHARD_COST=' . PublicInbox::Search::SHARD_COST . ' -DXH_SPEC="'.join('', map { s/=.*/:/; $_ } @PublicInbox::Search::XH_SPEC) . '" ' . ($ENV{LDFLAGS} // $ldflags); diff --git a/lib/PublicInbox/xap_helper.h b/lib/PublicInbox/xap_helper.h index 44e0d63e0..c71ac06d2 100644 --- a/lib/PublicInbox/xap_helper.h +++ b/lib/PublicInbox/xap_helper.h @@ -140,6 +140,7 @@ static int srch_eq(const struct srch *a, const struct srch *b) KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *, srch_hash, srch_eq) static srch_set *srch_cache; +static long my_fd_max, shard_nfd; // sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET static volatile int sock_fd = STDIN_FILENO; static sigset_t fullset, workerset; @@ -552,6 +553,34 @@ static bool is_chert(const char *dir) return false; } +static void srch_free(struct srch *srch) +{ + delete srch->qp; + delete srch->db; + free(srch); +} + +static void srch_cache_renew(struct srch *keep) +{ + khint_t k; + + // can't delete while iterating, so just free each + clear + for (k = kh_begin(srch_cache); k != kh_end(srch_cache); k++) { + if (!kh_exist(srch_cache, k)) continue; + struct srch *cur = kh_key(srch_cache, k); + + if (cur != keep) + srch_free(cur); + } + srch_set_cs_clear(srch_cache); + if (keep) { + int absent; + k = srch_set_put(srch_cache, keep, &absent); + assert(absent); + assert(k < kh_end(srch_cache)); + } +} + static bool srch_init(struct req *req) { int i; @@ -563,6 +592,13 @@ static bool srch_init(struct req *req) Xapian::QueryParser::FLAG_WILDCARD; if (is_chert(req->dirv[0])) srch->qp_flags &= ~FLAG_PHRASE; + long nfd = req->dirc * SHARD_COST; + + shard_nfd += nfd; + if (shard_nfd > my_fd_max) { + srch_cache_renew(srch); + shard_nfd = nfd; + } try { srch->db = new Xapian::Database(req->dirv[0]); } catch (...) { @@ -629,13 +665,6 @@ static void srch_init_extra(struct req *req) req->srch->qp_extra_done = true; } -static void srch_free(struct srch *srch) -{ - delete srch->qp; - delete srch->db; - free(srch); -} - static void dispatch(struct req *req) { int c; @@ -900,12 +929,7 @@ static void cleanup_all(void) cleanup_pids(); if (!srch_cache) return; - - khint_t k; - for (k = kh_begin(srch_cache); k != kh_end(srch_cache); k++) { - if (kh_exist(srch_cache, k)) - srch_free(kh_key(srch_cache, k)); - } + srch_cache_renew(NULL); srch_set_destroy(srch_cache); srch_cache = NULL; } @@ -1041,12 +1065,20 @@ int main(int argc, char *argv[]) socklen_t slen = (socklen_t)sizeof(c); stdout_path = getenv("STDOUT_PATH"); stderr_path = getenv("STDERR_PATH"); + struct rlimit rl; if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen)) err(EXIT_FAILURE, "getsockopt"); if (c != SOCK_SEQPACKET) errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET"); + if (getrlimit(RLIMIT_NOFILE, &rl)) + err(EXIT_FAILURE, "getrlimit"); + my_fd_max = rl.rlim_cur; + if (my_fd_max < 72) + warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max); + my_fd_max -= 64; + mail_nrp_init(); code_nrp_init(); srch_cache = srch_set_init(); diff --git a/t/xap_helper.t b/t/xap_helper.t index 78be8539c..b0fa75a2e 100644 --- a/t/xap_helper.t +++ b/t/xap_helper.t @@ -284,4 +284,27 @@ for my $n (@NO_CXX) { } } +SKIP: { + my $nr = $ENV{TEST_XH_FDMAX} or + skip 'TEST_XH_FDMAX unset', 1; + my @xhc = map { + local $ENV{PI_NO_CXX} = $_; + PublicInbox::XapClient::start_helper('-j0'); + } @NO_CXX; + my $n = 1; + my $exp; + for (0..(PublicInbox::Search::ulimit_n() * $nr)) { + for my $xhc (@xhc) { + my $r = $xhc->mkreq([], qw(mset -Q), "tst$n=XTST$n", + @ibx_shard_args, qw(rt:0..)); + chomp(my @res = readline($r)); + $exp //= $res[0]; + $exp eq $res[0] or + is $exp, $res[0], "mset mismatch on n=$n"; + ++$n; + } + } + ok $exp, "got expected entries ($n)"; +} + done_testing;