From: Eric Wong Date: Mon, 12 May 2025 20:45:00 +0000 (+0000) Subject: www: extmsg: async partial Message-ID search X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=c3d0295bd96297600a98b1d6aa6d79790518d906;p=thirdparty%2Fpublic-inbox.git www: extmsg: async partial Message-ID search While partial Message-ID matches tend to be relatively fast, there's still some potential for pathological matches taking a long time and blocking our event loop. So throw them over to the external xap_helper process to ensure our main event loop can handle other HTTP/NNTP/POP3/IMAP requests while potentially waiting on Xapian disk seeks. --- diff --git a/lib/PublicInbox/ExtMsg.pm b/lib/PublicInbox/ExtMsg.pm index c486467c3..7453df1ef 100644 --- a/lib/PublicInbox/ExtMsg.pm +++ b/lib/PublicInbox/ExtMsg.pm @@ -29,11 +29,30 @@ our @EXT_URL = map { ascii_html($_) } ( sub PARTIAL_MAX () { 100 } -sub search_partial ($$) { - my ($ibx, $mid) = @_; +sub partial_cb { # async_mset cb + my ($ctx, $srch, $mset, $err) = @_; + if ($err) { + my $msg = "W: query failed: $! ($err)"; + ++$ctx->{ext_msg_partial_fail}; + warn($msg); + } else { + my $ibx = $ctx->{partial_ibx}; + my @mid = map { $_->{mid} } @{$srch->mset_to_smsg($ibx, $mset)}; + if (scalar @mid) { + push @{$ctx->{partial}}, [ $ibx, \@mid ]; + (($ctx->{n_partial} += scalar(@mid)) >= PARTIAL_MAX) and + delete $ctx->{again}; # done + } + } + PublicInbox::DS::requeue($ctx) if $ctx->{env}->{'pi-httpd.app'}; +} + +# returns true if started asynchronously +sub partial_ibx_start ($$) { + my ($ctx, $ibx) = @_; + my $mid = $ctx->{mid}; return if length($mid) < $MIN_PARTIAL_LEN; my $srch = $ibx->isrch or return; - my $opt = { limit => PARTIAL_MAX, sort_col => -1, asc => 1 }; my @try = ("m:$mid*"); my $chop = $mid; if ($chop =~ s/(\W+)(\w*)\z//) { @@ -59,18 +78,9 @@ sub search_partial ($$) { push(@try, join(' ', map { "m:$_" } @long)); } } - - foreach my $m (@try) { - # If Xapian can't handle the wildcard since it - # has too many results. $@ can be - # Search::Xapian::QueryParserError or even: - # "something terrible happened at ../Search/Xapian/Enquire.pm" - my $mset = eval { $srch->mset($m, $opt) } or next; - my @mids = map { - $_->{mid} - } @{$srch->mset_to_smsg($ibx, $mset)}; - return \@mids if scalar(@mids); - } + $ctx->{partial_ibx} = $ibx; + my $opt = { limit => PARTIAL_MAX, sort_col => -1, asc => 1 }; + $srch->async_mset(\@try, $opt, \&partial_cb, $ctx, $srch); } sub ext_msg_i { @@ -104,19 +114,28 @@ sub ext_msg_step { } } -sub try_partial ($) { +sub partial_enter ($) { my ($ctx) = @_; bless $ctx, __PACKAGE__; # for ExtMsg->event_step return $ctx->event_step if $ctx->{env}->{'pi-httpd.app'}; $ctx->event_step(1) while $ctx->{-wcb}; } +sub partial_prepare ($@) { + my ($ctx, @try_ibxish) = @_; + $ctx->{again} = \@try_ibxish; + sub { + $ctx->{-wcb} = $_[0]; # HTTP server write callback + partial_enter $ctx; + } +} + sub ext_msg_ALL ($) { my ($ctx) = @_; my $ALL = $ctx->{www}->{pi_cfg}->ALL or return; + return partial_prepare($ctx, $ALL) if $ALL == $ctx->{ibx}; my $by_eidx_key = $ctx->{www}->{pi_cfg}->{-by_eidx_key}; - my $cur_key = eval { $ctx->{ibx}->eidx_key } // - return partial_response($ctx); # $cur->{ibx} == $ALL + my $cur_key = $ctx->{ibx}->eidx_key; my %seen = ($cur_key => 1); my ($id, $prev); while (my $x = $ALL->over->next_by_mid($ctx->{mid}, \$id, \$prev)) { @@ -129,14 +148,7 @@ sub ext_msg_ALL ($) { push(@{$ctx->{found}}, $ibx) unless $seen{$k}++; } } - return exact($ctx) if $ctx->{found}; - - # fall back to partial MID matching - $ctx->{again} = [ $ctx->{ibx}, $ALL ]; - sub { - $ctx->{-wcb} = $_[0]; # HTTP server write callback - try_partial $ctx; - } + $ctx->{found} ? exact($ctx) : partial_prepare($ctx, $ctx->{ibx}, $ALL); } # only public entry point @@ -165,30 +177,19 @@ sub event_step { my ($ctx, $sync) = @_; # can't find a partial match in current inbox, try the others: my $ibx = shift @{$ctx->{again}} or return finalize_partial($ctx); - my $mids = search_partial($ibx, $ctx->{mid}) or - return ($sync ? undef : PublicInbox::DS::requeue($ctx)); - $ctx->{n_partial} += scalar(@$mids); - push @{$ctx->{partial}}, [ $ibx, $mids ]; - $ctx->{n_partial} >= PARTIAL_MAX ? finalize_partial($ctx) - : ($sync ? undef : PublicInbox::DS::requeue($ctx)); + unless (partial_ibx_start($ctx, $ibx)) { + PublicInbox::DS::requeue($ctx) unless $sync; + } } sub finalize_exact { my ($ctx) = @_; - - return delete($ctx->{-wcb})->(exact($ctx)) if $ctx->{found}; - - # fall back to partial MID matching - my $mid = $ctx->{mid}; - my $cur = $ctx->{ibx}; - my $mids = search_partial($cur, $mid); - if ($mids) { - $ctx->{n_partial} = scalar(@$mids); - push @{$ctx->{partial}}, [ $cur, $mids ]; - } elsif ($ctx->{again} && length($mid) >= $MIN_PARTIAL_LEN) { - return try_partial $ctx; + if ($ctx->{found}) { + delete($ctx->{-wcb})->(exact($ctx)); + } else { # no exact matches? fall back to partial msgid matching + $ctx->{again} = [ $ctx->{ibx} ]; + partial_enter $ctx; } - finalize_partial($ctx); } sub _url_pfx ($$;$) { @@ -230,6 +231,13 @@ sub partial_response ($) { $s .= $ext; $code = 300; } + if (my $nr = delete $ctx->{ext_msg_partial_fail}) { + $s .= <{-html_tip} = $s .= ''; $ctx->{-title_html} = $title; html_oneshot($ctx, $code); diff --git a/lib/PublicInbox/Isearch.pm b/lib/PublicInbox/Isearch.pm index 8fe3c54a1..bcda9ae48 100644 --- a/lib/PublicInbox/Isearch.pm +++ b/lib/PublicInbox/Isearch.pm @@ -72,10 +72,10 @@ sub mset { } sub async_mset { - my ($self, $str, $opt, $cb, @args) = @_; + my ($self, $qry, $opt, $cb, @args) = @_; $opt = eidx_mset_prep $self, $opt; local $self->{es}->{-extra} = $self->{-extra} if $self->{-extra}; - $self->{es}->async_mset($str, $opt, $cb, @args); + $self->{es}->async_mset($qry, $opt, $cb, @args); } sub mset_to_artnums { diff --git a/lib/PublicInbox/LeiRemote.pm b/lib/PublicInbox/LeiRemote.pm index 71aef8099..3d06f59c0 100644 --- a/lib/PublicInbox/LeiRemote.pm +++ b/lib/PublicInbox/LeiRemote.pm @@ -58,8 +58,13 @@ sub mset { # fake support for async API sub async_mset { - my ($self, $qstr, undef, $cb, @arg) = @_; # $opt ($_[2]) ignored - $cb->(@arg, mset($self, $qstr)); + my ($self, $qry, undef, $cb, @arg) = @_; # $opt ($_[2]) ignored + my $mset; + for my $qstr (ref($qry) eq 'ARRAY' ? @$qry : ($qry)) { + $mset = eval { mset($self, $qstr) }; + last if $mset && $mset->size; + } + $mset ? $cb->(@arg, $mset) : $cb->(@arg, undef, $@ || 'err'); undef; } diff --git a/lib/PublicInbox/Search.pm b/lib/PublicInbox/Search.pm index 389a95012..0caa915e2 100644 --- a/lib/PublicInbox/Search.pm +++ b/lib/PublicInbox/Search.pm @@ -482,15 +482,15 @@ sub xh_opt ($$) { # returns a true value if actually handled asynchronously, # and a falsy value if handled synchronously sub async_mset { - my ($self, $qry_str, $opt, $cb, @args) = @_; + my ($self, $qry, $opt, $cb, @args) = @_; if ($XHC) { # unconditionally retrieving pct + rank for now xdb($self); # populate {nshards} my @margs = ($self->xh_args, xh_opt($self, $opt), '--'); my $ret = eval { pipe my $out_rd, my $out_wr; open my $err_rw, '+>', undef; - $XHC->mkreq([ $out_wr, $err_rw ], - 'mset', @margs, $qry_str); + $XHC->mkreq([ $out_wr, $err_rw ], 'mset', @margs, + (ref($qry) eq 'ARRAY' ? @$qry : $qry)); undef $out_wr; PublicInbox::XhcMset->maybe_new($out_rd, $err_rw, $self, $cb, @args); @@ -498,8 +498,12 @@ sub async_mset { $cb->(@args, undef, $@) if $@; $ret; } else { # synchronous - my $mset = eval { $self->mset($qry_str, $opt) }; - $@ ? $cb->(@args, undef, $@) : $cb->(@args, $mset); + my $mset; + for my $qstr (ref($qry) eq 'ARRAY' ? @$qry : ($qry)) { + $mset = eval { $self->mset($qstr, $opt) }; + last if $mset && $mset->size; + } + $mset ? $cb->(@args, $mset) : $cb->(@args, undef, $@ || 'err'); undef; } }