]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
limiter: refactor to reduce code duplication
authorEric Wong <e@80x24.org>
Fri, 21 Mar 2025 22:22:30 +0000 (22:22 +0000)
committerEric Wong <e@80x24.org>
Sun, 23 Mar 2025 22:05:44 +0000 (22:05 +0000)
PlackLimiter, Qspawn, and ViewVCS all have roughly the same
code around our base Limiter package, so put everything around
a new Limiter->may_start subroutine.  PlackLimiter loses some
stats as a result but that's logged anyways and I doubt the
customizable error message was worth the effort.

We now have ckhup and 499 (client disconnect) handling for all
PSGI uses of Limiter, as well.

t/qspawn.t changes were required since the original ->finalize
logic now relies on on_destroy; but none of the existing PSGI
code using Qspawn required changes.

lib/PublicInbox/Limiter.pm
lib/PublicInbox/PlackLimiter.pm
lib/PublicInbox/Qspawn.pm
lib/PublicInbox/ViewVCS.pm
t/qspawn.t

index fc62d0d40b7948f39026db0b97b0ba7142faca36..f90c8b568f07e9bd8de700fcd0530c9823ff37a7 100644 (file)
@@ -4,6 +4,7 @@
 package PublicInbox::Limiter;
 use v5.12;
 use PublicInbox::Spawn;
+use PublicInbox::OnDestroy;
 
 sub new {
        my ($class, $max) = @_;
@@ -61,9 +62,40 @@ EOM
        }
 }
 
-sub is_too_busy {
+sub _do_start ($$$$) {
+       my ($self, $start_cb, $ctx, $fail_cb) = @_;
+       $ctx->{"limiter.next.$self"} = on_destroy \&_start_next, $self;
+       ++$self->{running};
+       eval { $start_cb->($ctx, $self) };
+       if ($@) {
+               print { $ctx->{env}->{'psgi.errors'} } "E: $@\n";
+               $fail_cb->($ctx, 500, 'internal error');
+       }
+}
+
+sub _start_next { # on_destroy cb
        my ($self) = @_;
-       scalar(@{$self->{run_queue}}) > ($self->{depth} // 32)
+       --$self->{running};
+       my ($rec, $ck, $start_cb, $ctx, $fail_cb);
+       while (1) {
+               $rec = shift @{$self->{run_queue}} or return;
+               ($start_cb, $ctx, $fail_cb) = @$rec;
+               $ck = $ctx->{env}->{'pi-httpd.ckhup'} or last;
+               $ck->($ctx->{env}->{'psgix.io'}->{sock}) or last;
+               $fail_cb->($ctx, 499, 'client disconnected');
+       }
+       _do_start $self, $start_cb, $ctx, $fail_cb;
+}
+
+sub may_start {
+       my ($self, $start_cb, $ctx, $fail_cb) = @_;
+       if ($self->{running} < $self->{max}) {
+               _do_start $self, $start_cb, $ctx, $fail_cb;
+       } elsif (@{$self->{run_queue}} > ($self->{depth} // 32)) {
+               $fail_cb->($ctx, 503, 'too busy');
+       } else {
+               push @{$self->{run_queue}}, [ $start_cb, $ctx, $fail_cb ];
+       }
 }
 
 1;
index a1cc51dcf49f7a178c7f4a986044aef021c1fd26..4803921219acc7a616729f3901c79d34906e4c07 100644 (file)
@@ -4,7 +4,7 @@
 package PublicInbox::PlackLimiter;
 use v5.12;
 use parent qw(Plack::Middleware);
-use PublicInbox::OnDestroy;
+use PublicInbox::Limiter;
 
 sub prepare_app { # called via Plack::Component (used by Plack::Middleware)
        my ($self) = @_;
@@ -12,25 +12,12 @@ sub prepare_app { # called via Plack::Component (used by Plack::Middleware)
        $self->{max} //= 2;
        $self->{run_queue} = [];
        $self->{running} = 0;
-       $self->{rejected} = 0;
-       $self->{message} //= "too busy\n";
 }
 
-sub r503 ($) {
-       my @body = ($_[0]->{message});
-       ++$_[0]->{rejected};
-       [ 503, [ 'Content-Type' => 'text/plain',
-               'Content-Length' => length($body[0]) ], \@body ]
-}
-
-sub next_req { # on_destroy cb
-       my ($self) = @_;
-       --$self->{running};
-       my $env = shift @{$self->{run_queue}} or return;
-       my $wcb = delete $env->{'p-i.limiter.wcb'} // die 'BUG: no wcb';
-       my $res = eval { call($self, $env) };
-       return warn("W: $@") if $@;
-       ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res);
+sub lim_fail { # limiter->may_start fail_cb
+       my ($ctx, $code, $msg) = @_;
+       delete($ctx->{psgi_wcb})->([ $code, [ 'Content-Type' => 'text/plain',
+               'Content-Length' => length($msg) ], [ $msg ] ]);
 }
 
 sub stats ($) {
@@ -39,29 +26,32 @@ sub stats ($) {
        my $res = <<EOM;
 running: $self->{running}
 queued: $nq
-rejected: $self->{rejected}
 max: $self->{max}
 EOM
        [ 200, [ 'Content-Type' => 'text/plain',
                'Content-Length' => length($res) ], [ $res ] ]
 }
 
+sub app_call { # limiter->may_start start_cb
+       my ($ctx, $self) = @_;
+       my $wcb = delete $ctx->{psgi_wcb};
+       my $env = delete $ctx->{env}; # avoid cyclic ref
+       push @{$env->{'limiter.ctx'}}, $ctx; # handoff limiter.next.$self
+       my $res = eval { $self->app->($env) };
+       return warn("W: $@") if $@;
+       ref($res) eq 'CODE' ? $res->($wcb) : $wcb->($res);
+}
+
 sub call {
        my ($self, $env) = @_;
        if (defined $self->{stats_match_cb}) {
                return stats $self if $self->{stats_match_cb}->($env);
        }
        return $self->app->($env) if !$self->{match_cb}->($env);
-       return r503($self) if @{$self->{run_queue}} > ($self->{depth} // 32);
-       if ($self->{running} < $self->{max}) {
-               ++$self->{running};
-               $env->{'p-i.limiter.next'} = on_destroy \&next_req, $self;
-               $self->app->($env);
-       } else { # capture write cb from PSGI server and queue up
-               sub {
-                       $env->{'p-i.limiter.wcb'} = $_[0];
-                       push @{$self->{run_queue}}, $env;
-               };
+       sub { # capture write cb from PSGI server
+               my $ctx = { env => $env, psgi_wcb => $_[0] };
+               PublicInbox::Limiter::may_start(
+                               $self, \&app_call, $ctx, \&lim_fail);
        }
 }
 
index b882ea535f21e0a3b3e45744e82899e7774b01a9..690860c177f2ab7d29ea4aaaa6e01f3f5d54a7b9 100644 (file)
@@ -51,16 +51,10 @@ sub new {
 }
 
 sub _do_spawn {
-       my ($self, $start_cb, $limiter) = @_;
+       my ($self) = @_;
        my ($cmd, $cmd_env, $opt) = @{$self->{args}};
-       my %o = %{$opt || {}};
-       $self->{limiter} = $limiter;
-       for my $k (@PublicInbox::Spawn::RLIMITS) {
-               $opt->{$k} = $limiter->{$k} // next;
-       }
-       $self->{-quiet} = 1 if $o{quiet};
-       $limiter->{running}++;
-       if ($start_cb) {
+       $self->{-quiet} = 1 if $opt->{quiet};
+       if (my $start_cb = delete $self->{-start_cb}) {
                eval { # popen_rd may die on EMFILE, ENFILE
                        $self->{rpipe} = popen_rd($cmd, $cmd_env, $opt,
                                                        \&waitpid_err, $self);
@@ -78,7 +72,7 @@ sub psgi_status_err { # Qspawn itself is useful w/o PSGI
        PublicInbox::WwwStatic::r($_[0] // 500);
 }
 
-sub _finalize ($) {
+sub finalize ($) {
        my ($self) = @_;
        if (my $err = $self->{_err}) { # set by finish or waitpid_err
                utf8::decode($err);
@@ -102,17 +96,6 @@ sub _finalize ($) {
        }
 }
 
-sub finalize ($) {
-       my ($self) = @_;
-
-       # process is done, spawn whatever's in the queue
-       my $limiter = delete $self->{limiter} or return;
-       --$limiter->{running};
-       my $next = shift @{$limiter->{run_queue}};
-       _do_spawn(@$next, $limiter) if $next;
-       _finalize $self;
-}
-
 sub waitpid_err { # callback for awaitpid
        my (undef, $self) = @_; # $_[0]: pid
        $self->{_err} = ''; # for defined check in ->finish
@@ -156,17 +139,21 @@ sub finish ($;$) {
        finalize($self) if $closed_before || defined($self->{_err});
 }
 
-sub start ($$$) {
+sub _qsp_fail { # limiter fail_cb
+       my ($self, $code, $msg) = @_;
+       $self->{env}->{'qspawn.fallback'} //= $code; # likely 503
+       finalize $self;
+}
+
+sub start ($$;$) {
        my ($self, $limiter, $start_cb) = @_;
-       if ($limiter->{running} < $limiter->{max}) {
-               _do_spawn($self, $start_cb, $limiter);
-       } elsif ($limiter->is_too_busy) {
-               $self->{env}->{'qspawn.fallback'} //= 503 if
-                       $self->{env};
-               _finalize $self;
-       } else {
-               push @{$limiter->{run_queue}}, [ $self, $start_cb ];
+       $self->{-start_cb} = $start_cb if $start_cb;
+       my %opt;
+       for (@PublicInbox::Spawn::RLIMITS) {
+               $opt{$_} = $limiter->{$_} // next;
        }
+       %{$self->{args}->[2]} = (%{$self->{args}->[2]}, %opt) if keys %opt;
+       $limiter->may_start(\&_do_spawn, $self, \&_qsp_fail);
 }
 
 # Similar to `backtick` or "qx" ("perldoc -f qx"), it calls @qx_cb_arg with
@@ -177,8 +164,7 @@ sub psgi_qx {
        my ($self, $env, $limiter, @qx_cb_arg) = @_;
        $self->{env} = $env;
        $self->{qx_cb_arg} = \@qx_cb_arg;
-       $limiter ||= $def_limiter ||= PublicInbox::Limiter->new;
-       start($self, $limiter, undef);
+       start($self, $limiter ||= $def_limiter ||= PublicInbox::Limiter->new);
 }
 
 sub yield_pass {
index 552e3241b11e16dd313865752fe6090083090f5d..e9ed471157b22c3a2fee2d233615cbafc4dadaa2 100644 (file)
@@ -636,15 +636,8 @@ sub show_blob { # git->cat_async callback
                '</code></pre></td></tr></table>'.dbg_log($ctx), @def);
 }
 
-sub start_solver ($$) {
-       my ($ctx, $limiter) = @_;
-       $ctx->{-next_solver} = on_destroy \&next_solver, $limiter;
-       ++$limiter->{running};
-       if (my $ck = $ctx->{env}->{'pi-httpd.ckhup'}) {
-               $ck->($ctx->{env}->{'psgix.io'}->{sock}) and
-                       return html_page $ctx, 499, 'client disconnected';
-       }
-
+sub start_solver {
+       my ($ctx) = @_;
        while (my ($from, $to) = each %QP_MAP) {
                my $v = $ctx->{qp}->{$from} // next;
                $ctx->{hints}->{$to} = $v if $v ne '';
@@ -662,19 +655,11 @@ sub start_solver ($$) {
        $solver->solve(@$ctx{qw(env lh oid_b hints)});
 }
 
-# run the next solver job when done and DESTROY-ed (on_destroy cb)
-sub next_solver {
-       my ($limiter) = @_;
-       --$limiter->{running};
-       my $ctx = shift(@{$limiter->{run_queue}}) // return;
-       eval { start_solver $ctx, $limiter };
-       return unless $@;
-       warn "W: start_solver: $@";
-       html_page($ctx, 500) if $ctx->{-wcb};
-}
-
-sub may_start_solver ($) {
-       my ($ctx) = @_;
+# GET /$INBOX/$GIT_OBJECT_ID/s/
+# GET /$INBOX/$GIT_OBJECT_ID/s/$FILENAME
+sub show ($$;$) {
+       my ($ctx, $oid_b, $fn) = @_;
+       @$ctx{qw(oid_b fn)} = ($oid_b, $fn);
        my $limiter = $ctx->{www}->{pi_cfg}->limiter('-codeblob');
 
        # {solver_limiter} just inherits rlimits from the configurable
@@ -685,23 +670,9 @@ sub may_start_solver ($) {
                $l->{$_} = $limiter->{$_} for grep /^RLIMIT_/, keys %$limiter;
                $l;
        };
-       if ($limiter->{running} < $limiter->{max}) {
-               start_solver $ctx, $limiter;
-       } elsif ($limiter->is_too_busy) {
-               html_page $ctx, 503, 'too busy';
-       } else {
-               push @{$limiter->{run_queue}}, $ctx;
-       }
-}
-
-# GET /$INBOX/$GIT_OBJECT_ID/s/
-# GET /$INBOX/$GIT_OBJECT_ID/s/$FILENAME
-sub show ($$;$) {
-       my ($ctx, $oid_b, $fn) = @_;
-       @$ctx{qw(oid_b fn)} = ($oid_b, $fn);
        sub {
                $ctx->{-wcb} = $_[0]; # HTTP write callback
-               may_start_solver $ctx;
+               $limiter->may_start(\&start_solver, $ctx, \&html_page);
        };
 }
 
index 507f86a515888027e2013d905a29596bab71d57d..dfa33ada06d03b223c48bc33151a81472fed4ecc 100644 (file)
@@ -82,7 +82,11 @@ foreach my $cmd ([qw(sleep 1)], [qw(sh -c), 'sleep 1; false']) {
                ok(!finish_err($s), 'no error on sleep');
                is_deeply([], \@err, 'no warnings');
        }
-       ok(!finish_err($_->[0]), "true $_->[1] succeeded") foreach @t;
+       undef $s;
+       for (@t) { # DESTROY in order
+               my ($qsp, $i) = (shift(@$_), shift(@$_));
+               ok !finish_err($qsp), "true $i succeeded";
+       }
        is_deeply([qw(sleep 0 1 2)], \@run, 'ran in order');
 }