typedef struct virFDStreamData virFDStreamData;
typedef virFDStreamData *virFDStreamDataPtr;
struct virFDStreamData {
+ virObjectLockable parent;
+
int fd;
int errfd;
virCommandPtr cmd;
virFDStreamInternalCloseCb icbCb;
virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque;
void *icbOpaque;
-
- virMutex lock;
};
+static virClassPtr virFDStreamDataClass;
+
+static void
+virFDStreamDataDispose(void *obj)
+{
+ virFDStreamDataPtr fdst = obj;
+
+ VIR_DEBUG("obj=%p", fdst);
+}
+
+static int virFDStreamDataOnceInit(void)
+{
+ if (!(virFDStreamDataClass = virClassNew(virClassForObjectLockable(),
+ "virFDStreamData",
+ sizeof(virFDStreamData),
+ virFDStreamDataDispose)))
+ return -1;
+
+ return 0;
+}
+
+VIR_ONCE_GLOBAL_INIT(virFDStreamData)
+
static int virFDStreamRemoveCallback(virStreamPtr stream)
{
return -1;
}
- virMutexLock(&fdst->lock);
+ virObjectLock(fdst);
if (fdst->watch == 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream does not have a callback registered"));
ret = 0;
cleanup:
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
return ret;
}
return -1;
}
- virMutexLock(&fdst->lock);
+ virObjectLock(fdst);
if (fdst->watch == 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream does not have a callback registered"));
ret = 0;
cleanup:
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
return ret;
}
if (!fdst)
return;
- virMutexLock(&fdst->lock);
+ virObjectLock(fdst);
if (!fdst->cb) {
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
return;
}
cbopaque = fdst->opaque;
ff = fdst->ff;
fdst->dispatching = true;
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
cb(stream, events, cbopaque);
- virMutexLock(&fdst->lock);
+ virObjectLock(fdst);
fdst->dispatching = false;
if (fdst->cbRemoved && ff)
(ff)(cbopaque);
closed = fdst->closed;
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
- if (closed) {
- virMutexDestroy(&fdst->lock);
- VIR_FREE(fdst);
- }
+ if (closed)
+ virObjectUnref(fdst);
}
static void virFDStreamCallbackFree(void *opaque)
return -1;
}
- virMutexLock(&fdst->lock);
+ virObjectLock(fdst);
if (fdst->watch != 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream already has a callback registered"));
ret = 0;
cleanup:
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
return ret;
}
if (!st || !(fdst = st->privateData) || fdst->abortCallbackDispatching)
return 0;
- virMutexLock(&fdst->lock);
+ virObjectLock(fdst);
/* aborting the stream, ensure the callback is called if it's
* registered for stream error event */
VIR_STREAM_EVENT_WRITABLE))) {
/* don't enter this function accidentally from the callback again */
if (fdst->abortCallbackCalled) {
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
return 0;
}
/* cache the pointers */
cb = fdst->cb;
opaque = fdst->opaque;
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
/* call failure callback, poll reports nothing on closed fd */
(cb)(st, VIR_STREAM_EVENT_ERROR, opaque);
- virMutexLock(&fdst->lock);
+ virObjectLock(fdst);
fdst->abortCallbackDispatching = false;
}
if (fdst->dispatching) {
fdst->closed = true;
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
} else {
- virMutexUnlock(&fdst->lock);
- virMutexDestroy(&fdst->lock);
- VIR_FREE(fdst);
+ virObjectUnlock(fdst);
+ virObjectUnref(fdst);
}
return ret;
return -1;
}
- virMutexLock(&fdst->lock);
+ virObjectLock(fdst);
if (fdst->length) {
if (fdst->length == fdst->offset) {
virReportSystemError(ENOSPC, "%s",
_("cannot write to stream"));
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
return -1;
}
fdst->offset += ret;
}
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
return ret;
}
return -1;
}
- virMutexLock(&fdst->lock);
+ virObjectLock(fdst);
if (fdst->length) {
if (fdst->length == fdst->offset) {
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
return 0;
}
fdst->offset += ret;
}
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
return ret;
}
VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu",
st, fd, cmd, errfd, length);
+ if (virFDStreamDataInitialize() < 0)
+ return -1;
+
if ((st->flags & VIR_STREAM_NONBLOCK) &&
virSetNonBlock(fd) < 0) {
virReportSystemError(errno, "%s", _("Unable to set non-blocking mode"));
return -1;
}
- if (VIR_ALLOC(fdst) < 0)
+ if (!(fdst = virObjectLockableNew(virFDStreamDataClass)))
return -1;
fdst->fd = fd;
fdst->cmd = cmd;
fdst->errfd = errfd;
fdst->length = length;
- if (virMutexInit(&fdst->lock) < 0) {
- VIR_FREE(fdst);
- virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
- _("Unable to initialize mutex"));
- return -1;
- }
st->driver = &virFDStreamDrv;
st->privateData = fdst;
{
virFDStreamDataPtr fdst = st->privateData;
- virMutexLock(&fdst->lock);
+ virObjectLock(fdst);
if (fdst->icbFreeOpaque)
(fdst->icbFreeOpaque)(fdst->icbOpaque);
fdst->icbOpaque = opaque;
fdst->icbFreeOpaque = fcb;
- virMutexUnlock(&fdst->lock);
+ virObjectUnlock(fdst);
return 0;
}