remoteStreamHandleWrite(struct qemud_client *client,
struct qemud_client_stream *stream);
static int
+remoteStreamHandleRead(struct qemud_client *client,
+ struct qemud_client_stream *stream);
+static int
remoteStreamHandleFinish(struct qemud_client *client,
struct qemud_client_stream *stream,
struct qemud_client_message *msg);
int newEvents = 0;
if (stream->rx)
newEvents |= VIR_STREAM_EVENT_WRITABLE;
+ if (stream->tx && !stream->recvEOF)
+ newEvents |= VIR_STREAM_EVENT_READABLE;
virStreamEventUpdateCallback(stream->st, newEvents);
}
}
}
+ if (!stream->recvEOF &&
+ (events & (VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP))) {
+ events = events & ~(VIR_STREAM_EVENT_READABLE | VIR_STREAM_EVENT_HANGUP);
+ if (remoteStreamHandleRead(client, stream) < 0) {
+ remoteRemoveClientStream(client, stream);
+ qemudDispatchClientFailure(client);
+ goto cleanup;
+ }
+ }
+
if (!stream->closed &&
(events & (VIR_STREAM_EVENT_ERROR | VIR_STREAM_EVENT_HANGUP))) {
int ret;
return 0;
}
+
+
+
+/*
+ * Invoked when a stream is signalled as having data
+ * available to read. This reads upto one message
+ * worth of data, and then queues that for transmission
+ * to the client.
+ *
+ * Returns 0 if data was queued for TX, or a error RPC
+ * was sent, or -1 on fatal error, indicating client should
+ * be killed
+ */
+static int
+remoteStreamHandleRead(struct qemud_client *client,
+ struct qemud_client_stream *stream)
+{
+ char *buffer;
+ size_t bufferLen = REMOTE_MESSAGE_PAYLOAD_MAX;
+ int ret;
+
+ DEBUG("stream=%p", stream);
+
+ /* Shouldn't ever be called unless we're marked able to
+ * transmit, but doesn't hurt to check */
+ if (!stream->tx)
+ return 0;
+
+ if (VIR_ALLOC_N(buffer, bufferLen) < 0)
+ return -1;
+
+ ret = virStreamRecv(stream->st, buffer, bufferLen);
+ if (ret == -2) {
+ /* Should never get this, since we're only called when we know
+ * we're readable, but hey things change... */
+ ret = 0;
+ } else if (ret < 0) {
+ remote_error rerr;
+ memset(&rerr, 0, sizeof rerr);
+ remoteDispatchConnError(&rerr, NULL);
+
+ ret = remoteSerializeStreamError(client, &rerr, stream->procedure, stream->serial);
+ } else {
+ stream->tx = 0;
+ if (ret == 0)
+ stream->recvEOF = 1;
+ ret = remoteSendStreamData(client, stream, buffer, ret);
+ }
+
+ VIR_FREE(buffer);
+ return ret;
+}
+
+
+/*
+ * Invoked when an outgoing data packet message has been fully sent.
+ * This simply re-enables TX of further data.
+ *
+ * The idea is to stop the daemon growing without bound due to
+ * fast stream, but slow client
+ */
+void
+remoteStreamMessageFinished(struct qemud_client *client,
+ struct qemud_client_message *msg)
+{
+ struct qemud_client_stream *stream = client->streams;
+
+ while (stream) {
+ if (msg->hdr.proc == stream->procedure &&
+ msg->hdr.serial == stream->serial)
+ break;
+ stream = stream->next;
+ }
+
+ DEBUG("Message client=%p stream=%p proc=%d serial=%d", client, stream, msg->hdr.proc, msg->hdr.serial);
+
+ if (stream) {
+ stream->tx = 1;
+ remoteStreamUpdateEvents(stream);
+ }
+}