From 4ed6b51f693657bccb78731fcf710e7494836bcf Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 21 Mar 2025 22:22:30 +0000 Subject: [PATCH] limiter: refactor to reduce code duplication PlackLimiter, Qspawn, and ViewVCS all have roughly the same code around our base Limiter package, so put everything around a new Limiter->may_start subroutine. PlackLimiter loses some stats as a result but that's logged anyways and I doubt the customizable error message was worth the effort. We now have ckhup and 499 (client disconnect) handling for all PSGI uses of Limiter, as well. t/qspawn.t changes were required since the original ->finalize logic now relies on on_destroy; but none of the existing PSGI code using Qspawn required changes. --- lib/PublicInbox/Limiter.pm | 36 ++++++++++++++++++++++-- lib/PublicInbox/PlackLimiter.pm | 48 +++++++++++++------------------ lib/PublicInbox/Qspawn.pm | 50 ++++++++++++--------------------- lib/PublicInbox/ViewVCS.pm | 45 ++++++----------------------- t/qspawn.t | 6 +++- 5 files changed, 84 insertions(+), 101 deletions(-) diff --git a/lib/PublicInbox/Limiter.pm b/lib/PublicInbox/Limiter.pm index fc62d0d40..f90c8b568 100644 --- a/lib/PublicInbox/Limiter.pm +++ b/lib/PublicInbox/Limiter.pm @@ -4,6 +4,7 @@ package PublicInbox::Limiter; use v5.12; use PublicInbox::Spawn; +use PublicInbox::OnDestroy; sub new { my ($class, $max) = @_; @@ -61,9 +62,40 @@ EOM } } -sub is_too_busy { +sub _do_start ($$$$) { + my ($self, $start_cb, $ctx, $fail_cb) = @_; + $ctx->{"limiter.next.$self"} = on_destroy \&_start_next, $self; + ++$self->{running}; + eval { $start_cb->($ctx, $self) }; + if ($@) { + print { $ctx->{env}->{'psgi.errors'} } "E: $@\n"; + $fail_cb->($ctx, 500, 'internal error'); + } +} + +sub _start_next { # on_destroy cb my ($self) = @_; - scalar(@{$self->{run_queue}}) > ($self->{depth} // 32) + --$self->{running}; + my ($rec, $ck, $start_cb, $ctx, $fail_cb); + while (1) { + $rec = shift @{$self->{run_queue}} or return; + ($start_cb, $ctx, $fail_cb) = @$rec; + $ck = $ctx->{env}->{'pi-httpd.ckhup'} or last; + $ck->($ctx->{env}->{'psgix.io'}->{sock}) or last; + $fail_cb->($ctx, 499, 'client disconnected'); + } + _do_start $self, $start_cb, $ctx, $fail_cb; +} + +sub may_start { + my ($self, $start_cb, $ctx, $fail_cb) = @_; + if ($self->{running} < $self->{max}) { + _do_start $self, $start_cb, $ctx, $fail_cb; + } elsif (@{$self->{run_queue}} > ($self->{depth} // 32)) { + $fail_cb->($ctx, 503, 'too busy'); + } else { + push @{$self->{run_queue}}, [ $start_cb, $ctx, $fail_cb ]; + } } 1; diff --git a/lib/PublicInbox/PlackLimiter.pm b/lib/PublicInbox/PlackLimiter.pm index a1cc51dcf..480392121 100644 --- a/lib/PublicInbox/PlackLimiter.pm +++ b/lib/PublicInbox/PlackLimiter.pm @@ -4,7 +4,7 @@ package PublicInbox::PlackLimiter; use v5.12; use parent qw(Plack::Middleware); -use PublicInbox::OnDestroy; +use PublicInbox::Limiter; sub prepare_app { # called via Plack::Component (used by Plack::Middleware) my ($self) = @_; @@ -12,25 +12,12 @@ sub prepare_app { # called via Plack::Component (used by Plack::Middleware) $self->{max} //= 2; $self->{run_queue} = []; $self->{running} = 0; - $self->{rejected} = 0; - $self->{message} //= "too busy\n"; } -sub r503 ($) { - my @body = ($_[0]->{message}); - ++$_[0]->{rejected}; - [ 503, [ 'Content-Type' => 'text/plain', - 'Content-Length' => length($body[0]) ], \@body ] -} - -sub next_req { # on_destroy cb - my ($self) = @_; - --$self->{running}; - my $env = shift @{$self->{run_queue}} or return; - my $wcb = delete $env->{'p-i.limiter.wcb'} // die 'BUG: no wcb'; - my $res = eval { call($self, $env) }; - return warn("W: $@") if $@; - ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res); +sub lim_fail { # limiter->may_start fail_cb + my ($ctx, $code, $msg) = @_; + delete($ctx->{psgi_wcb})->([ $code, [ 'Content-Type' => 'text/plain', + 'Content-Length' => length($msg) ], [ $msg ] ]); } sub stats ($) { @@ -39,29 +26,32 @@ sub stats ($) { my $res = <{running} queued: $nq -rejected: $self->{rejected} max: $self->{max} EOM [ 200, [ 'Content-Type' => 'text/plain', 'Content-Length' => length($res) ], [ $res ] ] } +sub app_call { # limiter->may_start start_cb + my ($ctx, $self) = @_; + my $wcb = delete $ctx->{psgi_wcb}; + my $env = delete $ctx->{env}; # avoid cyclic ref + push @{$env->{'limiter.ctx'}}, $ctx; # handoff limiter.next.$self + my $res = eval { $self->app->($env) }; + return warn("W: $@") if $@; + ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res); +} + sub call { my ($self, $env) = @_; if (defined $self->{stats_match_cb}) { return stats $self if $self->{stats_match_cb}->($env); } return $self->app->($env) if !$self->{match_cb}->($env); - return r503($self) if @{$self->{run_queue}} > ($self->{depth} // 32); - if ($self->{running} < $self->{max}) { - ++$self->{running}; - $env->{'p-i.limiter.next'} = on_destroy \&next_req, $self; - $self->app->($env); - } else { # capture write cb from PSGI server and queue up - sub { - $env->{'p-i.limiter.wcb'} = $_[0]; - push @{$self->{run_queue}}, $env; - }; + sub { # capture write cb from PSGI server + my $ctx = { env => $env, psgi_wcb => $_[0] }; + PublicInbox::Limiter::may_start( + $self, \&app_call, $ctx, \&lim_fail); } } diff --git a/lib/PublicInbox/Qspawn.pm b/lib/PublicInbox/Qspawn.pm index b882ea535..690860c17 100644 --- a/lib/PublicInbox/Qspawn.pm +++ b/lib/PublicInbox/Qspawn.pm @@ -51,16 +51,10 @@ sub new { } sub _do_spawn { - my ($self, $start_cb, $limiter) = @_; + my ($self) = @_; my ($cmd, $cmd_env, $opt) = @{$self->{args}}; - my %o = %{$opt || {}}; - $self->{limiter} = $limiter; - for my $k (@PublicInbox::Spawn::RLIMITS) { - $opt->{$k} = $limiter->{$k} // next; - } - $self->{-quiet} = 1 if $o{quiet}; - $limiter->{running}++; - if ($start_cb) { + $self->{-quiet} = 1 if $opt->{quiet}; + if (my $start_cb = delete $self->{-start_cb}) { eval { # popen_rd may die on EMFILE, ENFILE $self->{rpipe} = popen_rd($cmd, $cmd_env, $opt, \&waitpid_err, $self); @@ -78,7 +72,7 @@ sub psgi_status_err { # Qspawn itself is useful w/o PSGI PublicInbox::WwwStatic::r($_[0] // 500); } -sub _finalize ($) { +sub finalize ($) { my ($self) = @_; if (my $err = $self->{_err}) { # set by finish or waitpid_err utf8::decode($err); @@ -102,17 +96,6 @@ sub _finalize ($) { } } -sub finalize ($) { - my ($self) = @_; - - # process is done, spawn whatever's in the queue - my $limiter = delete $self->{limiter} or return; - --$limiter->{running}; - my $next = shift @{$limiter->{run_queue}}; - _do_spawn(@$next, $limiter) if $next; - _finalize $self; -} - sub waitpid_err { # callback for awaitpid my (undef, $self) = @_; # $_[0]: pid $self->{_err} = ''; # for defined check in ->finish @@ -156,17 +139,21 @@ sub finish ($;$) { finalize($self) if $closed_before || defined($self->{_err}); } -sub start ($$$) { +sub _qsp_fail { # limiter fail_cb + my ($self, $code, $msg) = @_; + $self->{env}->{'qspawn.fallback'} //= $code; # likely 503 + finalize $self; +} + +sub start ($$;$) { my ($self, $limiter, $start_cb) = @_; - if ($limiter->{running} < $limiter->{max}) { - _do_spawn($self, $start_cb, $limiter); - } elsif ($limiter->is_too_busy) { - $self->{env}->{'qspawn.fallback'} //= 503 if - $self->{env}; - _finalize $self; - } else { - push @{$limiter->{run_queue}}, [ $self, $start_cb ]; + $self->{-start_cb} = $start_cb if $start_cb; + my %opt; + for (@PublicInbox::Spawn::RLIMITS) { + $opt{$_} = $limiter->{$_} // next; } + %{$self->{args}->[2]} = (%{$self->{args}->[2]}, %opt) if keys %opt; + $limiter->may_start(\&_do_spawn, $self, \&_qsp_fail); } # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with @@ -177,8 +164,7 @@ sub psgi_qx { my ($self, $env, $limiter, @qx_cb_arg) = @_; $self->{env} = $env; $self->{qx_cb_arg} = \@qx_cb_arg; - $limiter ||= $def_limiter ||= PublicInbox::Limiter->new; - start($self, $limiter, undef); + start($self, $limiter ||= $def_limiter ||= PublicInbox::Limiter->new); } sub yield_pass { diff --git a/lib/PublicInbox/ViewVCS.pm b/lib/PublicInbox/ViewVCS.pm index 552e3241b..e9ed47115 100644 --- a/lib/PublicInbox/ViewVCS.pm +++ b/lib/PublicInbox/ViewVCS.pm @@ -636,15 +636,8 @@ sub show_blob { # git->cat_async callback ''.dbg_log($ctx), @def); } -sub start_solver ($$) { - my ($ctx, $limiter) = @_; - $ctx->{-next_solver} = on_destroy \&next_solver, $limiter; - ++$limiter->{running}; - if (my $ck = $ctx->{env}->{'pi-httpd.ckhup'}) { - $ck->($ctx->{env}->{'psgix.io'}->{sock}) and - return html_page $ctx, 499, 'client disconnected'; - } - +sub start_solver { + my ($ctx) = @_; while (my ($from, $to) = each %QP_MAP) { my $v = $ctx->{qp}->{$from} // next; $ctx->{hints}->{$to} = $v if $v ne ''; @@ -662,19 +655,11 @@ sub start_solver ($$) { $solver->solve(@$ctx{qw(env lh oid_b hints)}); } -# run the next solver job when done and DESTROY-ed (on_destroy cb) -sub next_solver { - my ($limiter) = @_; - --$limiter->{running}; - my $ctx = shift(@{$limiter->{run_queue}}) // return; - eval { start_solver $ctx, $limiter }; - return unless $@; - warn "W: start_solver: $@"; - html_page($ctx, 500) if $ctx->{-wcb}; -} - -sub may_start_solver ($) { - my ($ctx) = @_; +# GET /$INBOX/$GIT_OBJECT_ID/s/ +# GET /$INBOX/$GIT_OBJECT_ID/s/$FILENAME +sub show ($$;$) { + my ($ctx, $oid_b, $fn) = @_; + @$ctx{qw(oid_b fn)} = ($oid_b, $fn); my $limiter = $ctx->{www}->{pi_cfg}->limiter('-codeblob'); # {solver_limiter} just inherits rlimits from the configurable @@ -685,23 +670,9 @@ sub may_start_solver ($) { $l->{$_} = $limiter->{$_} for grep /^RLIMIT_/, keys %$limiter; $l; }; - if ($limiter->{running} < $limiter->{max}) { - start_solver $ctx, $limiter; - } elsif ($limiter->is_too_busy) { - html_page $ctx, 503, 'too busy'; - } else { - push @{$limiter->{run_queue}}, $ctx; - } -} - -# GET /$INBOX/$GIT_OBJECT_ID/s/ -# GET /$INBOX/$GIT_OBJECT_ID/s/$FILENAME -sub show ($$;$) { - my ($ctx, $oid_b, $fn) = @_; - @$ctx{qw(oid_b fn)} = ($oid_b, $fn); sub { $ctx->{-wcb} = $_[0]; # HTTP write callback - may_start_solver $ctx; + $limiter->may_start(\&start_solver, $ctx, \&html_page); }; } diff --git a/t/qspawn.t b/t/qspawn.t index 507f86a51..dfa33ada0 100644 --- a/t/qspawn.t +++ b/t/qspawn.t @@ -82,7 +82,11 @@ foreach my $cmd ([qw(sleep 1)], [qw(sh -c), 'sleep 1; false']) { ok(!finish_err($s), 'no error on sleep'); is_deeply([], \@err, 'no warnings'); } - ok(!finish_err($_->[0]), "true $_->[1] succeeded") foreach @t; + undef $s; + for (@t) { # DESTROY in order + my ($qsp, $i) = (shift(@$_), shift(@$_)); + ok !finish_err($qsp), "true $i succeeded"; + } is_deeply([qw(sleep 0 1 2)], \@run, 'ran in order'); } -- 2.39.5