]> git.ipfire.org Git - thirdparty/public-inbox.git/commitdiff
send_cmd: use (practically) infinite retries for writers
authorEric Wong <e@80x24.org>
Fri, 29 Nov 2024 23:53:59 +0000 (23:53 +0000)
committerEric Wong <e@80x24.org>
Sat, 30 Nov 2024 22:43:35 +0000 (22:43 +0000)
Write tools (-*index, -watch, -mda, lei) should never croak due
to the system being busy.  So make the retry infinite to benefit
users who run several parallel imports at once on a slower
system.  The previous 5s timeout was too close to failing in
my own experience using `lei import' on an old, busy machine.

For lei (inotify || EVFILT_VNODE) watches, we now retry on busy
sockets to avoid loss of FS change notifications.

On the contrary, public-facing read-only interfaces have always
been assumed to constantly be under attack.  Thus continuing to
drop requests due to a lack of kernel memory/buffers is probably
prudent.

lib/PublicInbox/CmdIPC4.pm
lib/PublicInbox/IPC.pm
lib/PublicInbox/Spawn.pm
lib/PublicInbox/Syscall.pm
lib/PublicInbox/WQBlocked.pm
lib/PublicInbox/XapClient.pm

index fc77bd03e1821506eb0a2cb20b061b4af58d61cb..bdf17a3f58d7f3f2fa7196bb61802a711bdad3c2 100644 (file)
@@ -11,7 +11,7 @@ use Socket qw(SOL_SOCKET SCM_RIGHTS);
 sub sendmsg_retry ($) {
        return 1 if $!{EINTR};
        return unless ($!{ENOMEM} || $!{ENOBUFS} || $!{ETOOMANYREFS});
-       return if --$_[0] < 0;
+       return if $_[0]-- == 0;
        warn "# sleeping on sendmsg: $! ($_[0] tries left)\n";
        select(undef, undef, undef, 0.1);
        1;
@@ -24,7 +24,7 @@ no warnings 'once';
 # any number of FDs per-sendmsg(2) + buffer
 *send_cmd4 = sub ($$$$;$) { # (sock, fds, buf, flags) = @_;
        my ($sock, $fds, undef, $flags, $tries) = @_;
-       $tries //= 50;
+       $tries //= -1; # infinite
        my $mh = Socket::MsgHdr->new(buf => $_[2]);
        $mh->cmsghdr(SOL_SOCKET, SCM_RIGHTS, pack('i' x scalar(@$fds), @$fds));
        my $s;
index 13a897be3658fd480a1f9f1080d9f3d093e87061..806653dca2a3929905c6423d3fedc4ec6e84c5d9 100644 (file)
@@ -331,11 +331,13 @@ sub wq_nonblock_do { # always async
        my $buf = ipc_freeze([$sub, @args]);
        if ($self->{wqb}) { # saturated once, assume saturated forever
                $self->{wqb}->flush_send($buf);
-       } else {
-               $send_cmd->($self->{-wq_s1}, [], $buf, 0) //
-                       ($!{EAGAIN} ? PublicInbox::WQBlocked->new($self, $buf)
-                                       : croak("sendmsg: $!"));
-       }
+       } elsif (!defined $send_cmd->($self->{-wq_s1}, [], $buf, 0)) {
+               if ($!{EAGAIN} || $!{ENOBUFS} || $!{ENOMEM}) {
+                       PublicInbox::WQBlocked->new($self, $buf);
+               } else {
+                       croak "sendmsg: $!";
+               }
+       } # else success
 }
 
 sub _wq_worker_start {
index 19bedb1079ccab270add5ba89eeceebff72b263f..1c6cc5becb826feb2fdcd0a9cb9574f6b50d1117 100644 (file)
@@ -176,15 +176,15 @@ out:
        return (int)pid;
 }
 
-static int sendmsg_retry(int *tries)
+static int sendmsg_retry(long *tries)
 {
        const struct timespec req = { 0, 100000000 }; /* 100ms */
        int err = errno;
        switch (err) {
        case EINTR: PERL_ASYNC_CHECK(); return 1;
        case ENOBUFS: case ENOMEM: case ETOOMANYREFS:
-               if (--*tries < 0) return 0;
-               fprintf(stderr, "# sleeping on sendmsg: %s (%d tries left)\n",
+               if (*tries-- == 0) return 0;
+               fprintf(stderr, "# sleeping on sendmsg: %s (%ld tries left)\n",
                        strerror(err), *tries);
                nanosleep(&req, NULL);
                PERL_ASYNC_CHECK();
@@ -201,7 +201,7 @@ union my_cmsg {
        char pad[sizeof(struct cmsghdr) + 16 + SEND_FD_SPACE];
 };
 
-SV *send_cmd4_(PerlIO *s, SV *svfds, SV *data, int flags, int tries)
+SV *send_cmd4_(PerlIO *s, SV *svfds, SV *data, int flags, long tries)
 {
        struct msghdr msg = { 0 };
        union my_cmsg cmsg = { 0 };
index ebcedb8909c1889b7abbe3bca9d9d988ee608ff0..e5e3340fa11123e52c38d48135330196a6bf6154 100644 (file)
@@ -502,7 +502,7 @@ require PublicInbox::CmdIPC4;
                        $msg_controllen,
                        0); # msg_flags
        my $s;
-       $tries //= 50;
+       $tries //= -1;
        do {
                $s = syscall($SYS_sendmsg, fileno($sock), $mh, $flags);
        } while ($s < 0 && PublicInbox::CmdIPC4::sendmsg_retry($tries));
index 8d931fa9d1ad8af9b941eceb9ad9e2dbf37ad68e..b2463898c24dd2f52ad915c5d57500926ece680c 100644 (file)
@@ -26,8 +26,15 @@ sub flush_send {
                        my $n = $PublicInbox::IPC::send_cmd->($wq_s1, [], $buf,
                                                                0);
                        next if defined($n);
-                       Carp::croak("sendmsg: $!") unless $!{EAGAIN};
-                       PublicInbox::DS::epwait($wq_s1, EPOLLOUT|EPOLLONESHOT);
+                       if ($!{EAGAIN}) {
+                               PublicInbox::DS::epwait($wq_s1,
+                                                       EPOLLOUT|EPOLLONESHOT);
+                       } elsif ($!{ENOBUFS} || $!{ENOMEM}) {
+                               PublicInbox::DS::add_timer(0.1, \&flush_send,
+                                                       $self);
+                       } else {
+                               Carp::croak("sendmsg: $!");
+                       }
                        unshift @{$self->{msgq}}, $buf;
                        last; # wait for ->event_step
                }
index 24b3f45e0c14a06204e54361bf0a5546d5f13363..786982d87238e32067230fce50590cd1184e6638 100644 (file)
@@ -12,7 +12,7 @@ use PublicInbox::Spawn qw(spawn);
 use Socket qw(AF_UNIX SOCK_SEQPACKET);
 use PublicInbox::IPC;
 use autodie qw(pipe socketpair);
-our $tries = 50;
+our $tries = -1; # set to zero by read-only daemon
 
 sub mkreq {
        my ($self, $ios, @arg) = @_;