t/solver_git.t
t/spamcheck_spamc.t
t/spawn.t
+t/syscall.t
t/tail_notify.t
t/thread-cycle.t
t/thread-index-gap.t
use PublicInbox::OnDestroy;
use Errno qw(EAGAIN EINVAL ECHILD);
use Carp qw(carp croak);
+use List::Util qw(sum);
our @EXPORT_OK = qw(now msg_more awaitpid add_timer add_uniq_timer);
+my $sendmsg_more = PublicInbox::Syscall->can('sendmsg_more');
my $nextq; # queue for next_tick
my $reap_armed;
undef;
}
-sub tmpio ($$$) {
- my ($self, $bref, $off) = @_;
+sub tmpio ($$$;@) {
+ my ($self, $bref, $off, @rest) = @_;
my $fh = tmpfile 'wbuf', $self->{sock}, 1 or
return drop $self, "tmpfile $!";
$fh->autoflush(1);
my $n = syswrite($fh, $$bref, $len, $off) //
return drop $self, "write ($len): $!";
$n == $len or return drop $self, "wrote $n < $len bytes";
+ @rest and (print $fh @rest or return drop $self, "print rest: $!");
[ $fh, 0 ] # [1] = offset, [2] = length, not set by us
}
}
}
-use constant MSG_MORE => ($^O eq 'linux') ? 0x8000 : 0;
-
-sub msg_more ($$) {
- my $self = $_[0]; # $_[1] = buf
- my ($sock, $wbuf, $n, $nlen, $tmpio);
+sub msg_more ($@) {
+ my $self = shift;
+ my ($sock, $wbuf);
$sock = $self->{sock} or return 1;
$wbuf = $self->{wbuf};
- if (MSG_MORE && (!defined($wbuf) || !scalar(@$wbuf)) &&
+ if ($sendmsg_more && (!defined($wbuf) || !scalar(@$wbuf)) &&
!$sock->can('stop_SSL')) {
- $n = send($sock, $_[1], MSG_MORE);
- if (defined $n) {
- $nlen = length($_[1]) - $n;
- return 1 if $nlen == 0; # all done!
- # queue up the unwritten substring:
- $tmpio = tmpio($self, \($_[1]), $n) or return 0;
- push @{$self->{wbuf}}, $tmpio; # autovivifies
- epwait $sock, EPOLLOUT|EPOLLONESHOT;
- return 0;
+ my ($s, $tip, $tmpio);
+ $s = $sendmsg_more->($sock, @_);
+ if (defined $s) {
+ my $exp = sum(map length, @_);
+ return 1 if $s == $exp;
+ while (@_) {
+ $tip = shift;
+ if ($s >= length($tip)) { # fully written
+ $s -= length($tip);
+ } else { # first partial write
+ $tmpio = tmpio $self, \$tip, $s, @_
+ or return 0;
+ last;
+ }
+ }
+ $tmpio // return drop $self, "BUG: tmpio on $s != $exp";
+ } elsif ($! == EAGAIN) {
+ $tip = shift;
+ $tmpio = tmpio $self, \$tip, 0, @_ or return 0;
+ } else { # client disconnected
+ return $self->close;
}
+ push @{$self->{wbuf}}, $tmpio; # autovivifies
+ epwait $sock, EPOLLOUT|EPOLLONESHOT;
+ 0;
+ } else {
+ # don't redispatch into NNTPdeflate::write
+ PublicInbox::DS::write($self, join('', @_));
}
-
- # don't redispatch into NNTPdeflate::write
- PublicInbox::DS::write($self, \($_[1]));
}
# return true if complete, false if incomplete (or failure)
}
# override PublicInbox::DS::msg_more
-sub msg_more ($$) {
- my $self = $_[0];
+sub msg_more ($@) {
+ my $self = shift;
# $_[1] may be a reference or not for ->deflate
- my $err = $zout->deflate($_[1], $zbuf);
- $err == Z_OK or die "->deflate failed $err";
+ my $err;
+ for (@_) {
+ $err = $zout->deflate($_, $zbuf);
+ $err == Z_OK or die "->deflate failed $err";
+ }
1;
}
# middlewares such as Deflater may write empty strings
sub chunked_write ($$) {
- my $self = $_[0];
- return if $_[1] eq '';
- msg_more($self, sprintf("%x\r\n", length($_[1])));
- msg_more($self, $_[1]);
-
- # use $self->write(\"\n\n") if you care about real-time
- # streaming responses, public-inbox WWW does not.
- msg_more($self, "\r\n");
+ my ($self, $buf) = @_;
+ $buf eq '' or
+ msg_more $self, sprintf("%x\r\n", length($buf)), $buf, "\r\n";
}
sub identity_write ($$) {
sub emit_rfc822 {
my ($self, $k, undef, $bref) = @_;
- $self->msg_more(" $k {" . length($$bref)."}\r\n");
- $self->msg_more($$bref);
+ $self->msg_more(" $k {" . length($$bref)."}\r\n", $$bref);
}
# Mail::IMAPClient::message_string cares about this by default,
sub emit_envelope {
my ($self, undef, undef, undef, $eml) = @_;
- $self->msg_more(' ENVELOPE '.eml_envelope($eml));
+ $self->msg_more(' ENVELOPE ', eml_envelope($eml));
}
sub emit_rfc822_header {
my ($self, $k, undef, undef, $eml) = @_;
- $self->msg_more(" $k {".length(${$eml->{hdr}})."}\r\n");
- $self->msg_more(${$eml->{hdr}});
+ $self->msg_more(" $k {".length(${$eml->{hdr}})."}\r\n", ${$eml->{hdr}});
}
# n.b. this is sorted to be after any emit_eml_new ops
sub emit_rfc822_text {
my ($self, $k, undef, $bref) = @_;
- $self->msg_more(" $k {".length($$bref)."}\r\n");
- $self->msg_more($$bref);
+ $self->msg_more(" $k {".length($$bref)."}\r\n", $$bref);
}
sub emit_bodystructure {
} else {
$len = length($str);
}
- $self->msg_more(" $k {$len}\r\n");
- $self->msg_more($str);
+ $self->msg_more(" $k {$len}\r\n", $str);
}
}
use Socket qw(SOL_SOCKET SCM_RIGHTS);
use Config;
our %SIGNUM = (WINCH => 28); # most Linux, {Free,Net,Open}BSD, *Darwin
-our ($INOTIFY, %PACK);
+our ($INOTIFY, %CONST);
+use List::Util qw(sum);
# $VERSION = '0.25'; # Sys::Syscall version
our @EXPORT_OK = qw(epoll_ctl epoll_create epoll_wait
BEGIN {
if ($^O eq 'linux') {
- %PACK = (
+ %CONST = (
+ MSG_MORE => 0x8000,
TMPL_cmsg_len => TMPL_size_t,
# cmsg_len, cmsg_level, cmsg_type
SIZEOF_cmsghdr => SIZEOF_int * 2 + SIZEOF_size_t,
'i', # msg_flags
);
} elsif ($^O =~ /\A(?:freebsd|openbsd|netbsd|dragonfly)\z/) {
- %PACK = (
+ %CONST = (
TMPL_cmsg_len => 'L', # socklen_t
SIZEOF_cmsghdr => SIZEOF_int * 3,
CMSG_DATA_off => SIZEOF_ptr == 8 ? '@16' : '',
)
}
- $PACK{CMSG_ALIGN_size} = SIZEOF_size_t;
- $PACK{SIZEOF_cmsghdr} //= 0;
- $PACK{TMPL_cmsg_len} //= undef;
- $PACK{CMSG_DATA_off} //= undef;
- $PACK{TMPL_msghdr} //= undef;
+ $CONST{CMSG_ALIGN_size} = SIZEOF_size_t;
+ $CONST{SIZEOF_cmsghdr} //= 0;
+ $CONST{TMPL_cmsg_len} //= undef;
+ $CONST{CMSG_DATA_off} //= undef;
+ $CONST{TMPL_msghdr} //= undef;
+ $CONST{MSG_MORE} //= 0;
}
# SFD_CLOEXEC is arch-dependent, so IN_CLOEXEC may be, too
if (open my $fh, '<', $_[0]) { nodatacow_fh($fh) }
}
-use constant \%PACK;
+use constant \%CONST;
sub CMSG_ALIGN ($) { ($_[0] + CMSG_ALIGN_size - 1) & ~(CMSG_ALIGN_size - 1) }
use constant CMSG_ALIGN_SIZEOF_cmsghdr => CMSG_ALIGN(SIZEOF_cmsghdr);
sub CMSG_SPACE ($) { CMSG_ALIGN($_[0]) + CMSG_ALIGN_SIZEOF_cmsghdr }
}
@ret;
};
+
+*sendmsg_more = sub ($@) {
+ use bytes qw(length substr);
+ my $sock = shift;
+ my $iov = join('', map { pack 'P'.TMPL_size_t, $_, length } @_);
+ my $mh = pack(TMPL_msghdr,
+ undef, 0, # msg_name, msg_namelen (unused)
+ $iov, scalar(@_), # msg_iov, msg_iovlen
+ undef, 0, # msg_control, msg_controllen (unused),
+ 0); # msg_flags (unused)
+ my $s;
+ do {
+ $s = syscall($SYS_sendmsg, fileno($sock), $mh, MSG_MORE);
+ } while ($s < 0 && $!{EINTR});
+ $s < 0 ? undef : $s;
+};
}
1;
--- /dev/null
+use v5.12;
+use autodie;
+use Test::More;
+use PublicInbox::Syscall;
+use Socket qw(AF_UNIX SOCK_STREAM);
+my $sendmsg_more = PublicInbox::Syscall->can('sendmsg_more') or
+ plan skip_all => "sendmsg syscalls not defined on $^O";
+
+socketpair(my $s1, my $s2, AF_UNIX, SOCK_STREAM, 0);
+is $sendmsg_more->($s1, 'hello', 'world'), 10, 'sendmsg_more expected size';
+is sysread($s2, my $buf, 11), 10, 'reader got expected size from sendmsg_more';
+is $buf, 'helloworld', 'sendmsg_more sent expected message';
+
+done_testing;