#define SOCKET_MAX_FDS 16
+#ifdef QEMU_MSG_ZEROCOPY
+static int qio_channel_socket_flush_internal(QIOChannel *ioc,
+ bool block,
+ Error **errp);
+#endif
+
SocketAddress *
qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
Error **errp)
sioc->zero_copy_queued = 0;
sioc->zero_copy_sent = 0;
sioc->blocking = false;
+ sioc->new_zero_copy_sent_success = false;
ioc = QIO_CHANNEL(sioc);
qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
size_t fdsize = sizeof(int) * nfds;
struct cmsghdr *cmsg;
int sflags = 0;
+#ifdef QEMU_MSG_ZEROCOPY
+ bool blocking = sioc->blocking;
+ bool zerocopy_flushed_once = false;
+#endif
memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
return QIO_CHANNEL_ERR_BLOCK;
case EINTR:
goto retry;
+#ifdef QEMU_MSG_ZEROCOPY
case ENOBUFS:
if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
- error_setg_errno(errp, errno,
- "Process can't lock enough memory for using MSG_ZEROCOPY");
- return -1;
+ /**
+ * Socket error queueing may exhaust the OPTMEM limit. Try
+ * flushing the error queue once.
+ */
+ if (!zerocopy_flushed_once) {
+ ret = qio_channel_socket_flush_internal(ioc, blocking,
+ errp);
+ if (ret < 0) {
+ return -1;
+ }
+ zerocopy_flushed_once = true;
+ goto retry;
+ } else {
+ error_setg_errno(errp, errno,
+ "Process can't lock enough memory for "
+ "using MSG_ZEROCOPY");
+ return -1;
+ }
}
break;
+#endif
}
error_setg_errno(errp, errno,
#ifdef QEMU_MSG_ZEROCOPY
-static int qio_channel_socket_flush(QIOChannel *ioc,
- Error **errp)
+static int qio_channel_socket_flush_internal(QIOChannel *ioc,
+ bool block,
+ Error **errp)
{
QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
struct msghdr msg = {};
struct cmsghdr *cm;
char control[CMSG_SPACE(sizeof(*serr))];
int received;
- int ret;
if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
return 0;
msg.msg_controllen = sizeof(control);
memset(control, 0, sizeof(control));
- ret = 1;
-
while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
if (received < 0) {
switch (errno) {
case EAGAIN:
- /* Nothing on errqueue, wait until something is available */
- qio_channel_wait(ioc, G_IO_ERR);
- continue;
+ if (block) {
+ /*
+ * Nothing on errqueue, wait until something is
+ * available.
+ *
+ * Use G_IO_ERR instead of G_IO_IN since MSG_ERRQUEUE reads
+ * are signaled via POLLERR, not POLLIN, as the kernel
+ * sets POLLERR when zero-copy notificatons appear on the
+ * socket error queue.
+ */
+ qio_channel_wait(ioc, G_IO_ERR);
+ continue;
+ }
+ return 0;
case EINTR:
continue;
default:
/* No errors, count successfully finished sendmsg()*/
sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
- /* If any sendmsg() succeeded using zero copy, return 0 at the end */
+ /* If any sendmsg() succeeded using zero copy, mark zerocopy success */
if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
- ret = 0;
+ sioc->new_zero_copy_sent_success = true;
}
}
- return ret;
+ return 0;
+}
+
+static int qio_channel_socket_flush(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+ int ret;
+
+ ret = qio_channel_socket_flush_internal(ioc, true, errp);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (sioc->new_zero_copy_sent_success) {
+ sioc->new_zero_copy_sent_success = false;
+ return 0;
+ }
+
+ return 1;
}
#endif /* QEMU_MSG_ZEROCOPY */