From: Eric Wong Date: Fri, 29 Nov 2024 23:53:59 +0000 (+0000) Subject: send_cmd: use (practically) infinite retries for writers X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=237eb43a50ef7a9bd117635077421d7463158cbe;p=thirdparty%2Fpublic-inbox.git send_cmd: use (practically) infinite retries for writers Write tools (-*index, -watch, -mda, lei) should never croak due to the system being busy. So make the retry infinite to benefit users who run several parallel imports at once on a slower system. The previous 5s timeout was too close to failing in my own experience using `lei import' on an old, busy machine. For lei (inotify || EVFILT_VNODE) watches, we now retry on busy sockets to avoid loss of FS change notifications. On the contrary, public-facing read-only interfaces have always been assumed to constantly be under attack. Thus continuing to drop requests due to a lack of kernel memory/buffers is probably prudent. --- diff --git a/lib/PublicInbox/CmdIPC4.pm b/lib/PublicInbox/CmdIPC4.pm index fc77bd03e..bdf17a3f5 100644 --- a/lib/PublicInbox/CmdIPC4.pm +++ b/lib/PublicInbox/CmdIPC4.pm @@ -11,7 +11,7 @@ use Socket qw(SOL_SOCKET SCM_RIGHTS); sub sendmsg_retry ($) { return 1 if $!{EINTR}; return unless ($!{ENOMEM} || $!{ENOBUFS} || $!{ETOOMANYREFS}); - return if --$_[0] < 0; + return if $_[0]-- == 0; warn "# sleeping on sendmsg: $! ($_[0] tries left)\n"; select(undef, undef, undef, 0.1); 1; @@ -24,7 +24,7 @@ no warnings 'once'; # any number of FDs per-sendmsg(2) + buffer *send_cmd4 = sub ($$$$;$) { # (sock, fds, buf, flags) = @_; my ($sock, $fds, undef, $flags, $tries) = @_; - $tries //= 50; + $tries //= -1; # infinite my $mh = Socket::MsgHdr->new(buf => $_[2]); $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('i' x scalar(@$fds), @$fds)); my $s; diff --git a/lib/PublicInbox/IPC.pm b/lib/PublicInbox/IPC.pm index 13a897be3..806653dca 100644 --- a/lib/PublicInbox/IPC.pm +++ b/lib/PublicInbox/IPC.pm @@ -331,11 +331,13 @@ sub wq_nonblock_do { # always async my $buf = ipc_freeze([$sub, @args]); if ($self->{wqb}) { # saturated once, assume saturated forever $self->{wqb}->flush_send($buf); - } else { - $send_cmd->($self->{-wq_s1}, [], $buf, 0) // - ($!{EAGAIN} ? PublicInbox::WQBlocked->new($self, $buf) - : croak("sendmsg: $!")); - } + } elsif (!defined $send_cmd->($self->{-wq_s1}, [], $buf, 0)) { + if ($!{EAGAIN} || $!{ENOBUFS} || $!{ENOMEM}) { + PublicInbox::WQBlocked->new($self, $buf); + } else { + croak "sendmsg: $!"; + } + } # else success } sub _wq_worker_start { diff --git a/lib/PublicInbox/Spawn.pm b/lib/PublicInbox/Spawn.pm index 19bedb107..1c6cc5bec 100644 --- a/lib/PublicInbox/Spawn.pm +++ b/lib/PublicInbox/Spawn.pm @@ -176,15 +176,15 @@ out: return (int)pid; } -static int sendmsg_retry(int *tries) +static int sendmsg_retry(long *tries) { const struct timespec req = { 0, 100000000 }; /* 100ms */ int err = errno; switch (err) { case EINTR: PERL_ASYNC_CHECK(); return 1; case ENOBUFS: case ENOMEM: case ETOOMANYREFS: - if (--*tries < 0) return 0; - fprintf(stderr, "# sleeping on sendmsg: %s (%d tries left)\n", + if (*tries-- == 0) return 0; + fprintf(stderr, "# sleeping on sendmsg: %s (%ld tries left)\n", strerror(err), *tries); nanosleep(&req, NULL); PERL_ASYNC_CHECK(); @@ -201,7 +201,7 @@ union my_cmsg { char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE]; }; -SV *send_cmd4_(PerlIO *s, SV *svfds, SV *data, int flags, int tries) +SV *send_cmd4_(PerlIO *s, SV *svfds, SV *data, int flags, long tries) { struct msghdr msg = { 0 }; union my_cmsg cmsg = { 0 }; diff --git a/lib/PublicInbox/Syscall.pm b/lib/PublicInbox/Syscall.pm index ebcedb890..e5e3340fa 100644 --- a/lib/PublicInbox/Syscall.pm +++ b/lib/PublicInbox/Syscall.pm @@ -502,7 +502,7 @@ require PublicInbox::CmdIPC4; $msg_controllen, 0); # msg_flags my $s; - $tries //= 50; + $tries //= -1; do { $s = syscall($SYS_sendmsg, fileno($sock), $mh, $flags); } while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($tries)); diff --git a/lib/PublicInbox/WQBlocked.pm b/lib/PublicInbox/WQBlocked.pm index 8d931fa9d..b2463898c 100644 --- a/lib/PublicInbox/WQBlocked.pm +++ b/lib/PublicInbox/WQBlocked.pm @@ -26,8 +26,15 @@ sub flush_send { my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf, 0); next if defined($n); - Carp::croak("sendmsg: $!") unless $!{EAGAIN}; - PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT); + if ($!{EAGAIN}) { + PublicInbox::DS::epwait($wq_s1, + EPOLLOUT|EPOLLONESHOT); + } elsif ($!{ENOBUFS} || $!{ENOMEM}) { + PublicInbox::DS::add_timer(0.1, \&flush_send, + $self); + } else { + Carp::croak("sendmsg: $!"); + } unshift @{$self->{msgq}}, $buf; last; # wait for ->event_step } diff --git a/lib/PublicInbox/XapClient.pm b/lib/PublicInbox/XapClient.pm index 24b3f45e0..786982d87 100644 --- a/lib/PublicInbox/XapClient.pm +++ b/lib/PublicInbox/XapClient.pm @@ -12,7 +12,7 @@ use PublicInbox::Spawn qw(spawn); use Socket qw(AF_UNIX SOCK_SEQPACKET); use PublicInbox::IPC; use autodie qw(pipe socketpair); -our $tries = 50; +our $tries = -1; # set to zero by read-only daemon sub mkreq { my ($self, $ios, @arg) = @_;