#include "migration/colo.h"
#include "hw/boards.h"
#include "monitor/monitor.h"
+#include "net/announce.h"
#define MAX_THROTTLE (32 << 20) /* Migration transfer speed throttling */
/* Define default autoconverge cpu throttle migration parameters */
#define DEFAULT_MIGRATE_CPU_THROTTLE_INITIAL 20
#define DEFAULT_MIGRATE_CPU_THROTTLE_INCREMENT 10
+#define DEFAULT_MIGRATE_MAX_CPU_THROTTLE 99
/* Migration XBZRLE default cache size */
#define DEFAULT_MIGRATE_XBZRLE_CACHE_SIZE (64 * 1024 * 1024)
-/* The delay time (in ms) between two COLO checkpoints
- * Note: Please change this default value to 10000 when we support hybrid mode.
- */
-#define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY 200
+/* The delay time (in ms) between two COLO checkpoints */
+#define DEFAULT_MIGRATE_X_CHECKPOINT_DELAY (200 * 100)
#define DEFAULT_MIGRATE_MULTIFD_CHANNELS 2
#define DEFAULT_MIGRATE_MULTIFD_PAGE_COUNT 16
*/
#define DEFAULT_MIGRATE_MAX_POSTCOPY_BANDWIDTH 0
+/*
+ * Parameters for self_announce_delay giving a stream of RARP/ARP
+ * packets after migration.
+ */
+#define DEFAULT_MIGRATE_ANNOUNCE_INITIAL 50
+#define DEFAULT_MIGRATE_ANNOUNCE_MAX 550
+#define DEFAULT_MIGRATE_ANNOUNCE_ROUNDS 5
+#define DEFAULT_MIGRATE_ANNOUNCE_STEP 100
+
static NotifierList migration_state_notifiers =
NOTIFIER_LIST_INITIALIZER(migration_state_notifiers);
static void migrate_generate_event(int new_state)
{
if (migrate_use_events()) {
- qapi_event_send_migration(new_state, &error_abort);
+ qapi_event_send_migration(new_state);
}
}
return migrate_send_rp_message(mis, msg_type, msglen, bufc);
}
+static bool migration_colo_enabled;
+bool migration_incoming_colo_enabled(void)
+{
+ return migration_colo_enabled;
+}
+
+void migration_incoming_disable_colo(void)
+{
+ migration_colo_enabled = false;
+}
+
+void migration_incoming_enable_colo(void)
+{
+ migration_colo_enabled = true;
+}
+
void qemu_start_incoming_migration(const char *uri, Error **errp)
{
const char *p;
- qapi_event_send_migration(MIGRATION_STATUS_SETUP, &error_abort);
+ qapi_event_send_migration(MIGRATION_STATUS_SETUP);
if (!strcmp(uri, "defer")) {
deferred_incoming_migration(errp);
} else if (strstart(uri, "tcp:", &p)) {
* This must happen after all error conditions are dealt with and
* we're sure the VM is going to be running on this host.
*/
- qemu_announce_self();
+ qemu_announce_self(&mis->announce_timer, migrate_announce_params());
if (multifd_load_cleanup(&local_err) != 0) {
error_report_err(local_err);
MigrationIncomingState *mis = migration_incoming_get_current();
PostcopyState ps;
int ret;
+ Error *local_err = NULL;
assert(mis->from_src_file);
+ mis->migration_incoming_co = qemu_coroutine_self();
mis->largest_page_size = qemu_ram_pagesize_largest();
postcopy_state_set(POSTCOPY_INCOMING_NONE);
migrate_set_state(&mis->state, MIGRATION_STATUS_NONE,
}
/* we get COLO info, and know if we are in COLO mode */
- if (!ret && migration_incoming_enable_colo()) {
- mis->migration_incoming_co = qemu_coroutine_self();
+ if (!ret && migration_incoming_colo_enabled()) {
+ /* Make sure all file formats flush their mutable metadata */
+ bdrv_invalidate_cache_all(&local_err);
+ if (local_err) {
+ error_report_err(local_err);
+ goto fail;
+ }
+
+ if (colo_init_ram_cache() < 0) {
+ error_report("Init ram cache failed");
+ goto fail;
+ }
+
qemu_thread_create(&mis->colo_incoming_thread, "COLO incoming",
colo_process_incoming_thread, mis, QEMU_THREAD_JOINABLE);
mis->have_colo_incoming_thread = true;
/* Wait checkpoint incoming thread exit before free resource */
qemu_thread_join(&mis->colo_incoming_thread);
+ /* We hold the global iothread lock, so it is safe here */
+ colo_release_ram_cache();
}
if (ret < 0) {
- Error *local_err = NULL;
-
- migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
- MIGRATION_STATUS_FAILED);
error_report("load of migration failed: %s", strerror(-ret));
- qemu_fclose(mis->from_src_file);
- if (multifd_load_cleanup(&local_err) != 0) {
- error_report_err(local_err);
- }
- exit(EXIT_FAILURE);
+ goto fail;
}
mis->bh = qemu_bh_new(process_incoming_migration_bh, mis);
qemu_bh_schedule(mis->bh);
+ mis->migration_incoming_co = NULL;
+ return;
+fail:
+ local_err = NULL;
+ migrate_set_state(&mis->state, MIGRATION_STATUS_ACTIVE,
+ MIGRATION_STATUS_FAILED);
+ qemu_fclose(mis->from_src_file);
+ if (multifd_load_cleanup(&local_err) != 0) {
+ error_report_err(local_err);
+ }
+ exit(EXIT_FAILURE);
}
static void migration_incoming_setup(QEMUFile *f)
qemu_coroutine_enter(co);
}
-void migration_fd_process_incoming(QEMUFile *f)
+/* Returns true if recovered from a paused migration, otherwise false */
+static bool postcopy_try_recover(QEMUFile *f)
{
MigrationIncomingState *mis = migration_incoming_get_current();
* that source is ready to reply to page requests.
*/
qemu_sem_post(&mis->postcopy_pause_sem_dst);
- } else {
- /* New incoming migration */
- migration_incoming_setup(f);
- migration_incoming_process();
+ return true;
}
+
+ return false;
+}
+
+void migration_fd_process_incoming(QEMUFile *f)
+{
+ if (postcopy_try_recover(f)) {
+ return;
+ }
+
+ migration_incoming_setup(f);
+ migration_incoming_process();
}
-void migration_ioc_process_incoming(QIOChannel *ioc)
+void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
{
MigrationIncomingState *mis = migration_incoming_get_current();
+ bool start_migration;
if (!mis->from_src_file) {
+ /* The first connection (multifd may have multiple) */
QEMUFile *f = qemu_fopen_channel_input(ioc);
+
+ /* If it's a recovery, we're done */
+ if (postcopy_try_recover(f)) {
+ return;
+ }
+
migration_incoming_setup(f);
- return;
+
+ /*
+ * Common migration only needs one channel, so we can start
+ * right now. Multifd needs more than one channel, we wait.
+ */
+ start_migration = !migrate_use_multifd();
+ } else {
+ Error *local_err = NULL;
+ /* Multiple connections */
+ assert(migrate_use_multifd());
+ start_migration = multifd_recv_new_channel(ioc, &local_err);
+ if (local_err) {
+ error_propagate(errp, local_err);
+ return;
+ }
+ }
+
+ if (start_migration) {
+ migration_incoming_process();
}
- multifd_recv_new_channel(ioc);
}
/**
params->compress_level = s->parameters.compress_level;
params->has_compress_threads = true;
params->compress_threads = s->parameters.compress_threads;
+ params->has_compress_wait_thread = true;
+ params->compress_wait_thread = s->parameters.compress_wait_thread;
params->has_decompress_threads = true;
params->decompress_threads = s->parameters.decompress_threads;
params->has_cpu_throttle_initial = true;
params->xbzrle_cache_size = s->parameters.xbzrle_cache_size;
params->has_max_postcopy_bandwidth = true;
params->max_postcopy_bandwidth = s->parameters.max_postcopy_bandwidth;
+ params->has_max_cpu_throttle = true;
+ params->max_cpu_throttle = s->parameters.max_cpu_throttle;
+ params->has_announce_initial = true;
+ params->announce_initial = s->parameters.announce_initial;
+ params->has_announce_max = true;
+ params->announce_max = s->parameters.announce_max;
+ params->has_announce_rounds = true;
+ params->announce_rounds = s->parameters.announce_rounds;
+ params->has_announce_step = true;
+ params->announce_step = s->parameters.announce_step;
return params;
}
+AnnounceParameters *migrate_announce_params(void)
+{
+ static AnnounceParameters ap;
+
+ MigrationState *s = migrate_get_current();
+
+ ap.initial = s->parameters.announce_initial;
+ ap.max = s->parameters.announce_max;
+ ap.rounds = s->parameters.announce_rounds;
+ ap.step = s->parameters.announce_step;
+
+ return ≈
+}
+
/*
* Return true if we're already in the middle of a migration
* (i.e. any of the active or setup states)
*/
-static bool migration_is_setup_or_active(int state)
+bool migration_is_setup_or_active(int state)
{
switch (state) {
case MIGRATION_STATUS_ACTIVE:
info->ram->postcopy_requests = ram_counters.postcopy_requests;
info->ram->page_size = qemu_target_page_size();
info->ram->multifd_bytes = ram_counters.multifd_bytes;
+ info->ram->pages_per_second = s->pages_per_second;
if (migrate_use_xbzrle()) {
info->has_xbzrle_cache = true;
info->xbzrle_cache->overflow = xbzrle_counters.overflow;
}
+ if (migrate_use_compression()) {
+ info->has_compression = true;
+ info->compression = g_malloc0(sizeof(*info->compression));
+ info->compression->pages = compression_counters.pages;
+ info->compression->busy = compression_counters.busy;
+ info->compression->busy_rate = compression_counters.busy_rate;
+ info->compression->compressed_size =
+ compression_counters.compressed_size;
+ info->compression->compression_rate =
+ compression_counters.compression_rate;
+ }
+
if (cpu_throttle_active()) {
info->has_cpu_throttle_percentage = true;
info->cpu_throttle_percentage = cpu_throttle_get_percentage();
}
#endif
+#ifndef CONFIG_REPLICATION
+ if (cap_list[MIGRATION_CAPABILITY_X_COLO]) {
+ error_setg(errp, "QEMU compiled without replication module"
+ " can't enable COLO");
+ error_append_hint(errp, "Please enable replication before COLO.\n");
+ return false;
+ }
+#endif
+
if (cap_list[MIGRATION_CAPABILITY_POSTCOPY_RAM]) {
if (cap_list[MIGRATION_CAPABILITY_COMPRESS]) {
/* The decompression threads asynchronously write into RAM
case MIGRATION_STATUS_CANCELLED:
case MIGRATION_STATUS_ACTIVE:
case MIGRATION_STATUS_POSTCOPY_ACTIVE:
+ case MIGRATION_STATUS_POSTCOPY_PAUSED:
+ case MIGRATION_STATUS_POSTCOPY_RECOVER:
case MIGRATION_STATUS_FAILED:
case MIGRATION_STATUS_COLO:
info->has_status = true;
return false;
}
+ if (params->has_max_cpu_throttle &&
+ (params->max_cpu_throttle < params->cpu_throttle_initial ||
+ params->max_cpu_throttle > 99)) {
+ error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+ "max_cpu_throttle",
+ "an integer in the range of cpu_throttle_initial to 99");
+ return false;
+ }
+
+ if (params->has_announce_initial &&
+ params->announce_initial > 100000) {
+ error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+ "announce_initial",
+ "is invalid, it must be less than 100000 ms");
+ return false;
+ }
+ if (params->has_announce_max &&
+ params->announce_max > 100000) {
+ error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+ "announce_max",
+ "is invalid, it must be less than 100000 ms");
+ return false;
+ }
+ if (params->has_announce_rounds &&
+ params->announce_rounds > 1000) {
+ error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+ "announce_rounds",
+ "is invalid, it must be in the range of 0 to 1000");
+ return false;
+ }
+ if (params->has_announce_step &&
+ (params->announce_step < 1 ||
+ params->announce_step > 10000)) {
+ error_setg(errp, QERR_INVALID_PARAMETER_VALUE,
+ "announce_step",
+ "is invalid, it must be in the range of 1 to 10000 ms");
+ return false;
+ }
return true;
}
dest->compress_threads = params->compress_threads;
}
+ if (params->has_compress_wait_thread) {
+ dest->compress_wait_thread = params->compress_wait_thread;
+ }
+
if (params->has_decompress_threads) {
dest->decompress_threads = params->decompress_threads;
}
if (params->has_max_postcopy_bandwidth) {
dest->max_postcopy_bandwidth = params->max_postcopy_bandwidth;
}
+ if (params->has_max_cpu_throttle) {
+ dest->max_cpu_throttle = params->max_cpu_throttle;
+ }
+ if (params->has_announce_initial) {
+ dest->announce_initial = params->announce_initial;
+ }
+ if (params->has_announce_max) {
+ dest->announce_max = params->announce_max;
+ }
+ if (params->has_announce_rounds) {
+ dest->announce_rounds = params->announce_rounds;
+ }
+ if (params->has_announce_step) {
+ dest->announce_step = params->announce_step;
+ }
}
static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
s->parameters.compress_threads = params->compress_threads;
}
+ if (params->has_compress_wait_thread) {
+ s->parameters.compress_wait_thread = params->compress_wait_thread;
+ }
+
if (params->has_decompress_threads) {
s->parameters.decompress_threads = params->decompress_threads;
}
if (params->has_max_postcopy_bandwidth) {
s->parameters.max_postcopy_bandwidth = params->max_postcopy_bandwidth;
}
+ if (params->has_max_cpu_throttle) {
+ s->parameters.max_cpu_throttle = params->max_cpu_throttle;
+ }
+ if (params->has_announce_initial) {
+ s->parameters.announce_initial = params->announce_initial;
+ }
+ if (params->has_announce_max) {
+ s->parameters.announce_max = params->announce_max;
+ }
+ if (params->has_announce_rounds) {
+ s->parameters.announce_rounds = params->announce_rounds;
+ }
+ if (params->has_announce_step) {
+ s->parameters.announce_step = params->announce_step;
+ }
}
void qmp_migrate_set_parameters(MigrateSetParameters *params, Error **errp)
qemu_savevm_state_cleanup();
if (s->to_dst_file) {
- Error *local_err = NULL;
QEMUFile *tmp;
trace_migrate_fd_cleanup();
}
qemu_mutex_lock_iothread();
- if (multifd_save_cleanup(&local_err) != 0) {
- error_report_err(local_err);
- }
+ multifd_save_cleanup();
qemu_mutex_lock(&s->qemu_file_lock);
tmp = s->to_dst_file;
s->to_dst_file = NULL;
s->rp_state.from_dst_file = NULL;
s->rp_state.error = false;
s->mbps = 0.0;
+ s->pages_per_second = 0.0;
s->downtime = 0;
s->expected_downtime = 0;
s->setup_time = 0;
int migrate_add_blocker(Error *reason, Error **errp)
{
if (migrate_get_current()->only_migratable) {
- error_propagate(errp, error_copy(reason));
- error_prepend(errp, "disallowing migration blocker "
- "(--only_migratable) for: ");
+ error_propagate_prepend(errp, error_copy(reason),
+ "disallowing migration blocker "
+ "(--only_migratable) for: ");
return -EACCES;
}
return 0;
}
- error_propagate(errp, error_copy(reason));
- error_prepend(errp, "disallowing migration blocker (migration in "
- "progress) for: ");
+ error_propagate_prepend(errp, error_copy(reason),
+ "disallowing migration blocker "
+ "(migration in progress) for: ");
return -EBUSY;
}
"paused migration");
return false;
}
+
+ /*
+ * Postcopy recovery won't work well with release-ram
+ * capability since release-ram will drop the page buffer as
+ * long as the page is put into the send buffer. So if there
+ * is a network failure happened, any page buffers that have
+ * not yet reached the destination VM but have already been
+ * sent from the source VM will be lost forever. Let's refuse
+ * the client from resuming such a postcopy migration.
+ * Luckily release-ram was designed to only be used when src
+ * and destination VMs are on the same host, so it should be
+ * fine.
+ */
+ if (migrate_release_ram()) {
+ error_setg(errp, "Postcopy recovery cannot work "
+ "when release-ram capability is set");
+ return false;
+ }
+
/* This is a resume, skip init status */
return true;
}
return s->parameters.compress_threads;
}
+int migrate_compress_wait_thread(void)
+{
+ MigrationState *s;
+
+ s = migrate_get_current();
+
+ return s->parameters.compress_wait_thread;
+}
+
int migrate_decompress_threads(void)
{
MigrationState *s;
return s->parameters.max_postcopy_bandwidth;
}
-
bool migrate_use_block(void)
{
MigrationState *s;
int res;
trace_source_return_path_thread_entry();
+ rcu_register_thread();
retry:
while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
*/
if (postcopy_pause_return_path_thread(ms)) {
/* Reload rp, reset the rest */
- rp = ms->rp_state.from_dst_file;
+ if (rp != ms->rp_state.from_dst_file) {
+ qemu_fclose(rp);
+ rp = ms->rp_state.from_dst_file;
+ }
ms->rp_state.error = false;
goto retry;
}
trace_source_return_path_thread_end();
ms->rp_state.from_dst_file = NULL;
qemu_fclose(rp);
+ rcu_unregister_thread();
return NULL;
}
qemu_mutex_lock_iothread();
trace_postcopy_start_set_run();
- qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER);
+ qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER, NULL);
global_state_store();
ret = vm_stop_force_state(RUN_STATE_FINISH_MIGRATE);
if (ret < 0) {
if (s->state == MIGRATION_STATUS_ACTIVE) {
qemu_mutex_lock_iothread();
s->downtime_start = qemu_clock_get_ms(QEMU_CLOCK_REALTIME);
- qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER);
+ qemu_system_wakeup_request(QEMU_WAKEUP_REASON_OTHER, NULL);
s->vm_was_running = runstate_is_running();
ret = global_state_store();
static MigThrError migration_detect_error(MigrationState *s)
{
int ret;
+ int state = s->state;
+
+ if (state == MIGRATION_STATUS_CANCELLING ||
+ state == MIGRATION_STATUS_CANCELLED) {
+ /* End the migration, but don't set the state to failed */
+ return MIG_THR_ERR_FATAL;
+ }
/* Try to detect any file errors */
ret = qemu_file_get_error(s->to_dst_file);
return MIG_THR_ERR_NONE;
}
- if (s->state == MIGRATION_STATUS_POSTCOPY_ACTIVE && ret == -EIO) {
+ if (state == MIGRATION_STATUS_POSTCOPY_ACTIVE && ret == -EIO) {
/*
* For postcopy, we allow the network to be down for a
* while. After that, it can be continued by a
* For precopy (or postcopy with error outside IO), we fail
* with no time.
*/
- migrate_set_state(&s->state, s->state, MIGRATION_STATUS_FAILED);
+ migrate_set_state(&s->state, state, MIGRATION_STATUS_FAILED);
trace_migration_thread_file_err();
/* Time to stop the migration, now. */
static void migration_update_counters(MigrationState *s,
int64_t current_time)
{
- uint64_t transferred, time_spent;
+ uint64_t transferred, transferred_pages, time_spent;
uint64_t current_bytes; /* bytes transferred since the beginning */
double bandwidth;
s->mbps = (((double) transferred * 8.0) /
((double) time_spent / 1000.0)) / 1000.0 / 1000.0;
+ transferred_pages = ram_get_total_transferred_pages() -
+ s->iteration_initial_pages;
+ s->pages_per_second = (double) transferred_pages /
+ (((double) time_spent / 1000.0));
+
/*
* if we haven't sent anything, we don't want to
* recalculate. 10000 is a small enough number for our purposes
s->iteration_start_time = current_time;
s->iteration_initial_bytes = current_bytes;
+ s->iteration_initial_pages = ram_get_total_transferred_pages();
trace_migrate_transferred(transferred, time_spent,
bandwidth, s->threshold_size);
/* Fallthrough */
case MIGRATION_STATUS_FAILED:
case MIGRATION_STATUS_CANCELLED:
+ case MIGRATION_STATUS_CANCELLING:
if (s->vm_was_running) {
vm_start();
} else {
qemu_savevm_send_postcopy_advise(s->to_dst_file);
}
+ if (migrate_colo_enabled()) {
+ /* Notify migration destination that we enable COLO */
+ qemu_savevm_send_colo_enable(s->to_dst_file);
+ }
+
qemu_savevm_state_setup(s->to_dst_file);
s->setup_time = qemu_clock_get_ms(QEMU_CLOCK_HOST) - setup_start;
} else {
/* This is a fresh new migration */
rate_limit = s->parameters.max_bandwidth / XFER_LIMIT_RATIO;
- s->expected_downtime = s->parameters.downtime_limit;
- s->cleanup_bh = qemu_bh_new(migrate_fd_cleanup, s);
/* Notify before starting migration thread */
notifier_list_notify(&migration_state_notifiers, s);
DEFINE_PROP_UINT8("x-compress-threads", MigrationState,
parameters.compress_threads,
DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT),
+ DEFINE_PROP_BOOL("x-compress-wait-thread", MigrationState,
+ parameters.compress_wait_thread, true),
DEFINE_PROP_UINT8("x-decompress-threads", MigrationState,
parameters.decompress_threads,
DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT),
DEFINE_PROP_SIZE("max-postcopy-bandwidth", MigrationState,
parameters.max_postcopy_bandwidth,
DEFAULT_MIGRATE_MAX_POSTCOPY_BANDWIDTH),
+ DEFINE_PROP_UINT8("max-cpu-throttle", MigrationState,
+ parameters.max_cpu_throttle,
+ DEFAULT_MIGRATE_MAX_CPU_THROTTLE),
+ DEFINE_PROP_SIZE("announce-initial", MigrationState,
+ parameters.announce_initial,
+ DEFAULT_MIGRATE_ANNOUNCE_INITIAL),
+ DEFINE_PROP_SIZE("announce-max", MigrationState,
+ parameters.announce_max,
+ DEFAULT_MIGRATE_ANNOUNCE_MAX),
+ DEFINE_PROP_SIZE("announce-rounds", MigrationState,
+ parameters.announce_rounds,
+ DEFAULT_MIGRATE_ANNOUNCE_ROUNDS),
+ DEFINE_PROP_SIZE("announce-step", MigrationState,
+ parameters.announce_step,
+ DEFAULT_MIGRATE_ANNOUNCE_STEP),
/* Migration capabilities */
DEFINE_PROP_MIG_CAP("x-xbzrle", MIGRATION_CAPABILITY_XBZRLE),
ms->state = MIGRATION_STATUS_NONE;
ms->mbps = -1;
+ ms->pages_per_second = -1;
qemu_sem_init(&ms->pause_sem, 0);
qemu_mutex_init(&ms->error_mutex);
params->has_x_multifd_page_count = true;
params->has_xbzrle_cache_size = true;
params->has_max_postcopy_bandwidth = true;
+ params->has_max_cpu_throttle = true;
+ params->has_announce_initial = true;
+ params->has_announce_max = true;
+ params->has_announce_rounds = true;
+ params->has_announce_step = true;
qemu_sem_init(&ms->postcopy_pause_sem, 0);
qemu_sem_init(&ms->postcopy_pause_rp_sem, 0);