From: Eric Wong Date: Tue, 11 Feb 2025 03:55:36 +0000 (+0000) Subject: daemon: make xap_helper socket non-blocking X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=2a3c504f6e10c0eb81ff2c4289166396584f9a04;p=thirdparty%2Fpublic-inbox.git daemon: make xap_helper socket non-blocking A non-blocking socket for public-facing searches is ideal on overloaded servers which receive more searches than it can handle in a short time span. A non-blocking socket here prevents a sendmsg(2) call from blocking the main event loop and ensures non-search requests can still be processed even if all xap_helper subprocesses are overwhelmed. $search_xh_pid is hoisted out to TestCommon for use with the new slow-xh-search test and renamed $find_xh_pid to avoid confusion with Xapian search functionality. We'll also drop a redundant call to find the xap_helper PID in psgi_v2.t while transitioning to $find_xh_pid. --- diff --git a/MANIFEST b/MANIFEST index 8bddb7519..2ca7755ce 100644 --- a/MANIFEST +++ b/MANIFEST @@ -678,5 +678,6 @@ xt/perf-msgview.t xt/perf-nntpd.t xt/perf-threading.t xt/pop3d-mpop.t +xt/slow-xh-search.t xt/solver.t xt/stress-sharedkv.t diff --git a/lib/PublicInbox/Daemon.pm b/lib/PublicInbox/Daemon.pm index 84147b684..e00e0a78f 100644 --- a/lib/PublicInbox/Daemon.pm +++ b/lib/PublicInbox/Daemon.pm @@ -400,9 +400,12 @@ sub spawn_xh () { local $ENV{STDOUT_PATH} = $stdout; PublicInbox::XapClient::start_helper('-j', $xh_workers) }; - warn "E: $@" if $@; - awaitpid($PublicInbox::Search::XHC->{io}->attached_pid, \&respawn_xh) - if $PublicInbox::Search::XHC; + if ($@) { + warn "E: $@"; + } elsif (my $xhc = $PublicInbox::Search::XHC) { + $xhc->{io}->blocking(0); + awaitpid($xhc->{io}->attached_pid, \&respawn_xh); + } } sub reopen_logs { diff --git a/lib/PublicInbox/SearchView.pm b/lib/PublicInbox/SearchView.pm index 1213b3d37..8ae422f91 100644 --- a/lib/PublicInbox/SearchView.pm +++ b/lib/PublicInbox/SearchView.pm @@ -16,6 +16,11 @@ use PublicInbox::SearchThread; use PublicInbox::SearchQuery; use PublicInbox::Search qw(get_pct); my %rmap_inc; +use Errno (); + +# possible busy errors from async_mset (via pipe, sendmsg, epoll_ctl) +my %BUSY = map { (Errno->$_, 1) } qw(EAGAIN ENOBUFS ENOMEM ETOOMANYREFS + EMFILE ENFILE ENOSPC); sub mbox_results { my ($ctx) = @_; @@ -60,12 +65,21 @@ sub sres_top_html { sub sres_html_cb { # async_mset cb my ($ctx, $opt, $q, $mset, $err) = @_; my $code = 200; + if ($err) { + if ($BUSY{$! + 0}) { + warn "W: query failed: $!"; + $code = 503; + } else { # bad query by user + $code = 400; + } + } my $total = $mset ? $mset->get_matches_estimated : undef; ctx_prepare($q, $ctx); my ($res, $html); - if ($err) { - $code = 400; + if ($code == 400) { $html = '
'.err_txt($ctx, $err).'

'; + } elsif ($code == 503) { + $html = "
\ntoo busy

"; } elsif ($total == 0) { if (defined($ctx->{-uxs_retried})) { # undo retry damage: $q->{'q'} = $ctx->{-uxs_retried}; diff --git a/lib/PublicInbox/TestCommon.pm b/lib/PublicInbox/TestCommon.pm index cebd09367..cbcfc008c 100644 --- a/lib/PublicInbox/TestCommon.pm +++ b/lib/PublicInbox/TestCommon.pm @@ -15,7 +15,7 @@ use Carp (); our @EXPORT; my $lei_loud = $ENV{TEST_LEI_ERR_LOUD}; our $tail_cmd = $ENV{TAIL}; -our ($lei_opt, $lei_out, $lei_err); +our ($lei_opt, $lei_out, $lei_err, $find_xh_pid); use autodie qw(chdir close fcntl mkdir open opendir seek unlink); $ENV{XDG_CACHE_HOME} //= "$ENV{HOME}/.cache"; # reuse C++ xap_helper builds $ENV{GIT_TEST_FSYNC} = 0; # hopefully reduce wear @@ -32,7 +32,7 @@ BEGIN { tcp_host_port test_lei lei lei_ok $lei_out $lei_err $lei_opt test_httpd no_httpd_errors xbail require_cmd is_xdeeply tail_f ignore_inline_c_missing no_pollerfd no_coredump cfg_new - strace strace_inject lsof_pid oct_is); + strace strace_inject lsof_pid oct_is $find_xh_pid); require Test::More; my @methods = grep(!/\W/, @Test::More::EXPORT); eval(join('', map { "*$_=\\&Test::More::$_;" } @methods)); @@ -1086,6 +1086,23 @@ sub oct_is ($$$) { goto &is; # tail recursion to get lineno from callers on failure } +$find_xh_pid = $^O eq 'linux' && -r "/proc/$$/stat" ? sub { + my ($ppid) = @_; + my ($cmdline, $fh, @s); + for (glob('/proc/*/stat')) { + CORE::open $fh, '<', $_ or next; + @s = split /\s+/, readline($fh) // next; + next if $s[3] ne $ppid; # look for matching PPID + CORE::open $fh, '<', "/proc/$s[0]/cmdline" or next; + $cmdline = readline($fh) // next; + if ($cmdline =~ /\0-MPublicInbox::XapHelper\0-e\0/ || + $cmdline =~ m!/xap_helper\0!) { + return $s[0]; + } + } + undef; +} : 'xap_helper PID lookup currently depends on Linux /proc'; + package PublicInbox::TestCommon::InboxWakeup; use strict; sub on_inbox_unlock { ${$_[0]}->($_[1]) } diff --git a/t/psgi_v2.t b/t/psgi_v2.t index 2027037dc..dfec5abcb 100644 --- a/t/psgi_v2.t +++ b/t/psgi_v2.t @@ -393,23 +393,9 @@ my $client3 = sub { test_psgi(sub { $www->call(@_) }, $client3); test_httpd($env, $client3, 4); -if ($^O eq 'linux' && -r "/proc/$$/stat") { +SKIP: { + ref($find_xh_pid) or skip $find_xh_pid, 1; my $args; - my $search_xh_pid = sub { - my ($pid) = @_; - for my $f (glob('/proc/*/stat')) { - open my $fh, '<', $f or next; - my @s = split /\s+/, readline($fh) // next; - next if $s[3] ne $pid; # look for matching PPID - open $fh, '<', "/proc/$s[0]/cmdline" or next; - my $cmdline = readline($fh) // next; - if ($cmdline =~ /\0-MPublicInbox::XapHelper\0-e\0/ || - $cmdline =~ m!/xap_helper\0!) { - return $s[0]; - } - } - undef; - }; my $usr1_test = sub { my ($cb) = @_; my $td = $PublicInbox::TestCommon::CURRENT_DAEMON; @@ -417,8 +403,7 @@ if ($^O eq 'linux' && -r "/proc/$$/stat") { my $res = $cb->(GET('/v2test/?q=m:a-mid@b')); is $res->code, 200, '-httpd is running w/ search'; - $search_xh_pid->($pid); - my $xh_pid = $search_xh_pid->($pid) or + my $xh_pid = $find_xh_pid->($pid) or BAIL_OUT "can't find XH pid with $args"; my $xh_err = readlink "/proc/$xh_pid/fd/2"; is $xh_err, "$env->{TMPDIR}/stderr.log", @@ -432,7 +417,7 @@ if ($^O eq 'linux' && -r "/proc/$$/stat") { tick; $res = $cb->(GET('/v2test/?q=m:a-mid@b')); is $res->code, 200, '-httpd still running w/ search'; - my $new_xh_pid = $search_xh_pid->($pid) or + my $new_xh_pid = $find_xh_pid->($pid) or BAIL_OUT "can't find new XH pid with $args"; is $new_xh_pid, $xh_pid, "XH pid unchanged ($args)"; $xh_err = readlink "/proc/$new_xh_pid/fd/2"; diff --git a/xt/slow-xh-search.t b/xt/slow-xh-search.t new file mode 100644 index 000000000..f86028640 --- /dev/null +++ b/xt/slow-xh-search.t @@ -0,0 +1,65 @@ +#!perl -w +# Copyright (C) all contributors +# License: AGPL-3.0+ +use v5.12; +use autodie; +use PublicInbox::TestCommon; +use Socket (); +ref($find_xh_pid) or plan skip_all => $find_xh_pid; +require_mods qw(-httpd v2 psgi Xapian); +my ($ro_home, $cfg_path) = setup_public_inboxes; +my $tmpdir = tmpdir; +my $unix = "$tmpdir/unix.sock"; +my $out = "$tmpdir/out.log"; +my $err = "$tmpdir/err.log"; +my $cmd = [ qw(-httpd -W0 -X0), "--stdout=$out", "--stderr=$err" ]; +my $env = { PI_CONFIG => $cfg_path }; +my $srv = IO::Socket::UNIX->new(Listen => 4096, Local => $unix, + Type => Socket::SOCK_STREAM); +my $rdr = { 3 => $srv }; +$srv->blocking(0); +my $td = start_script($cmd, $env, $rdr); +chomp(my $nfd = `/bin/sh -c 'ulimit -n'`); +diag "RLIMIT_NOFILE=$nfd, consider increasing if test fails"; +fail "RLIMIT_NOFILE=$nfd too small" if $nfd < 11; +$nfd -= 10; + +my $req = "GET /t2/?q=m:testmessage\@example.com HTTP/1.0\r\n\r\n"; +my @opt = (Peer => $unix, Type => Socket::SOCK_STREAM); +my $start_req = sub { + my $c = IO::Socket::UNIX->new(@opt) or xbail "connect: $!"; + $c->autoflush(1); + print $c $req; + $c; +}; + +# make a search request/response to ensure xap_helper is started +read $start_req->(), my $buf, 16384; + +my $xh_pid = $find_xh_pid->($td->{pid}) or + xbail "can't find XH pid"; +kill 'STOP', $xh_pid; +diag "starting $nfd requests..."; +my @c = map { $start_req->() } (1..$nfd); +diag "all $nfd requested"; +tick 1; +kill 'CONT', $xh_pid; +my %codes = (200 => 0, 503 => 0); +diag "reading $nfd responses"; +for my $c (@c) { + read $c, $buf, 16384; + undef $c; + if ($buf =~ m!\AHTTP/1\.[01] (\d+)!) { + $codes{$1}++; + } else { + diag explain($buf); + fail 'bad response'; + } +} +ok $codes{200}, 'got some 200 responses' or diag explain(\%codes); +ok $codes{503}, 'got some 503 errors' or diag explain(\%codes); +is $codes{200} + $codes{503}, $nfd, + 'only got 503 and 200 codes' or diag explain(\%codes); + +$td->join('TERM'); +done_testing;