[pve-devel] [PATCH qemu-server 09/10] migrate: add remote migration handling

Fabian Grünbichler f.gruenbichler at proxmox.com
Thu Nov 11 13:25:46 CET 2021


On November 10, 2021 12:17 pm, Fabian Ebner wrote:
> Am 05.11.21 um 14:03 schrieb Fabian Grünbichler:
>> remote migration uses a websocket connection to a task worker running on
>> the target node instead of commands via SSH to control the migration.
>> this websocket tunnel is started earlier than the SSH tunnel, and allows
>> adding UNIX-socket forwarding over additional websocket connections
>> on-demand.
>> 
>> the main differences to regular intra-cluster migration are:
>> - source VM config and disks are only removed upon request via --delete
>> - shared storages are treated like local storages, since we can't
>> assume they are shared across clusters (with potentical to extend this
>> by marking storages as shared)
>> - NBD migrated disks are explicitly pre-allocated on the target node via
>> tunnel command before starting the target VM instance
>> - in addition to storages, network bridges and the VMID itself is
>> transformed via a user defined mapping
>> - all commands and migration data streams are sent via a WS tunnel proxy
>> 
>> Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
>> ---
>> 
>> Notes:
>>      requires proxmox-websocket-tunnel
>> 
>>   PVE/API2/Qemu.pm   |   4 +-
>>   PVE/QemuMigrate.pm | 647 +++++++++++++++++++++++++++++++++++++++------
>>   PVE/QemuServer.pm  |   8 +-
>>   3 files changed, 575 insertions(+), 84 deletions(-)
>> 
>> diff --git a/PVE/API2/Qemu.pm b/PVE/API2/Qemu.pm
>> index a1a1813..24f5b98 100644
>> --- a/PVE/API2/Qemu.pm
>> +++ b/PVE/API2/Qemu.pm
>> @@ -4610,7 +4610,7 @@ __PACKAGE__->register_method({
>>   		    # bump/reset both for breaking changes
>>   		    # bump tunnel only for opt-in changes
> 
> Sorry for asking about this on this patch: shouldn't opt-in changes bump 
> both?

yes. this was initially version and min version, then changed to version 
and age like with the storage plugins..

> 
>>   		    return {
>> -			api => 2,
>> +			api => $PVE::QemuMigrate::WS_TUNNEL_VERSION,
>>   			age => 0,
>>   		    };
>>   		},
>> @@ -4897,7 +4897,7 @@ __PACKAGE__->register_method({
>>   			    PVE::Firewall::remove_vmfw_conf($vmid);
>>   			}
>>   
>> -			if (my @volumes = keys $state->{cleanup}->{volumes}->$%) {
>> +			if (my @volumes = keys $state->{cleanup}->{volumes}->%*) {
>>   			    PVE::Storage::foreach_volid(@volumes, sub {
>>   				my ($volid, $sid, $volname, $d) = @_;
>>   
>> diff --git a/PVE/QemuMigrate.pm b/PVE/QemuMigrate.pm
>> index 07b56eb..7378551 100644
>> --- a/PVE/QemuMigrate.pm
>> +++ b/PVE/QemuMigrate.pm
>> @@ -7,9 +7,15 @@ use IO::File;
>>   use IPC::Open2;
>>   use POSIX qw( WNOHANG );
>>   use Time::HiRes qw( usleep );
>> +use JSON qw(encode_json decode_json);
>> +use IO::Socket::UNIX;
>> +use Socket qw(SOCK_STREAM);
>> +use Storable qw(dclone);
>> +use URI::Escape;
>>   
>> -use PVE::Format qw(render_bytes);
>> +use PVE::APIClient::LWP;
>>   use PVE::Cluster;
>> +use PVE::Format qw(render_bytes);
>>   use PVE::GuestHelpers qw(safe_boolean_ne safe_string_ne);
>>   use PVE::INotify;
>>   use PVE::RPCEnvironment;
>> @@ -30,6 +36,9 @@ use PVE::QemuServer;
>>   use PVE::AbstractMigrate;
>>   use base qw(PVE::AbstractMigrate);
>>   
>> +# compared against remote end's minimum version
>> +our $WS_TUNNEL_VERSION = 2;
>> +
>>   sub fork_command_pipe {
>>       my ($self, $cmd) = @_;
>>   
>> @@ -85,7 +94,7 @@ sub finish_command_pipe {
>>   	}
>>       }
>>   
>> -    $self->log('info', "ssh tunnel still running - terminating now with SIGTERM\n");
>> +    $self->log('info', "tunnel still running - terminating now with SIGTERM\n");
>>       kill(15, $cpid);
>>   
>>       # wait again
>> @@ -94,11 +103,11 @@ sub finish_command_pipe {
>>   	sleep(1);
>>       }
>>   
>> -    $self->log('info', "ssh tunnel still running - terminating now with SIGKILL\n");
>> +    $self->log('info', "tunnel still running - terminating now with SIGKILL\n");
>>       kill 9, $cpid;
>>       sleep 1;
>>   
>> -    $self->log('err', "ssh tunnel child process (PID $cpid) couldn't be collected\n")
>> +    $self->log('err', "tunnel child process (PID $cpid) couldn't be collected\n")
>>   	if !&$collect_child_process();
>>   }
>>   
>> @@ -115,18 +124,28 @@ sub read_tunnel {
>>       };
>>       die "reading from tunnel failed: $@\n" if $@;
>>   
>> -    chomp $output;
>> +    chomp $output if defined($output);
>>   
>>       return $output;
>>   }
>>   
>>   sub write_tunnel {
>> -    my ($self, $tunnel, $timeout, $command) = @_;
>> +    my ($self, $tunnel, $timeout, $command, $params) = @_;
>>   
>>       $timeout = 60 if !defined($timeout);
>>   
>>       my $writer = $tunnel->{writer};
>>   
>> +    if ($tunnel->{version} && $tunnel->{version} >= 2) {
>> +	my $object = defined($params) ? dclone($params) : {};
>> +	$object->{cmd} = $command;
>> +
>> +	$command = eval { JSON::encode_json($object) };
>> + > +	die "failed to encode command as JSON - $@\n"
>> +	    if $@;
>> +    }
>> +
>>       eval {
>>   	PVE::Tools::run_with_timeout($timeout, sub {
>>   	    print $writer "$command\n";
>> @@ -136,13 +155,29 @@ sub write_tunnel {
>>       die "writing to tunnel failed: $@\n" if $@;
>>   
>>       if ($tunnel->{version} && $tunnel->{version} >= 1) {
>> -	my $res = eval { $self->read_tunnel($tunnel, 10); };
>> +	my $res = eval { $self->read_tunnel($tunnel, $timeout); };
>>   	die "no reply to command '$command': $@\n" if $@;
>>   
>> -	if ($res eq 'OK') {
>> -	    return;
>> +	if ($tunnel->{version} == 1) {
>> +	    if ($res eq 'OK') {
>> +		return;
>> +	    } else {
>> +		die "tunnel replied '$res' to command '$command'\n";
>> +	    }
>>   	} else {
>> -	    die "tunnel replied '$res' to command '$command'\n";
>> +	    my $parsed = eval { JSON::decode_json($res) };
>> +	    die "failed to decode tunnel reply '$res' (command '$command') - $@\n"
>> +		if $@;
>> +
>> +	    if (!$parsed->{success}) {
>> +		if (defined($parsed->{msg})) {
>> +		    die "error - tunnel command '$command' failed - $parsed->{msg}\n";
>> +		} else {
>> +		    die "error - tunnel command '$command' failed\n";
>> +		}
>> +	    }
>> +
>> +	    return $parsed;
>>   	}
>>       }
>>   }
>> @@ -185,10 +220,150 @@ sub fork_tunnel {
>>       return $tunnel;
>>   }
>>   
>> +my $forward_unix_socket = sub {
>> +    my ($self, $local, $remote) = @_;
>> +
>> +    my $params = dclone($self->{tunnel}->{params});
>> +    $params->{unix} = $local;
>> +    $params->{url} = $params->{url} ."socket=$remote&";
>> +    $params->{ticket} = { path => $remote };
>> +
>> +    my $cmd = encode_json({
>> +	control => JSON::true,
>> +	cmd => 'forward',
>> +	data => $params,
>> +    });
>> +
>> +    my $writer = $self->{tunnel}->{writer};
>> +    eval {
>> +	unlink $local;
>> +	PVE::Tools::run_with_timeout(15, sub {
>> +	    print $writer "$cmd\n";
>> +	    $writer->flush();
>> +	});
>> +    };
>> +    die "failed to write forwarding command - $@\n" if $@;
>> +
>> +    $self->read_tunnel($self->{tunnel});
>> +
>> +    $self->log('info', "Forwarded local unix socket '$local' to remote '$remote' via websocket tunnel");
>> +};
>> +
>> +sub fork_websocket_tunnel {
>> +    my ($self, $storages) = @_;
>> +
>> +    my $remote = $self->{opts}->{remote};
>> +    my $conn = $remote->{conn};
>> +
>> +    my $websocket_url = "https://$conn->{host}:$conn->{port}/api2/json/nodes/$self->{node}/qemu/$remote->{vmid}/mtunnelwebsocket";
>> +
>> +    my $params = {
>> +	url => $websocket_url,
>> +    };
>> +
>> +    if (my $apitoken = $conn->{apitoken}) {
>> +	$params->{headers} = [["Authorization", "$apitoken"]];
>> +    } else {
>> +	die "can't connect to remote host without credentials\n";
>> +    }
>> +
>> +    if (my $fps = $conn->{cached_fingerprints}) {
>> +	$params->{fingerprint} = (keys %$fps)[0];
>> +    }
>> +
>> +    my $api_client = PVE::APIClient::LWP->new(%$conn);
>> +    my $storage_list = join(',', keys %$storages);
>> +    my $res = $api_client->post("/nodes/$self->{node}/qemu/$remote->{vmid}/mtunnel", { storages => $storage_list });
>> +    $self->log('info', "remote: started migration tunnel worker '$res->{upid}'");
>> +    $params->{url} .= "?ticket=".uri_escape($res->{ticket});
>> +    $params->{url} .= "&socket=$res->{socket}";
> 
> Nit: could also be escaped.
> 

done

>> +
>> +    my $reader = IO::Pipe->new();
>> +    my $writer = IO::Pipe->new();
>> +
>> +    my $cpid = fork();
>> +    if ($cpid) {
>> +	$writer->writer();
>> +	$reader->reader();
>> +	my $tunnel = { writer => $writer, reader => $reader, pid => $cpid };
>> +
>> +	eval {
>> +	    my $writer = $tunnel->{writer};
>> +	    my $cmd = encode_json({
>> +		control => JSON::true,
>> +		cmd => 'connect',
>> +		data => $params,
>> +	    });
>> +
>> +	    eval {
>> +		PVE::Tools::run_with_timeout(15, sub {
>> +		    print {$writer} "$cmd\n";
>> +		    $writer->flush();
>> +		});
>> +	    };
>> +	    die "failed to write tunnel connect command - $@\n" if $@;
>> +	};
>> +	die "failed to connect via WS: $@\n" if $@;
>> +
>> +	my $err;
>> +        eval {
>> +	    my $writer = $tunnel->{writer};
>> +	    my $cmd = encode_json({
>> +		cmd => 'version',
>> +	    });
>> +
>> +	    eval {
>> +		PVE::Tools::run_with_timeout(15, sub {
>> +		    print {$writer} "$cmd\n";
>> +		    $writer->flush();
>> +		});
>> +	    };
>> +	    $err = "failed to write tunnel version command - $@\n" if $@;
>> +	    my $res = $self->read_tunnel($tunnel, 10);
>> +	    $res = JSON::decode_json($res);
>> +	    my $version = $res->{api};
>> +
>> +	    if ($version =~ /^(\d+)$/) {
>> +		$tunnel->{version} = $1;
>> +		$tunnel->{age} = $res->{age};
>> +		$self->log('info', "tunnel info: $version\n");
>> +	    } else {
>> +		$err = "received invalid tunnel version string '$version'\n" if !$err;
>> +	    }
>> +	};
>> +	$err = $@ if !$err;
>> +
>> +	if ($err) {
>> +	    $self->finish_command_pipe($tunnel);
>> +	    die "can't open migration tunnel - $err";
>> +	}
>> +
>> +	$params->{url} = "$websocket_url?";
>> +	$tunnel->{params} = $params; # for forwarding
>> +
>> +	return $tunnel;
>> +    } else {
>> +	eval {
>> +	    $writer->reader();
>> +	    $reader->writer();
>> +	    PVE::Tools::run_command(
>> +		['proxmox-websocket-tunnel'],
>> +		input => "<&".fileno($writer),
>> +		output => ">&".fileno($reader),
>> +		errfunc => sub { my $line = shift; print "tunnel: $line\n"; },
>> +	    );
>> +	};
>> +	warn "CMD websocket tunnel died: $@\n" if $@;
>> +	exit 0;
>> +    }
>> +}
>> +
>>   sub finish_tunnel {
>> -    my ($self, $tunnel) = @_;
>> +    my ($self, $tunnel, $cleanup) = @_;
>>   
>> -    eval { $self->write_tunnel($tunnel, 30, 'quit'); };
>> +    $cleanup = $cleanup ? 1 : 0;
>> +
>> +    eval { $self->write_tunnel($tunnel, 30, 'quit', { cleanup => $cleanup }); };
>>       my $err = $@;
>>   
>>       $self->finish_command_pipe($tunnel, 30);
> 
> Nit: below here is
>      if (my $unix_sockets = $tunnel->{unix_sockets}) {
>          my $cmd = ['rm', '-f', @$unix_sockets];
>          PVE::Tools::run_command($cmd);
> 
>          # .. and just to be sure check on remote side
>          unshift @{$cmd}, @{$self->{rem_ssh}};
>          PVE::Tools::run_command($cmd);
>      }
> and if I'm not mistaken, $self->{rem_ssh} is undef for remote migration, 
> resulting in an undef warning and $cmd being executed twice locally.

doesn't hurt, but also not optimal. conditionalized in v2

>> @@ -338,23 +513,34 @@ sub prepare {
>>       }
>>   
>>       my $vollist = PVE::QemuServer::get_vm_volumes($conf);
>> +
>> +    my $storages = {};
>>       foreach my $volid (@$vollist) {
>>   	my ($sid, $volname) = PVE::Storage::parse_volume_id($volid, 1);
>>   
>> -	# check if storage is available on both nodes
>> +	# check if storage is available on source node
>>   	my $scfg = PVE::Storage::storage_check_enabled($storecfg, $sid);
>>   
>>   	my $targetsid = $sid;
>> -	# NOTE: we currently ignore shared source storages in mappings so skip here too for now
>> -	if (!$scfg->{shared}) {
>> +	# NOTE: local ignores shared mappings, remote maps them
>> +	if (!$scfg->{shared} || $self->{opts}->{remote}) {
>>   	    $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
>>   	}
>>   
>> -	my $target_scfg = PVE::Storage::storage_check_enabled($storecfg, $targetsid, $self->{node});
>> -	my ($vtype) = PVE::Storage::parse_volname($storecfg, $volid);
>> +	$storages->{$targetsid} = 1;
>> +
>> +	if (!$self->{opts}->{remote}) {
>> +	    # check if storage is available on target node
>> +	    my $target_scfg = PVE::Storage::storage_check_enabled(
>> +		$storecfg,
>> +		$targetsid,
>> +		$self->{node},
>> +	    );
>> +	    my ($vtype) = PVE::Storage::parse_volname($storecfg, $volid);
>>   
>> -	die "$volid: content type '$vtype' is not available on storage '$targetsid'\n"
>> -	    if !$target_scfg->{content}->{$vtype};
>> +	    die "$volid: content type '$vtype' is not available on storage '$targetsid'\n"
>> +		if !$target_scfg->{content}->{$vtype};
>> +	}
>>   
>>   	if ($scfg->{shared}) {
>>   	    # PVE::Storage::activate_storage checks this for non-shared storages
>> @@ -364,10 +550,23 @@ sub prepare {
>>   	}
>>       }
>>   
>> -    # test ssh connection
>> -    my $cmd = [ @{$self->{rem_ssh}}, '/bin/true' ];
>> -    eval { $self->cmd_quiet($cmd); };
>> -    die "Can't connect to destination address using public key\n" if $@;
>> +    if ($self->{opts}->{remote}) {
>> +	# test & establish websocket connection
>> +	my $tunnel = $self->fork_websocket_tunnel($storages);
>> +	my $min_version = $tunnel->{version} - $tunnel->{age};
>> +	die "Remote tunnel endpoint not compatible, upgrade required (current: $WS_TUNNEL_VERSION, required: $min_version)\n"
>> +	    if $WS_TUNNEL_VERSION < $min_version;
>> +	 die "Remote tunnel endpoint too old, upgrade required (local: $WS_TUNNEL_VERSION, remote: $tunnel->{version})"
> 
> Nit: missing '\n' in error, and while we're at it: style nit for >100 
> character lines (are not the only instances in the series).
> 

fixed by switching to three info log statements, and shorter error 
messages without the versions

>> +	    if $WS_TUNNEL_VERSION > $tunnel->{version};
>> +
>> +	print "websocket tunnel started\n";
>> +	$self->{tunnel} = $tunnel;
>> +    } else {
>> +	# test ssh connection
>> +	my $cmd = [ @{$self->{rem_ssh}}, '/bin/true' ];
>> +	eval { $self->cmd_quiet($cmd); };
>> +	die "Can't connect to destination address using public key\n" if $@;
>> +    }
>>   
>>       return $running;
>>   }
>> @@ -405,7 +604,7 @@ sub scan_local_volumes {
>>   	my @sids = PVE::Storage::storage_ids($storecfg);
>>   	foreach my $storeid (@sids) {
>>   	    my $scfg = PVE::Storage::storage_config($storecfg, $storeid);
>> -	    next if $scfg->{shared};
>> +	    next if $scfg->{shared} && !$self->{opts}->{remote};
>>   	    next if !PVE::Storage::storage_check_enabled($storecfg, $storeid, undef, 1);
>>   
>>   	    # get list from PVE::Storage (for unused volumes)
>> @@ -414,19 +613,24 @@ sub scan_local_volumes {
>>   	    next if @{$dl->{$storeid}} == 0;
>>   
>>   	    my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $storeid);
>> -	    # check if storage is available on target node
>> -	    my $target_scfg = PVE::Storage::storage_check_enabled(
>> -		$storecfg,
>> -		$targetsid,
>> -		$self->{node},
>> -	    );
>> -
>> -	    die "content type 'images' is not available on storage '$targetsid'\n"
>> -		if !$target_scfg->{content}->{images};
>> +	    my $bwlimit_sids = [$storeid];
>> +	    if (!$self->{opts}->{remote}) {
>> +		# check if storage is available on target node
>> +		my $target_scfg = PVE::Storage::storage_check_enabled(
>> +		    $storecfg,
>> +		    $targetsid,
>> +		    $self->{node},
>> +		);
>> +
>> +		die "content type 'images' is not available on storage '$targetsid'\n"
>> +		    if !$target_scfg->{content}->{images};
>> +
>> +		push @$bwlimit_sids, $targetsid;
>> +	    }
>>   
>>   	    my $bwlimit = PVE::Storage::get_bandwidth_limit(
>>   		'migration',
>> -		[$targetsid, $storeid],
>> +		$bwlimit_sids,
>>   		$self->{opts}->{bwlimit},
>>   	    );
>>   
>> @@ -482,14 +686,17 @@ sub scan_local_volumes {
>>   	    my $scfg = PVE::Storage::storage_check_enabled($storecfg, $sid);
>>   
>>   	    my $targetsid = $sid;
>> -	    # NOTE: we currently ignore shared source storages in mappings so skip here too for now
>> -	    if (!$scfg->{shared}) {
>> +	    # NOTE: local ignores shared mappings, remote maps them
>> +	    if (!$scfg->{shared} || $self->{opts}->{remote}) {
>>   		$targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
>>   	    }
>>   
>> -	    PVE::Storage::storage_check_enabled($storecfg, $targetsid, $self->{node});
>> +	    # check target storage on target node if intra-cluster migration
>> +	    if (!$self->{opts}->{remote}) {
>> +		PVE::Storage::storage_check_enabled($storecfg, $targetsid, $self->{node});
>>   
>> -	    return if $scfg->{shared};
>> +		return if $scfg->{shared};
>> +	    }
>>   
>>   	    $local_volumes->{$volid}->{ref} = $attr->{referenced_in_config} ? 'config' : 'snapshot';
>>   	    $local_volumes->{$volid}->{ref} = 'storage' if $attr->{is_unused};
>> @@ -578,6 +785,9 @@ sub scan_local_volumes {
>>   
>>   	    my $migratable = $scfg->{type} =~ /^(?:dir|btrfs|zfspool|lvmthin|lvm)$/;
>>   
>> +	    # TODO: what is this even here for?
>> +	    $migratable = 1 if $self->{opts}->{remote};
>> +
>>   	    die "can't migrate '$volid' - storage type '$scfg->{type}' not supported\n"
>>   		if !$migratable;
>>   
>> @@ -612,6 +822,10 @@ sub handle_replication {
>>       my $local_volumes = $self->{local_volumes};
>>   
>>       return if !$self->{replication_jobcfg};
>> +
>> +    die "can't migrate VM with replicated volumes to remote cluster/node\n"
>> +	if $self->{opts}->{remote};
> 
> We can add that later, asserting that no local removal will happen ;)
> Same for being a base VM referenced by a linked clone.
> 
>> +
>>       if ($self->{running}) {
>>   
>>   	my $version = PVE::QemuServer::kvm_user_version();
>> @@ -709,26 +923,133 @@ sub sync_offline_local_volumes {
>>       my $opts = $self->{opts};
>>   
>>       $self->log('info', "copying local disk images") if scalar(@volids);
>> -
>> +    my $forwarded = 0;
>>       foreach my $volid (@volids) {
>>   	my $targetsid = $local_volumes->{$volid}->{targetsid};
>> -	my $bwlimit = $local_volumes->{$volid}->{bwlimit};
>> -	$bwlimit = $bwlimit * 1024 if defined($bwlimit); # storage_migrate uses bps
>> -
>> -	my $storage_migrate_opts = {
>> -	    'ratelimit_bps' => $bwlimit,
>> -	    'insecure' => $opts->{migration_type} eq 'insecure',
>> -	    'with_snapshots' => $local_volumes->{$volid}->{snapshots},
>> -	    'allow_rename' => !$local_volumes->{$volid}->{is_vmstate},
>> -	};
>>   
>> -	my $logfunc = sub { $self->log('info', $_[0]); };
>> -	my $new_volid = eval {
>> -	    PVE::Storage::storage_migrate($storecfg, $volid, $self->{ssh_info},
>> -					  $targetsid, $storage_migrate_opts, $logfunc);
>> -	};
>> -	if (my $err = $@) {
>> -	    die "storage migration for '$volid' to storage '$targetsid' failed - $err\n";
>> +	my $new_volid;
>> +	
> 
> Style nit: whitespace error
> 
>> +	my $opts = $self->{opts};
>> +	if (my $remote = $opts->{remote}) {
>> +	    my $remote_vmid = $remote->{vmid};
>> +	    my ($sid, undef) = PVE::Storage::parse_volume_id($volid);
>> +	    my (undef, $name, undef, undef, undef, undef, $format) = PVE::Storage::parse_volname($storecfg, $volid);
>> +	    my $scfg = PVE::Storage::storage_config($storecfg, $sid);
>> +	    PVE::Storage::activate_volumes($storecfg, [$volid]);
>> +
>> +	    # use 'migrate' limit for transfer to other node
>> +	    my $bwlimit_opts = {
>> +		storage => $targetsid,
>> +		bwlimit => $opts->{bwlimit},
>> +	    };
>> +	    my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', [$sid], $opts->{bwlimit});
> 
> Nit: could use
>      my $bwlimit = $local_volumes->{$volid}->{bwlimit};
> 
>> +	    my $remote_bwlimit = $self->write_tunnel($self->{tunnel}, 10, 'bwlimit', $bwlimit_opts);
>> +	    $remote_bwlimit = $remote_bwlimit->{bwlimit};
>> +	    if (defined($remote_bwlimit)) {
>> +		$bwlimit = $remote_bwlimit if !defined($bwlimit);
>> +		$bwlimit = $remote_bwlimit if $remote_bwlimit < $bwlimit;
>> +	    }
>> +
>> +	    # JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
>> +	    $bwlimit = $bwlimit * 1024 if defined($bwlimit);
>> +
>> +	    my $with_snapshots = $local_volumes->{$volid}->{snapshots} ? 1 : 0;
>> +	    my $snapshot;
>> +	    if ($scfg->{type} eq 'zfspool') {
>> +		$snapshot = '__migration__';
>> +		$with_snapshots = 1;
>> +		PVE::Storage::volume_snapshot($storecfg, $volid, $snapshot);
>> +	    }
>> +
>> +	    if ($self->{vmid} != $remote_vmid) {
>> +		$name =~ s/-$self->{vmid}-/-$remote_vmid-/g;
>> +		$name =~ s/^$self->{vmid}\//$remote_vmid\//;
>> +	    }
>> +
>> +	    my @export_formats = PVE::Storage::volume_export_formats($storecfg, $volid, undef, undef, $with_snapshots);
>> +
>> +	    my $storage_migrate_opts = {
> 
> Nit: maybe call it disk_import_opts
> 
>> +		format => $format,
>> +		storage => $targetsid,
>> +		'with-snapshots' => $with_snapshots,
>> +		'allow-rename' => !$local_volumes->{$volid}->{is_vmstate},
>> +		'export-formats' => @export_formats,
> 
> Doesn't this need to be converted to a string?
> 

yes, but it seems it works without when there's just a single entry, 
which is true for everything but btrfs :)

>> +		volname => $name,
>> +	    };
>> +	    my $res = $self->write_tunnel($self->{tunnel}, 600, 'disk-import', $storage_migrate_opts);
>> +	    my $local = "/run/qemu-server/$self->{vmid}.storage";
>> +	    if (!$forwarded) {
>> +		$forward_unix_socket->($self, $local, $res->{socket});
>> +		$forwarded = 1;
>> +	    }
>> +	    my $socket = IO::Socket::UNIX->new(Peer => $local, Type => SOCK_STREAM())
>> +		or die "failed to connect to websocket tunnel at $local\n";
>> +	    # we won't be reading from the socket
>> +	    shutdown($socket, 0);
>> +	    my $send = ['pvesm', 'export', $volid, $res->{format}, '-', '-with-snapshots', $with_snapshots];
>> +	    push @$send, '-snapshot', $snapshot if $snapshot;
>> +
>> +	    my @cstream;
>> +	    if (defined($bwlimit)) {
>> +		@cstream = ([ '/usr/bin/cstream', '-t', $bwlimit ]);
>> +		$self->log('info', "using a bandwidth limit of $bwlimit bps for transferring '$volid'");
>> +	    }
>> +
>> +	    eval {
>> +		PVE::Tools::run_command(
>> +		    [$send, @cstream],
>> +		    output => '>&'.fileno($socket),
>> +		    errfunc => sub { my $line = shift; $self->log('warn', $line); },
>> +		);
>> +	    };
>> +	    my $send_error = $@;
>> +
>> +	    # don't close the connection entirely otherwise the
>> +	    # receiving end might not get all buffered data (and
>> +	    # fails with 'connection reset by peer')
>> +	    shutdown($socket, 1);
>> +
>> +	    # wait for the remote process to finish
>> +	    while ($res = $self->write_tunnel($self->{tunnel}, 10, 'query-disk-import')) {
>> +		if ($res->{status} eq 'pending') {
>> +		    $self->log('info', "waiting for disk import to finish..\n");
>> +		    sleep(1)
>> +		} elsif ($res->{status} eq 'complete') {
>> +		    $new_volid = $res->{volid};
>> +		    last;
>> +		} else {
>> +		    die "unknown query-disk-import result: $res->{status}\n";
>> +		}
>> +	    }
>> +
>> +	    # now close the socket
>> +	    close($socket);
>> +	    die $send_error if $send_error;
>> +	} else {
>> +	    my $bwlimit = $local_volumes->{$volid}->{bwlimit};
>> +	    $bwlimit = $bwlimit * 1024 if defined($bwlimit); # storage_migrate uses bps
>> +
>> +	    my $storage_migrate_opts = {
>> +		'ratelimit_bps' => $bwlimit,
>> +		'insecure' => $opts->{migration_type} eq 'insecure',
>> +		'with_snapshots' => $local_volumes->{$volid}->{snapshots},
>> +		'allow_rename' => !$local_volumes->{$volid}->{is_vmstate},
>> +	    };
>> +
>> +	    my $logfunc = sub { $self->log('info', $_[0]); };
>> +	    $new_volid = eval {
>> +		PVE::Storage::storage_migrate(
>> +		    $storecfg,
>> +		    $volid,
>> +		    $self->{ssh_info},
>> +		    $targetsid,
>> +		    $storage_migrate_opts,
>> +		    $logfunc,
>> +		);
>> +	    };
>> +	    if (my $err = $@) {
>> +		die "storage migration for '$volid' to storage '$targetsid' failed - $err\n";
>> +	    }
>>   	}
>>   
>>   	$self->{volume_map}->{$volid} = $new_volid;
>> @@ -744,6 +1065,12 @@ sub sync_offline_local_volumes {
>>   sub cleanup_remotedisks {
>>       my ($self) = @_;
>>   
> 
> Nit, not to be taken seriously: cleanup_remotedisks_and_maybe_tunnel ;)
> 
>> +    if ($self->{opts}->{remote}) {
>> +	$self->finish_tunnel($self->{tunnel}, 1);
>> +	delete $self->{tunnel};
>> +	return;
>> +    }
>> +
>>       my $local_volumes = $self->{local_volumes};
>>   
>>       foreach my $volid (values %{$self->{volume_map}}) {
>> @@ -793,8 +1120,84 @@ sub phase1 {
>>       $self->handle_replication($vmid);
>>   
>>       $self->sync_offline_local_volumes();
>> +    $self->phase1_remote($vmid) if $self->{opts}->{remote};
>>   };
>>   
>> +sub phase1_remote {
>> +    my ($self, $vmid) = @_;
>> +
>> +    my $remote_conf = PVE::QemuConfig->load_config($vmid);
>> +    PVE::QemuConfig->update_volume_ids($remote_conf, $self->{volume_map});
>> +
>> +    # TODO: check bridge availability earlier?
>> +    my $bridgemap = $self->{opts}->{bridgemap};
>> +    foreach my $opt (keys %$remote_conf) {
>> +	next if $opt !~ m/^net\d+$/;
>> +
>> +	next if !$remote_conf->{$opt};
>> +	my $d = PVE::QemuServer::parse_net($remote_conf->{$opt});
>> +	next if !$d || !$d->{bridge};
>> +
>> +	my $target_bridge = PVE::QemuServer::map_id($bridgemap, $d->{bridge});
>> +	$self->log('info', "mapped: $opt from $d->{bridge} to $target_bridge");
>> +	$d->{bridge} = $target_bridge;
>> +	$remote_conf->{$opt} = PVE::QemuServer::print_net($d);
>> +    }
>> +
>> +    my @online_local_volumes = $self->filter_local_volumes('online');
>> +
>> +    my $storage_map = $self->{opts}->{storagemap};
>> +    $self->{nbd} = {};
>> +    PVE::QemuConfig->foreach_volume($remote_conf, sub {
>> +	my ($ds, $drive) = @_;
>> +
>> +	# TODO eject CDROM?
>> +	return if PVE::QemuServer::drive_is_cdrom($drive);
>> +
>> +	my $volid = $drive->{file};
>> +	return if !$volid;
>> +
>> +	return if !grep { $_ eq $volid} @online_local_volumes;
>> +
>> +	my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
>> +	my $scfg = PVE::Storage::storage_config($self->{storecfg}, $storeid);
>> +	my $source_format = PVE::QemuServer::qemu_img_format($scfg, $volname);
>> +
>> +	# set by target cluster
>> +	my $oldvolid = delete $drive->{file};
>> +	delete $drive->{format};
>> +
>> +	my $targetsid = PVE::QemuServer::map_id($storage_map, $storeid);
>> +
>> +	my $params = {
>> +	    format => $source_format,
>> +	    storage => $targetsid,
>> +	    drive => $drive,
>> +	};
>> +
>> +	$self->log('info', "Allocating volume for drive '$ds' on remote storage '$targetsid'..");
>> +	my $res = $self->write_tunnel($self->{tunnel}, 600, 'disk', $params);
>> +
>> +	$self->log('info', "volume '$oldvolid' os '$res->{volid}' on the target\n");
>> +	$remote_conf->{$ds} = $res->{drivestr};
>> +	$self->{nbd}->{$ds} = $res;
>> +    });
>> +
>> +    my $conf_str = PVE::QemuServer::write_vm_config("remote", $remote_conf);
>> +
>> +    # TODO expose in PVE::Firewall?
>> +    my $vm_fw_conf_path = "/etc/pve/firewall/$vmid.fw";
>> +    my $fw_conf_str;
>> +    $fw_conf_str = PVE::Tools::file_get_contents($vm_fw_conf_path)
>> +	if -e $vm_fw_conf_path;
>> +    my $params = {
>> +	conf => $conf_str,
>> +	'firewall-config' => $fw_conf_str,
>> +    };
>> +
>> +    $self->write_tunnel($self->{tunnel}, 10, 'config', $params);
>> +}
>> +
>>   sub phase1_cleanup {
>>       my ($self, $vmid, $err) = @_;
>>   
>> @@ -825,7 +1228,6 @@ sub phase2_start_local_cluster {
>>       my $local_volumes = $self->{local_volumes};
>>       my @online_local_volumes = $self->filter_local_volumes('online');
>>   
>> -    $self->{storage_migration} = 1 if scalar(@online_local_volumes);
>>       my $start = $params->{start_params};
>>       my $migrate = $params->{migrate_opts};
>>   
>> @@ -948,10 +1350,34 @@ sub phase2_start_local_cluster {
>>       return ($tunnel_info, $spice_port);
>>   }
>>   
>> +sub phase2_start_remote_cluster {
>> +    my ($self, $vmid, $params) = @_;
>> +
>> +    die "insecure migration to remote cluster not implemented\n"
>> +	if $params->{migrate_opts}->{type} ne 'websocket';
>> +
>> +    my $remote_vmid = $self->{opts}->{remote}->{vmid};
>> +
>> +    my $res = $self->write_tunnel($self->{tunnel}, 10, "start", $params);
>> +
>> +    foreach my $drive (keys %{$res->{drives}}) {
>> +	$self->{stopnbd} = 1;
>> +	$self->{target_drive}->{$drive}->{drivestr} = $res->{drives}->{$drive}->{drivestr};
>> +	my $nbd_uri = $res->{drives}->{$drive}->{nbd_uri};
>> +	die "unexpected NBD uri for '$drive': $nbd_uri\n"
>> +	    if $nbd_uri !~ s!/run/qemu-server/$remote_vmid\_!/run/qemu-server/$vmid\_!;
>> +
>> +	$self->{target_drive}->{$drive}->{nbd_uri} = $nbd_uri;
>> +    }
>> +
>> +    return ($res->{migrate}, $res->{spice_port});
>> +}
>> +
>>   sub phase2 {
>>       my ($self, $vmid) = @_;
>>   
>>       my $conf = $self->{vmconf};
>> +    my $local_volumes = $self->{local_volumes};
>>   
>>       # version > 0 for unix socket support
>>       my $nbd_protocol_version = 1;
>> @@ -983,10 +1409,42 @@ sub phase2 {
>>   	},
>>       };
>>   
>> -    my ($tunnel_info, $spice_port) = $self->phase2_start_local_cluster($vmid, $params);
>> +    my ($tunnel_info, $spice_port);
>> +
>> +    my @online_local_volumes = $self->filter_local_volumes('online');
>> +    $self->{storage_migration} = 1 if scalar(@online_local_volumes);
>> +
>> +    if (my $remote = $self->{opts}->{remote}) {
>> +	my $remote_vmid = $remote->{vmid};
>> +	$params->{migrate_opts}->{remote_node} = $self->{node};
>> +	($tunnel_info, $spice_port) = $self->phase2_start_remote_cluster($vmid, $params);
>> +	die "only UNIX sockets are supported for remote migration\n"
>> +	    if $tunnel_info->{proto} ne 'unix';
>> +
>> +	my $forwarded = {};
>> +	my $remote_socket = $tunnel_info->{addr};
>> +	my $local_socket = $remote_socket;
>> +	$local_socket =~ s/$remote_vmid/$vmid/g;
>> +	$tunnel_info->{addr} = $local_socket;
>> +
>> +	$self->log('info', "Setting up tunnel for '$local_socket'");
>> +	$forward_unix_socket->($self, $local_socket, $remote_socket);
>> +	$forwarded->{$local_socket} = 1;
>> +
>> +	foreach my $remote_socket (@{$tunnel_info->{unix_sockets}}) {
>> +	    my $local_socket = $remote_socket;
>> +	    $local_socket =~ s/$remote_vmid/$vmid/g;
>> +	    next if $forwarded->{$local_socket};
>> +	    $self->log('info', "Setting up tunnel for '$local_socket'");
>> +	    $forward_unix_socket->($self, $local_socket, $remote_socket);
>> +	    $forwarded->{$local_socket} = 1;
>> +	}
>> +    } else {
>> +	($tunnel_info, $spice_port) = $self->phase2_start_local_cluster($vmid, $params);
>>   
>> -    $self->log('info', "start remote tunnel");
>> -    $self->start_remote_tunnel($tunnel_info);
>> +	$self->log('info', "start remote tunnel");
>> +	$self->start_remote_tunnel($tunnel_info);
>> +    }
>>   
>>       my $migrate_uri = "$tunnel_info->{proto}:$tunnel_info->{addr}";
>>       $migrate_uri .= ":$tunnel_info->{port}"
>> @@ -996,8 +1454,6 @@ sub phase2 {
>>   	$self->{storage_migration_jobs} = {};
>>   	$self->log('info', "starting storage migration");
>>   
>> -	my @online_local_volumes = $self->filter_local_volumes('online');
>> -
>>   	die "The number of local disks does not match between the source and the destination.\n"
>>   	    if (scalar(keys %{$self->{target_drive}}) != scalar(@online_local_volumes));
>>   	foreach my $drive (keys %{$self->{target_drive}}){
>> @@ -1070,7 +1526,7 @@ sub phase2 {
>>       };
>>       $self->log('info', "migrate-set-parameters error: $@") if $@;
>>   
>> -    if (PVE::QemuServer::vga_conf_has_spice($conf->{vga})) {
>> +    if (PVE::QemuServer::vga_conf_has_spice($conf->{vga} && !$self->{opts}->{remote})) {
>>   	my $rpcenv = PVE::RPCEnvironment::get();
>>   	my $authuser = $rpcenv->get_user();
>>   
>> @@ -1267,11 +1723,15 @@ sub phase2_cleanup {
>>   
>>       my $nodename = PVE::INotify::nodename();
>>   
>> -    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'stop', $vmid, '--skiplock', '--migratedfrom', $nodename];
>> -    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
>> -    if (my $err = $@) {
>> -        $self->log('err', $err);
>> -        $self->{errors} = 1;
>> +    if ($self->{tunnel} && $self->{tunnel}->{version} >= 2) {
>> +	$self->write_tunnel($self->{tunnel}, 10, 'stop');
>> +    } else {
>> +	my $cmd = [@{$self->{rem_ssh}}, 'qm', 'stop', $vmid, '--skiplock', '--migratedfrom', $nodename];
>> +	eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
>> +	if (my $err = $@) {
>> +	    $self->log('err', $err);
>> +	    $self->{errors} = 1;
>> +	}
>>       }
>>   
>>       # cleanup after stopping, otherwise disks might be in-use by target VM!
>> @@ -1304,7 +1764,7 @@ sub phase3_cleanup {
>>   
>>       my $tunnel = $self->{tunnel};
>>   
>> -    if ($self->{volume_map}) {
>> +    if ($self->{volume_map} && !$self->{opts}->{remote}) {
>>   	my $target_drives = $self->{target_drive};
>>   
>>   	# FIXME: for NBD storage migration we now only update the volid, and
>> @@ -1321,26 +1781,33 @@ sub phase3_cleanup {
>>   
>>       # transfer replication state before move config
>>       $self->transfer_replication_state() if $self->{is_replicated};
>> -    PVE::QemuConfig->move_config_to_node($vmid, $self->{node});
>> +    if (!$self->{opts}->{remote}) {
>> +	PVE::QemuConfig->move_config_to_node($vmid, $self->{node});
>> +    }
>>       $self->switch_replication_job_target() if $self->{is_replicated};
> 
> All three lines could/should be guarded by the if.
> 

true (doesn't matter now since we error out on replicated VMs anyway, 
but makes this more obvious when we change that face ;))

>>   
>>       if ($self->{livemigration}) {
>>   	if ($self->{stopnbd}) {
>>   	    $self->log('info', "stopping NBD storage migration server on target.");
>>   	    # stop nbd server on remote vm - requirement for resume since 2.9
>> -	    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'nbdstop', $vmid];
>> +	    if ($tunnel && $tunnel->{version} && $tunnel->{version} >= 2) {
>> +		$self->write_tunnel($tunnel, 30, 'nbdstop');
>> +	    } else {
>> +		my $cmd = [@{$self->{rem_ssh}}, 'qm', 'nbdstop', $vmid];
>>   
>> -	    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
>> -	    if (my $err = $@) {
>> -		$self->log('err', $err);
>> -		$self->{errors} = 1;
>> +		eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
>> +		if (my $err = $@) {
>> +		    $self->log('err', $err);
>> +		    $self->{errors} = 1;
>> +		}
>>   	    }
>>   	}
>>   
>>   	# config moved and nbd server stopped - now we can resume vm on target
>>   	if ($tunnel && $tunnel->{version} && $tunnel->{version} >= 1) {
>> +	    my $cmd = $tunnel->{version} == 1 ? "resume $vmid" : "resume";
>>   	    eval {
>> -		$self->write_tunnel($tunnel, 30, "resume $vmid");
>> +		$self->write_tunnel($tunnel, 30, $cmd);
>>   	    };
>>   	    if (my $err = $@) {
>>   		$self->log('err', $err);
>> @@ -1360,18 +1827,24 @@ sub phase3_cleanup {
>>   	}
>>   
>>   	if ($self->{storage_migration} && PVE::QemuServer::parse_guest_agent($conf)->{fstrim_cloned_disks} && $self->{running}) {
>> -	    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'guest', 'cmd', $vmid, 'fstrim'];
>> -	    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
>> +	    if ($self->{opts}->{remote}) {
>> +		$self->write_tunnel($self->{tunnel}, 600, 'fstrim');
>> +	    } else {
>> +		my $cmd = [@{$self->{rem_ssh}}, 'qm', 'guest', 'cmd', $vmid, 'fstrim'];
>> +		eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
>> +	    }
>>   	}
>>       }
>>   
>>       # close tunnel on successful migration, on error phase2_cleanup closed it
>> -    if ($tunnel) {
>> +    if ($tunnel && $tunnel->{version} == 1) {
>>   	eval { finish_tunnel($self, $tunnel);  };
>>   	if (my $err = $@) {
>>   	    $self->log('err', $err);
>>   	    $self->{errors} = 1;
>>   	}
>> +	$tunnel = undef;
>> +	delete $self->{tunnel};
>>       }
>>   
>>       eval {
>> @@ -1409,6 +1882,9 @@ sub phase3_cleanup {
>>   
>>       # destroy local copies
>>       foreach my $volid (@not_replicated_volumes) {
>> +	# remote is cleaned up below
>> +	next if $self->{opts}->{remote};
>> +
>>   	eval { PVE::Storage::vdisk_free($self->{storecfg}, $volid); };
>>   	if (my $err = $@) {
>>   	    $self->log('err', "removing local copy of '$volid' failed - $err");
>> @@ -1418,8 +1894,19 @@ sub phase3_cleanup {
>>       }
>>   
>>       # clear migrate lock
>> -    my $cmd = [ @{$self->{rem_ssh}}, 'qm', 'unlock', $vmid ];
>> -    $self->cmd_logerr($cmd, errmsg => "failed to clear migrate lock");
>> +    if ($tunnel && $tunnel->{version} >= 2) {
>> +	$self->write_tunnel($tunnel, 10, "unlock");
>> +
>> +	$self->finish_tunnel($tunnel);
>> +    } else {
>> +	my $cmd = [ @{$self->{rem_ssh}}, 'qm', 'unlock', $vmid ];
>> +	$self->cmd_logerr($cmd, errmsg => "failed to clear migrate lock");
>> +    }
>> +
>> +    if ($self->{opts}->{remote} && $self->{opts}->{delete}) {
>> +	eval { PVE::QemuServer::destroy_vm($self->{storecfg}, $vmid, 1, undef, 0) };
>> +	warn "Failed to remove source VM - $@\n" if $@;
>> +    }
>>   }
>>   
>>   sub final_cleanup {
>> diff --git a/PVE/QemuServer.pm b/PVE/QemuServer.pm
>> index d494cc0..bf05da2 100644
>> --- a/PVE/QemuServer.pm
>> +++ b/PVE/QemuServer.pm
>> @@ -5384,7 +5384,11 @@ sub vm_start_nolock {
>>       my $defaults = load_defaults();
>>   
>>       # set environment variable useful inside network script
>> -    $ENV{PVE_MIGRATED_FROM} = $migratedfrom if $migratedfrom;
>> +    if ($migrate_opts->{remote_node}) {
>> +	$ENV{PVE_MIGRATED_FROM} = $migrate_opts->{remote_node};
>> +    } elsif ($migratedfrom) {
>> +	$ENV{PVE_MIGRATED_FROM} = $migratedfrom;
>> +    }
> 
> But the network script tries to load the config from that node and if 
> it's not in the cluster that doesn't work?
> 

this is a bit confusing, yeah.

$migratedfrom contains the source node, which is unusable on the remote 
cluster
remote_node contains the target node, which actually has the full config 
when we start the VM there over the tunnel (in contrast to a local 
migration, where the target node doesn't yet have the config!)

so this should be correct? but even easier would be to just not set it 
(for remote migrations), since the start MUST happen on the node where 
mtunnel is running/the config is located.

>>   
>>       PVE::GuestHelpers::exec_hookscript($conf, $vmid, 'pre-start', 1);
>>   
>> @@ -5621,7 +5625,7 @@ sub vm_start_nolock {
>>   
>>   	my $migrate_storage_uri;
>>   	# nbd_protocol_version > 0 for unix socket support
>> -	if ($nbd_protocol_version > 0 && $migration_type eq 'secure') {
>> +	if ($nbd_protocol_version > 0 && ($migration_type eq 'secure' || $migration_type eq 'websocket')) {
>>   	    my $socket_path = "/run/qemu-server/$vmid\_nbd.migrate";
>>   	    mon_cmd($vmid, "nbd-server-start", addr => { type => 'unix', data => { path => $socket_path } } );
>>   	    $migrate_storage_uri = "nbd:unix:$socket_path";
>> 
> 




More information about the pve-devel mailing list