]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
xap_helper: expire DB handles when FD table is near full
authorEric Wong <e@80x24.org>
Sun, 19 May 2024 21:55:06 +0000 (21:55 +0000)
committerEric Wong <e@80x24.org>
Mon, 20 May 2024 18:29:46 +0000 (18:29 +0000)
For long-lived daemons across config reloads, we shouldn't keep
Xapian DBs open forever under FD pressure.  So estimate the
number of FDs we need per-shard and start clearing some out
if we have too many open.

While we're at it, hoist out our ulimit_n helper and share it
across extindex and the Perl XapHelper implementation.

lib/PublicInbox/ExtSearchIdx.pm
lib/PublicInbox/Search.pm
lib/PublicInbox/XapHelper.pm
lib/PublicInbox/XapHelperCxx.pm
lib/PublicInbox/xap_helper.h
t/xap_helper.t

index 883dbea3d6cd580ad3857328be568102bc8efa20..934197c05bab9e1f5972a490f06c8efa3d4493e9 100644 (file)
@@ -543,13 +543,7 @@ sub _ibx_for ($$$) {
 sub _fd_constrained ($) {
        my ($self) = @_;
        $self->{-fd_constrained} //= do {
-               my $soft;
-               if (eval { require BSD::Resource; 1 }) {
-                       my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
-                       ($soft, undef) = BSD::Resource::getrlimit($NOFILE);
-               } else {
-                       chomp($soft = `sh -c 'ulimit -n'`);
-               }
+               my $soft = PublicInbox::Search::ulimit_n;
                if (defined($soft)) {
                        # $want is an estimate
                        my $want = scalar(@{$self->{ibx_active}}) + 64;
index e5c5d6abbf722cfbba54d5c2ae31dc2dc36de3cb..25ef49c5ee2aef58293a523bcccf4758b26e7a0f 100644 (file)
@@ -54,6 +54,9 @@ use constant {
        #
        #      v1.6.0 adds BYTES, UID and THREADID values
        SCHEMA_VERSION => 15,
+
+       # we may have up to 8 FDs per shard (depends on Xapian *shrug*)
+       SHARD_COST => 8,
 };
 
 use PublicInbox::Smsg;
@@ -729,4 +732,17 @@ sub get_doc ($$) {
        }
 }
 
+# not sure where best to put this...
+sub ulimit_n () {
+       my $n;
+       if (eval { require BSD::Resource; 1 }) {
+               my $NOFILE = BSD::Resource::RLIMIT_NOFILE();
+               ($n, undef) = BSD::Resource::getrlimit($NOFILE);
+       } else {
+               require PublicInbox::Spawn;
+               $n = PublicInbox::Spawn::run_qx([qw(/bin/sh -c), 'ulimit -n']);
+       }
+       $n;
+}
+
 1;
index f1311bd4b2f54b76044da91d43348fef5d3327f6..db9e99ae2257787048f004162a941705341c8861 100644 (file)
@@ -18,7 +18,7 @@ use POSIX qw(:signal_h);
 use Fcntl qw(LOCK_UN LOCK_EX);
 use Carp qw(croak);
 my $X = \%PublicInbox::Search::X;
-our (%SRCH, %WORKERS, $nworker, $workerset, $in);
+our (%SRCH, %WORKERS, $nworker, $workerset, $in, $SHARD_NFD, $MY_FD_MAX);
 our $stderr = \*STDERR;
 
 sub cmd_test_inspect {
@@ -193,8 +193,14 @@ sub dispatch {
        my $key = "-d\0".join("\0-d\0", @$dirs);
        $key .= "\0".join("\0", map { ('-Q', $_) } @{$req->{Q}}) if $req->{Q};
        my $new;
-       $req->{srch} = $SRCH{$key} //= do {
+       $req->{srch} = $SRCH{$key} // do {
                $new = { qp_flags => $PublicInbox::Search::QP_FLAGS };
+               my $nfd = scalar(@$dirs) * PublicInbox::Search::SHARD_COST;
+               $SHARD_NFD += $nfd;
+               if ($SHARD_NFD > $MY_FD_MAX) {
+                       $SHARD_NFD = $nfd;
+                       %SRCH = ();
+               }
                my $first = shift @$dirs;
                my $slow_phrase = -f "$first/iamchert";
                $new->{xdb} = $X->{Database}->new($first);
@@ -207,7 +213,7 @@ sub dispatch {
                bless $new, $req->{c} ? 'PublicInbox::CodeSearch' :
                                        'PublicInbox::Search';
                $new->{qp} = $new->qparse_new;
-               $new;
+               $SRCH{$key} = $new;
        };
        $req->{srch}->{xdb}->reopen unless $new;
        $req->{Q} && !$req->{srch}->{qp_extra_done} and
@@ -305,7 +311,7 @@ sub start (@) {
        my $c = getsockopt(local $in = \*STDIN, SOL_SOCKET, SO_TYPE);
        unpack('i', $c) == SOCK_SEQPACKET or die 'stdin is not SOCK_SEQPACKET';
 
-       local (%SRCH, %WORKERS);
+       local (%SRCH, %WORKERS, $SHARD_NFD, $MY_FD_MAX);
        PublicInbox::Search::load_xapian();
        $GLP->getoptionsfromarray(\@argv, my $opt = { j => 1 }, 'j=i') or
                die 'bad args';
@@ -314,6 +320,10 @@ sub start (@) {
        for (@PublicInbox::DS::UNBLOCKABLE, POSIX::SIGUSR1) {
                $workerset->delset($_) or die "delset($_): $!";
        }
+       $MY_FD_MAX = PublicInbox::Search::ulimit_n //
+               die "E: unable to get RLIMIT_NOFILE: $!";
+       warn "W: RLIMIT_NOFILE=$MY_FD_MAX too low\n" if $MY_FD_MAX < 72;
+       $MY_FD_MAX -= 64;
 
        local $nworker = $opt->{j};
        return recv_loop() if $nworker == 0;
index 74852ad19ac5b4ad86afe4fd193c9cc7d5c3ece7..922bd583ba0a911a96c8d8080c0bf9b7a54d9917 100644 (file)
@@ -34,6 +34,7 @@ my $ldflags = '-Wl,-O1';
 $ldflags .= ' -Wl,--compress-debug-sections=zlib' if $^O ne 'openbsd';
 my $xflags = ($ENV{CXXFLAGS} // '-Wall -ggdb3 -pipe') . ' ' .
        ' -DTHREADID=' . PublicInbox::Search::THREADID .
+       ' -DSHARD_COST=' . PublicInbox::Search::SHARD_COST .
        ' -DXH_SPEC="'.join('',
                map { s/=.*/:/; $_ } @PublicInbox::Search::XH_SPEC) . '" ' .
        ($ENV{LDFLAGS} // $ldflags);
index 44e0d63e01d727c0ca8c831d675a54a9ed826ce9..c71ac06d29e5c454ef4aff54ed5efcf995317329 100644 (file)
@@ -140,6 +140,7 @@ static int srch_eq(const struct srch *a, const struct srch *b)
 KHASHL_CSET_INIT(KH_LOCAL, srch_set, srch_set, struct srch *,
                srch_hash, srch_eq)
 static srch_set *srch_cache;
+static long my_fd_max, shard_nfd;
 // sock_fd is modified in signal handler, yes, it's SOCK_SEQPACKET
 static volatile int sock_fd = STDIN_FILENO;
 static sigset_t fullset, workerset;
@@ -552,6 +553,34 @@ static bool is_chert(const char *dir)
        return false;
 }
 
+static void srch_free(struct srch *srch)
+{
+       delete srch->qp;
+       delete srch->db;
+       free(srch);
+}
+
+static void srch_cache_renew(struct srch *keep)
+{
+       khint_t k;
+
+       // can't delete while iterating, so just free each + clear
+       for (k = kh_begin(srch_cache); k != kh_end(srch_cache); k++) {
+               if (!kh_exist(srch_cache, k)) continue;
+               struct srch *cur = kh_key(srch_cache, k);
+
+               if (cur != keep)
+                       srch_free(cur);
+       }
+       srch_set_cs_clear(srch_cache);
+       if (keep) {
+               int absent;
+               k = srch_set_put(srch_cache, keep, &absent);
+               assert(absent);
+               assert(k < kh_end(srch_cache));
+       }
+}
+
 static bool srch_init(struct req *req)
 {
        int i;
@@ -563,6 +592,13 @@ static bool srch_init(struct req *req)
                        Xapian::QueryParser::FLAG_WILDCARD;
        if (is_chert(req->dirv[0]))
                srch->qp_flags &= ~FLAG_PHRASE;
+       long nfd = req->dirc * SHARD_COST;
+
+       shard_nfd += nfd;
+       if (shard_nfd > my_fd_max) {
+               srch_cache_renew(srch);
+               shard_nfd = nfd;
+       }
        try {
                srch->db = new Xapian::Database(req->dirv[0]);
        } catch (...) {
@@ -629,13 +665,6 @@ static void srch_init_extra(struct req *req)
        req->srch->qp_extra_done = true;
 }
 
-static void srch_free(struct srch *srch)
-{
-       delete srch->qp;
-       delete srch->db;
-       free(srch);
-}
-
 static void dispatch(struct req *req)
 {
        int c;
@@ -900,12 +929,7 @@ static void cleanup_all(void)
        cleanup_pids();
        if (!srch_cache)
                return;
-
-       khint_t k;
-       for (k = kh_begin(srch_cache); k != kh_end(srch_cache); k++) {
-               if (kh_exist(srch_cache, k))
-                       srch_free(kh_key(srch_cache, k));
-       }
+       srch_cache_renew(NULL);
        srch_set_destroy(srch_cache);
        srch_cache = NULL;
 }
@@ -1041,12 +1065,20 @@ int main(int argc, char *argv[])
        socklen_t slen = (socklen_t)sizeof(c);
        stdout_path = getenv("STDOUT_PATH");
        stderr_path = getenv("STDERR_PATH");
+       struct rlimit rl;
 
        if (getsockopt(sock_fd, SOL_SOCKET, SO_TYPE, &c, &slen))
                err(EXIT_FAILURE, "getsockopt");
        if (c != SOCK_SEQPACKET)
                errx(EXIT_FAILURE, "stdin is not SOCK_SEQPACKET");
 
+       if (getrlimit(RLIMIT_NOFILE, &rl))
+               err(EXIT_FAILURE, "getrlimit");
+       my_fd_max = rl.rlim_cur;
+       if (my_fd_max < 72)
+               warnx("W: RLIMIT_NOFILE=%ld too low\n", my_fd_max);
+       my_fd_max -= 64;
+
        mail_nrp_init();
        code_nrp_init();
        srch_cache = srch_set_init();
index 78be8539cbbdf6d5b9991ad85bad13f2aae65481..b0fa75a2e011c84e0a20e07cdc5065260f2bc84e 100644 (file)
@@ -284,4 +284,27 @@ for my $n (@NO_CXX) {
        }
 }
 
+SKIP: {
+       my $nr = $ENV{TEST_XH_FDMAX} or
+               skip 'TEST_XH_FDMAX unset', 1;
+       my @xhc = map {
+               local $ENV{PI_NO_CXX} = $_;
+               PublicInbox::XapClient::start_helper('-j0');
+       } @NO_CXX;
+       my $n = 1;
+       my $exp;
+       for (0..(PublicInbox::Search::ulimit_n() * $nr)) {
+               for my $xhc (@xhc) {
+                       my $r = $xhc->mkreq([], qw(mset -Q), "tst$n=XTST$n",
+                                       @ibx_shard_args, qw(rt:0..));
+                       chomp(my @res = readline($r));
+                       $exp //= $res[0];
+                       $exp eq $res[0] or
+                               is $exp, $res[0], "mset mismatch on n=$n";
+                       ++$n;
+               }
+       }
+       ok $exp, "got expected entries ($n)";
+}
+
 done_testing;