virObjectLockable parent;
int fd;
- int errfd;
- virCommandPtr cmd;
unsigned long long offset;
unsigned long long length;
virFDStreamInternalCloseCb icbCb;
virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque;
void *icbOpaque;
+
+ /* Thread data */
+ virThreadPtr thread;
+ int threadErr;
+ bool threadQuit;
};
static virClassPtr virFDStreamDataClass;
return ret;
}
-static int
-virFDStreamCloseCommand(virFDStreamDataPtr fdst, bool streamAbort)
+
+typedef struct _virFDStreamThreadData virFDStreamThreadData;
+typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
+struct _virFDStreamThreadData {
+ virStreamPtr st;
+ size_t length;
+ int fdin;
+ char *fdinname;
+ int fdout;
+ char *fdoutname;
+};
+
+
+static void
+virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
{
- char buf[1024];
- ssize_t len;
- int status;
- int ret = -1;
+ if (!data)
+ return;
- if (!fdst->cmd)
- return 0;
+ virObjectUnref(data->st);
+ VIR_FREE(data->fdinname);
+ VIR_FREE(data->fdoutname);
+ VIR_FREE(data);
+}
- if ((len = saferead(fdst->errfd, buf, sizeof(buf)-1)) < 0)
- buf[0] = '\0';
- else
- buf[len] = '\0';
- virCommandRawStatus(fdst->cmd);
- if (virCommandWait(fdst->cmd, &status) < 0)
- goto cleanup;
+static void
+virFDStreamThread(void *opaque)
+{
+ virFDStreamThreadDataPtr data = opaque;
+ virStreamPtr st = data->st;
+ size_t length = data->length;
+ int fdin = data->fdin;
+ char *fdinname = data->fdinname;
+ int fdout = data->fdout;
+ char *fdoutname = data->fdoutname;
+ virFDStreamDataPtr fdst = st->privateData;
+ char *buf = NULL;
+ size_t buflen = 256 * 1024;
+ size_t total = 0;
- if (status != 0) {
- if (buf[0] != '\0') {
- virReportError(VIR_ERR_INTERNAL_ERROR, "%s", buf);
- } else if (WIFSIGNALED(status) && WTERMSIG(status) == SIGPIPE) {
- if (streamAbort) {
- /* Explicit abort request means the caller doesn't care
- if there's data left over, so skip the error */
- goto out;
- }
-
- virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
- _("I/O helper exited "
- "before all data was processed"));
- } else {
- char *str = virProcessTranslateStatus(status);
- virReportError(VIR_ERR_INTERNAL_ERROR,
- _("I/O helper exited with %s"),
- NULLSTR(str));
- VIR_FREE(str);
+ virObjectRef(fdst);
+
+ if (VIR_ALLOC_N(buf, buflen) < 0)
+ goto error;
+
+ while (1) {
+ ssize_t got;
+
+ if (length &&
+ (length - total) < buflen)
+ buflen = length - total;
+
+ if (buflen == 0)
+ break; /* End of requested data from client */
+
+ if ((got = saferead(fdin, buf, buflen)) < 0) {
+ virReportSystemError(errno,
+ _("Unable to read %s"),
+ fdinname);
+ goto error;
}
+
+ if (got == 0)
+ break;
+
+ total += got;
+
+ if (safewrite(fdout, buf, got) < 0) {
+ virReportSystemError(errno,
+ _("Unable to write %s"),
+ fdoutname);
+ goto error;
+ }
+ }
+
+ cleanup:
+ if (!virObjectUnref(fdst))
+ st->privateData = NULL;
+ VIR_FORCE_CLOSE(fdin);
+ VIR_FORCE_CLOSE(fdout);
+ virFDStreamThreadDataFree(data);
+ VIR_FREE(buf);
+ return;
+
+ error:
+ virObjectLock(fdst);
+ fdst->threadErr = errno;
+ virObjectUnlock(fdst);
+ goto cleanup;
+}
+
+
+static int
+virFDStreamJoinWorker(virFDStreamDataPtr fdst,
+ bool streamAbort)
+{
+ int ret = -1;
+ if (!fdst->thread)
+ return 0;
+
+ /* Give the thread a chance to lock the FD stream object. */
+ virObjectUnlock(fdst);
+ virThreadJoin(fdst->thread);
+ virObjectLock(fdst);
+
+ if (fdst->threadErr && !streamAbort) {
+ /* errors are expected on streamAbort */
goto cleanup;
}
- out:
ret = 0;
cleanup:
- virCommandFree(fdst->cmd);
- fdst->cmd = NULL;
+ VIR_FREE(fdst->thread);
return ret;
}
+
static int
virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
{
/* mutex locked */
ret = VIR_CLOSE(fdst->fd);
- if (virFDStreamCloseCommand(fdst, streamAbort) < 0)
+ if (virFDStreamJoinWorker(fdst, streamAbort) < 0)
ret = -1;
- if (VIR_CLOSE(fdst->errfd) < 0)
- VIR_DEBUG("ignoring failed close on fd %d", fdst->errfd);
-
st->privateData = NULL;
/* call the internal stream closing callback */
static int virFDStreamOpenInternal(virStreamPtr st,
int fd,
- virCommandPtr cmd,
- int errfd,
+ virFDStreamThreadDataPtr threadData,
unsigned long long length)
{
virFDStreamDataPtr fdst;
- VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu",
- st, fd, cmd, errfd, length);
+ VIR_DEBUG("st=%p fd=%d threadData=%p length=%llu",
+ st, fd, threadData, length);
if (virFDStreamDataInitialize() < 0)
return -1;
return -1;
fdst->fd = fd;
- fdst->cmd = cmd;
- fdst->errfd = errfd;
fdst->length = length;
st->driver = &virFDStreamDrv;
st->privateData = fdst;
+ if (threadData) {
+ /* Create the thread after fdst and st were initialized.
+ * The thread worker expects them to be that way. */
+ if (VIR_ALLOC(fdst->thread) < 0)
+ goto error;
+
+ if (virThreadCreate(fdst->thread,
+ true,
+ virFDStreamThread,
+ threadData) < 0)
+ goto error;
+ }
+
return 0;
+
+ error:
+ VIR_FREE(fdst->thread);
+ st->driver = NULL;
+ st->privateData = NULL;
+ virObjectUnref(fdst);
+ return -1;
}
int virFDStreamOpen(virStreamPtr st,
int fd)
{
- return virFDStreamOpenInternal(st, fd, NULL, -1, 0);
+ return virFDStreamOpenInternal(st, fd, NULL, 0);
}
goto error;
}
- if (virFDStreamOpenInternal(st, fd, NULL, -1, 0) < 0)
+ if (virFDStreamOpenInternal(st, fd, NULL, 0) < 0)
goto error;
return 0;
bool forceIOHelper)
{
int fd = -1;
- int childfd = -1;
+ int pipefds[2] = { -1, -1 };
+ int tmpfd = -1;
struct stat sb;
- virCommandPtr cmd = NULL;
- int errfd = -1;
- char *iohelper_path = NULL;
+ virFDStreamThreadDataPtr threadData = NULL;
VIR_DEBUG("st=%p path=%s oflags=%x offset=%llu length=%llu mode=%o",
st, path, oflags, offset, length, mode);
path);
return -1;
}
+ tmpfd = fd;
if (fstat(fd, &sb) < 0) {
virReportSystemError(errno,
/* Thanks to the POSIX i/o model, we can't reliably get
* non-blocking I/O on block devs/regular files. To
- * support those we need to fork a helper process to do
+ * support those we need to create a helper thread to do
* the I/O so we just have a fifo. Or use AIO :-(
*/
if ((st->flags & VIR_STREAM_NONBLOCK) &&
((!S_ISCHR(sb.st_mode) &&
!S_ISFIFO(sb.st_mode)) || forceIOHelper)) {
- int fds[2] = { -1, -1 };
if ((oflags & O_ACCMODE) == O_RDWR) {
virReportError(VIR_ERR_INTERNAL_ERROR,
goto error;
}
- if (pipe(fds) < 0) {
+ if (pipe(pipefds) < 0) {
virReportSystemError(errno, "%s",
_("Unable to create pipe"));
goto error;
}
- if (!(iohelper_path = virFileFindResource("libvirt_iohelper",
- abs_topbuilddir "/src",
- LIBEXECDIR)))
+ if (VIR_ALLOC(threadData) < 0)
goto error;
- cmd = virCommandNewArgList(iohelper_path,
- path,
- NULL);
-
- VIR_FREE(iohelper_path);
-
- virCommandAddArgFormat(cmd, "%llu", length);
- virCommandPassFD(cmd, fd,
- VIR_COMMAND_PASS_FD_CLOSE_PARENT);
- virCommandAddArgFormat(cmd, "%d", fd);
+ threadData->st = virObjectRef(st);
+ threadData->length = length;
if ((oflags & O_ACCMODE) == O_RDONLY) {
- childfd = fds[1];
- fd = fds[0];
- virCommandSetOutputFD(cmd, &childfd);
+ threadData->fdin = fd;
+ threadData->fdout = pipefds[1];
+ if (VIR_STRDUP(threadData->fdinname, path) < 0 ||
+ VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
+ goto error;
+ tmpfd = pipefds[0];
} else {
- childfd = fds[0];
- fd = fds[1];
- virCommandSetInputFD(cmd, childfd);
+ threadData->fdin = pipefds[0];
+ threadData->fdout = fd;
+ if (VIR_STRDUP(threadData->fdinname, "pipe") < 0 ||
+ VIR_STRDUP(threadData->fdoutname, path) < 0)
+ goto error;
+ tmpfd = pipefds[1];
}
- virCommandSetErrorFD(cmd, &errfd);
-
- if (virCommandRunAsync(cmd, NULL) < 0)
- goto error;
-
- VIR_FORCE_CLOSE(childfd);
}
- if (virFDStreamOpenInternal(st, fd, cmd, errfd, length) < 0)
+ if (virFDStreamOpenInternal(st, tmpfd, threadData, length) < 0)
goto error;
return 0;
error:
- virCommandFree(cmd);
VIR_FORCE_CLOSE(fd);
- VIR_FORCE_CLOSE(childfd);
- VIR_FORCE_CLOSE(errfd);
- VIR_FREE(iohelper_path);
+ VIR_FORCE_CLOSE(pipefds[0]);
+ VIR_FORCE_CLOSE(pipefds[1]);
if (oflags & O_CREAT)
unlink(path);
+ virFDStreamThreadDataFree(threadData);
return -1;
}