#include <gnutls/gnutls.h>
#include <gnutls/x509.h>
#include <fcntl.h>
+#include <poll.h>
#include "qemu_migration.h"
#include "qemu_monitor.h"
virStreamPtr st;
int sock;
virError err;
+ int wakeupRecvFD;
+ int wakeupSendFD;
};
static void qemuMigrationIOFunc(void *arg)
{
qemuMigrationIOThreadPtr data = arg;
- char *buffer;
- int nbytes = TUNNEL_SEND_BUF_SIZE;
+ char *buffer = NULL;
+ struct pollfd fds[2];
+ int timeout = -1;
+ virErrorPtr err = NULL;
+
+ VIR_DEBUG("Running migration tunnel; stream=%p, sock=%d",
+ data->st, data->sock);
if (VIR_ALLOC_N(buffer, TUNNEL_SEND_BUF_SIZE) < 0) {
virReportOOMError();
- virStreamAbort(data->st);
- goto error;
+ goto abrt;
}
+ fds[0].fd = data->sock;
+ fds[1].fd = data->wakeupRecvFD;
+
for (;;) {
- nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE);
- if (nbytes < 0) {
+ int ret;
+
+ fds[0].events = fds[1].events = POLLIN;
+ fds[0].revents = fds[1].revents = 0;
+
+ ret = poll(fds, ARRAY_CARDINALITY(fds), timeout);
+
+ if (ret < 0) {
+ if (errno == EAGAIN || errno == EINTR)
+ continue;
virReportSystemError(errno, "%s",
- _("tunnelled migration failed to read from qemu"));
- virStreamAbort(data->st);
- VIR_FREE(buffer);
- goto error;
+ _("poll failed in migration tunnel"));
+ goto abrt;
}
- else if (nbytes == 0)
- /* EOF; get out of here */
+
+ if (ret == 0) {
+ /* We were asked to gracefully stop but reading would block. This
+ * can only happen if qemu told us migration finished but didn't
+ * close the migration fd. We handle this in the same way as EOF.
+ */
+ VIR_DEBUG("QEMU forgot to close migration fd");
break;
+ }
- if (virStreamSend(data->st, buffer, nbytes) < 0) {
- VIR_FREE(buffer);
- goto error;
+ if (fds[1].revents & (POLLIN | POLLERR | POLLHUP)) {
+ char stop = 0;
+
+ if (saferead(data->wakeupRecvFD, &stop, 1) != 1) {
+ virReportSystemError(errno, "%s",
+ _("failed to read from wakeup fd"));
+ goto abrt;
+ }
+
+ VIR_DEBUG("Migration tunnel was asked to %s",
+ stop ? "abort" : "finish");
+ if (stop) {
+ goto abrt;
+ } else {
+ timeout = 0;
+ }
}
- }
- VIR_FREE(buffer);
+ if (fds[0].revents & (POLLIN | POLLERR | POLLHUP)) {
+ int nbytes;
+
+ nbytes = saferead(data->sock, buffer, TUNNEL_SEND_BUF_SIZE);
+ if (nbytes > 0) {
+ if (virStreamSend(data->st, buffer, nbytes) < 0)
+ goto error;
+ } else if (nbytes < 0) {
+ virReportSystemError(errno, "%s",
+ _("tunnelled migration failed to read from qemu"));
+ goto abrt;
+ } else {
+ /* EOF; get out of here */
+ break;
+ }
+ }
+ }
if (virStreamFinish(data->st) < 0)
goto error;
+ VIR_FREE(buffer);
+
return;
+abrt:
+ err = virSaveLastError();
+ if (err && err->code == VIR_ERR_OK) {
+ virFreeError(err);
+ err = NULL;
+ }
+ virStreamAbort(data->st);
+ if (err) {
+ virSetError(err);
+ virFreeError(err);
+ }
+
error:
virCopyLastError(&data->err);
virResetLastError();
+ VIR_FREE(buffer);
}
qemuMigrationStartTunnel(virStreamPtr st,
int sock)
{
- qemuMigrationIOThreadPtr io;
+ qemuMigrationIOThreadPtr io = NULL;
+ int wakeupFD[2] = { -1, -1 };
- if (VIR_ALLOC(io) < 0) {
- virReportOOMError();
- return NULL;
+ if (pipe2(wakeupFD, O_CLOEXEC) < 0) {
+ virReportSystemError(errno, "%s",
+ _("Unable to make pipe"));
+ goto error;
}
+ if (VIR_ALLOC(io) < 0)
+ goto no_memory;
+
io->st = st;
io->sock = sock;
+ io->wakeupRecvFD = wakeupFD[0];
+ io->wakeupSendFD = wakeupFD[1];
if (virThreadCreate(&io->thread, true,
qemuMigrationIOFunc,
io) < 0) {
virReportSystemError(errno, "%s",
_("Unable to create migration thread"));
- VIR_FREE(io);
- return NULL;
+ goto error;
}
return io;
+
+no_memory:
+ virReportOOMError();
+error:
+ VIR_FORCE_CLOSE(wakeupFD[0]);
+ VIR_FORCE_CLOSE(wakeupFD[1]);
+ VIR_FREE(io);
+ return NULL;
}
static int
-qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io)
+qemuMigrationStopTunnel(qemuMigrationIOThreadPtr io, bool error)
{
int rv = -1;
+ char stop = error ? 1 : 0;
+
+ /* make sure the thread finishes its job and is joinable */
+ if (safewrite(io->wakeupSendFD, &stop, 1) != 1) {
+ virReportSystemError(errno, "%s",
+ _("failed to wakeup migration tunnel"));
+ goto cleanup;
+ }
+
virThreadJoin(&io->thread);
/* Forward error from the IO thread, to this thread */
if (io->err.code != VIR_ERR_OK) {
- virSetError(&io->err);
+ if (error)
+ rv = 0;
+ else
+ virSetError(&io->err);
virResetError(&io->err);
goto cleanup;
}
rv = 0;
cleanup:
+ VIR_FORCE_CLOSE(io->wakeupSendFD);
+ VIR_FORCE_CLOSE(io->wakeupRecvFD);
VIR_FREE(io);
return rv;
}
orig_err = virSaveLastError();
if (spec->fwdType != MIGRATION_FWD_DIRECT) {
- /* Close now to ensure the IO thread quits & is joinable */
- VIR_FORCE_CLOSE(fd);
- if (iothread && qemuMigrationStopTunnel(iothread) < 0)
+ if (iothread && qemuMigrationStopTunnel(iothread, ret < 0) < 0)
ret = -1;
+ VIR_FORCE_CLOSE(fd);
}
if (ret == 0 &&