]> git.ipfire.org Git - thirdparty/libvirt.git/commitdiff
qemu: convert agent to use the per-VM event loop
authorDaniel P. Berrangé <berrange@redhat.com>
Wed, 12 Feb 2020 14:54:19 +0000 (14:54 +0000)
committerDaniel P. Berrangé <berrange@redhat.com>
Wed, 11 Mar 2020 14:45:01 +0000 (14:45 +0000)
This converts the QEMU agent APIs to use the per-VM
event loop, which involves switching from virEvent APIs
to GMainContext / GSource APIs.

A GSocket is used as a convenient way to create a GSource
for a socket, but is not yet used for actual I/O.

Reviewed-by: Michal Privoznik <mprivozn@redhat.com>
Signed-off-by: Daniel P. Berrangé <berrange@redhat.com>
src/qemu/qemu_agent.c
src/qemu/qemu_agent.h
src/qemu/qemu_process.c
tests/qemumonitortestutils.c

index 72ea159a9ccb5da90028f60bac26f6f0d13e3968..d183b310cb4366e08ab36abf21f74d145deb9dc9 100644 (file)
@@ -25,6 +25,7 @@
 #include <unistd.h>
 #include <fcntl.h>
 #include <sys/time.h>
+#include <gio/gio.h>
 
 #include "qemu_agent.h"
 #include "qemu_domain.h"
@@ -101,7 +102,10 @@ struct _qemuAgent {
     virCond notify;
 
     int fd;
-    int watch;
+
+    GMainContext *context;
+    GSocket *socket;
+    GSource *watch;
 
     bool running;
 
@@ -172,6 +176,7 @@ static void qemuAgentDispose(void *obj)
         (agent->cb->destroy)(agent, agent->vm);
     virCondDestroy(&agent->notify);
     VIR_FREE(agent->buffer);
+    g_main_context_unref(agent->context);
     virResetError(&agent->lastError);
 }
 
@@ -188,13 +193,6 @@ qemuAgentOpenUnix(const char *socketpath)
         return -1;
     }
 
-    if (virSetNonBlock(agentfd) < 0) {
-        virReportSystemError(errno, "%s",
-                             _("Unable to put monitor "
-                               "into non-blocking mode"));
-        goto error;
-    }
-
     if (virSetCloseExec(agentfd) < 0) {
         virReportSystemError(errno, "%s",
                              _("Unable to set agent "
@@ -498,28 +496,62 @@ qemuAgentIORead(qemuAgentPtr agent)
 }
 
 
-static void qemuAgentUpdateWatch(qemuAgentPtr agent)
-{
-    int events =
-        VIR_EVENT_HANDLE_HANGUP |
-        VIR_EVENT_HANDLE_ERROR;
+static gboolean
+qemuAgentIO(GSocket *socket,
+            GIOCondition cond,
+            gpointer opaque);
 
-    if (!agent->watch)
-        return;
+
+static void
+qemuAgentRegister(qemuAgentPtr agent)
+{
+    GIOCondition cond = 0;
 
     if (agent->lastError.code == VIR_ERR_OK) {
-        events |= VIR_EVENT_HANDLE_READABLE;
+        cond |= G_IO_IN;
 
         if (agent->msg && agent->msg->txOffset < agent->msg->txLength)
-            events |= VIR_EVENT_HANDLE_WRITABLE;
+            cond |= G_IO_OUT;
     }
 
-    virEventUpdateHandle(agent->watch, events);
+    agent->watch = g_socket_create_source(agent->socket,
+                                        cond,
+                                        NULL);
+
+    virObjectRef(agent);
+    g_source_set_callback(agent->watch,
+                          (GSourceFunc)qemuAgentIO,
+                          agent,
+                          (GDestroyNotify)virObjectUnref);
+
+    g_source_attach(agent->watch,
+                    agent->context);
 }
 
 
 static void
-qemuAgentIO(int watch, int fd, int events, void *opaque)
+qemuAgentUnregister(qemuAgentPtr agent)
+{
+    if (agent->watch) {
+        g_source_destroy(agent->watch);
+        g_source_unref(agent->watch);
+        agent->watch = NULL;
+    }
+}
+
+
+static void qemuAgentUpdateWatch(qemuAgentPtr agent)
+{
+    qemuAgentUnregister(agent);
+    if (agent->socket)
+        qemuAgentRegister(agent);
+}
+
+
+static gboolean
+qemuAgentIO(GSocket *socket G_GNUC_UNUSED,
+            GIOCondition cond,
+            gpointer opaque)
 {
     qemuAgentPtr agent = opaque;
     bool error = false;
@@ -529,45 +561,36 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
     /* lock access to the agent and protect fd */
     virObjectLock(agent);
 #if DEBUG_IO
-    VIR_DEBUG("Agent %p I/O on watch %d fd %d events %d", agent, watch, fd, events);
+    VIR_DEBUG("Agent %p I/O on watch %d socket %p cond %d", agent, agent->socket, cond);
 #endif
 
-    if (agent->fd == -1 || agent->watch == 0) {
+    if (agent->fd == -1 || !agent->watch) {
         virObjectUnlock(agent);
         virObjectUnref(agent);
-        return;
+        return G_SOURCE_REMOVE;
     }
 
-    if (agent->fd != fd || agent->watch != watch) {
-        if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
-            eof = true;
-        virReportError(VIR_ERR_INTERNAL_ERROR,
-                       _("event from unexpected fd %d!=%d / watch %d!=%d"),
-                       agent->fd, fd, agent->watch, watch);
-        error = true;
-    } else if (agent->lastError.code != VIR_ERR_OK) {
-        if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
+    if (agent->lastError.code != VIR_ERR_OK) {
+        if (cond & (G_IO_HUP | G_IO_ERR))
             eof = true;
         error = true;
     } else {
-        if (events & VIR_EVENT_HANDLE_WRITABLE) {
+        if (cond & G_IO_OUT) {
             if (qemuAgentIOWrite(agent) < 0)
                 error = true;
-            events &= ~VIR_EVENT_HANDLE_WRITABLE;
         }
 
         if (!error &&
-            events & VIR_EVENT_HANDLE_READABLE) {
+            cond & G_IO_IN) {
             int got = qemuAgentIORead(agent);
-            events &= ~VIR_EVENT_HANDLE_READABLE;
             if (got < 0) {
                 error = true;
             } else if (got == 0) {
                 eof = true;
             } else {
-                /* Ignore hangup/error events if we read some data, to
+                /* Ignore hangup/error cond if we read some data, to
                  * give time for that data to be consumed */
-                events = 0;
+                cond = 0;
 
                 if (qemuAgentIOProcess(agent) < 0)
                     error = true;
@@ -575,25 +598,17 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
         }
 
         if (!error &&
-            events & VIR_EVENT_HANDLE_HANGUP) {
+            cond & G_IO_HUP) {
             virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                            _("End of file from agent socket"));
             eof = true;
-            events &= ~VIR_EVENT_HANDLE_HANGUP;
         }
 
         if (!error && !eof &&
-            events & VIR_EVENT_HANDLE_ERROR) {
+            cond & G_IO_ERR) {
             virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
                            _("Invalid file descriptor while waiting for agent"));
             eof = true;
-            events &= ~VIR_EVENT_HANDLE_ERROR;
-        }
-        if (!error && events) {
-            virReportError(VIR_ERR_INTERNAL_ERROR,
-                           _("Unhandled event %d for agent fd %d"),
-                           events, agent->fd);
-            error = true;
         }
     }
 
@@ -649,15 +664,19 @@ qemuAgentIO(int watch, int fd, int events, void *opaque)
         virObjectUnlock(agent);
         virObjectUnref(agent);
     }
+
+    return G_SOURCE_REMOVE;
 }
 
 
 qemuAgentPtr
 qemuAgentOpen(virDomainObjPtr vm,
               const virDomainChrSourceDef *config,
+              GMainContext *context,
               qemuAgentCallbacksPtr cb)
 {
     qemuAgentPtr agent;
+    g_autoptr(GError) gerr = NULL;
 
     if (!cb || !cb->eofNotify) {
         virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
@@ -693,22 +712,20 @@ qemuAgentOpen(virDomainObjPtr vm,
     if (agent->fd == -1)
         goto cleanup;
 
-    virObjectRef(agent);
-    if ((agent->watch = virEventAddHandle(agent->fd,
-                                        VIR_EVENT_HANDLE_HANGUP |
-                                        VIR_EVENT_HANDLE_ERROR |
-                                        VIR_EVENT_HANDLE_READABLE,
-                                        qemuAgentIO,
-                                        agent,
-                                        virObjectFreeCallback)) < 0) {
-        virObjectUnref(agent);
-        virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
-                       _("unable to register agent events"));
+    agent->context = g_main_context_ref(context);
+
+    agent->socket = g_socket_new_from_fd(agent->fd, &gerr);
+    if (!agent->socket) {
+        virReportError(VIR_ERR_INTERNAL_ERROR,
+                       _("Unable to create socket object: %s"),
+                       gerr->message);
         goto cleanup;
     }
 
+    qemuAgentRegister(agent);
+
     agent->running = true;
-    VIR_DEBUG("New agent %p fd =%d watch=%d", agent, agent->fd, agent->watch);
+    VIR_DEBUG("New agent %p fd=%d", agent, agent->fd);
 
     return agent;
 
@@ -763,12 +780,11 @@ void qemuAgentClose(qemuAgentPtr agent)
 
     virObjectLock(agent);
 
-    if (agent->fd >= 0) {
-        if (agent->watch) {
-            virEventRemoveHandle(agent->watch);
-            agent->watch = 0;
-        }
-        VIR_FORCE_CLOSE(agent->fd);
+    if (agent->socket) {
+        qemuAgentUnregister(agent);
+        g_object_unref(agent->socket);
+        agent->socket = NULL;
+        agent->fd = -1;
     }
 
     qemuAgentNotifyCloseLocked(agent);
index 5656fe60ff4f951430f6d4d108b6c03f52f0a1e5..d4d861532375de3b311e94b8c10b5a47621ad7d2 100644 (file)
@@ -41,6 +41,7 @@ struct _qemuAgentCallbacks {
 
 qemuAgentPtr qemuAgentOpen(virDomainObjPtr vm,
                            const virDomainChrSourceDef *config,
+                           GMainContext *context,
                            qemuAgentCallbacksPtr cb);
 
 void qemuAgentClose(qemuAgentPtr mon);
index 541a526cf737de5cf48a3f6da7ba415c236bfa70..67dad9841ade22ca70c03e880f69a59767531c10 100644 (file)
@@ -237,6 +237,7 @@ qemuConnectAgent(virQEMUDriverPtr driver, virDomainObjPtr vm)
 
     agent = qemuAgentOpen(vm,
                           config->source,
+                          virEventThreadGetContext(priv->eventThread),
                           &agentCallbacks);
 
     virObjectLock(vm);
index df93aae758b2fe3f2bf38143f9ca3c33ab278d7e..328bfb85253932d47717de0431cbad441a11d904 100644 (file)
@@ -1406,6 +1406,7 @@ qemuMonitorTestNewAgent(virDomainXMLOptionPtr xmlopt)
 
     if (!(test->agent = qemuAgentOpen(test->vm,
                                       &src,
+                                      virEventThreadGetContext(test->eventThread),
                                       &qemuMonitorTestAgentCallbacks)))
         goto error;