package PublicInbox::Limiter;
use v5.12;
use PublicInbox::Spawn;
+use PublicInbox::OnDestroy;
sub new {
my ($class, $max) = @_;
}
}
-sub is_too_busy {
+sub _do_start ($$$$) {
+ my ($self, $start_cb, $ctx, $fail_cb) = @_;
+ $ctx->{"limiter.next.$self"} = on_destroy \&_start_next, $self;
+ ++$self->{running};
+ eval { $start_cb->($ctx, $self) };
+ if ($@) {
+ print { $ctx->{env}->{'psgi.errors'} } "E: $@\n";
+ $fail_cb->($ctx, 500, 'internal error');
+ }
+}
+
+sub _start_next { # on_destroy cb
my ($self) = @_;
- scalar(@{$self->{run_queue}}) > ($self->{depth} // 32)
+ --$self->{running};
+ my ($rec, $ck, $start_cb, $ctx, $fail_cb);
+ while (1) {
+ $rec = shift @{$self->{run_queue}} or return;
+ ($start_cb, $ctx, $fail_cb) = @$rec;
+ $ck = $ctx->{env}->{'pi-httpd.ckhup'} or last;
+ $ck->($ctx->{env}->{'psgix.io'}->{sock}) or last;
+ $fail_cb->($ctx, 499, 'client disconnected');
+ }
+ _do_start $self, $start_cb, $ctx, $fail_cb;
+}
+
+sub may_start {
+ my ($self, $start_cb, $ctx, $fail_cb) = @_;
+ if ($self->{running} < $self->{max}) {
+ _do_start $self, $start_cb, $ctx, $fail_cb;
+ } elsif (@{$self->{run_queue}} > ($self->{depth} // 32)) {
+ $fail_cb->($ctx, 503, 'too busy');
+ } else {
+ push @{$self->{run_queue}}, [ $start_cb, $ctx, $fail_cb ];
+ }
}
1;
package PublicInbox::PlackLimiter;
use v5.12;
use parent qw(Plack::Middleware);
-use PublicInbox::OnDestroy;
+use PublicInbox::Limiter;
sub prepare_app { # called via Plack::Component (used by Plack::Middleware)
my ($self) = @_;
$self->{max} //= 2;
$self->{run_queue} = [];
$self->{running} = 0;
- $self->{rejected} = 0;
- $self->{message} //= "too busy\n";
}
-sub r503 ($) {
- my @body = ($_[0]->{message});
- ++$_[0]->{rejected};
- [ 503, [ 'Content-Type' => 'text/plain',
- 'Content-Length' => length($body[0]) ], \@body ]
-}
-
-sub next_req { # on_destroy cb
- my ($self) = @_;
- --$self->{running};
- my $env = shift @{$self->{run_queue}} or return;
- my $wcb = delete $env->{'p-i.limiter.wcb'} // die 'BUG: no wcb';
- my $res = eval { call($self, $env) };
- return warn("W: $@") if $@;
- ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res);
+sub lim_fail { # limiter->may_start fail_cb
+ my ($ctx, $code, $msg) = @_;
+ delete($ctx->{psgi_wcb})->([ $code, [ 'Content-Type' => 'text/plain',
+ 'Content-Length' => length($msg) ], [ $msg ] ]);
}
sub stats ($) {
my $res = <<EOM;
running: $self->{running}
queued: $nq
-rejected: $self->{rejected}
max: $self->{max}
EOM
[ 200, [ 'Content-Type' => 'text/plain',
'Content-Length' => length($res) ], [ $res ] ]
}
+sub app_call { # limiter->may_start start_cb
+ my ($ctx, $self) = @_;
+ my $wcb = delete $ctx->{psgi_wcb};
+ my $env = delete $ctx->{env}; # avoid cyclic ref
+ push @{$env->{'limiter.ctx'}}, $ctx; # handoff limiter.next.$self
+ my $res = eval { $self->app->($env) };
+ return warn("W: $@") if $@;
+ ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res);
+}
+
sub call {
my ($self, $env) = @_;
if (defined $self->{stats_match_cb}) {
return stats $self if $self->{stats_match_cb}->($env);
}
return $self->app->($env) if !$self->{match_cb}->($env);
- return r503($self) if @{$self->{run_queue}} > ($self->{depth} // 32);
- if ($self->{running} < $self->{max}) {
- ++$self->{running};
- $env->{'p-i.limiter.next'} = on_destroy \&next_req, $self;
- $self->app->($env);
- } else { # capture write cb from PSGI server and queue up
- sub {
- $env->{'p-i.limiter.wcb'} = $_[0];
- push @{$self->{run_queue}}, $env;
- };
+ sub { # capture write cb from PSGI server
+ my $ctx = { env => $env, psgi_wcb => $_[0] };
+ PublicInbox::Limiter::may_start(
+ $self, \&app_call, $ctx, \&lim_fail);
}
}
}
sub _do_spawn {
- my ($self, $start_cb, $limiter) = @_;
+ my ($self) = @_;
my ($cmd, $cmd_env, $opt) = @{$self->{args}};
- my %o = %{$opt || {}};
- $self->{limiter} = $limiter;
- for my $k (@PublicInbox::Spawn::RLIMITS) {
- $opt->{$k} = $limiter->{$k} // next;
- }
- $self->{-quiet} = 1 if $o{quiet};
- $limiter->{running}++;
- if ($start_cb) {
+ $self->{-quiet} = 1 if $opt->{quiet};
+ if (my $start_cb = delete $self->{-start_cb}) {
eval { # popen_rd may die on EMFILE, ENFILE
$self->{rpipe} = popen_rd($cmd, $cmd_env, $opt,
\&waitpid_err, $self);
PublicInbox::WwwStatic::r($_[0] // 500);
}
-sub _finalize ($) {
+sub finalize ($) {
my ($self) = @_;
if (my $err = $self->{_err}) { # set by finish or waitpid_err
utf8::decode($err);
}
}
-sub finalize ($) {
- my ($self) = @_;
-
- # process is done, spawn whatever's in the queue
- my $limiter = delete $self->{limiter} or return;
- --$limiter->{running};
- my $next = shift @{$limiter->{run_queue}};
- _do_spawn(@$next, $limiter) if $next;
- _finalize $self;
-}
-
sub waitpid_err { # callback for awaitpid
my (undef, $self) = @_; # $_[0]: pid
$self->{_err} = ''; # for defined check in ->finish
finalize($self) if $closed_before || defined($self->{_err});
}
-sub start ($$$) {
+sub _qsp_fail { # limiter fail_cb
+ my ($self, $code, $msg) = @_;
+ $self->{env}->{'qspawn.fallback'} //= $code; # likely 503
+ finalize $self;
+}
+
+sub start ($$;$) {
my ($self, $limiter, $start_cb) = @_;
- if ($limiter->{running} < $limiter->{max}) {
- _do_spawn($self, $start_cb, $limiter);
- } elsif ($limiter->is_too_busy) {
- $self->{env}->{'qspawn.fallback'} //= 503 if
- $self->{env};
- _finalize $self;
- } else {
- push @{$limiter->{run_queue}}, [ $self, $start_cb ];
+ $self->{-start_cb} = $start_cb if $start_cb;
+ my %opt;
+ for (@PublicInbox::Spawn::RLIMITS) {
+ $opt{$_} = $limiter->{$_} // next;
}
+ %{$self->{args}->[2]} = (%{$self->{args}->[2]}, %opt) if keys %opt;
+ $limiter->may_start(\&_do_spawn, $self, \&_qsp_fail);
}
# Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
my ($self, $env, $limiter, @qx_cb_arg) = @_;
$self->{env} = $env;
$self->{qx_cb_arg} = \@qx_cb_arg;
- $limiter ||= $def_limiter ||= PublicInbox::Limiter->new;
- start($self, $limiter, undef);
+ start($self, $limiter ||= $def_limiter ||= PublicInbox::Limiter->new);
}
sub yield_pass {
'</code></pre></td></tr></table>'.dbg_log($ctx), @def);
}
-sub start_solver ($$) {
- my ($ctx, $limiter) = @_;
- $ctx->{-next_solver} = on_destroy \&next_solver, $limiter;
- ++$limiter->{running};
- if (my $ck = $ctx->{env}->{'pi-httpd.ckhup'}) {
- $ck->($ctx->{env}->{'psgix.io'}->{sock}) and
- return html_page $ctx, 499, 'client disconnected';
- }
-
+sub start_solver {
+ my ($ctx) = @_;
while (my ($from, $to) = each %QP_MAP) {
my $v = $ctx->{qp}->{$from} // next;
$ctx->{hints}->{$to} = $v if $v ne '';
$solver->solve(@$ctx{qw(env lh oid_b hints)});
}
-# run the next solver job when done and DESTROY-ed (on_destroy cb)
-sub next_solver {
- my ($limiter) = @_;
- --$limiter->{running};
- my $ctx = shift(@{$limiter->{run_queue}}) // return;
- eval { start_solver $ctx, $limiter };
- return unless $@;
- warn "W: start_solver: $@";
- html_page($ctx, 500) if $ctx->{-wcb};
-}
-
-sub may_start_solver ($) {
- my ($ctx) = @_;
+# GET /$INBOX/$GIT_OBJECT_ID/s/
+# GET /$INBOX/$GIT_OBJECT_ID/s/$FILENAME
+sub show ($$;$) {
+ my ($ctx, $oid_b, $fn) = @_;
+ @$ctx{qw(oid_b fn)} = ($oid_b, $fn);
my $limiter = $ctx->{www}->{pi_cfg}->limiter('-codeblob');
# {solver_limiter} just inherits rlimits from the configurable
$l->{$_} = $limiter->{$_} for grep /^RLIMIT_/, keys %$limiter;
$l;
};
- if ($limiter->{running} < $limiter->{max}) {
- start_solver $ctx, $limiter;
- } elsif ($limiter->is_too_busy) {
- html_page $ctx, 503, 'too busy';
- } else {
- push @{$limiter->{run_queue}}, $ctx;
- }
-}
-
-# GET /$INBOX/$GIT_OBJECT_ID/s/
-# GET /$INBOX/$GIT_OBJECT_ID/s/$FILENAME
-sub show ($$;$) {
- my ($ctx, $oid_b, $fn) = @_;
- @$ctx{qw(oid_b fn)} = ($oid_b, $fn);
sub {
$ctx->{-wcb} = $_[0]; # HTTP write callback
- may_start_solver $ctx;
+ $limiter->may_start(\&start_solver, $ctx, \&html_page);
};
}
ok(!finish_err($s), 'no error on sleep');
is_deeply([], \@err, 'no warnings');
}
- ok(!finish_err($_->[0]), "true $_->[1] succeeded") foreach @t;
+ undef $s;
+ for (@t) { # DESTROY in order
+ my ($qsp, $i) = (shift(@$_), shift(@$_));
+ ok !finish_err($qsp), "true $i succeeded";
+ }
is_deeply([qw(sleep 0 1 2)], \@run, 'ran in order');
}