]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
daemon: make xap_helper socket non-blocking
authorEric Wong <e@80x24.org>
Tue, 11 Feb 2025 03:55:36 +0000 (03:55 +0000)
committerEric Wong <e@80x24.org>
Tue, 11 Feb 2025 19:08:17 +0000 (19:08 +0000)
A non-blocking socket for public-facing searches is ideal on
overloaded servers which receive more searches than it can
handle in a short time span.  A non-blocking socket here
prevents a sendmsg(2) call from blocking the main event loop
and ensures non-search requests can still be processed even
if all xap_helper subprocesses are overwhelmed.

$search_xh_pid is hoisted out to TestCommon for use with the new
slow-xh-search test and renamed $find_xh_pid to avoid confusion
with Xapian search functionality.  We'll also drop a redundant
call to find the xap_helper PID in psgi_v2.t while transitioning
to $find_xh_pid.

MANIFEST
lib/PublicInbox/Daemon.pm
lib/PublicInbox/SearchView.pm
lib/PublicInbox/TestCommon.pm
t/psgi_v2.t
xt/slow-xh-search.t [new file with mode: 0644]

index 8bddb75190aef73b533f152f81fe6a99201cc519..2ca7755ce46b2e5fb302f0a99682b94e89d96883 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -678,5 +678,6 @@ xt/perf-msgview.t
 xt/perf-nntpd.t
 xt/perf-threading.t
 xt/pop3d-mpop.t
+xt/slow-xh-search.t
 xt/solver.t
 xt/stress-sharedkv.t
index 84147b6841dbc2e76fe9daa5e9d5b6b96c490e1a..e00e0a78f2302b39eabd6f57777b8711cb70ed34 100644 (file)
@@ -400,9 +400,12 @@ sub spawn_xh () {
                local $ENV{STDOUT_PATH} = $stdout;
                PublicInbox::XapClient::start_helper('-j', $xh_workers)
        };
-       warn "E: $@" if $@;
-       awaitpid($PublicInbox::Search::XHC->{io}->attached_pid, \&respawn_xh)
-               if $PublicInbox::Search::XHC;
+       if ($@) {
+               warn "E: $@";
+       } elsif (my $xhc = $PublicInbox::Search::XHC) {
+               $xhc->{io}->blocking(0);
+               awaitpid($xhc->{io}->attached_pid, \&respawn_xh);
+       }
 }
 
 sub reopen_logs {
index 1213b3d3748f502c5e193890789d4f38a29d7a8a..8ae422f91c985bca8dadfc9c11d1e59effc78803 100644 (file)
@@ -16,6 +16,11 @@ use PublicInbox::SearchThread;
 use PublicInbox::SearchQuery;
 use PublicInbox::Search qw(get_pct);
 my %rmap_inc;
+use Errno ();
+
+# possible busy errors from async_mset (via pipe, sendmsg, epoll_ctl)
+my %BUSY = map { (Errno->$_, 1) } qw(EAGAIN ENOBUFS ENOMEM ETOOMANYREFS
+                                       EMFILE ENFILE ENOSPC);
 
 sub mbox_results {
        my ($ctx) = @_;
@@ -60,12 +65,21 @@ sub sres_top_html {
 sub sres_html_cb { # async_mset cb
        my ($ctx, $opt, $q, $mset, $err) = @_;
        my $code = 200;
+       if ($err) {
+               if ($BUSY{$! + 0}) {
+                       warn "W: query failed: $!";
+                       $code = 503;
+               } else { # bad query by user
+                       $code = 400;
+               }
+       }
        my $total = $mset ? $mset->get_matches_estimated : undef;
        ctx_prepare($q, $ctx);
        my ($res, $html);
-       if ($err) {
-               $code = 400;
+       if ($code == 400) {
                $html = '<pre>'.err_txt($ctx, $err).'</pre><hr>';
+       } elsif ($code == 503) {
+               $html = "<pre>\ntoo busy</pre><hr>";
        } elsif ($total == 0) {
                if (defined($ctx->{-uxs_retried})) { # undo retry damage:
                        $q->{'q'} = $ctx->{-uxs_retried};
index cebd093676c7c4ed466063e138959c3c59e6967e..cbcfc008c9e6af3c2c0a251a869f4d7ad8fb30c8 100644 (file)
@@ -15,7 +15,7 @@ use Carp ();
 our @EXPORT;
 my $lei_loud = $ENV{TEST_LEI_ERR_LOUD};
 our $tail_cmd = $ENV{TAIL};
-our ($lei_opt, $lei_out, $lei_err);
+our ($lei_opt, $lei_out, $lei_err, $find_xh_pid);
 use autodie qw(chdir close fcntl mkdir open opendir seek unlink);
 $ENV{XDG_CACHE_HOME} //= "$ENV{HOME}/.cache"; # reuse C++ xap_helper builds
 $ENV{GIT_TEST_FSYNC} = 0; # hopefully reduce wear
@@ -32,7 +32,7 @@ BEGIN {
                tcp_host_port test_lei lei lei_ok $lei_out $lei_err $lei_opt
                test_httpd no_httpd_errors xbail require_cmd is_xdeeply tail_f
                ignore_inline_c_missing no_pollerfd no_coredump cfg_new
-               strace strace_inject lsof_pid oct_is);
+               strace strace_inject lsof_pid oct_is $find_xh_pid);
        require Test::More;
        my @methods = grep(!/\W/, @Test::More::EXPORT);
        eval(join('', map { "*$_=\\&Test::More::$_;" } @methods));
@@ -1086,6 +1086,23 @@ sub oct_is ($$$) {
        goto &is; # tail recursion to get lineno from callers on failure
 }
 
+$find_xh_pid = $^O eq 'linux' && -r "/proc/$$/stat" ? sub {
+       my ($ppid) = @_;
+       my ($cmdline, $fh, @s);
+       for (glob('/proc/*/stat')) {
+               CORE::open $fh, '<', $_ or next;
+               @s = split /\s+/, readline($fh) // next;
+               next if $s[3] ne $ppid; # look for matching PPID
+               CORE::open $fh, '<', "/proc/$s[0]/cmdline" or next;
+               $cmdline = readline($fh) // next;
+               if ($cmdline =~ /\0-MPublicInbox::XapHelper\0-e\0/ ||
+                               $cmdline =~ m!/xap_helper\0!) {
+                       return $s[0];
+               }
+       }
+       undef;
+} : 'xap_helper PID lookup currently depends on Linux /proc';
+
 package PublicInbox::TestCommon::InboxWakeup;
 use strict;
 sub on_inbox_unlock { ${$_[0]}->($_[1]) }
index 2027037dc42026957dcec2660892a10e3d7a86f7..dfec5abcb47b0489a61551be40ccff9076259aa0 100644 (file)
@@ -393,23 +393,9 @@ my $client3 = sub {
 test_psgi(sub { $www->call(@_) }, $client3);
 test_httpd($env, $client3, 4);
 
-if ($^O eq 'linux' && -r "/proc/$$/stat") {
+SKIP: {
+       ref($find_xh_pid) or skip $find_xh_pid, 1;
        my $args;
-       my $search_xh_pid = sub {
-               my ($pid) = @_;
-               for my $f (glob('/proc/*/stat')) {
-                       open my $fh, '<', $f or next;
-                       my @s = split /\s+/, readline($fh) // next;
-                       next if $s[3] ne $pid; # look for matching PPID
-                       open $fh, '<', "/proc/$s[0]/cmdline" or next;
-                       my $cmdline = readline($fh) // next;
-                       if ($cmdline =~ /\0-MPublicInbox::XapHelper\0-e\0/ ||
-                                       $cmdline =~ m!/xap_helper\0!) {
-                               return $s[0];
-                       }
-               }
-               undef;
-       };
        my $usr1_test = sub {
                my ($cb) = @_;
                my $td = $PublicInbox::TestCommon::CURRENT_DAEMON;
@@ -417,8 +403,7 @@ if ($^O eq 'linux' && -r "/proc/$$/stat") {
                my $res = $cb->(GET('/v2test/?q=m:a-mid@b'));
                is $res->code, 200, '-httpd is running w/ search';
 
-               $search_xh_pid->($pid);
-               my $xh_pid = $search_xh_pid->($pid) or
+               my $xh_pid = $find_xh_pid->($pid) or
                        BAIL_OUT "can't find XH pid with $args";
                my $xh_err = readlink "/proc/$xh_pid/fd/2";
                is $xh_err, "$env->{TMPDIR}/stderr.log",
@@ -432,7 +417,7 @@ if ($^O eq 'linux' && -r "/proc/$$/stat") {
                tick;
                $res = $cb->(GET('/v2test/?q=m:a-mid@b'));
                is $res->code, 200, '-httpd still running w/ search';
-               my $new_xh_pid = $search_xh_pid->($pid) or
+               my $new_xh_pid = $find_xh_pid->($pid) or
                        BAIL_OUT "can't find new XH pid with $args";
                is $new_xh_pid, $xh_pid, "XH pid unchanged ($args)";
                $xh_err = readlink "/proc/$new_xh_pid/fd/2";
diff --git a/xt/slow-xh-search.t b/xt/slow-xh-search.t
new file mode 100644 (file)
index 0000000..f860286
--- /dev/null
@@ -0,0 +1,65 @@
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+use v5.12;
+use autodie;
+use PublicInbox::TestCommon;
+use Socket ();
+ref($find_xh_pid) or plan skip_all => $find_xh_pid;
+require_mods qw(-httpd v2 psgi Xapian);
+my ($ro_home, $cfg_path) = setup_public_inboxes;
+my $tmpdir = tmpdir;
+my $unix = "$tmpdir/unix.sock";
+my $out = "$tmpdir/out.log";
+my $err = "$tmpdir/err.log";
+my $cmd = [ qw(-httpd -W0 -X0), "--stdout=$out", "--stderr=$err" ];
+my $env = { PI_CONFIG => $cfg_path };
+my $srv = IO::Socket::UNIX->new(Listen => 4096, Local => $unix,
+                       Type => Socket::SOCK_STREAM);
+my $rdr = { 3 => $srv };
+$srv->blocking(0);
+my $td = start_script($cmd, $env, $rdr);
+chomp(my $nfd = `/bin/sh -c 'ulimit -n'`);
+diag "RLIMIT_NOFILE=$nfd, consider increasing if test fails";
+fail "RLIMIT_NOFILE=$nfd too small" if $nfd < 11;
+$nfd -= 10;
+
+my $req = "GET /t2/?q=m:testmessage\@example.com HTTP/1.0\r\n\r\n";
+my @opt = (Peer => $unix, Type => Socket::SOCK_STREAM);
+my $start_req = sub {
+       my $c = IO::Socket::UNIX->new(@opt) or xbail "connect: $!";
+       $c->autoflush(1);
+       print $c $req;
+       $c;
+};
+
+# make a search request/response to ensure xap_helper is started
+read $start_req->(), my $buf, 16384;
+
+my $xh_pid = $find_xh_pid->($td->{pid}) or
+       xbail "can't find XH pid";
+kill 'STOP', $xh_pid;
+diag "starting $nfd requests...";
+my @c = map { $start_req->() } (1..$nfd);
+diag "all $nfd requested";
+tick 1;
+kill 'CONT', $xh_pid;
+my %codes = (200 => 0, 503 => 0);
+diag "reading $nfd responses";
+for my $c (@c) {
+       read $c, $buf, 16384;
+       undef $c;
+       if ($buf =~ m!\AHTTP/1\.[01] (\d+)!) {
+               $codes{$1}++;
+       } else {
+               diag explain($buf);
+               fail 'bad response';
+       }
+}
+ok $codes{200}, 'got some 200 responses' or diag explain(\%codes);
+ok $codes{503}, 'got some 503 errors' or diag explain(\%codes);
+is $codes{200} + $codes{503}, $nfd,
+       'only got 503 and 200 codes' or diag explain(\%codes);
+
+$td->join('TERM');
+done_testing;