C<RLIMIT_*> keys may be set to enforce resource limits for
a particular limiter (L<BSD::Resource(3pm)> is required).
-Default named-limiters are prefixed with "-". Currently,
+Default named-limiters are prefixed with C<->. Currently,
the C<-cgit> named limiter is reserved for instances spawning
cgit via C<publicinbox.cgitrc>. The C<-httpbackend> named
limiter (in public-inbox 2.0+) governs all L<git-http-backend(1)>
processes for inboxes and coderepos for a given public-inbox
-config file.
+config file. The C<-codeblob> limiter (in 2.0+) governs the
+entire series of git commands used for blob reconstruction from
+patches.
=over 8
The maximum number of parallel processes for the given limiter.
+Default: 32 for C<-httpbackend>, 1 for everything else
+
+=item publicinboxlimiter.<name>.depth
+
+The maximum queue depth for the given limiter. HTTP C<503> errors
+will be returned when the queue depth is exceeded.
+
+Default: 32
+
+New in public-inbox 2.0+ (PENDING)
+
=item publicinboxlimiter.<name>.rlimitCore
=item publicinboxlimiter.<name>.rlimitCPU
xt/create-many-inboxes.t
xt/eml_check_limits.t
xt/eml_octet-stream.t
+xt/git-http-backend-parallel.t
xt/git-http-backend.t
xt/git_async_cmp.t
xt/httpd-async-stream.t
my ($self, $name, $max_default) = @_;
$self->{-limiters}->{$name} //= do {
require PublicInbox::Limiter;
- my $max = $self->{"publicinboxlimiter.$name.max"};
- my $l = PublicInbox::Limiter->new($max || $max_default || 1);
- $l->setup_rlimit($name, $self);
+ my $n = $self->{"publicinboxlimiter.$name.max"};
+ my $l = PublicInbox::Limiter->new($n || $max_default || 1);
+ $l->setup_limiter($name, $self);
$l;
};
}
sub new {
my ($class, $max) = @_;
bless {
- # 32 is same as the git-daemon connection limit
+ # 32 is same as the git-daemon connection limit, but
+ # -cgit and -codeblob internal limiters default to 1
max => $max || 32,
running => 0,
run_queue => [],
}, $class;
}
-sub setup_rlimit {
+sub setup_limiter {
my ($self, $name, $cfg) = @_;
+ my $k = "publicinboxlimiter.$name.depth";
+ my $v = $cfg->{$k};
+ if (defined $v) {
+ if ($v =~ /\A[1-9][0-9]*\z/) {
+ $self->{depth} = $v + 0;
+ } else {
+ warn "W: `$v' not a positive integer in $cfg->{-f}\n";
+ }
+ }
for my $rlim (@PublicInbox::Spawn::RLIMITS) {
- my $k = lc($rlim);
+ $k = lc($rlim);
$k =~ tr/_//d;
$k = "publicinboxlimiter.$name.$k";
- my $v = $cfg->{$k} // next;
+ $v = $cfg->{$k} // next;
my @rlimit = split(/\s*,\s*/, $v);
if (scalar(@rlimit) == 1) {
- push @rlimit, $rlimit[0];
+ $rlimit[1] = $rlimit[0];
} elsif (scalar(@rlimit) != 2) {
warn "W: could not parse $k: $v (ignored)\n";
next;
warn "BSD::Resource missing for $rlim";
next;
} : undef;
- for my $i (0..$#rlimit) {
- next if $rlimit[$i] ne 'INFINITY';
- $rlimit[$i] = $inf;
+ for (@rlimit) {
+ $_ = $inf if $_ eq 'INFINITY';
}
$self->{$rlim} = \@rlimit;
}
}
+sub is_too_busy {
+ my ($self) = @_;
+ scalar(@{$self->{run_queue}}) > ($self->{depth} // 32)
+}
+
1;
PublicInbox::WwwStatic::r($_[0] // 500);
}
-sub finalize ($) {
+sub _finalize ($) {
my ($self) = @_;
-
- # process is done, spawn whatever's in the queue
- my $limiter = delete $self->{limiter} or return;
- my $running = --$limiter->{running};
-
- if ($running < $limiter->{max}) {
- if (my $next = shift(@{$limiter->{run_queue}})) {
- _do_spawn(@$next, $limiter);
- }
- }
if (my $err = $self->{_err}) { # set by finish or waitpid_err
utf8::decode($err);
if (my $dst = $self->{qsp_err}) {
}
}
+sub finalize ($) {
+ my ($self) = @_;
+
+ # process is done, spawn whatever's in the queue
+ my $limiter = delete $self->{limiter} or return;
+ my $running = --$limiter->{running};
+
+ if ($running < $limiter->{max}) {
+ if (my $next = shift(@{$limiter->{run_queue}})) {
+ _do_spawn(@$next, $limiter);
+ }
+ }
+ _finalize $self;
+}
+
sub waitpid_err { # callback for awaitpid
my (undef, $self) = @_; # $_[0]: pid
$self->{_err} = ''; # for defined check in ->finish
my ($self, $limiter, $start_cb) = @_;
if ($limiter->{running} < $limiter->{max}) {
_do_spawn($self, $start_cb, $limiter);
+ } elsif ($limiter->is_too_busy) {
+ $self->{psgi_env}->{'qspawn.fallback'} //= 503 if
+ $self->{psgi_env};
+ _finalize $self;
} else {
push @{$limiter->{run_queue}}, [ $self, $start_cb ];
}
'160000' => 'g', # commit (gitlink)
);
-# TODO: not fork safe, but we don't fork w/o exec in PublicInbox::WWW
-my (@solver_q, $solver_lim);
-my $solver_nr = 0;
-
sub html_page ($$;@) {
my ($ctx, $code) = @_[0, 1];
my $wcb = delete $ctx->{-wcb};
'</code></pre></td></tr></table>'.dbg_log($ctx), @def);
}
-sub start_solver ($) {
- my ($ctx) = @_;
- $ctx->{-next_solver} = on_destroy \&next_solver;
- ++$solver_nr;
+sub start_solver ($$) {
+ my ($ctx, $limiter) = @_;
+ $ctx->{-next_solver} = on_destroy \&next_solver, $limiter;
+ ++$limiter->{running};
if (my $ck = $ctx->{env}->{'pi-httpd.ckhup'}) {
$ck->($ctx->{env}->{'psgix.io'}->{sock}) and
return html_page $ctx, 499, 'client disconnected';
$ctx->{lh} or open $ctx->{lh}, '+>>', "$ctx->{-tmp}/solve.log";
my $solver = PublicInbox::SolverGit->new($ctx->{ibx},
\&solve_result, $ctx);
- $solver->{limiter} = $solver_lim;
+ $solver->{limiter} = $ctx->{www}->{solver_limiter};
$solver->{gits} //= [ $ctx->{git} ];
$solver->{tmp} = $ctx->{-tmp}; # share tmpdir
# PSGI server will call this immediately and give us a callback (-wcb)
# run the next solver job when done and DESTROY-ed (on_destroy cb)
sub next_solver {
- --$solver_nr;
- my $ctx = shift(@solver_q) // return;
- eval { start_solver($ctx) };
+ my ($limiter) = @_;
+ --$limiter->{running};
+ my $ctx = shift(@{$limiter->{run_queue}}) // return;
+ eval { start_solver $ctx, $limiter };
return unless $@;
warn "W: start_solver: $@";
html_page($ctx, 500) if $ctx->{-wcb};
sub may_start_solver ($) {
my ($ctx) = @_;
- $solver_lim //= $ctx->{www}->{pi_cfg}->limiter('codeblob');
- if ($solver_nr >= $solver_lim->{max}) {
- @solver_q > 128 ? html_page($ctx, 503, 'too busy')
- : push(@solver_q, $ctx);
+ my $limiter = $ctx->{www}->{pi_cfg}->limiter('-codeblob');
+
+ # {solver_limiter} just inherits rlimits from the configurable
+ # -codeblob limiter. The parallelism and depth management is
+ # redundant since -codeblob $limiter encompasses {solver_limiter}.
+ $ctx->{www}->{solver_limiter} //= do {
+ my $l = PublicInbox::Limiter->new;
+ $l->{$_} = $limiter->{$_} for grep /^RLIMIT_/, keys %$limiter;
+ $l;
+ };
+ if ($limiter->{running} < $limiter->{max}) {
+ start_solver $ctx, $limiter;
+ } elsif ($limiter->is_too_busy) {
+ html_page $ctx, 503, 'too busy';
} else {
- start_solver($ctx);
+ push @{$limiter->{run_queue}}, $ctx;
}
}
is $counts{499} + $counts{200}, 31,
'all 1.0 connections logged for disconnects';
}
+ undef $c0;
+ kill 'STOP', $td_pid;
+ @c = map {
+ my $c = tcp_connect($lis);
+ print $c $req[0];
+ $c;
+ } (0..66);
+ print $_ $req[1] for @c;
+ kill 'CONT', $td_pid;
+ my %codes;
+ for (@c) {
+ read $_, $buf, 16384;
+ my $code = $buf =~ m!\AHTTP/1\.0 (\d+) ! ? $1 : '';
+ ++$codes{$code};
+ }
+ is $codes{''}, undef, 'all valid '.scalar(@c).' HTTP responses';
+ ok $codes{503}, 'got some 503 too busy errors';
+ is $codes{503} + $codes{200},
+ scalar(@c), 'only got 200 and 503 codes';
require_cmd('curl', 1) or skip 'no curl', 1;
mkdir "$tmpdir/ext";
--- /dev/null
+#!perl -w
+# Copyright (C) all contributors <meta@public-inbox.org>
+# License: AGPL-3.0+ <https://www.gnu.org/licenses/agpl-3.0.txt>
+# ensure publicinboxlimiter.-httpbackend.{depth,max} knobs work
+# and returns 503 (too busy) errors on overload.
+use v5.12;
+use autodie;
+use PublicInbox::TestCommon;
+use File::Path qw(remove_tree);
+use PublicInbox::IO qw(write_file try_cat);
+use PublicInbox::Spawn qw(spawn);
+use PublicInbox::Git qw(git_exe);
+require PublicInbox::Sigfd;
+my $git_dir = $ENV{GIANT_GIT_DIR} //
+ plan 'skip_all' => 'GIANT_GIT_DIR not defined';
+require_mods qw(-httpd psgi);
+my $tmpdir = tmpdir;
+my $henv = { TMPDIR => $tmpdir, PI_CONFIG => "$tmpdir/cfg" };
+write_file '>', $henv->{PI_CONFIG}, <<EOM;
+[coderepo "giant.git"]
+ dir = $git_dir
+EOM
+my @tail = split ' ', $PublicInbox::TestCommon::tail_cmd // '';
+my $uri;
+
+my $run_clones = sub {
+ my ($max) = @_;
+ my (%wait, $tpid, %chld_status, @f);
+ if (@tail) {
+ my @f = map {
+ my $f = "$tmpdir/$_.err";
+ open my $fh, '>', $f;
+ $f;
+ } (1..$max);
+ $tpid = spawn([ @tail, @f ], undef, { 1 => 2 });
+ }
+ for my $n (1..$max) {
+ my $rdr;
+ my $err = "$tmpdir/$n.err";
+ open $rdr->{2}, '>', $err;
+ my $pid = spawn([git_exe, qw(clone -q --mirror), $uri,
+ "$tmpdir/$n.git"], undef, $rdr);
+ $wait{$pid} = $n;
+ }
+ while (keys %wait) {
+ my $pid = waitpid(-1, 0);
+ my $n = delete $wait{$pid} // next;
+ push @{$chld_status{$?}}, $n;
+ remove_tree "$tmpdir/$n.git";
+ }
+ if ($tpid) {
+ kill 'TERM', $tpid;
+ waitpid($tpid, 0);
+ }
+ \%chld_status;
+};
+
+my $sigfd = PublicInbox::Sigfd->new;
+my $reload_cfg = sub {
+ write_file '>>', $henv->{PI_CONFIG}, @_;
+ kill 'HUP', $PublicInbox::TestCommon::CURRENT_DAEMON->{pid};
+ # signalfd/EVFILT_SIGNAL platforms should handle signals more
+ # predictably and not need tick
+ tick(1) unless $sigfd;
+};
+
+my $ck_503 = sub {
+ my ($chld_status) = @_;
+ ok scalar(keys %$chld_status), 'got non-zero statuses from git clone';
+ for my $s (keys %$chld_status) {
+ my @unexpected;
+ for my $n (@{$chld_status->{$s}}) {
+ my $msg = try_cat "$tmpdir/$n.err";
+ push @unexpected, $msg if $msg !~ /\b503\b/s;
+ }
+ ok !@unexpected, 'no unexpected errors' or
+ diag explain([ "status $s", \@unexpected ]);
+ diag "chld_status=$s: ".scalar(@{$chld_status->{$s}})
+ .' instances'
+ }
+};
+
+my $client = sub {
+ $uri = "$ENV{PLACK_TEST_EXTERNALSERVER_URI}/giant.git/";
+ my $chld_status = $run_clones->(64);
+ my $ok = delete $chld_status->{0} // [];
+ is scalar(@$ok), 64, 'got all successes';
+ is keys(%$chld_status), 0, 'no other statuses' or
+ diag explain($chld_status);
+
+ $reload_cfg->(<<EOM);
+[publicinboxlimiter "-httpbackend"]
+ depth = 1
+EOM
+
+ $chld_status = $run_clones->(40);
+ $ok = delete $chld_status->{0} // [];
+ ok scalar(@$ok) >= 33, 'got at least 33 successes' or
+ diag 'got '.scalar(@$ok).' successes';
+ $ck_503->($chld_status);
+
+ $reload_cfg->("\tmax = 1\n");
+
+ $chld_status = $run_clones->(10);
+ $ok = delete $chld_status->{0} // [];
+ ok scalar(@$ok) >= 2, 'got at least 2 successes' or
+ diag 'got '.scalar(@$ok).' successes';
+ $ck_503->($chld_status);
+};
+test_httpd $henv, $client;
+done_testing;