]> git.ipfire.org Git - thirdparty/libvirt.git/commitdiff
Make event loop thread safe & re-entrant safe
authorDaniel P. Berrange <berrange@redhat.com>
Thu, 4 Dec 2008 22:14:15 +0000 (22:14 +0000)
committerDaniel P. Berrange <berrange@redhat.com>
Thu, 4 Dec 2008 22:14:15 +0000 (22:14 +0000)
ChangeLog
qemud/event.c
qemud/event.h
qemud/qemud.c
qemud/qemud.h

index a9452b3460556f0e94815df39a8a6103f77b0bbd..57dc3774b75a2eff5ed56ada09de143e9400c7e6 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,10 @@
+Thu Dec  4 22:14:41 GMT 2008 Daniel P. Berrange <berrange@redhat.com>
+
+       * src/event.c, src/event.h: Make all event handling thread
+       safe and re-entrant safe, and allow poll() to be woken up
+       by other threads.
+       * src/qemud.c, src/qemud.h: Initialize event loop explicitly
+
 Thu Dec  4 22:12:41 GMT 2008 Daniel P. Berrange <berrange@redhat.com>
 
        * qemud/qemud.c, qemud/qemud.h, qemud/remote.c: Make all
index e8eed0c42de8f83a7e959a55a2dceb3d52b91792..fcaa62d5d6441fea71eb059b97f2a4ae48e53e16 100644 (file)
 #include <poll.h>
 #include <sys/time.h>
 #include <errno.h>
+#include <unistd.h>
 
 #include "qemud.h"
 #include "event.h"
 #include "memory.h"
+#include "util.h"
 
 #define EVENT_DEBUG(fmt, ...) qemudDebug("EVENT: " fmt, __VA_ARGS__)
 
+static int virEventInterruptLocked(void);
+
 /* State for a single file handle being monitored */
 struct virEventHandle {
     int watch;
@@ -63,6 +67,9 @@ struct virEventTimeout {
 
 /* State for the main event loop */
 struct virEventLoop {
+    pthread_mutex_t lock;
+    pthread_t leader;
+    int wakeupfd[2];
     int handlesCount;
     int handlesAlloc;
     struct virEventHandle *handles;
@@ -80,6 +87,16 @@ static int nextWatch = 0;
 /* Unique ID for the next timer to be registered */
 static int nextTimer = 0;
 
+static void virEventLock(void)
+{
+    pthread_mutex_lock(&eventLoop.lock);
+}
+
+static void virEventUnlock(void)
+{
+    pthread_mutex_unlock(&eventLoop.lock);
+}
+
 /*
  * Register a callback for monitoring file handle events.
  * NB, it *must* be safe to call this from within a callback
@@ -89,17 +106,23 @@ int virEventAddHandleImpl(int fd, int events,
                           virEventHandleCallback cb,
                           void *opaque,
                           virFreeCallback ff) {
+    int watch;
     EVENT_DEBUG("Add handle %d %d %p %p", fd, events, cb, opaque);
+    virEventLock();
     if (eventLoop.handlesCount == eventLoop.handlesAlloc) {
         EVENT_DEBUG("Used %d handle slots, adding %d more",
                     eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT);
         if (VIR_REALLOC_N(eventLoop.handles,
-                          (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0)
+                          (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0) {
+            virEventUnlock();
             return -1;
+        }
         eventLoop.handlesAlloc += EVENT_ALLOC_EXTENT;
     }
 
-    eventLoop.handles[eventLoop.handlesCount].watch = nextWatch++;
+    watch = nextWatch++;
+
+    eventLoop.handles[eventLoop.handlesCount].watch = watch;
     eventLoop.handles[eventLoop.handlesCount].fd = fd;
     eventLoop.handles[eventLoop.handlesCount].events =
                                          virEventHandleTypeToPollEvent(events);
@@ -110,11 +133,15 @@ int virEventAddHandleImpl(int fd, int events,
 
     eventLoop.handlesCount++;
 
-    return nextWatch-1;
+    virEventInterruptLocked();
+    virEventUnlock();
+
+    return watch;
 }
 
 void virEventUpdateHandleImpl(int watch, int events) {
     int i;
+    virEventLock();
     for (i = 0 ; i < eventLoop.handlesCount ; i++) {
         if (eventLoop.handles[i].watch == watch) {
             eventLoop.handles[i].events =
@@ -122,6 +149,8 @@ void virEventUpdateHandleImpl(int watch, int events) {
             break;
         }
     }
+    virEventInterruptLocked();
+    virEventUnlock();
 }
 
 /*
@@ -133,6 +162,7 @@ void virEventUpdateHandleImpl(int watch, int events) {
 int virEventRemoveHandleImpl(int watch) {
     int i;
     EVENT_DEBUG("Remove handle %d", watch);
+    virEventLock();
     for (i = 0 ; i < eventLoop.handlesCount ; i++) {
         if (eventLoop.handles[i].deleted)
             continue;
@@ -140,9 +170,12 @@ int virEventRemoveHandleImpl(int watch) {
         if (eventLoop.handles[i].watch == watch) {
             EVENT_DEBUG("mark delete %d %d", i, eventLoop.handles[i].fd);
             eventLoop.handles[i].deleted = 1;
+            virEventUnlock();
             return 0;
         }
     }
+    virEventInterruptLocked();
+    virEventUnlock();
     return -1;
 }
 
@@ -157,17 +190,21 @@ int virEventAddTimeoutImpl(int frequency,
                            void *opaque,
                            virFreeCallback ff) {
     struct timeval now;
+    int ret;
     EVENT_DEBUG("Adding timer %d with %d ms freq", nextTimer, frequency);
     if (gettimeofday(&now, NULL) < 0) {
         return -1;
     }
 
+    virEventLock();
     if (eventLoop.timeoutsCount == eventLoop.timeoutsAlloc) {
         EVENT_DEBUG("Used %d timeout slots, adding %d more",
                     eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT);
         if (VIR_REALLOC_N(eventLoop.timeouts,
-                          (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0)
+                          (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0) {
+            virEventUnlock();
             return -1;
+        }
         eventLoop.timeoutsAlloc += EVENT_ALLOC_EXTENT;
     }
 
@@ -183,8 +220,10 @@ int virEventAddTimeoutImpl(int frequency,
         (((unsigned long long)now.tv_usec)/1000) : 0;
 
     eventLoop.timeoutsCount++;
-
-    return nextTimer-1;
+    ret = nextTimer-1;
+    virEventInterruptLocked();
+    virEventUnlock();
+    return ret;
 }
 
 void virEventUpdateTimeoutImpl(int timer, int frequency) {
@@ -195,6 +234,7 @@ void virEventUpdateTimeoutImpl(int timer, int frequency) {
         return;
     }
 
+    virEventLock();
     for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
         if (eventLoop.timeouts[i].timer == timer) {
             eventLoop.timeouts[i].frequency = frequency;
@@ -205,6 +245,8 @@ void virEventUpdateTimeoutImpl(int timer, int frequency) {
             break;
         }
     }
+    virEventInterruptLocked();
+    virEventUnlock();
 }
 
 /*
@@ -216,15 +258,19 @@ void virEventUpdateTimeoutImpl(int timer, int frequency) {
 int virEventRemoveTimeoutImpl(int timer) {
     int i;
     EVENT_DEBUG("Remove timer %d", timer);
+    virEventLock();
     for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
         if (eventLoop.timeouts[i].deleted)
             continue;
 
         if (eventLoop.timeouts[i].timer == timer) {
             eventLoop.timeouts[i].deleted = 1;
+            virEventUnlock();
             return 0;
         }
     }
+    virEventInterruptLocked();
+    virEventUnlock();
     return -1;
 }
 
@@ -336,10 +382,15 @@ static int virEventDispatchTimeouts(void) {
             continue;
 
         if (eventLoop.timeouts[i].expiresAt <= now) {
-            (eventLoop.timeouts[i].cb)(eventLoop.timeouts[i].timer,
-                                       eventLoop.timeouts[i].opaque);
+            virEventTimeoutCallback cb = eventLoop.timeouts[i].cb;
+            int timer = eventLoop.timeouts[i].timer;
+            void *opaque = eventLoop.timeouts[i].opaque;
             eventLoop.timeouts[i].expiresAt =
                 now + eventLoop.timeouts[i].frequency;
+
+            virEventUnlock();
+            (cb)(timer, opaque);
+            virEventLock();
         }
     }
     return 0;
@@ -356,28 +407,25 @@ static int virEventDispatchTimeouts(void) {
  *
  * Returns 0 upon success, -1 if an error occurred
  */
-static int virEventDispatchHandles(struct pollfd *fds) {
+static int virEventDispatchHandles(int nfds, struct pollfd *fds) {
     int i;
-    virEventHandleType hEvents;
-    /* Save this now - it may be changed during dispatch */
-    int nhandles = eventLoop.handlesCount;
 
-    for (i = 0 ; i < nhandles ; i++) {
+    for (i = 0 ; i < nfds ; i++) {
         if (eventLoop.handles[i].deleted) {
             EVENT_DEBUG("Skip deleted %d", eventLoop.handles[i].fd);
             continue;
         }
 
         if (fds[i].revents) {
-            hEvents = virPollEventToEventHandleType(fds[i].revents);
-            EVENT_DEBUG("Dispatch %d %d %d %p",
-                        eventLoop.handles[i].watch,
-                        fds[i].fd, fds[i].revents,
-                        eventLoop.handles[i].opaque);
-            (eventLoop.handles[i].cb)(eventLoop.handles[i].watch,
-                                      fds[i].fd,
-                                      hEvents,
-                                      eventLoop.handles[i].opaque);
+            virEventHandleCallback cb = eventLoop.handles[i].cb;
+            void *opaque = eventLoop.handles[i].opaque;
+            int hEvents = virPollEventToEventHandleType(fds[i].revents);
+            EVENT_DEBUG("Dispatch %d %d %p", fds[i].fd,
+                        fds[i].revents, eventLoop.handles[i].opaque);
+            virEventUnlock();
+            (cb)(eventLoop.handles[i].watch,
+                 fds[i].fd, hEvents, opaque);
+            virEventLock();
         }
     }
 
@@ -472,14 +520,21 @@ int virEventRunOnce(void) {
     struct pollfd *fds;
     int ret, timeout, nfds;
 
-    if ((nfds = virEventMakePollFDs(&fds)) < 0)
+    virEventLock();
+    eventLoop.leader = pthread_self();
+    if ((nfds = virEventMakePollFDs(&fds)) < 0) {
+        virEventUnlock();
         return -1;
+    }
 
     if (virEventCalculateTimeout(&timeout) < 0) {
         VIR_FREE(fds);
+        virEventUnlock();
         return -1;
     }
 
+    virEventUnlock();
+
  retry:
     EVENT_DEBUG("Poll on %d handles %p timeout %d", nfds, fds, timeout);
     ret = poll(fds, nfds, timeout);
@@ -491,27 +546,88 @@ int virEventRunOnce(void) {
         VIR_FREE(fds);
         return -1;
     }
+
+    virEventLock();
     if (virEventDispatchTimeouts() < 0) {
         VIR_FREE(fds);
+        virEventUnlock();
         return -1;
     }
 
     if (ret > 0 &&
-        virEventDispatchHandles(fds) < 0) {
+        virEventDispatchHandles(nfds, fds) < 0) {
         VIR_FREE(fds);
+        virEventUnlock();
         return -1;
     }
     VIR_FREE(fds);
 
-    if (virEventCleanupTimeouts() < 0)
+    if (virEventCleanupTimeouts() < 0) {
+        virEventUnlock();
+        return -1;
+    }
+
+    if (virEventCleanupHandles() < 0) {
+        virEventUnlock();
+        return -1;
+    }
+
+    eventLoop.leader = 0;
+    virEventUnlock();
+    return 0;
+}
+
+static void virEventHandleWakeup(int watch ATTRIBUTE_UNUSED,
+                                 int fd,
+                                 int events ATTRIBUTE_UNUSED,
+                                 void *opaque ATTRIBUTE_UNUSED)
+{
+    char c;
+    virEventLock();
+    saferead(fd, &c, sizeof(c));
+    virEventUnlock();
+}
+
+int virEventInit(void)
+{
+    if (pthread_mutex_init(&eventLoop.lock, NULL) != 0)
+        return -1;
+
+    if (pipe(eventLoop.wakeupfd) < 0 ||
+        qemudSetNonBlock(eventLoop.wakeupfd[0]) < 0 ||
+        qemudSetNonBlock(eventLoop.wakeupfd[1]) < 0 ||
+        qemudSetCloseExec(eventLoop.wakeupfd[0]) < 0 ||
+        qemudSetCloseExec(eventLoop.wakeupfd[1]) < 0)
         return -1;
 
-    if (virEventCleanupHandles() < 0)
+    if (virEventAddHandleImpl(eventLoop.wakeupfd[0],
+                              VIR_EVENT_HANDLE_READABLE,
+                              virEventHandleWakeup, NULL, NULL) < 0)
         return -1;
 
     return 0;
 }
 
+static int virEventInterruptLocked(void)
+{
+    char c = '\0';
+    if (pthread_self() == eventLoop.leader)
+        return 0;
+
+    if (safewrite(eventLoop.wakeupfd[1], &c, sizeof(c)) != sizeof(c))
+        return -1;
+    return 0;
+}
+
+int virEventInterrupt(void)
+{
+    int ret;
+    virEventLock();
+    ret = virEventInterruptLocked();
+    virEventUnlock();
+    return ret;
+}
+
 int
 virEventHandleTypeToPollEvent(int events)
 {
index 615dd69a6733188641842e6ff5fb7db419d5121b..0992f1cbc84dc618e59950433a6255537c620852 100644 (file)
@@ -100,6 +100,13 @@ void virEventUpdateTimeoutImpl(int timer, int frequency);
  */
 int virEventRemoveTimeoutImpl(int timer);
 
+/**
+ * virEventInit: Initialize the event loop
+ *
+ * returns -1 if initialization failed
+ */
+int virEventInit(void);
+
 /**
  * virEventRunOnce: run a single iteration of the event loop.
  *
@@ -116,5 +123,12 @@ int
 virPollEventToEventHandleType(int events);
 
 
+/**
+ * virEventInterrupt: wakeup any thread waiting in poll()
+ *
+ * return -1 if wakup failed
+ */
+int virEventInterrupt(void);
+
 
 #endif /* __VIRTD_EVENT_H__ */
index bd35756f3001bcac3059b49fa4d7976fb6c6a841..fac6d8890e254a46bb09b0fe02bab65ecc916961 100644 (file)
@@ -296,7 +296,7 @@ qemudDispatchSignalEvent(int watch ATTRIBUTE_UNUSED,
         server->shutdown = 1;
 }
 
-static int qemudSetCloseExec(int fd) {
+int qemudSetCloseExec(int fd) {
     int flags;
     if ((flags = fcntl(fd, F_GETFD)) < 0)
         goto error;
@@ -311,7 +311,7 @@ static int qemudSetCloseExec(int fd) {
 }
 
 
-static int qemudSetNonBlock(int fd) {
+int qemudSetNonBlock(int fd) {
     int flags;
     if ((flags = fcntl(fd, F_GETFL)) < 0)
         goto error;
@@ -753,6 +753,12 @@ static struct qemud_server *qemudInitialize(int sigread) {
 
     server->sigread = sigread;
 
+    if (virEventInit() < 0) {
+        qemudLog(QEMUD_ERR, "%s", _("Failed to initialize event system"));
+        VIR_FREE(server);
+        return NULL;
+    }
+
     virInitialize();
 
     /*
index f889c72a02cfb5f6ae1e3cce458fa5de652f9765..464d942ccbcea7d577c39dd7590da619b08c1418 100644 (file)
@@ -177,6 +177,9 @@ void qemudLog(int priority, const char *fmt, ...)
 #define qemudDebug(fmt, ...) do {} while(0)
 #endif
 
+int qemudSetCloseExec(int fd);
+int qemudSetNonBlock(int fd);
+
 unsigned int
 remoteDispatchClientRequest (struct qemud_server *server,
                              struct qemud_client *client);