[pve-devel] [PATCH qemu-server 6/7] migrate: add remote migration handling

Fabian Grünbichler f.gruenbichler at proxmox.com
Tue Apr 13 14:16:39 CEST 2021


remote migration uses a websocket connection to a task worker running on
the target node instead of commands via SSH to control the migration.
this websocket tunnel is started earlier than the SSH tunnel, and allows
adding UNIX-socket forwarding over additional websocket connections
on-demand.

the main differences to regular intra-cluster migration are:
- source VM config and disks are not removed upon migration
- shared storages are treated like local storages, since we can't
assume they are shared across clusters
- NBD migrated disks are explicitly pre-allocated on the target node via
tunnel command before starting the target VM instance
- in addition to storages, network bridges and the VMID itself is
transformed via a user defined mapping

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---

Notes:
    requires
    - mtunnel endpoints on remote node
    - proxmox-websocket-tunnel on local node

 PVE/QemuMigrate.pm | 592 +++++++++++++++++++++++++++++++++++++++------
 PVE/QemuServer.pm  |   8 +-
 2 files changed, 521 insertions(+), 79 deletions(-)

diff --git a/PVE/QemuMigrate.pm b/PVE/QemuMigrate.pm
index 5d44c51..e1053d0 100644
--- a/PVE/QemuMigrate.pm
+++ b/PVE/QemuMigrate.pm
@@ -7,7 +7,13 @@ use IO::File;
 use IPC::Open2;
 use POSIX qw( WNOHANG );
 use Time::HiRes qw( usleep );
+use JSON qw(encode_json decode_json);
+use IO::Socket::UNIX;
+use Socket qw(SOCK_STREAM);
+use Storable qw(dclone);
+use URI::Escape;
 
+use PVE::APIClient::LWP;
 use PVE::Cluster;
 use PVE::INotify;
 use PVE::RPCEnvironment;
@@ -83,7 +89,7 @@ sub finish_command_pipe {
 	}
     }
 
-    $self->log('info', "ssh tunnel still running - terminating now with SIGTERM\n");
+    $self->log('info', "tunnel still running - terminating now with SIGTERM\n");
     kill(15, $cpid);
 
     # wait again
@@ -92,11 +98,11 @@ sub finish_command_pipe {
 	sleep(1);
     }
 
-    $self->log('info', "ssh tunnel still running - terminating now with SIGKILL\n");
+    $self->log('info', "tunnel still running - terminating now with SIGKILL\n");
     kill 9, $cpid;
     sleep 1;
 
-    $self->log('err', "ssh tunnel child process (PID $cpid) couldn't be collected\n")
+    $self->log('err', "tunnel child process (PID $cpid) couldn't be collected\n")
 	if !&$collect_child_process();
 }
 
@@ -113,18 +119,28 @@ sub read_tunnel {
     };
     die "reading from tunnel failed: $@\n" if $@;
 
-    chomp $output;
+    chomp $output if defined($output);
 
     return $output;
 }
 
 sub write_tunnel {
-    my ($self, $tunnel, $timeout, $command) = @_;
+    my ($self, $tunnel, $timeout, $command, $params) = @_;
 
     $timeout = 60 if !defined($timeout);
 
     my $writer = $tunnel->{writer};
 
+    if ($tunnel->{version} && $tunnel->{version} >= 2) {
+	my $object = defined($params) ? dclone($params) : {};
+	$object->{cmd} = $command;
+
+	$command = eval { JSON::encode_json($object) };
+
+	die "failed to encode command as JSON - $@\n"
+	    if $@;
+    }
+
     eval {
 	PVE::Tools::run_with_timeout($timeout, sub {
 	    print $writer "$command\n";
@@ -134,13 +150,29 @@ sub write_tunnel {
     die "writing to tunnel failed: $@\n" if $@;
 
     if ($tunnel->{version} && $tunnel->{version} >= 1) {
-	my $res = eval { $self->read_tunnel($tunnel, 10); };
+	my $res = eval { $self->read_tunnel($tunnel, $timeout); };
 	die "no reply to command '$command': $@\n" if $@;
 
-	if ($res eq 'OK') {
-	    return;
+	if ($tunnel->{version} == 1) {
+	    if ($res eq 'OK') {
+		return;
+	    } else {
+		die "tunnel replied '$res' to command '$command'\n";
+	    }
 	} else {
-	    die "tunnel replied '$res' to command '$command'\n";
+	    my $parsed = eval { JSON::decode_json($res) };
+	    die "failed to decode tunnel reply '$res' (command '$command') - $@\n"
+		if $@;
+
+	    if (!$parsed->{success}) {
+		if (defined($parsed->{msg})) {
+		    die "error - tunnel command '$command' failed - $parsed->{msg}\n";
+		} else {
+		    die "error - tunnel command '$command' failed\n";
+		}
+	    }
+
+	    return $parsed;
 	}
     }
 }
@@ -183,10 +215,149 @@ sub fork_tunnel {
     return $tunnel;
 }
 
+my $forward_unix_socket = sub {
+    my ($self, $local, $remote) = @_;
+
+    my $params = dclone($self->{tunnel}->{params});
+    $params->{unix} = $local;
+    $params->{url} = $params->{url} ."socket=$remote&";
+    $params->{ticket} = { path => $remote };
+
+    my $cmd = encode_json({
+	control => JSON::true,
+	cmd => 'forward',
+	data => $params,
+    });
+
+    my $writer = $self->{tunnel}->{writer};
+    eval {
+	unlink $local;
+	PVE::Tools::run_with_timeout(15, sub {
+	    print $writer "$cmd\n";
+	    $writer->flush();
+	});
+    };
+    die "failed to write forwarding command - $@\n" if $@;
+
+    $self->read_tunnel($self->{tunnel});
+
+    $self->log('info', "Forwarded local unix socket '$local' to remote '$remote' via websocket tunnel");
+};
+
+sub fork_websocket_tunnel {
+    my ($self, $storages) = @_;
+
+    my $remote = $self->{opts}->{remote};
+    my $conn = $remote->{conn};
+
+    my $websocket_url = "https://$conn->{host}:$conn->{port}/api2/json/nodes/$self->{node}/qemu/$remote->{vmid}/mtunnelwebsocket";
+
+    my $params = {
+	url => $websocket_url,
+    };
+
+    if (my $apitoken = $conn->{apitoken}) {
+	$params->{headers} = [["Authorization", "$apitoken"]];
+    } else {
+	die "can't connect to remote host without credentials\n";
+    }
+
+    if (my $fps = $conn->{cached_fingerprints}) {
+	$params->{fingerprint} = (keys %$fps)[0];
+    }
+
+    my $api_client = PVE::APIClient::LWP->new(%$conn);
+    my $storage_list = join(',', keys %$storages);
+    my $res = $api_client->post("/nodes/$self->{node}/qemu/$remote->{vmid}/mtunnel", { storages => $storage_list });
+    $self->log('info', "remote: started migration tunnel worker '$res->{upid}'");
+    $params->{url} .= "?ticket=".uri_escape($res->{ticket});
+    $params->{url} .= "&socket=$res->{socket}";
+
+    my $reader = IO::Pipe->new();
+    my $writer = IO::Pipe->new();
+
+    my $cpid = fork();
+    if ($cpid) {
+	$writer->writer();
+	$reader->reader();
+	my $tunnel = { writer => $writer, reader => $reader, pid => $cpid };
+
+	eval {
+	    my $writer = $tunnel->{writer};
+	    my $cmd = encode_json({
+		control => JSON::true,
+		cmd => 'connect',
+		data => $params,
+	    });
+
+	    eval {
+		PVE::Tools::run_with_timeout(15, sub {
+		    print {$writer} "$cmd\n";
+		    $writer->flush();
+		});
+	    };
+	    die "failed to write tunnel connect command - $@\n" if $@;
+	};
+	die "failed to connect via WS: $@\n" if $@;
+
+	my $err;
+        eval {
+	    my $writer = $tunnel->{writer};
+	    my $cmd = encode_json({
+		cmd => 'version',
+	    });
+
+	    eval {
+		PVE::Tools::run_with_timeout(15, sub {
+		    print {$writer} "$cmd\n";
+		    $writer->flush();
+		});
+	    };
+	    $err = "failed to write tunnel version command - $@\n" if $@;
+	    my $ver = $self->read_tunnel($tunnel, 10);
+	    $ver = JSON::decode_json($ver);
+	    $ver = $ver->{tunnel};
+
+	    if ($ver =~ /^(\d+)$/) {
+		$tunnel->{version} = $1;
+		$self->log('info', "tunnel info: $ver\n");
+	    } else {
+		$err = "received invalid tunnel version string '$ver'\n" if !$err;
+	    }
+	};
+	$err = $@ if !$err;
+
+	if ($err) {
+	    $self->finish_command_pipe($tunnel);
+	    die "can't open migration tunnel - $err";
+	}
+
+	$params->{url} = "$websocket_url?";
+	$tunnel->{params} = $params; # for forwarding
+
+	return $tunnel;
+    } else {
+	eval {
+	    $writer->reader();
+	    $reader->writer();
+	    PVE::Tools::run_command(
+		['proxmox-websocket-tunnel'],
+		input => "<&".fileno($writer),
+		output => ">&".fileno($reader),
+		errfunc => sub { my $line = shift; print "tunnel: $line\n"; },
+	    );
+	};
+	warn "CMD websocket tunnel died: $@\n" if $@;
+	exit 0;
+    }
+}
+
 sub finish_tunnel {
-    my ($self, $tunnel) = @_;
+    my ($self, $tunnel, $cleanup) = @_;
 
-    eval { $self->write_tunnel($tunnel, 30, 'quit'); };
+    $cleanup = $cleanup ? 1 : 0;
+
+    eval { $self->write_tunnel($tunnel, 30, 'quit', { cleanup => $cleanup }); };
     my $err = $@;
 
     $self->finish_command_pipe($tunnel, 30);
@@ -337,14 +508,19 @@ sub prepare {
 
     my $vollist = PVE::QemuServer::get_vm_volumes($conf);
 
+    my $storages = {};
     foreach my $volid (@$vollist) {
 	my ($sid, $volname) = PVE::Storage::parse_volume_id($volid, 1);
 
-	# check if storage is available on both nodes
 	my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
+	$storages->{$targetsid} = 1;
 
+	# check if storage is available on source node
 	my $scfg = PVE::Storage::storage_check_node($self->{storecfg}, $sid);
-	PVE::Storage::storage_check_node($self->{storecfg}, $targetsid, $self->{node});
+
+	# check if storage is available on target node
+	PVE::Storage::storage_check_node($self->{storecfg}, $targetsid, $self->{node})
+	    if !$self->{opts}->{remote};
 
 	if ($scfg->{shared}) {
 	    # PVE::Storage::activate_storage checks this for non-shared storages
@@ -354,10 +530,16 @@ sub prepare {
 	}
     }
 
-    # test ssh connection
-    my $cmd = [ @{$self->{rem_ssh}}, '/bin/true' ];
-    eval { $self->cmd_quiet($cmd); };
-    die "Can't connect to destination address using public key\n" if $@;
+    if ($self->{opts}->{remote}) {
+	# test & establish websocket connection
+	$self->{tunnel} = $self->fork_websocket_tunnel($storages);
+	print "websocket tunnel started\n";
+    } else {
+	# test ssh connection
+	my $cmd = [ @{$self->{rem_ssh}}, '/bin/true' ];
+	eval { $self->cmd_quiet($cmd); };
+	die "Can't connect to destination address using public key\n" if $@;
+    }
 
     return $running;
 }
@@ -395,7 +577,7 @@ sub sync_disks {
 	my @sids = PVE::Storage::storage_ids($storecfg);
 	foreach my $storeid (@sids) {
 	    my $scfg = PVE::Storage::storage_config($storecfg, $storeid);
-	    next if $scfg->{shared};
+	    next if $scfg->{shared} && !$self->{opts}->{remote};
 	    next if !PVE::Storage::storage_check_enabled($storecfg, $storeid, undef, 1);
 
 	    # get list from PVE::Storage (for unused volumes)
@@ -403,15 +585,17 @@ sub sync_disks {
 
 	    next if @{$dl->{$storeid}} == 0;
 
-	    my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $storeid);
-	    # check if storage is available on target node
-	    PVE::Storage::storage_check_node($storecfg, $targetsid, $self->{node});
+	    if (!$self->{opts}->{remote}) {
+		my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $storeid);
+		# check if storage is available on target node
+		PVE::Storage::storage_check_node($storecfg, $targetsid, $self->{node});
 
-	    # grandfather in existing mismatches
-	    if ($targetsid ne $storeid) {
-		my $target_scfg = PVE::Storage::storage_config($storecfg, $targetsid);
-		die "content type 'images' is not available on storage '$targetsid'\n"
-		    if !$target_scfg->{content}->{images};
+		# grandfather in existing mismatches
+		if ($targetsid ne $storeid) {
+		    my $target_scfg = PVE::Storage::storage_config($storecfg, $targetsid);
+		    die "content type 'images' is not available on storage '$targetsid'\n"
+			if !$target_scfg->{content}->{images};
+		}
 	    }
 
 	    PVE::Storage::foreach_volid($dl, sub {
@@ -456,12 +640,16 @@ sub sync_disks {
 
 	    my ($sid, $volname) = PVE::Storage::parse_volume_id($volid);
 
-	    my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
-	    # check if storage is available on both nodes
+	    # check if storage is available on source node
 	    my $scfg = PVE::Storage::storage_check_node($storecfg, $sid);
-	    PVE::Storage::storage_check_node($storecfg, $targetsid, $self->{node});
 
-	    return if $scfg->{shared};
+	    # check target storage on target node if intra-cluster migration
+	    if (!$self->{opts}->{remote}) {
+		my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
+		PVE::Storage::storage_check_node($storecfg, $targetsid, $self->{node});
+
+		return if $scfg->{shared};
+	    }
 
 	    $local_volumes->{$volid}->{ref} = $attr->{referenced_in_config} ? 'config' : 'snapshot';
 	    $local_volumes->{$volid}->{ref} = 'storage' if $attr->{is_unused};
@@ -543,6 +731,9 @@ sub sync_disks {
 
 	    my $migratable = $scfg->{type} =~ /^(?:dir|zfspool|lvmthin|lvm)$/;
 
+	    # TODO: what is this even here for?
+	    $migratable = 1 if $self->{opts}->{remote};
+
 	    die "can't migrate '$volid' - storage type '$scfg->{type}' not supported\n"
 		if !$migratable;
 
@@ -553,6 +744,9 @@ sub sync_disks {
 	}
 
 	if ($self->{replication_jobcfg}) {
+	    die "can't migrate VM with replicated volumes to remote cluster/node\n"
+		if $self->{opts}->{remote};
+
 	    if ($self->{running}) {
 
 		my $version = PVE::QemuServer::kvm_user_version();
@@ -615,6 +809,7 @@ sub sync_disks {
 
 	$self->log('info', "copying local disk images") if scalar(%$local_volumes);
 
+	my $forwarded = 0;
 	foreach my $volid (sort keys %$local_volumes) {
 	    my ($sid, $volname) = PVE::Storage::parse_volume_id($volid);
 	    my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
@@ -628,27 +823,121 @@ sub sync_disks {
 		next;
 	    } else {
 		next if $self->{replicated_volumes}->{$volid};
+
 		push @{$self->{volumes}}, $volid;
+		my $new_volid;
+		
 		my $opts = $self->{opts};
-		# use 'migrate' limit for transfer to other node
-		my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', [$targetsid, $sid], $opts->{bwlimit});
-		# JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
-		$bwlimit = $bwlimit * 1024 if defined($bwlimit);
-
-		my $storage_migrate_opts = {
-		    'ratelimit_bps' => $bwlimit,
-		    'insecure' => $opts->{migration_type} eq 'insecure',
-		    'with_snapshots' => $local_volumes->{$volid}->{snapshots},
-		    'allow_rename' => !$local_volumes->{$volid}->{is_vmstate},
-		};
-
-		my $logfunc = sub { $self->log('info', $_[0]); };
-		my $new_volid = eval {
-		    PVE::Storage::storage_migrate($storecfg, $volid, $self->{ssh_info},
-						  $targetsid, $storage_migrate_opts, $logfunc);
-		};
-		if (my $err = $@) {
-		    die "storage migration for '$volid' to storage '$targetsid' failed - $err\n";
+		if (my $remote = $opts->{remote}) {
+		    my $remote_vmid = $remote->{vmid};
+		    my (undef, $name, undef, undef, undef, undef, $format) = PVE::Storage::parse_volname($storecfg, $volid);
+		    my $scfg = PVE::Storage::storage_config($storecfg, $sid);
+		    PVE::Storage::activate_volumes($storecfg, [$volid]);
+
+		    # use 'migrate' limit for transfer to other node
+		    # TODO: get limit from remote cluster as well?
+		    my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', [$sid], $opts->{bwlimit});
+		    # JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
+		    $bwlimit = $bwlimit * 1024 if defined($bwlimit);
+
+		    my $with_snapshots = $local_volumes->{$volid}->{snapshots} ? 1 : 0;
+		    my $snapshot;
+		    if ($scfg->{type} eq 'zfspool') {
+			$snapshot = '__migration__';
+			$with_snapshots = 1;
+			PVE::Storage::volume_snapshot($storecfg, $volid, $snapshot);
+		    }
+
+		    if ($self->{vmid} != $remote_vmid) {
+			$name =~ s/-$self->{vmid}-/-$remote_vmid-/g;
+			$name =~ s/^$self->{vmid}\//$remote_vmid\//;
+		    }
+
+		    my @export_formats = PVE::Storage::volume_export_formats($storecfg, $volid, undef, undef, $with_snapshots);
+
+		    my $storage_migrate_opts = {
+			format => $format,
+			storage => $targetsid,
+			'with-snapshots' => $with_snapshots,
+			'allow-rename' => !$local_volumes->{$volid}->{is_vmstate},
+			'export-formats' => @export_formats,
+			volname => $name,
+		    };
+		    my $res = $self->write_tunnel($self->{tunnel}, 600, 'disk-import', $storage_migrate_opts);
+		    my $local = "/run/qemu-server/$self->{vmid}.storage";
+		    if (!$forwarded) {
+			$forward_unix_socket->($self, $local, $res->{socket});
+			$forwarded = 1;
+		    }
+		    my $socket = IO::Socket::UNIX->new(Peer => $local, Type => SOCK_STREAM())
+			or die "failed to connect to websocket tunnel at $local\n";
+		    # we won't be reading from the socket
+		    shutdown($socket, 0);
+		    my $send = ['pvesm', 'export', $volid, $res->{format}, '-', '-with-snapshots', $with_snapshots];
+		    push @$send, '-snapshot', $snapshot if $snapshot;
+
+		    my @cstream;
+		    if (defined($bwlimit)) {
+			@cstream = ([ '/usr/bin/cstream', '-t', $bwlimit ]);
+			$self->log('info', "using a bandwidth limit of $bwlimit bps for transferring '$volid'");
+		    }
+
+		    eval {
+			PVE::Tools::run_command(
+			    [$send, @cstream],
+			    output => '>&'.fileno($socket),
+			    errfunc => sub { my $line = shift; $self->log('warn', $line); },
+			);
+		    };
+		    my $send_error = $@;
+
+		    # don't close the connection entirely otherwise the
+		    # receiving end might not get all buffered data (and
+		    # fails with 'connection reset by peer')
+		    shutdown($socket, 1);
+
+		    # wait for the remote process to finish
+		    while ($res = $self->write_tunnel($self->{tunnel}, 10, 'query-disk-import')) {
+			if ($res->{status} eq 'pending') {
+			    $self->log('info', "waiting for disk import to finish..\n");
+			    sleep(1)
+			} elsif ($res->{status} eq 'complete') {
+			    $new_volid = $res->{volid};
+			    last;
+			} else {
+			    die "unknown query-disk-import result: $res->{status}\n";
+			}
+		    }
+
+		    # now close the socket
+		    close($socket);
+		    die $send_error if $send_error;
+		} else {
+		    # use 'migrate' limit for transfer to other node
+		    my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', [$sid], $opts->{bwlimit});
+		    # JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
+		    $bwlimit = $bwlimit * 1024 if defined($bwlimit);
+		    my $storage_migrate_opts = {
+			'ratelimit_bps' => $bwlimit,
+			'insecure' => $opts->{migration_type} eq 'insecure',
+			'with_snapshots' => $local_volumes->{$volid}->{snapshots},
+			'allow_rename' => !$local_volumes->{$volid}->{is_vmstate},
+		    };
+
+		    my $logfunc = sub { $self->log('info', $_[0]); };
+		    $new_volid = eval {
+			PVE::Storage::storage_migrate(
+			    $storecfg,
+			    $volid,
+			    $self->{ssh_info},
+			    $targetsid,
+			    $storage_migrate_opts,
+			    $logfunc,
+			);
+		    };
+		    if (my $err = $@) {
+			die "storage migration for '$volid' to storage '$targetsid' failed - $err\n";
+		    }
 		}
 
 		$self->{volume_map}->{$volid} = $new_volid;
@@ -667,6 +956,12 @@ sub sync_disks {
 sub cleanup_remotedisks {
     my ($self) = @_;
 
+    if ($self->{opts}->{remote}) {
+	$self->finish_tunnel($self, $self->{tunnel}, 1);
+	delete $self->{tunnel};
+	return;
+    }
+
     foreach my $target_drive (keys %{$self->{target_drive}}) {
 	my $drivestr = $self->{target_drive}->{$target_drive}->{drivestr};
 	next if !defined($drivestr);
@@ -714,8 +1009,71 @@ sub phase1 {
     # sync_disks fixes disk sizes to match their actual size, write changes so
     # target allocates correct volumes
     PVE::QemuConfig->write_config($vmid, $conf);
+
+    $self->phase1_remote($vmid) if $self->{opts}->{remote};
 };
 
+sub phase1_remote {
+    my ($self, $vmid) = @_;
+
+    my $remote_conf = PVE::QemuConfig->load_config($vmid);
+    PVE::QemuConfig->update_volume_ids($remote_conf, $self->{volume_map});
+
+    my $storage_map = $self->{opts}->{storagemap};
+    $self->{nbd} = {};
+    PVE::QemuConfig->foreach_volume($remote_conf, sub {
+	my ($ds, $drive) = @_;
+
+	# TODO eject CDROM?
+	return if PVE::QemuServer::drive_is_cdrom($drive);
+
+	my $volid = $drive->{file};
+	return if !$volid;
+
+	return if !grep { $_ eq $volid} @{$self->{online_local_volumes}};
+
+	my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
+	my $scfg = PVE::Storage::storage_config($self->{storecfg}, $storeid);
+	my $source_format = PVE::QemuServer::qemu_img_format($scfg, $volname);
+
+	# set by target cluster
+	my $oldvolid = delete $drive->{file};
+	delete $drive->{format};
+
+	my $targetsid = PVE::QemuServer::map_id($storage_map, $storeid);
+
+	my $params = {
+	    format => $source_format,
+	    storage => $targetsid,
+	    drive => $drive,
+	};
+
+	$self->log('info', "Allocating volume for drive '$ds' on remote storage '$targetsid'..");
+	my $res = $self->write_tunnel($self->{tunnel}, 600, 'disk', $params);
+
+	$self->log('info', "mapped: $oldvolid => $res->{volid}");
+	$remote_conf->{$ds} = $res->{drivestr};
+	$self->{nbd}->{$ds} = $res;
+    });
+
+    # TODO: check bridge availability earlier?
+    my $bridgemap = $self->{opts}->{bridgemap};
+    foreach my $opt (keys %$remote_conf) {
+	next if $opt !~ m/^net\d+$/;
+
+	next if !$remote_conf->{$opt};
+	my $d = PVE::QemuServer::parse_net($remote_conf->{$opt});
+	next if !$d || !$d->{bridge};
+
+	my $target_bridge = PVE::QemuServer::map_id($bridgemap, $d->{bridge});
+	$self->log('info', "mapped: $opt from $d->{bridge} to $target_bridge");
+	$d->{bridge} = $target_bridge;
+	$remote_conf->{$opt} = PVE::QemuServer::print_net($d);
+    }
+
+    $self->write_tunnel($self->{tunnel}, 10, 'config', { conf => $remote_conf });
+}
+
 sub phase1_cleanup {
     my ($self, $vmid, $err) = @_;
 
@@ -863,6 +1221,28 @@ sub phase2_start_local_cluster {
     return ($tunnel_info, $spice_port);
 }
 
+sub phase2_start_remote_cluster {
+    my ($self, $vmid, $params) = @_;
+
+    die "insecure migration to remote cluster not implemented\n"
+	if $params->{migrate_opts}->{type} ne 'websocket';
+
+    my $remote_vmid = $self->{opts}->{remote}->{vmid};
+
+    my $res = $self->write_tunnel($self->{tunnel}, 10, "start", $params);
+
+    foreach my $drive (keys %{$res->{drives}}) {
+	$self->{target_drive}->{$drive}->{drivestr} = $res->{drives}->{$drive}->{drivestr};
+	my $nbd_uri = $res->{drives}->{$drive}->{nbd_uri};
+	die "unexpected NBD uri for '$drive': $nbd_uri\n"
+	    if $nbd_uri !~ s!/run/qemu-server/$remote_vmid\_!/run/qemu-server/$vmid\_!;
+
+	$self->{target_drive}->{$drive}->{nbd_uri} = $nbd_uri;
+    }
+
+    return ($res->{migrate}, $res->{spice_port});
+}
+
 sub phase2 {
     my ($self, $vmid) = @_;
 
@@ -898,10 +1278,39 @@ sub phase2 {
 	},
     };
 
-    my ($tunnel_info, $spice_port) = $self->phase2_start_local_cluster($vmid, $params);
+    my ($tunnel_info, $spice_port);
+
+    if (my $remote = $self->{opts}->{remote}) {
+	my $remote_vmid = $remote->{vmid};
+	$params->{migrate_opts}->{remote_node} = $self->{node};
+	($tunnel_info, $spice_port) = $self->phase2_start_remote_cluster($vmid, $params);
+	die "only UNIX sockets are supported for remote migration\n"
+	    if $tunnel_info->{proto} ne 'unix';
+
+	my $forwarded = {};
+	my $remote_socket = $tunnel_info->{addr};
+	my $local_socket = $remote_socket;
+	$local_socket =~ s/$remote_vmid/$vmid/g;
+	$tunnel_info->{addr} = $local_socket;
+
+	$self->log('info', "forwarding migration socket '$local_socket' => '$remote_socket'");
+	$forward_unix_socket->($self, $local_socket, $remote_socket);
+	$forwarded->{$local_socket} = 1;
+
+	foreach my $remote_socket (@{$tunnel_info->{unix_sockets}}) {
+	    my $local_socket = $remote_socket;
+	    $local_socket =~ s/$remote_vmid/$vmid/g;
+	    next if $forwarded->{$local_socket};
+	    $self->log('info', "forwarding migration socket '$local_socket' => '$remote_socket'");
+	    $forward_unix_socket->($self, $local_socket, $remote_socket);
+	    $forwarded->{$local_socket} = 1;
+	}
+    } else {
+	($tunnel_info, $spice_port) = $self->phase2_start_local_cluster($vmid, $params);
 
-    $self->log('info', "start remote tunnel");
-    $self->start_remote_tunnel($tunnel_info);
+	$self->log('info', "start remote tunnel");
+	$self->start_remote_tunnel($tunnel_info);
+    }
 
     my $migrate_uri = "$tunnel_info->{proto}:$tunnel_info->{addr}";
     $migrate_uri .= ":$tunnel_info->{port}"
@@ -923,13 +1332,15 @@ sub phase2 {
 	    my $nbd_uri = $target->{nbd_uri};
 
 	    my $source_drive = PVE::QemuServer::parse_drive($drive, $conf->{$drive});
-	    my $target_drive = PVE::QemuServer::parse_drive($drive, $target->{drivestr});
-
 	    my $source_volid = $source_drive->{file};
-	    my $target_volid = $target_drive->{file};
-
 	    my $source_sid = PVE::Storage::Plugin::parse_volume_id($source_volid);
-	    my $target_sid = PVE::Storage::Plugin::parse_volume_id($target_volid);
+
+	    my ($target_volid, $target_sid);
+	    if (!$self->{opts}->{remote}) {
+		my $target_drive = PVE::QemuServer::parse_drive($drive, $target->{drivestr});
+		$target_volid = $target_drive->{file};
+		$target_sid = PVE::Storage::Plugin::parse_volume_id($target_volid);
+	    }
 
 	    my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', [$source_sid, $target_sid], $opt_bwlimit);
 	    my $bitmap = $target->{bitmap};
@@ -937,8 +1348,10 @@ sub phase2 {
 	    $self->log('info', "$drive: start migration to $nbd_uri");
 	    PVE::QemuServer::qemu_drive_mirror($vmid, $drive, $nbd_uri, $vmid, undef, $self->{storage_migration_jobs}, 'skip', undef, $bwlimit, $bitmap);
 
-	    $self->{volume_map}->{$source_volid} = $target_volid;
-	    $self->log('info', "volume '$source_volid' is '$target_volid' on the target\n");
+	    if ($target_volid) {
+		$self->{volume_map}->{$source_volid} = $target_volid;
+		$self->log('info', "volume '$source_volid' is '$target_volid' on the target\n");
+	    }
 	}
     }
 
@@ -995,7 +1408,7 @@ sub phase2 {
     };
     $self->log('info', "migrate-set-parameters error: $@") if $@;
 
-    if (PVE::QemuServer::vga_conf_has_spice($conf->{vga})) {
+    if (PVE::QemuServer::vga_conf_has_spice($conf->{vga} && !$self->{opts}->{remote})) {
 	my $rpcenv = PVE::RPCEnvironment::get();
 	my $authuser = $rpcenv->get_user();
 
@@ -1159,11 +1572,15 @@ sub phase2_cleanup {
 
     my $nodename = PVE::INotify::nodename();
 
-    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'stop', $vmid, '--skiplock', '--migratedfrom', $nodename];
-    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
-    if (my $err = $@) {
-        $self->log('err', $err);
-        $self->{errors} = 1;
+    if ($self->{tunnel} && $self->{tunnel}->{version} >= 2) {
+	$self->write_tunnel($self->{tunnel}, 10, 'stop');
+    } else {
+	my $cmd = [@{$self->{rem_ssh}}, 'qm', 'stop', $vmid, '--skiplock', '--migratedfrom', $nodename];
+	eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
+	if (my $err = $@) {
+	    $self->log('err', $err);
+	    $self->{errors} = 1;
+	}
     }
 
     # cleanup after stopping, otherwise disks might be in-use by target VM!
@@ -1188,6 +1605,8 @@ sub phase3 {
     my $volids = $self->{volumes};
     return if $self->{phase2errors};
 
+    return if $self->{opts}->{remote};
+
     # destroy local copies
     foreach my $volid (@$volids) {
 	eval { PVE::Storage::vdisk_free($self->{storecfg}, $volid); };
@@ -1220,7 +1639,7 @@ sub phase3_cleanup {
 	}
     }
 
-    if ($self->{volume_map}) {
+    if ($self->{volume_map} && !$self->{opts}->{remote}) {
 	my $target_drives = $self->{target_drive};
 
 	# FIXME: for NBD storage migration we now only update the volid, and
@@ -1237,26 +1656,35 @@ sub phase3_cleanup {
 
     # transfer replication state before move config
     $self->transfer_replication_state() if $self->{is_replicated};
-    PVE::QemuConfig->move_config_to_node($vmid, $self->{node});
+    if ($self->{opts}->{remote}) {
+	# TODO decide whether to remove or lock&keep here
+    } else {
+	PVE::QemuConfig->move_config_to_node($vmid, $self->{node});
+    }
     $self->switch_replication_job_target() if $self->{is_replicated};
 
     if ($self->{livemigration}) {
 	if ($self->{stopnbd}) {
 	    $self->log('info', "stopping NBD storage migration server on target.");
 	    # stop nbd server on remote vm - requirement for resume since 2.9
-	    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'nbdstop', $vmid];
+	    if ($tunnel && $tunnel->{version} && $tunnel->{version} >= 2) {
+		$self->write_tunnel($tunnel, 30, 'nbdstop');
+	    } else {
+		my $cmd = [@{$self->{rem_ssh}}, 'qm', 'nbdstop', $vmid];
 
-	    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
-	    if (my $err = $@) {
-		$self->log('err', $err);
-		$self->{errors} = 1;
+		eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
+		if (my $err = $@) {
+		    $self->log('err', $err);
+		    $self->{errors} = 1;
+		}
 	    }
 	}
 
 	# config moved and nbd server stopped - now we can resume vm on target
 	if ($tunnel && $tunnel->{version} && $tunnel->{version} >= 1) {
+	    my $cmd = $tunnel->{version} == 1 ? "resume $vmid" : "resume";
 	    eval {
-		$self->write_tunnel($tunnel, 30, "resume $vmid");
+		$self->write_tunnel($tunnel, 30, $cmd);
 	    };
 	    if (my $err = $@) {
 		$self->log('err', $err);
@@ -1276,18 +1704,21 @@ sub phase3_cleanup {
 	}
 
 	if ($self->{storage_migration} && PVE::QemuServer::parse_guest_agent($conf)->{fstrim_cloned_disks} && $self->{running}) {
+	    # TODO mtunnel command
 	    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'guest', 'cmd', $vmid, 'fstrim'];
 	    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
 	}
     }
 
     # close tunnel on successful migration, on error phase2_cleanup closed it
-    if ($tunnel) {
+    if ($tunnel && $tunnel->{version} == 1) {
 	eval { finish_tunnel($self, $tunnel);  };
 	if (my $err = $@) {
 	    $self->log('err', $err);
 	    $self->{errors} = 1;
 	}
+	$tunnel = undef;
+	delete $self->{tunnel};
     }
 
     eval {
@@ -1321,7 +1752,8 @@ sub phase3_cleanup {
 	$self->{errors} = 1;
     }
 
-    if($self->{storage_migration}) {
+    # TODO decide whether we want a "keep source VM" option for remote migration
+    if ($self->{storage_migration} && !$self->{opts}->{remote}) {
 	# destroy local copies
 	my $volids = $self->{online_local_volumes};
 
@@ -1340,8 +1772,14 @@ sub phase3_cleanup {
     }
 
     # clear migrate lock
-    my $cmd = [ @{$self->{rem_ssh}}, 'qm', 'unlock', $vmid ];
-    $self->cmd_logerr($cmd, errmsg => "failed to clear migrate lock");
+    if ($tunnel && $tunnel->{version} >= 2) {
+	$self->write_tunnel($tunnel, 10, "unlock");
+
+	$self->finish_tunnel($tunnel);
+    } else {
+	my $cmd = [ @{$self->{rem_ssh}}, 'qm', 'unlock', $vmid ];
+	$self->cmd_logerr($cmd, errmsg => "failed to clear migrate lock");
+    }
 }
 
 sub final_cleanup {
diff --git a/PVE/QemuServer.pm b/PVE/QemuServer.pm
index a131fc8..a713258 100644
--- a/PVE/QemuServer.pm
+++ b/PVE/QemuServer.pm
@@ -5094,7 +5094,11 @@ sub vm_start_nolock {
     my $defaults = load_defaults();
 
     # set environment variable useful inside network script
-    $ENV{PVE_MIGRATED_FROM} = $migratedfrom if $migratedfrom;
+    if ($migrate_opts->{remote_node}) {
+	$ENV{PVE_MIGRATED_FROM} = $migrate_opts->{remote_node};
+    } elsif ($migratedfrom) {
+	$ENV{PVE_MIGRATED_FROM} = $migratedfrom;
+    }
 
     PVE::GuestHelpers::exec_hookscript($conf, $vmid, 'pre-start', 1);
 
@@ -5310,7 +5314,7 @@ sub vm_start_nolock {
 
 	my $migrate_storage_uri;
 	# nbd_protocol_version > 0 for unix socket support
-	if ($nbd_protocol_version > 0 && $migration_type eq 'secure') {
+	if ($nbd_protocol_version > 0 && ($migration_type eq 'secure' || $migration_type eq 'websocket')) {
 	    my $socket_path = "/run/qemu-server/$vmid\_nbd.migrate";
 	    mon_cmd($vmid, "nbd-server-start", addr => { type => 'unix', data => { path => $socket_path } } );
 	    $migrate_storage_uri = "nbd:unix:$socket_path";
-- 
2.20.1






More information about the pve-devel mailing list