typedef enum {
VIR_FDSTREAM_MSG_TYPE_DATA,
+ VIR_FDSTREAM_MSG_TYPE_HOLE,
} virFDStreamMsgType;
typedef struct _virFDStreamMsg virFDStreamMsg;
size_t len;
size_t offset;
} data;
+ struct {
+ long long len;
+ } hole;
} stream;
};
case VIR_FDSTREAM_MSG_TYPE_DATA:
VIR_FREE(msg->stream.data.buf);
break;
+ case VIR_FDSTREAM_MSG_TYPE_HOLE:
+ /* nada */
+ break;
}
VIR_FREE(msg);
virStreamPtr st;
size_t length;
bool doRead;
+ bool sparse;
int fdin;
char *fdinname;
int fdout;
static ssize_t
virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
+ bool sparse,
const int fdin,
const int fdout,
const char *fdinname,
const char *fdoutname,
+ size_t *dataLen,
size_t buflen)
{
virFDStreamMsgPtr msg = NULL;
+ int inData = 0;
+ long long sectionLen = 0;
char *buf = NULL;
ssize_t got;
+ if (sparse && *dataLen == 0) {
+ if (virFileInData(fdin, &inData, §ionLen) < 0)
+ goto error;
+
+ if (inData)
+ *dataLen = sectionLen;
+ }
+
if (VIR_ALLOC(msg) < 0)
goto error;
- if (VIR_ALLOC_N(buf, buflen) < 0)
- goto error;
+ if (sparse && *dataLen == 0) {
+ msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE;
+ msg->stream.hole.len = sectionLen;
+ got = sectionLen;
- if ((got = saferead(fdin, buf, buflen)) < 0) {
- virReportSystemError(errno,
- _("Unable to read %s"),
- fdinname);
- goto error;
- }
+ /* HACK: The message queue is one directional. So caller
+ * cannot make us skip the hole. Do that for them instead. */
+ if (sectionLen &&
+ lseek(fdin, sectionLen, SEEK_CUR) == (off_t) -1) {
+ virReportSystemError(errno,
+ _("unable to seek in %s"),
+ fdinname);
+ goto error;
+ }
+ } else {
+ if (sparse &&
+ buflen > *dataLen)
+ buflen = *dataLen;
- msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
- msg->stream.data.buf = buf;
- msg->stream.data.len = got;
- buf = NULL;
+ if (VIR_ALLOC_N(buf, buflen) < 0)
+ goto error;
+
+ if ((got = saferead(fdin, buf, buflen)) < 0) {
+ virReportSystemError(errno,
+ _("Unable to read %s"),
+ fdinname);
+ goto error;
+ }
+
+ msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
+ msg->stream.data.buf = buf;
+ msg->stream.data.len = got;
+ buf = NULL;
+ if (sparse)
+ *dataLen -= got;
+ }
virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
msg = NULL;
static ssize_t
virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
+ bool sparse,
const int fdin,
const int fdout,
const char *fdinname,
{
ssize_t got = 0;
virFDStreamMsgPtr msg = fdst->msg;
+ off_t off;
bool pop = false;
switch (msg->type) {
pop = msg->stream.data.offset == msg->stream.data.len;
break;
+
+ case VIR_FDSTREAM_MSG_TYPE_HOLE:
+ if (!sparse) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("unexpected stream hole"));
+ return -1;
+ }
+
+ got = msg->stream.hole.len;
+ off = lseek(fdout, got, SEEK_CUR);
+ if (off == (off_t) -1) {
+ virReportSystemError(errno,
+ _("unable to seek in %s"),
+ fdoutname);
+ return -1;
+ }
+
+ if (ftruncate(fdout, off) < 0) {
+ virReportSystemError(errno,
+ _("unable to truncate %s"),
+ fdoutname);
+ return -1;
+ }
+
+ pop = true;
+ break;
}
if (pop) {
virFDStreamThreadDataPtr data = opaque;
virStreamPtr st = data->st;
size_t length = data->length;
+ bool sparse = data->sparse;
int fdin = data->fdin;
char *fdinname = data->fdinname;
int fdout = data->fdout;
bool doRead = fdst->threadDoRead;
size_t buflen = 256 * 1024;
size_t total = 0;
+ size_t dataLen = 0;
virObjectRef(fdst);
virObjectLock(fdst);
}
if (doRead)
- got = virFDStreamThreadDoRead(fdst,
+ got = virFDStreamThreadDoRead(fdst, sparse,
fdin, fdout,
fdinname, fdoutname,
- buflen);
+ &dataLen, buflen);
else
- got = virFDStreamThreadDoWrite(fdst,
+ got = virFDStreamThreadDoWrite(fdst, sparse,
fdin, fdout,
fdinname, fdoutname);
}
}
+ /* Shortcut, if the stream is in the trailing hole,
+ * return 0 immediately. */
+ if (msg->type == VIR_FDSTREAM_MSG_TYPE_HOLE &&
+ msg->stream.hole.len == 0) {
+ ret = 0;
+ goto cleanup;
+ }
+
if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
/* Nope, nope, I'm outta here */
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
}
+static int
+virFDStreamSendHole(virStreamPtr st,
+ long long length,
+ unsigned int flags)
+{
+ virFDStreamDataPtr fdst = st->privateData;
+ virFDStreamMsgPtr msg = NULL;
+ off_t off;
+ int ret = -1;
+
+ virCheckFlags(0, -1);
+
+ virObjectLock(fdst);
+ if (fdst->length) {
+ if (length > fdst->length - fdst->offset)
+ length = fdst->length - fdst->offset;
+ fdst->offset += length;
+ }
+
+ if (fdst->thread) {
+ /* Things are a bit complicated here. If FDStream is in a
+ * read mode, then if the message at the queue head is
+ * HOLE, just pop it. The thread has lseek()-ed anyway.
+ * However, if the FDStream is in write mode, then tell
+ * the thread to do the lseek() for us. Under no
+ * circumstances we can do the lseek() ourselves here. We
+ * might mess up file position for the thread. */
+ if (fdst->threadDoRead) {
+ msg = fdst->msg;
+ if (msg->type != VIR_FDSTREAM_MSG_TYPE_HOLE) {
+ virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
+ _("Invalid stream hole"));
+ goto cleanup;
+ }
+
+ virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");
+ } else {
+ if (VIR_ALLOC(msg) < 0)
+ goto cleanup;
+
+ msg->type = VIR_FDSTREAM_MSG_TYPE_HOLE;
+ msg->stream.hole.len = length;
+ virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe");
+ msg = NULL;
+ }
+ } else {
+ off = lseek(fdst->fd, length, SEEK_CUR);
+ if (off == (off_t) -1) {
+ virReportSystemError(errno, "%s",
+ _("unable to seek"));
+ goto cleanup;
+ }
+
+ if (ftruncate(fdst->fd, off) < 0) {
+ virReportSystemError(errno, "%s",
+ _("unable to truncate"));
+ goto cleanup;
+ }
+ }
+
+ ret = 0;
+ cleanup:
+ virObjectUnlock(fdst);
+ virFDStreamMsgFree(msg);
+ return ret;
+}
+
+
+static int
+virFDStreamInData(virStreamPtr st,
+ int *inData,
+ long long *length)
+{
+ virFDStreamDataPtr fdst = st->privateData;
+ int ret = -1;
+
+ virObjectLock(fdst);
+
+ if (fdst->thread) {
+ virFDStreamMsgPtr msg;
+
+ while (!(msg = fdst->msg)) {
+ if (fdst->threadQuit) {
+ *inData = *length = 0;
+ ret = 0;
+ goto cleanup;
+ } else {
+ virObjectUnlock(fdst);
+ virCondSignal(&fdst->threadCond);
+ virObjectLock(fdst);
+ }
+ }
+
+ if (msg->type == VIR_FDSTREAM_MSG_TYPE_DATA) {
+ *inData = 1;
+ *length = msg->stream.data.len - msg->stream.data.offset;
+ } else {
+ *inData = 0;
+ *length = msg->stream.hole.len;
+ }
+ ret = 0;
+ } else {
+ ret = virFileInData(fdst->fd, inData, length);
+ }
+
+ cleanup:
+ virObjectUnlock(fdst);
+ return ret;
+}
+
+
static virStreamDriver virFDStreamDrv = {
.streamSend = virFDStreamWrite,
.streamRecv = virFDStreamRead,
.streamFinish = virFDStreamClose,
.streamAbort = virFDStreamAbort,
+ .streamSendHole = virFDStreamSendHole,
+ .streamInData = virFDStreamInData,
.streamEventAddCallback = virFDStreamAddCallback,
.streamEventUpdateCallback = virFDStreamUpdateCallback,
.streamEventRemoveCallback = virFDStreamRemoveCallback
unsigned long long length,
int oflags,
int mode,
- bool forceIOHelper)
+ bool forceIOHelper,
+ bool sparse)
{
int fd = -1;
int pipefds[2] = { -1, -1 };
threadData->st = virObjectRef(st);
threadData->length = length;
+ threadData->sparse = sparse;
if ((oflags & O_ACCMODE) == O_RDONLY) {
threadData->fdin = fd;
}
return virFDStreamOpenFileInternal(st, path,
offset, length,
- oflags, 0, false);
+ oflags, 0, false, false);
}
int virFDStreamCreateFile(virStreamPtr st,
return virFDStreamOpenFileInternal(st, path,
offset, length,
oflags | O_CREAT, mode,
- false);
+ false, false);
}
#ifdef HAVE_CFMAKERAW
if (virFDStreamOpenFileInternal(st, path,
offset, length,
oflags | O_CREAT, 0,
- false) < 0)
+ false, false) < 0)
return -1;
fdst = st->privateData;
return virFDStreamOpenFileInternal(st, path,
offset, length,
oflags | O_CREAT, 0,
- false);
+ false, false);
}
#endif /* !HAVE_CFMAKERAW */
const char *path,
unsigned long long offset,
unsigned long long length,
+ bool sparse,
int oflags)
{
return virFDStreamOpenFileInternal(st, path,
offset, length,
- oflags, 0, true);
+ oflags, 0, true, sparse);
}
int virFDStreamSetInternalCloseCb(virStreamPtr st,