#include <unistd.h>
#include <fcntl.h>
#include <sys/time.h>
+#include <gio/gio.h>
#include "qemu_agent.h"
#include "qemu_domain.h"
virCond notify;
int fd;
- int watch;
+
+ GMainContext *context;
+ GSocket *socket;
+ GSource *watch;
bool running;
(agent->cb->destroy)(agent, agent->vm);
virCondDestroy(&agent->notify);
VIR_FREE(agent->buffer);
+ g_main_context_unref(agent->context);
virResetError(&agent->lastError);
}
return -1;
}
- if (virSetNonBlock(agentfd) < 0) {
- virReportSystemError(errno, "%s",
- _("Unable to put monitor "
- "into non-blocking mode"));
- goto error;
- }
-
if (virSetCloseExec(agentfd) < 0) {
virReportSystemError(errno, "%s",
_("Unable to set agent "
}
-static void qemuAgentUpdateWatch(qemuAgentPtr agent)
-{
- int events =
- VIR_EVENT_HANDLE_HANGUP |
- VIR_EVENT_HANDLE_ERROR;
+static gboolean
+qemuAgentIO(GSocket *socket,
+ GIOCondition cond,
+ gpointer opaque);
- if (!agent->watch)
- return;
+
+static void
+qemuAgentRegister(qemuAgentPtr agent)
+{
+ GIOCondition cond = 0;
if (agent->lastError.code == VIR_ERR_OK) {
- events |= VIR_EVENT_HANDLE_READABLE;
+ cond |= G_IO_IN;
if (agent->msg && agent->msg->txOffset < agent->msg->txLength)
- events |= VIR_EVENT_HANDLE_WRITABLE;
+ cond |= G_IO_OUT;
}
- virEventUpdateHandle(agent->watch, events);
+ agent->watch = g_socket_create_source(agent->socket,
+ cond,
+ NULL);
+
+ virObjectRef(agent);
+ g_source_set_callback(agent->watch,
+ (GSourceFunc)qemuAgentIO,
+ agent,
+ (GDestroyNotify)virObjectUnref);
+
+ g_source_attach(agent->watch,
+ agent->context);
}
static void
-qemuAgentIO(int watch, int fd, int events, void *opaque)
+qemuAgentUnregister(qemuAgentPtr agent)
+{
+ if (agent->watch) {
+ g_source_destroy(agent->watch);
+ g_source_unref(agent->watch);
+ agent->watch = NULL;
+ }
+}
+
+
+static void qemuAgentUpdateWatch(qemuAgentPtr agent)
+{
+ qemuAgentUnregister(agent);
+ if (agent->socket)
+ qemuAgentRegister(agent);
+}
+
+
+static gboolean
+qemuAgentIO(GSocket *socket G_GNUC_UNUSED,
+ GIOCondition cond,
+ gpointer opaque)
{
qemuAgentPtr agent = opaque;
bool error = false;
/* lock access to the agent and protect fd */
virObjectLock(agent);
#if DEBUG_IO
- VIR_DEBUG("Agent %p I/O on watch %d fd %d events %d", agent, watch, fd, events);
+ VIR_DEBUG("Agent %p I/O on watch %d socket %p cond %d", agent, agent->socket, cond);
#endif
- if (agent->fd == -1 || agent->watch == 0) {
+ if (agent->fd == -1 || !agent->watch) {
virObjectUnlock(agent);
virObjectUnref(agent);
- return;
+ return G_SOURCE_REMOVE;
}
- if (agent->fd != fd || agent->watch != watch) {
- if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
- eof = true;
- virReportError(VIR_ERR_INTERNAL_ERROR,
- _("event from unexpected fd %d!=%d / watch %d!=%d"),
- agent->fd, fd, agent->watch, watch);
- error = true;
- } else if (agent->lastError.code != VIR_ERR_OK) {
- if (events & (VIR_EVENT_HANDLE_HANGUP | VIR_EVENT_HANDLE_ERROR))
+ if (agent->lastError.code != VIR_ERR_OK) {
+ if (cond & (G_IO_HUP | G_IO_ERR))
eof = true;
error = true;
} else {
- if (events & VIR_EVENT_HANDLE_WRITABLE) {
+ if (cond & G_IO_OUT) {
if (qemuAgentIOWrite(agent) < 0)
error = true;
- events &= ~VIR_EVENT_HANDLE_WRITABLE;
}
if (!error &&
- events & VIR_EVENT_HANDLE_READABLE) {
+ cond & G_IO_IN) {
int got = qemuAgentIORead(agent);
- events &= ~VIR_EVENT_HANDLE_READABLE;
if (got < 0) {
error = true;
} else if (got == 0) {
eof = true;
} else {
- /* Ignore hangup/error events if we read some data, to
+ /* Ignore hangup/error cond if we read some data, to
* give time for that data to be consumed */
- events = 0;
+ cond = 0;
if (qemuAgentIOProcess(agent) < 0)
error = true;
}
if (!error &&
- events & VIR_EVENT_HANDLE_HANGUP) {
+ cond & G_IO_HUP) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("End of file from agent socket"));
eof = true;
- events &= ~VIR_EVENT_HANDLE_HANGUP;
}
if (!error && !eof &&
- events & VIR_EVENT_HANDLE_ERROR) {
+ cond & G_IO_ERR) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Invalid file descriptor while waiting for agent"));
eof = true;
- events &= ~VIR_EVENT_HANDLE_ERROR;
- }
- if (!error && events) {
- virReportError(VIR_ERR_INTERNAL_ERROR,
- _("Unhandled event %d for agent fd %d"),
- events, agent->fd);
- error = true;
}
}
virObjectUnlock(agent);
virObjectUnref(agent);
}
+
+ return G_SOURCE_REMOVE;
}
qemuAgentPtr
qemuAgentOpen(virDomainObjPtr vm,
const virDomainChrSourceDef *config,
+ GMainContext *context,
qemuAgentCallbacksPtr cb)
{
qemuAgentPtr agent;
+ g_autoptr(GError) gerr = NULL;
if (!cb || !cb->eofNotify) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
if (agent->fd == -1)
goto cleanup;
- virObjectRef(agent);
- if ((agent->watch = virEventAddHandle(agent->fd,
- VIR_EVENT_HANDLE_HANGUP |
- VIR_EVENT_HANDLE_ERROR |
- VIR_EVENT_HANDLE_READABLE,
- qemuAgentIO,
- agent,
- virObjectFreeCallback)) < 0) {
- virObjectUnref(agent);
- virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
- _("unable to register agent events"));
+ agent->context = g_main_context_ref(context);
+
+ agent->socket = g_socket_new_from_fd(agent->fd, &gerr);
+ if (!agent->socket) {
+ virReportError(VIR_ERR_INTERNAL_ERROR,
+ _("Unable to create socket object: %s"),
+ gerr->message);
goto cleanup;
}
+ qemuAgentRegister(agent);
+
agent->running = true;
- VIR_DEBUG("New agent %p fd =%d watch=%d", agent, agent->fd, agent->watch);
+ VIR_DEBUG("New agent %p fd=%d", agent, agent->fd);
return agent;
virObjectLock(agent);
- if (agent->fd >= 0) {
- if (agent->watch) {
- virEventRemoveHandle(agent->watch);
- agent->watch = 0;
- }
- VIR_FORCE_CLOSE(agent->fd);
+ if (agent->socket) {
+ qemuAgentUnregister(agent);
+ g_object_unref(agent->socket);
+ agent->socket = NULL;
+ agent->fd = -1;
}
qemuAgentNotifyCloseLocked(agent);