#include <poll.h>
#include <sys/time.h>
#include <errno.h>
+#include <unistd.h>
#include "qemud.h"
#include "event.h"
#include "memory.h"
+#include "util.h"
#define EVENT_DEBUG(fmt, ...) qemudDebug("EVENT: " fmt, __VA_ARGS__)
+static int virEventInterruptLocked(void);
+
/* State for a single file handle being monitored */
struct virEventHandle {
int watch;
/* State for the main event loop */
struct virEventLoop {
+ pthread_mutex_t lock;
+ pthread_t leader;
+ int wakeupfd[2];
int handlesCount;
int handlesAlloc;
struct virEventHandle *handles;
/* Unique ID for the next timer to be registered */
static int nextTimer = 0;
+static void virEventLock(void)
+{
+ pthread_mutex_lock(&eventLoop.lock);
+}
+
+static void virEventUnlock(void)
+{
+ pthread_mutex_unlock(&eventLoop.lock);
+}
+
/*
* Register a callback for monitoring file handle events.
* NB, it *must* be safe to call this from within a callback
virEventHandleCallback cb,
void *opaque,
virFreeCallback ff) {
+ int watch;
EVENT_DEBUG("Add handle %d %d %p %p", fd, events, cb, opaque);
+ virEventLock();
if (eventLoop.handlesCount == eventLoop.handlesAlloc) {
EVENT_DEBUG("Used %d handle slots, adding %d more",
eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT);
if (VIR_REALLOC_N(eventLoop.handles,
- (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0)
+ (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0) {
+ virEventUnlock();
return -1;
+ }
eventLoop.handlesAlloc += EVENT_ALLOC_EXTENT;
}
- eventLoop.handles[eventLoop.handlesCount].watch = nextWatch++;
+ watch = nextWatch++;
+
+ eventLoop.handles[eventLoop.handlesCount].watch = watch;
eventLoop.handles[eventLoop.handlesCount].fd = fd;
eventLoop.handles[eventLoop.handlesCount].events =
virEventHandleTypeToPollEvent(events);
eventLoop.handlesCount++;
- return nextWatch-1;
+ virEventInterruptLocked();
+ virEventUnlock();
+
+ return watch;
}
void virEventUpdateHandleImpl(int watch, int events) {
int i;
+ virEventLock();
for (i = 0 ; i < eventLoop.handlesCount ; i++) {
if (eventLoop.handles[i].watch == watch) {
eventLoop.handles[i].events =
break;
}
}
+ virEventInterruptLocked();
+ virEventUnlock();
}
/*
int virEventRemoveHandleImpl(int watch) {
int i;
EVENT_DEBUG("Remove handle %d", watch);
+ virEventLock();
for (i = 0 ; i < eventLoop.handlesCount ; i++) {
if (eventLoop.handles[i].deleted)
continue;
if (eventLoop.handles[i].watch == watch) {
EVENT_DEBUG("mark delete %d %d", i, eventLoop.handles[i].fd);
eventLoop.handles[i].deleted = 1;
+ virEventUnlock();
return 0;
}
}
+ virEventInterruptLocked();
+ virEventUnlock();
return -1;
}
void *opaque,
virFreeCallback ff) {
struct timeval now;
+ int ret;
EVENT_DEBUG("Adding timer %d with %d ms freq", nextTimer, frequency);
if (gettimeofday(&now, NULL) < 0) {
return -1;
}
+ virEventLock();
if (eventLoop.timeoutsCount == eventLoop.timeoutsAlloc) {
EVENT_DEBUG("Used %d timeout slots, adding %d more",
eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT);
if (VIR_REALLOC_N(eventLoop.timeouts,
- (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0)
+ (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0) {
+ virEventUnlock();
return -1;
+ }
eventLoop.timeoutsAlloc += EVENT_ALLOC_EXTENT;
}
(((unsigned long long)now.tv_usec)/1000) : 0;
eventLoop.timeoutsCount++;
-
- return nextTimer-1;
+ ret = nextTimer-1;
+ virEventInterruptLocked();
+ virEventUnlock();
+ return ret;
}
void virEventUpdateTimeoutImpl(int timer, int frequency) {
return;
}
+ virEventLock();
for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
if (eventLoop.timeouts[i].timer == timer) {
eventLoop.timeouts[i].frequency = frequency;
break;
}
}
+ virEventInterruptLocked();
+ virEventUnlock();
}
/*
int virEventRemoveTimeoutImpl(int timer) {
int i;
EVENT_DEBUG("Remove timer %d", timer);
+ virEventLock();
for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
if (eventLoop.timeouts[i].deleted)
continue;
if (eventLoop.timeouts[i].timer == timer) {
eventLoop.timeouts[i].deleted = 1;
+ virEventUnlock();
return 0;
}
}
+ virEventInterruptLocked();
+ virEventUnlock();
return -1;
}
continue;
if (eventLoop.timeouts[i].expiresAt <= now) {
- (eventLoop.timeouts[i].cb)(eventLoop.timeouts[i].timer,
- eventLoop.timeouts[i].opaque);
+ virEventTimeoutCallback cb = eventLoop.timeouts[i].cb;
+ int timer = eventLoop.timeouts[i].timer;
+ void *opaque = eventLoop.timeouts[i].opaque;
eventLoop.timeouts[i].expiresAt =
now + eventLoop.timeouts[i].frequency;
+
+ virEventUnlock();
+ (cb)(timer, opaque);
+ virEventLock();
}
}
return 0;
*
* Returns 0 upon success, -1 if an error occurred
*/
-static int virEventDispatchHandles(struct pollfd *fds) {
+static int virEventDispatchHandles(int nfds, struct pollfd *fds) {
int i;
- virEventHandleType hEvents;
- /* Save this now - it may be changed during dispatch */
- int nhandles = eventLoop.handlesCount;
- for (i = 0 ; i < nhandles ; i++) {
+ for (i = 0 ; i < nfds ; i++) {
if (eventLoop.handles[i].deleted) {
EVENT_DEBUG("Skip deleted %d", eventLoop.handles[i].fd);
continue;
}
if (fds[i].revents) {
- hEvents = virPollEventToEventHandleType(fds[i].revents);
- EVENT_DEBUG("Dispatch %d %d %d %p",
- eventLoop.handles[i].watch,
- fds[i].fd, fds[i].revents,
- eventLoop.handles[i].opaque);
- (eventLoop.handles[i].cb)(eventLoop.handles[i].watch,
- fds[i].fd,
- hEvents,
- eventLoop.handles[i].opaque);
+ virEventHandleCallback cb = eventLoop.handles[i].cb;
+ void *opaque = eventLoop.handles[i].opaque;
+ int hEvents = virPollEventToEventHandleType(fds[i].revents);
+ EVENT_DEBUG("Dispatch %d %d %p", fds[i].fd,
+ fds[i].revents, eventLoop.handles[i].opaque);
+ virEventUnlock();
+ (cb)(eventLoop.handles[i].watch,
+ fds[i].fd, hEvents, opaque);
+ virEventLock();
}
}
struct pollfd *fds;
int ret, timeout, nfds;
- if ((nfds = virEventMakePollFDs(&fds)) < 0)
+ virEventLock();
+ eventLoop.leader = pthread_self();
+ if ((nfds = virEventMakePollFDs(&fds)) < 0) {
+ virEventUnlock();
return -1;
+ }
if (virEventCalculateTimeout(&timeout) < 0) {
VIR_FREE(fds);
+ virEventUnlock();
return -1;
}
+ virEventUnlock();
+
retry:
EVENT_DEBUG("Poll on %d handles %p timeout %d", nfds, fds, timeout);
ret = poll(fds, nfds, timeout);
VIR_FREE(fds);
return -1;
}
+
+ virEventLock();
if (virEventDispatchTimeouts() < 0) {
VIR_FREE(fds);
+ virEventUnlock();
return -1;
}
if (ret > 0 &&
- virEventDispatchHandles(fds) < 0) {
+ virEventDispatchHandles(nfds, fds) < 0) {
VIR_FREE(fds);
+ virEventUnlock();
return -1;
}
VIR_FREE(fds);
- if (virEventCleanupTimeouts() < 0)
+ if (virEventCleanupTimeouts() < 0) {
+ virEventUnlock();
+ return -1;
+ }
+
+ if (virEventCleanupHandles() < 0) {
+ virEventUnlock();
+ return -1;
+ }
+
+ eventLoop.leader = 0;
+ virEventUnlock();
+ return 0;
+}
+
+static void virEventHandleWakeup(int watch ATTRIBUTE_UNUSED,
+ int fd,
+ int events ATTRIBUTE_UNUSED,
+ void *opaque ATTRIBUTE_UNUSED)
+{
+ char c;
+ virEventLock();
+ saferead(fd, &c, sizeof(c));
+ virEventUnlock();
+}
+
+int virEventInit(void)
+{
+ if (pthread_mutex_init(&eventLoop.lock, NULL) != 0)
+ return -1;
+
+ if (pipe(eventLoop.wakeupfd) < 0 ||
+ qemudSetNonBlock(eventLoop.wakeupfd[0]) < 0 ||
+ qemudSetNonBlock(eventLoop.wakeupfd[1]) < 0 ||
+ qemudSetCloseExec(eventLoop.wakeupfd[0]) < 0 ||
+ qemudSetCloseExec(eventLoop.wakeupfd[1]) < 0)
return -1;
- if (virEventCleanupHandles() < 0)
+ if (virEventAddHandleImpl(eventLoop.wakeupfd[0],
+ VIR_EVENT_HANDLE_READABLE,
+ virEventHandleWakeup, NULL, NULL) < 0)
return -1;
return 0;
}
+static int virEventInterruptLocked(void)
+{
+ char c = '\0';
+ if (pthread_self() == eventLoop.leader)
+ return 0;
+
+ if (safewrite(eventLoop.wakeupfd[1], &c, sizeof(c)) != sizeof(c))
+ return -1;
+ return 0;
+}
+
+int virEventInterrupt(void)
+{
+ int ret;
+ virEventLock();
+ ret = virEventInterruptLocked();
+ virEventUnlock();
+ return ret;
+}
+
int
virEventHandleTypeToPollEvent(int events)
{