VIR_LOG_INIT("fdstream");
+typedef enum {
+ VIR_FDSTREAM_MSG_TYPE_DATA,
+} virFDStreamMsgType;
+
+typedef struct _virFDStreamMsg virFDStreamMsg;
+typedef virFDStreamMsg *virFDStreamMsgPtr;
+struct _virFDStreamMsg {
+ virFDStreamMsgPtr next;
+
+ virFDStreamMsgType type;
+
+ union {
+ struct {
+ char *buf;
+ size_t len;
+ size_t offset;
+ } data;
+ } stream;
+};
+
+
/* Tunnelled migration stream support */
typedef struct virFDStreamData virFDStreamData;
typedef virFDStreamData *virFDStreamDataPtr;
/* Thread data */
virThreadPtr thread;
+ virCond threadCond;
int threadErr;
bool threadQuit;
+ bool threadAbort;
+ bool threadDoRead;
+ virFDStreamMsgPtr msg;
};
static virClassPtr virFDStreamDataClass;
+static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue);
+
static void
virFDStreamDataDispose(void *obj)
{
virFDStreamDataPtr fdst = obj;
VIR_DEBUG("obj=%p", fdst);
+ virFDStreamMsgQueueFree(&fdst->msg);
}
static int virFDStreamDataOnceInit(void)
VIR_ONCE_GLOBAL_INIT(virFDStreamData)
+static int
+virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
+ virFDStreamMsgPtr msg,
+ int fd,
+ const char *fdname)
+{
+ virFDStreamMsgPtr *tmp = &fdst->msg;
+ char c = '1';
+
+ while (*tmp)
+ tmp = &(*tmp)->next;
+
+ *tmp = msg;
+ virCondSignal(&fdst->threadCond);
+
+ if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) {
+ virReportSystemError(errno,
+ _("Unable to write to %s"),
+ fdname);
+ return -1;
+ }
+
+ return 0;
+}
+
+
+static virFDStreamMsgPtr
+virFDStreamMsgQueuePop(virFDStreamDataPtr fdst,
+ int fd,
+ const char *fdname)
+{
+ virFDStreamMsgPtr tmp = fdst->msg;
+ char c;
+
+ if (tmp) {
+ fdst->msg = tmp->next;
+ tmp->next = NULL;
+ }
+
+ virCondSignal(&fdst->threadCond);
+
+ if (saferead(fd, &c, sizeof(c)) != sizeof(c)) {
+ virReportSystemError(errno,
+ _("Unable to read from %s"),
+ fdname);
+ return NULL;
+ }
+
+ return tmp;
+}
+
+
+static void
+virFDStreamMsgFree(virFDStreamMsgPtr msg)
+{
+ if (!msg)
+ return;
+
+ switch (msg->type) {
+ case VIR_FDSTREAM_MSG_TYPE_DATA:
+ VIR_FREE(msg->stream.data.buf);
+ break;
+ }
+
+ VIR_FREE(msg);
+}
+
+
+static void
+virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue)
+{
+ virFDStreamMsgPtr tmp = *queue;
+
+ while (tmp) {
+ virFDStreamMsgPtr next = tmp->next;
+ virFDStreamMsgFree(tmp);
+ tmp = next;
+ }
+
+ *queue = NULL;
+}
+
+
static int virFDStreamRemoveCallback(virStreamPtr stream)
{
virFDStreamDataPtr fdst = stream->privateData;
struct _virFDStreamThreadData {
virStreamPtr st;
size_t length;
+ bool doRead;
int fdin;
char *fdinname;
int fdout;
}
+static ssize_t
+virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
+ const int fdin,
+ const int fdout,
+ const char *fdinname,
+ const char *fdoutname,
+ size_t buflen)
+{
+ virFDStreamMsgPtr msg = NULL;
+ char *buf = NULL;
+ ssize_t got;
+
+ if (VIR_ALLOC(msg) < 0)
+ goto error;
+
+ if (VIR_ALLOC_N(buf, buflen) < 0)
+ goto error;
+
+ if ((got = saferead(fdin, buf, buflen)) < 0) {
+ virReportSystemError(errno,
+ _("Unable to read %s"),
+ fdinname);
+ goto error;
+ }
+
+ msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
+ msg->stream.data.buf = buf;
+ msg->stream.data.len = got;
+ buf = NULL;
+
+ virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
+ msg = NULL;
+
+ return got;
+
+ error:
+ VIR_FREE(buf);
+ virFDStreamMsgFree(msg);
+ return -1;
+}
+
+
+static ssize_t
+virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
+ const int fdin,
+ const int fdout,
+ const char *fdinname,
+ const char *fdoutname)
+{
+ ssize_t got = 0;
+ virFDStreamMsgPtr msg = fdst->msg;
+ bool pop = false;
+
+ switch (msg->type) {
+ case VIR_FDSTREAM_MSG_TYPE_DATA:
+ got = safewrite(fdout,
+ msg->stream.data.buf + msg->stream.data.offset,
+ msg->stream.data.len - msg->stream.data.offset);
+ if (got < 0) {
+ virReportSystemError(errno,
+ _("Unable to write %s"),
+ fdoutname);
+ return -1;
+ }
+
+ msg->stream.data.offset += got;
+
+ pop = msg->stream.data.offset == msg->stream.data.len;
+ break;
+ }
+
+ if (pop) {
+ virFDStreamMsgQueuePop(fdst, fdin, fdinname);
+ virFDStreamMsgFree(msg);
+ }
+
+ return got;
+}
+
+
static void
virFDStreamThread(void *opaque)
{
int fdout = data->fdout;
char *fdoutname = data->fdoutname;
virFDStreamDataPtr fdst = st->privateData;
- char *buf = NULL;
+ bool doRead = fdst->threadDoRead;
size_t buflen = 256 * 1024;
size_t total = 0;
virObjectRef(fdst);
-
- if (VIR_ALLOC_N(buf, buflen) < 0)
- goto error;
+ virObjectLock(fdst);
while (1) {
ssize_t got;
if (buflen == 0)
break; /* End of requested data from client */
- if ((got = saferead(fdin, buf, buflen)) < 0) {
- virReportSystemError(errno,
- _("Unable to read %s"),
- fdinname);
- goto error;
+ while (doRead == (fdst->msg != NULL) &&
+ !fdst->threadQuit) {
+ if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) {
+ virReportSystemError(errno, "%s",
+ _("failed to wait on condition"));
+ goto error;
+ }
+ }
+
+ if (fdst->threadQuit) {
+ /* If stream abort was requested, quit early. */
+ if (fdst->threadAbort)
+ goto cleanup;
+
+ /* Otherwise flush buffers and quit gracefully. */
+ if (doRead == (fdst->msg != NULL))
+ break;
}
+ if (doRead)
+ got = virFDStreamThreadDoRead(fdst,
+ fdin, fdout,
+ fdinname, fdoutname,
+ buflen);
+ else
+ got = virFDStreamThreadDoWrite(fdst,
+ fdin, fdout,
+ fdinname, fdoutname);
+
+ if (got < 0)
+ goto error;
+
if (got == 0)
break;
total += got;
-
- if (safewrite(fdout, buf, got) < 0) {
- virReportSystemError(errno,
- _("Unable to write %s"),
- fdoutname);
- goto error;
- }
}
cleanup:
+ fdst->threadQuit = true;
+ virObjectUnlock(fdst);
if (!virObjectUnref(fdst))
st->privateData = NULL;
VIR_FORCE_CLOSE(fdin);
VIR_FORCE_CLOSE(fdout);
virFDStreamThreadDataFree(data);
- VIR_FREE(buf);
return;
error:
- virObjectLock(fdst);
fdst->threadErr = errno;
- virObjectUnlock(fdst);
goto cleanup;
}
if (!fdst->thread)
return 0;
+ fdst->threadAbort = streamAbort;
+ fdst->threadQuit = true;
+ virCondSignal(&fdst->threadCond);
+
/* Give the thread a chance to lock the FD stream object. */
virObjectUnlock(fdst);
virThreadJoin(fdst->thread);
ret = 0;
cleanup:
VIR_FREE(fdst->thread);
+ virCondDestroy(&fdst->threadCond);
return ret;
}
fdst->abortCallbackDispatching = false;
}
- /* mutex locked */
- ret = VIR_CLOSE(fdst->fd);
if (virFDStreamJoinWorker(fdst, streamAbort) < 0)
ret = -1;
+ /* mutex locked */
+ if ((ret = VIR_CLOSE(fdst->fd)) < 0)
+ virReportSystemError(errno, "%s",
+ _("Unable to close"));
+
st->privateData = NULL;
/* call the internal stream closing callback */
static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
{
virFDStreamDataPtr fdst = st->privateData;
- int ret;
+ virFDStreamMsgPtr msg = NULL;
+ int ret = -1;
if (nbytes > INT_MAX) {
virReportSystemError(ERANGE, "%s",
nbytes = fdst->length - fdst->offset;
}
- retry:
- ret = write(fdst->fd, bytes, nbytes);
- if (ret < 0) {
- VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- VIR_WARNINGS_RESET
- ret = -2;
- } else if (errno == EINTR) {
- goto retry;
- } else {
- ret = -1;
- virReportSystemError(errno, "%s",
+ if (fdst->thread) {
+ char *buf;
+
+ if (fdst->threadQuit) {
+ virReportSystemError(EBADF, "%s",
_("cannot write to stream"));
+ return -1;
+ }
+
+ if (VIR_ALLOC(msg) < 0 ||
+ VIR_ALLOC_N(buf, nbytes) < 0)
+ goto cleanup;
+
+ memcpy(buf, bytes, nbytes);
+ msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
+ msg->stream.data.buf = buf;
+ msg->stream.data.len = nbytes;
+
+ virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe");
+ msg = NULL;
+ ret = nbytes;
+ } else {
+ retry:
+ ret = write(fdst->fd, bytes, nbytes);
+ if (ret < 0) {
+ VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ VIR_WARNINGS_RESET
+ ret = -2;
+ } else if (errno == EINTR) {
+ goto retry;
+ } else {
+ ret = -1;
+ virReportSystemError(errno, "%s",
+ _("cannot write to stream"));
+ }
}
- } else if (fdst->length) {
- fdst->offset += ret;
}
+ if (fdst->length)
+ fdst->offset += ret;
+
+ cleanup:
virObjectUnlock(fdst);
+ virFDStreamMsgFree(msg);
return ret;
}
static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
{
virFDStreamDataPtr fdst = st->privateData;
- int ret;
+ int ret = -1;
if (nbytes > INT_MAX) {
virReportSystemError(ERANGE, "%s",
nbytes = fdst->length - fdst->offset;
}
- retry:
- ret = read(fdst->fd, bytes, nbytes);
- if (ret < 0) {
- VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- VIR_WARNINGS_RESET
- ret = -2;
- } else if (errno == EINTR) {
- goto retry;
- } else {
- ret = -1;
- virReportSystemError(errno, "%s",
- _("cannot read from stream"));
+ if (fdst->thread) {
+ virFDStreamMsgPtr msg = NULL;
+
+ while (!(msg = fdst->msg)) {
+ if (fdst->threadQuit) {
+ if (nbytes) {
+ virReportSystemError(EBADF, "%s",
+ _("stream is not open"));
+ } else {
+ ret = 0;
+ }
+ goto cleanup;
+ } else {
+ virObjectUnlock(fdst);
+ virCondSignal(&fdst->threadCond);
+ virObjectLock(fdst);
+ }
+ }
+
+ if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
+ /* Nope, nope, I'm outta here */
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("unexpected message type"));
+ goto cleanup;
+ }
+
+ if (nbytes > msg->stream.data.len - msg->stream.data.offset)
+ nbytes = msg->stream.data.len - msg->stream.data.offset;
+
+ memcpy(bytes,
+ msg->stream.data.buf + msg->stream.data.offset,
+ nbytes);
+
+ msg->stream.data.offset += nbytes;
+ if (msg->stream.data.offset == msg->stream.data.len) {
+ virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");
+ virFDStreamMsgFree(msg);
+ }
+
+ ret = nbytes;
+
+ } else {
+ retry:
+ ret = read(fdst->fd, bytes, nbytes);
+ if (ret < 0) {
+ VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ VIR_WARNINGS_RESET
+ ret = -2;
+ } else if (errno == EINTR) {
+ goto retry;
+ } else {
+ ret = -1;
+ virReportSystemError(errno, "%s",
+ _("cannot read from stream"));
+ }
+ goto cleanup;
}
- } else if (fdst->length) {
- fdst->offset += ret;
}
+ if (fdst->length)
+ fdst->offset += ret;
+
+ cleanup:
virObjectUnlock(fdst);
return ret;
}
st->privateData = fdst;
if (threadData) {
+ fdst->threadDoRead = threadData->doRead;
+
/* Create the thread after fdst and st were initialized.
* The thread worker expects them to be that way. */
if (VIR_ALLOC(fdst->thread) < 0)
goto error;
+ if (virCondInit(&fdst->threadCond) < 0) {
+ virReportSystemError(errno, "%s",
+ _("cannot initialize condition variable"));
+ goto error;
+ }
+
if (virThreadCreate(fdst->thread,
true,
virFDStreamThread,
VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
goto error;
tmpfd = pipefds[0];
+ threadData->doRead = true;
} else {
threadData->fdin = pipefds[0];
threadData->fdout = fd;
VIR_STRDUP(threadData->fdoutname, path) < 0)
goto error;
tmpfd = pipefds[1];
+ threadData->doRead = false;
}
}