]> git.ipfire.org Git - thirdparty/libvirt.git/commitdiff
Handle outgoing data streams in libvirtd
authorDaniel P. Berrange <berrange@redhat.com>
Mon, 24 Aug 2009 19:57:16 +0000 (20:57 +0100)
committerDaniel P. Berrange <berrange@redhat.com>
Tue, 29 Sep 2009 14:48:58 +0000 (15:48 +0100)
* daemon/dispatch.c: Set streamTX flag on outgoing data packets
* daemon/qemud.h: Add streamTX flag to track outgoing data
* daemon/qemud.c: Re-enable further TX when outgoing data packet
  has been fully sent.
* daemon/stream.h, daemon/stream.c: Add method for enabling TX.
  Support reading from streams and transmitting data out to client

daemon/dispatch.c
daemon/libvirtd.c
daemon/libvirtd.h
daemon/stream.c
daemon/stream.h

index 1934d244e34714f73509b7ca91bb6af5cdfb7126..7417001e1422b4cf192372f48b7b12bde0ed7ea1 100644 (file)
@@ -636,6 +636,8 @@ remoteSendStreamData(struct qemud_client *client,
 
         DEBUG("Total %d", msg->bufferOffset);
     }
+    if (data)
+        msg->streamTX = 1;
 
     /* Reset ready for I/O */
     msg->bufferLength = msg->bufferOffset;
index e36151f6fad99f764a74e2fd3940b41996e2e2d7..78dfb2d092ac37f632bda630e2039b33fc09e4b9 100644 (file)
@@ -1898,7 +1898,9 @@ void
 qemudClientMessageRelease(struct qemud_client *client,
                           struct qemud_client_message *msg)
 {
-    if (!msg->async)
+    if (msg->streamTX) {
+        remoteStreamMessageFinished(client, msg);
+    } else if (!msg->async)
         client->nrequests--;
 
     /* See if the recv queue is currently throttled */
index aae23fcacff50487a2684dd082fe7ef489c49986..579e1c4144ae6b295801db00b04bf5e1b721aa01 100644 (file)
@@ -130,6 +130,7 @@ struct qemud_client_message {
     unsigned int bufferOffset;
 
     unsigned int async : 1;
+    unsigned int streamTX : 1;
 
     remote_message_header hdr;
 
index 1fe0e58db6e31795e0bf3f2168b3d9757c1435a2..584268dc4256cd072586e5fa5d3a5cd1b3d26855 100644 (file)
@@ -32,6 +32,9 @@ static int
 remoteStreamHandleWrite(struct qemud_client *client,
                         struct qemud_client_stream *stream);
 static int
+remoteStreamHandleRead(struct qemud_client *client,
+                       struct qemud_client_stream *stream);
+static int
 remoteStreamHandleFinish(struct qemud_client *client,
                          struct qemud_client_stream *stream,
                          struct qemud_client_message *msg);
@@ -48,6 +51,8 @@ remoteStreamUpdateEvents(struct qemud_client_stream *stream)
     int newEvents = 0;
     if (stream->rx)
         newEvents |= VIR_STREAM_EVENT_WRITABLE;
+    if (stream->tx && !stream->recvEOF)
+        newEvents |= VIR_STREAM_EVENT_READABLE;
 
     virStreamEventUpdateCallback(stream->st, newEvents);
 }
@@ -87,6 +92,16 @@ remoteStreamEvent(virStreamPtr st, int events, void *opaque)
         }
     }
 
+    if (!stream->recvEOF &&
+        (events & (VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP))) {
+        events = events & ~(VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP);
+        if (remoteStreamHandleRead(client, stream) < 0) {
+            remoteRemoveClientStream(client, stream);
+            qemudDispatchClientFailure(client);
+            goto cleanup;
+        }
+    }
+
     if (!stream->closed &&
         (events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
         int ret;
@@ -507,3 +522,84 @@ remoteStreamHandleWrite(struct qemud_client *client,
 
     return 0;
 }
+
+
+
+/*
+ * Invoked when a stream is signalled as having data
+ * available to read. This reads upto one message
+ * worth of data, and then queues that for transmission
+ * to the client.
+ *
+ * Returns 0 if data was queued for TX, or a error RPC
+ * was sent, or -1 on fatal error, indicating client should
+ * be killed
+ */
+static int
+remoteStreamHandleRead(struct qemud_client *client,
+                       struct qemud_client_stream *stream)
+{
+    char *buffer;
+    size_t bufferLen = REMOTE_MESSAGE_PAYLOAD_MAX;
+    int ret;
+
+    DEBUG("stream=%p", stream);
+
+    /* Shouldn't ever be called unless we're marked able to
+     * transmit, but doesn't hurt to check */
+    if (!stream->tx)
+        return 0;
+
+    if (VIR_ALLOC_N(buffer, bufferLen) < 0)
+        return -1;
+
+    ret = virStreamRecv(stream->st, buffer, bufferLen);
+    if (ret == -2) {
+        /* Should never get this, since we're only called when we know
+         * we're readable, but hey things change... */
+        ret = 0;
+    } else if (ret < 0) {
+        remote_error rerr;
+        memset(&rerr, 0, sizeof rerr);
+        remoteDispatchConnError(&rerr, NULL);
+
+        ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial);
+    } else {
+        stream->tx = 0;
+        if (ret == 0)
+            stream->recvEOF = 1;
+        ret = remoteSendStreamData(client, stream, buffer, ret);
+    }
+
+    VIR_FREE(buffer);
+    return ret;
+}
+
+
+/*
+ * Invoked when an outgoing data packet message has been fully sent.
+ * This simply re-enables TX of further data.
+ *
+ * The idea is to stop the daemon growing without bound due to
+ * fast stream, but slow client
+ */
+void
+remoteStreamMessageFinished(struct qemud_client *client,
+                            struct qemud_client_message *msg)
+{
+    struct qemud_client_stream *stream = client->streams;
+
+    while (stream) {
+        if (msg->hdr.proc == stream->procedure &&
+            msg->hdr.serial == stream->serial)
+            break;
+        stream = stream->next;
+    }
+
+    DEBUG("Message client=%p stream=%p proc=%d serial=%d", client, stream, msg->hdr.proc, msg->hdr.serial);
+
+    if (stream) {
+        stream->tx = 1;
+        remoteStreamUpdateEvents(stream);
+    }
+}
index 250498472eeb6c280bcc4e0570a6e06e8af8bb96..2e2d249517fdb298e6a13134aecea5460d7a50a4 100644 (file)
@@ -46,4 +46,8 @@ int
 remoteRemoveClientStream(struct qemud_client *client,
                          struct qemud_client_stream *stream);
 
+void
+remoteStreamMessageFinished(struct qemud_client *client,
+                            struct qemud_client_message *msg);
+
 #endif /* __LIBVIRTD_STREAM_H__ */