[pve-devel] [PATCH qemu-server 09/10] migrate: add remote migration handling
    Fabian Ebner 
    f.ebner at proxmox.com
       
    Wed Nov 10 12:17:24 CET 2021
    
    
  
Am 05.11.21 um 14:03 schrieb Fabian Grünbichler:
> remote migration uses a websocket connection to a task worker running on
> the target node instead of commands via SSH to control the migration.
> this websocket tunnel is started earlier than the SSH tunnel, and allows
> adding UNIX-socket forwarding over additional websocket connections
> on-demand.
> 
> the main differences to regular intra-cluster migration are:
> - source VM config and disks are only removed upon request via --delete
> - shared storages are treated like local storages, since we can't
> assume they are shared across clusters (with potentical to extend this
> by marking storages as shared)
> - NBD migrated disks are explicitly pre-allocated on the target node via
> tunnel command before starting the target VM instance
> - in addition to storages, network bridges and the VMID itself is
> transformed via a user defined mapping
> - all commands and migration data streams are sent via a WS tunnel proxy
> 
> Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
> ---
> 
> Notes:
>      requires proxmox-websocket-tunnel
> 
>   PVE/API2/Qemu.pm   |   4 +-
>   PVE/QemuMigrate.pm | 647 +++++++++++++++++++++++++++++++++++++++------
>   PVE/QemuServer.pm  |   8 +-
>   3 files changed, 575 insertions(+), 84 deletions(-)
> 
> diff --git a/PVE/API2/Qemu.pm b/PVE/API2/Qemu.pm
> index a1a1813..24f5b98 100644
> --- a/PVE/API2/Qemu.pm
> +++ b/PVE/API2/Qemu.pm
> @@ -4610,7 +4610,7 @@ __PACKAGE__->register_method({
>   		    # bump/reset both for breaking changes
>   		    # bump tunnel only for opt-in changes
Sorry for asking about this on this patch: shouldn't opt-in changes bump 
both?
>   		    return {
> -			api => 2,
> +			api => $PVE::QemuMigrate::WS_TUNNEL_VERSION,
>   			age => 0,
>   		    };
>   		},
> @@ -4897,7 +4897,7 @@ __PACKAGE__->register_method({
>   			    PVE::Firewall::remove_vmfw_conf($vmid);
>   			}
>   
> -			if (my @volumes = keys $state->{cleanup}->{volumes}->$%) {
> +			if (my @volumes = keys $state->{cleanup}->{volumes}->%*) {
>   			    PVE::Storage::foreach_volid(@volumes, sub {
>   				my ($volid, $sid, $volname, $d) = @_;
>   
> diff --git a/PVE/QemuMigrate.pm b/PVE/QemuMigrate.pm
> index 07b56eb..7378551 100644
> --- a/PVE/QemuMigrate.pm
> +++ b/PVE/QemuMigrate.pm
> @@ -7,9 +7,15 @@ use IO::File;
>   use IPC::Open2;
>   use POSIX qw( WNOHANG );
>   use Time::HiRes qw( usleep );
> +use JSON qw(encode_json decode_json);
> +use IO::Socket::UNIX;
> +use Socket qw(SOCK_STREAM);
> +use Storable qw(dclone);
> +use URI::Escape;
>   
> -use PVE::Format qw(render_bytes);
> +use PVE::APIClient::LWP;
>   use PVE::Cluster;
> +use PVE::Format qw(render_bytes);
>   use PVE::GuestHelpers qw(safe_boolean_ne safe_string_ne);
>   use PVE::INotify;
>   use PVE::RPCEnvironment;
> @@ -30,6 +36,9 @@ use PVE::QemuServer;
>   use PVE::AbstractMigrate;
>   use base qw(PVE::AbstractMigrate);
>   
> +# compared against remote end's minimum version
> +our $WS_TUNNEL_VERSION = 2;
> +
>   sub fork_command_pipe {
>       my ($self, $cmd) = @_;
>   
> @@ -85,7 +94,7 @@ sub finish_command_pipe {
>   	}
>       }
>   
> -    $self->log('info', "ssh tunnel still running - terminating now with SIGTERM\n");
> +    $self->log('info', "tunnel still running - terminating now with SIGTERM\n");
>       kill(15, $cpid);
>   
>       # wait again
> @@ -94,11 +103,11 @@ sub finish_command_pipe {
>   	sleep(1);
>       }
>   
> -    $self->log('info', "ssh tunnel still running - terminating now with SIGKILL\n");
> +    $self->log('info', "tunnel still running - terminating now with SIGKILL\n");
>       kill 9, $cpid;
>       sleep 1;
>   
> -    $self->log('err', "ssh tunnel child process (PID $cpid) couldn't be collected\n")
> +    $self->log('err', "tunnel child process (PID $cpid) couldn't be collected\n")
>   	if !&$collect_child_process();
>   }
>   
> @@ -115,18 +124,28 @@ sub read_tunnel {
>       };
>       die "reading from tunnel failed: $@\n" if $@;
>   
> -    chomp $output;
> +    chomp $output if defined($output);
>   
>       return $output;
>   }
>   
>   sub write_tunnel {
> -    my ($self, $tunnel, $timeout, $command) = @_;
> +    my ($self, $tunnel, $timeout, $command, $params) = @_;
>   
>       $timeout = 60 if !defined($timeout);
>   
>       my $writer = $tunnel->{writer};
>   
> +    if ($tunnel->{version} && $tunnel->{version} >= 2) {
> +	my $object = defined($params) ? dclone($params) : {};
> +	$object->{cmd} = $command;
> +
> +	$command = eval { JSON::encode_json($object) };
> + > +	die "failed to encode command as JSON - $@\n"
> +	    if $@;
> +    }
> +
>       eval {
>   	PVE::Tools::run_with_timeout($timeout, sub {
>   	    print $writer "$command\n";
> @@ -136,13 +155,29 @@ sub write_tunnel {
>       die "writing to tunnel failed: $@\n" if $@;
>   
>       if ($tunnel->{version} && $tunnel->{version} >= 1) {
> -	my $res = eval { $self->read_tunnel($tunnel, 10); };
> +	my $res = eval { $self->read_tunnel($tunnel, $timeout); };
>   	die "no reply to command '$command': $@\n" if $@;
>   
> -	if ($res eq 'OK') {
> -	    return;
> +	if ($tunnel->{version} == 1) {
> +	    if ($res eq 'OK') {
> +		return;
> +	    } else {
> +		die "tunnel replied '$res' to command '$command'\n";
> +	    }
>   	} else {
> -	    die "tunnel replied '$res' to command '$command'\n";
> +	    my $parsed = eval { JSON::decode_json($res) };
> +	    die "failed to decode tunnel reply '$res' (command '$command') - $@\n"
> +		if $@;
> +
> +	    if (!$parsed->{success}) {
> +		if (defined($parsed->{msg})) {
> +		    die "error - tunnel command '$command' failed - $parsed->{msg}\n";
> +		} else {
> +		    die "error - tunnel command '$command' failed\n";
> +		}
> +	    }
> +
> +	    return $parsed;
>   	}
>       }
>   }
> @@ -185,10 +220,150 @@ sub fork_tunnel {
>       return $tunnel;
>   }
>   
> +my $forward_unix_socket = sub {
> +    my ($self, $local, $remote) = @_;
> +
> +    my $params = dclone($self->{tunnel}->{params});
> +    $params->{unix} = $local;
> +    $params->{url} = $params->{url} ."socket=$remote&";
> +    $params->{ticket} = { path => $remote };
> +
> +    my $cmd = encode_json({
> +	control => JSON::true,
> +	cmd => 'forward',
> +	data => $params,
> +    });
> +
> +    my $writer = $self->{tunnel}->{writer};
> +    eval {
> +	unlink $local;
> +	PVE::Tools::run_with_timeout(15, sub {
> +	    print $writer "$cmd\n";
> +	    $writer->flush();
> +	});
> +    };
> +    die "failed to write forwarding command - $@\n" if $@;
> +
> +    $self->read_tunnel($self->{tunnel});
> +
> +    $self->log('info', "Forwarded local unix socket '$local' to remote '$remote' via websocket tunnel");
> +};
> +
> +sub fork_websocket_tunnel {
> +    my ($self, $storages) = @_;
> +
> +    my $remote = $self->{opts}->{remote};
> +    my $conn = $remote->{conn};
> +
> +    my $websocket_url = "https://$conn->{host}:$conn->{port}/api2/json/nodes/$self->{node}/qemu/$remote->{vmid}/mtunnelwebsocket";
> +
> +    my $params = {
> +	url => $websocket_url,
> +    };
> +
> +    if (my $apitoken = $conn->{apitoken}) {
> +	$params->{headers} = [["Authorization", "$apitoken"]];
> +    } else {
> +	die "can't connect to remote host without credentials\n";
> +    }
> +
> +    if (my $fps = $conn->{cached_fingerprints}) {
> +	$params->{fingerprint} = (keys %$fps)[0];
> +    }
> +
> +    my $api_client = PVE::APIClient::LWP->new(%$conn);
> +    my $storage_list = join(',', keys %$storages);
> +    my $res = $api_client->post("/nodes/$self->{node}/qemu/$remote->{vmid}/mtunnel", { storages => $storage_list });
> +    $self->log('info', "remote: started migration tunnel worker '$res->{upid}'");
> +    $params->{url} .= "?ticket=".uri_escape($res->{ticket});
> +    $params->{url} .= "&socket=$res->{socket}";
Nit: could also be escaped.
> +
> +    my $reader = IO::Pipe->new();
> +    my $writer = IO::Pipe->new();
> +
> +    my $cpid = fork();
> +    if ($cpid) {
> +	$writer->writer();
> +	$reader->reader();
> +	my $tunnel = { writer => $writer, reader => $reader, pid => $cpid };
> +
> +	eval {
> +	    my $writer = $tunnel->{writer};
> +	    my $cmd = encode_json({
> +		control => JSON::true,
> +		cmd => 'connect',
> +		data => $params,
> +	    });
> +
> +	    eval {
> +		PVE::Tools::run_with_timeout(15, sub {
> +		    print {$writer} "$cmd\n";
> +		    $writer->flush();
> +		});
> +	    };
> +	    die "failed to write tunnel connect command - $@\n" if $@;
> +	};
> +	die "failed to connect via WS: $@\n" if $@;
> +
> +	my $err;
> +        eval {
> +	    my $writer = $tunnel->{writer};
> +	    my $cmd = encode_json({
> +		cmd => 'version',
> +	    });
> +
> +	    eval {
> +		PVE::Tools::run_with_timeout(15, sub {
> +		    print {$writer} "$cmd\n";
> +		    $writer->flush();
> +		});
> +	    };
> +	    $err = "failed to write tunnel version command - $@\n" if $@;
> +	    my $res = $self->read_tunnel($tunnel, 10);
> +	    $res = JSON::decode_json($res);
> +	    my $version = $res->{api};
> +
> +	    if ($version =~ /^(\d+)$/) {
> +		$tunnel->{version} = $1;
> +		$tunnel->{age} = $res->{age};
> +		$self->log('info', "tunnel info: $version\n");
> +	    } else {
> +		$err = "received invalid tunnel version string '$version'\n" if !$err;
> +	    }
> +	};
> +	$err = $@ if !$err;
> +
> +	if ($err) {
> +	    $self->finish_command_pipe($tunnel);
> +	    die "can't open migration tunnel - $err";
> +	}
> +
> +	$params->{url} = "$websocket_url?";
> +	$tunnel->{params} = $params; # for forwarding
> +
> +	return $tunnel;
> +    } else {
> +	eval {
> +	    $writer->reader();
> +	    $reader->writer();
> +	    PVE::Tools::run_command(
> +		['proxmox-websocket-tunnel'],
> +		input => "<&".fileno($writer),
> +		output => ">&".fileno($reader),
> +		errfunc => sub { my $line = shift; print "tunnel: $line\n"; },
> +	    );
> +	};
> +	warn "CMD websocket tunnel died: $@\n" if $@;
> +	exit 0;
> +    }
> +}
> +
>   sub finish_tunnel {
> -    my ($self, $tunnel) = @_;
> +    my ($self, $tunnel, $cleanup) = @_;
>   
> -    eval { $self->write_tunnel($tunnel, 30, 'quit'); };
> +    $cleanup = $cleanup ? 1 : 0;
> +
> +    eval { $self->write_tunnel($tunnel, 30, 'quit', { cleanup => $cleanup }); };
>       my $err = $@;
>   
>       $self->finish_command_pipe($tunnel, 30);
Nit: below here is
     if (my $unix_sockets = $tunnel->{unix_sockets}) {
         my $cmd = ['rm', '-f', @$unix_sockets];
         PVE::Tools::run_command($cmd);
         # .. and just to be sure check on remote side
         unshift @{$cmd}, @{$self->{rem_ssh}};
         PVE::Tools::run_command($cmd);
     }
and if I'm not mistaken, $self->{rem_ssh} is undef for remote migration, 
resulting in an undef warning and $cmd being executed twice locally.
> @@ -338,23 +513,34 @@ sub prepare {
>       }
>   
>       my $vollist = PVE::QemuServer::get_vm_volumes($conf);
> +
> +    my $storages = {};
>       foreach my $volid (@$vollist) {
>   	my ($sid, $volname) = PVE::Storage::parse_volume_id($volid, 1);
>   
> -	# check if storage is available on both nodes
> +	# check if storage is available on source node
>   	my $scfg = PVE::Storage::storage_check_enabled($storecfg, $sid);
>   
>   	my $targetsid = $sid;
> -	# NOTE: we currently ignore shared source storages in mappings so skip here too for now
> -	if (!$scfg->{shared}) {
> +	# NOTE: local ignores shared mappings, remote maps them
> +	if (!$scfg->{shared} || $self->{opts}->{remote}) {
>   	    $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
>   	}
>   
> -	my $target_scfg = PVE::Storage::storage_check_enabled($storecfg, $targetsid, $self->{node});
> -	my ($vtype) = PVE::Storage::parse_volname($storecfg, $volid);
> +	$storages->{$targetsid} = 1;
> +
> +	if (!$self->{opts}->{remote}) {
> +	    # check if storage is available on target node
> +	    my $target_scfg = PVE::Storage::storage_check_enabled(
> +		$storecfg,
> +		$targetsid,
> +		$self->{node},
> +	    );
> +	    my ($vtype) = PVE::Storage::parse_volname($storecfg, $volid);
>   
> -	die "$volid: content type '$vtype' is not available on storage '$targetsid'\n"
> -	    if !$target_scfg->{content}->{$vtype};
> +	    die "$volid: content type '$vtype' is not available on storage '$targetsid'\n"
> +		if !$target_scfg->{content}->{$vtype};
> +	}
>   
>   	if ($scfg->{shared}) {
>   	    # PVE::Storage::activate_storage checks this for non-shared storages
> @@ -364,10 +550,23 @@ sub prepare {
>   	}
>       }
>   
> -    # test ssh connection
> -    my $cmd = [ @{$self->{rem_ssh}}, '/bin/true' ];
> -    eval { $self->cmd_quiet($cmd); };
> -    die "Can't connect to destination address using public key\n" if $@;
> +    if ($self->{opts}->{remote}) {
> +	# test & establish websocket connection
> +	my $tunnel = $self->fork_websocket_tunnel($storages);
> +	my $min_version = $tunnel->{version} - $tunnel->{age};
> +	die "Remote tunnel endpoint not compatible, upgrade required (current: $WS_TUNNEL_VERSION, required: $min_version)\n"
> +	    if $WS_TUNNEL_VERSION < $min_version;
> +	 die "Remote tunnel endpoint too old, upgrade required (local: $WS_TUNNEL_VERSION, remote: $tunnel->{version})"
Nit: missing '\n' in error, and while we're at it: style nit for >100 
character lines (are not the only instances in the series).
> +	    if $WS_TUNNEL_VERSION > $tunnel->{version};
> +
> +	print "websocket tunnel started\n";
> +	$self->{tunnel} = $tunnel;
> +    } else {
> +	# test ssh connection
> +	my $cmd = [ @{$self->{rem_ssh}}, '/bin/true' ];
> +	eval { $self->cmd_quiet($cmd); };
> +	die "Can't connect to destination address using public key\n" if $@;
> +    }
>   
>       return $running;
>   }
> @@ -405,7 +604,7 @@ sub scan_local_volumes {
>   	my @sids = PVE::Storage::storage_ids($storecfg);
>   	foreach my $storeid (@sids) {
>   	    my $scfg = PVE::Storage::storage_config($storecfg, $storeid);
> -	    next if $scfg->{shared};
> +	    next if $scfg->{shared} && !$self->{opts}->{remote};
>   	    next if !PVE::Storage::storage_check_enabled($storecfg, $storeid, undef, 1);
>   
>   	    # get list from PVE::Storage (for unused volumes)
> @@ -414,19 +613,24 @@ sub scan_local_volumes {
>   	    next if @{$dl->{$storeid}} == 0;
>   
>   	    my $targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $storeid);
> -	    # check if storage is available on target node
> -	    my $target_scfg = PVE::Storage::storage_check_enabled(
> -		$storecfg,
> -		$targetsid,
> -		$self->{node},
> -	    );
> -
> -	    die "content type 'images' is not available on storage '$targetsid'\n"
> -		if !$target_scfg->{content}->{images};
> +	    my $bwlimit_sids = [$storeid];
> +	    if (!$self->{opts}->{remote}) {
> +		# check if storage is available on target node
> +		my $target_scfg = PVE::Storage::storage_check_enabled(
> +		    $storecfg,
> +		    $targetsid,
> +		    $self->{node},
> +		);
> +
> +		die "content type 'images' is not available on storage '$targetsid'\n"
> +		    if !$target_scfg->{content}->{images};
> +
> +		push @$bwlimit_sids, $targetsid;
> +	    }
>   
>   	    my $bwlimit = PVE::Storage::get_bandwidth_limit(
>   		'migration',
> -		[$targetsid, $storeid],
> +		$bwlimit_sids,
>   		$self->{opts}->{bwlimit},
>   	    );
>   
> @@ -482,14 +686,17 @@ sub scan_local_volumes {
>   	    my $scfg = PVE::Storage::storage_check_enabled($storecfg, $sid);
>   
>   	    my $targetsid = $sid;
> -	    # NOTE: we currently ignore shared source storages in mappings so skip here too for now
> -	    if (!$scfg->{shared}) {
> +	    # NOTE: local ignores shared mappings, remote maps them
> +	    if (!$scfg->{shared} || $self->{opts}->{remote}) {
>   		$targetsid = PVE::QemuServer::map_id($self->{opts}->{storagemap}, $sid);
>   	    }
>   
> -	    PVE::Storage::storage_check_enabled($storecfg, $targetsid, $self->{node});
> +	    # check target storage on target node if intra-cluster migration
> +	    if (!$self->{opts}->{remote}) {
> +		PVE::Storage::storage_check_enabled($storecfg, $targetsid, $self->{node});
>   
> -	    return if $scfg->{shared};
> +		return if $scfg->{shared};
> +	    }
>   
>   	    $local_volumes->{$volid}->{ref} = $attr->{referenced_in_config} ? 'config' : 'snapshot';
>   	    $local_volumes->{$volid}->{ref} = 'storage' if $attr->{is_unused};
> @@ -578,6 +785,9 @@ sub scan_local_volumes {
>   
>   	    my $migratable = $scfg->{type} =~ /^(?:dir|btrfs|zfspool|lvmthin|lvm)$/;
>   
> +	    # TODO: what is this even here for?
> +	    $migratable = 1 if $self->{opts}->{remote};
> +
>   	    die "can't migrate '$volid' - storage type '$scfg->{type}' not supported\n"
>   		if !$migratable;
>   
> @@ -612,6 +822,10 @@ sub handle_replication {
>       my $local_volumes = $self->{local_volumes};
>   
>       return if !$self->{replication_jobcfg};
> +
> +    die "can't migrate VM with replicated volumes to remote cluster/node\n"
> +	if $self->{opts}->{remote};
We can add that later, asserting that no local removal will happen ;)
Same for being a base VM referenced by a linked clone.
> +
>       if ($self->{running}) {
>   
>   	my $version = PVE::QemuServer::kvm_user_version();
> @@ -709,26 +923,133 @@ sub sync_offline_local_volumes {
>       my $opts = $self->{opts};
>   
>       $self->log('info', "copying local disk images") if scalar(@volids);
> -
> +    my $forwarded = 0;
>       foreach my $volid (@volids) {
>   	my $targetsid = $local_volumes->{$volid}->{targetsid};
> -	my $bwlimit = $local_volumes->{$volid}->{bwlimit};
> -	$bwlimit = $bwlimit * 1024 if defined($bwlimit); # storage_migrate uses bps
> -
> -	my $storage_migrate_opts = {
> -	    'ratelimit_bps' => $bwlimit,
> -	    'insecure' => $opts->{migration_type} eq 'insecure',
> -	    'with_snapshots' => $local_volumes->{$volid}->{snapshots},
> -	    'allow_rename' => !$local_volumes->{$volid}->{is_vmstate},
> -	};
>   
> -	my $logfunc = sub { $self->log('info', $_[0]); };
> -	my $new_volid = eval {
> -	    PVE::Storage::storage_migrate($storecfg, $volid, $self->{ssh_info},
> -					  $targetsid, $storage_migrate_opts, $logfunc);
> -	};
> -	if (my $err = $@) {
> -	    die "storage migration for '$volid' to storage '$targetsid' failed - $err\n";
> +	my $new_volid;
> +	
Style nit: whitespace error
> +	my $opts = $self->{opts};
> +	if (my $remote = $opts->{remote}) {
> +	    my $remote_vmid = $remote->{vmid};
> +	    my ($sid, undef) = PVE::Storage::parse_volume_id($volid);
> +	    my (undef, $name, undef, undef, undef, undef, $format) = PVE::Storage::parse_volname($storecfg, $volid);
> +	    my $scfg = PVE::Storage::storage_config($storecfg, $sid);
> +	    PVE::Storage::activate_volumes($storecfg, [$volid]);
> +
> +	    # use 'migrate' limit for transfer to other node
> +	    my $bwlimit_opts = {
> +		storage => $targetsid,
> +		bwlimit => $opts->{bwlimit},
> +	    };
> +	    my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', [$sid], $opts->{bwlimit});
Nit: could use
     my $bwlimit = $local_volumes->{$volid}->{bwlimit};
> +	    my $remote_bwlimit = $self->write_tunnel($self->{tunnel}, 10, 'bwlimit', $bwlimit_opts);
> +	    $remote_bwlimit = $remote_bwlimit->{bwlimit};
> +	    if (defined($remote_bwlimit)) {
> +		$bwlimit = $remote_bwlimit if !defined($bwlimit);
> +		$bwlimit = $remote_bwlimit if $remote_bwlimit < $bwlimit;
> +	    }
> +
> +	    # JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
> +	    $bwlimit = $bwlimit * 1024 if defined($bwlimit);
> +
> +	    my $with_snapshots = $local_volumes->{$volid}->{snapshots} ? 1 : 0;
> +	    my $snapshot;
> +	    if ($scfg->{type} eq 'zfspool') {
> +		$snapshot = '__migration__';
> +		$with_snapshots = 1;
> +		PVE::Storage::volume_snapshot($storecfg, $volid, $snapshot);
> +	    }
> +
> +	    if ($self->{vmid} != $remote_vmid) {
> +		$name =~ s/-$self->{vmid}-/-$remote_vmid-/g;
> +		$name =~ s/^$self->{vmid}\//$remote_vmid\//;
> +	    }
> +
> +	    my @export_formats = PVE::Storage::volume_export_formats($storecfg, $volid, undef, undef, $with_snapshots);
> +
> +	    my $storage_migrate_opts = {
Nit: maybe call it disk_import_opts
> +		format => $format,
> +		storage => $targetsid,
> +		'with-snapshots' => $with_snapshots,
> +		'allow-rename' => !$local_volumes->{$volid}->{is_vmstate},
> +		'export-formats' => @export_formats,
Doesn't this need to be converted to a string?
> +		volname => $name,
> +	    };
> +	    my $res = $self->write_tunnel($self->{tunnel}, 600, 'disk-import', $storage_migrate_opts);
> +	    my $local = "/run/qemu-server/$self->{vmid}.storage";
> +	    if (!$forwarded) {
> +		$forward_unix_socket->($self, $local, $res->{socket});
> +		$forwarded = 1;
> +	    }
> +	    my $socket = IO::Socket::UNIX->new(Peer => $local, Type => SOCK_STREAM())
> +		or die "failed to connect to websocket tunnel at $local\n";
> +	    # we won't be reading from the socket
> +	    shutdown($socket, 0);
> +	    my $send = ['pvesm', 'export', $volid, $res->{format}, '-', '-with-snapshots', $with_snapshots];
> +	    push @$send, '-snapshot', $snapshot if $snapshot;
> +
> +	    my @cstream;
> +	    if (defined($bwlimit)) {
> +		@cstream = ([ '/usr/bin/cstream', '-t', $bwlimit ]);
> +		$self->log('info', "using a bandwidth limit of $bwlimit bps for transferring '$volid'");
> +	    }
> +
> +	    eval {
> +		PVE::Tools::run_command(
> +		    [$send, @cstream],
> +		    output => '>&'.fileno($socket),
> +		    errfunc => sub { my $line = shift; $self->log('warn', $line); },
> +		);
> +	    };
> +	    my $send_error = $@;
> +
> +	    # don't close the connection entirely otherwise the
> +	    # receiving end might not get all buffered data (and
> +	    # fails with 'connection reset by peer')
> +	    shutdown($socket, 1);
> +
> +	    # wait for the remote process to finish
> +	    while ($res = $self->write_tunnel($self->{tunnel}, 10, 'query-disk-import')) {
> +		if ($res->{status} eq 'pending') {
> +		    $self->log('info', "waiting for disk import to finish..\n");
> +		    sleep(1)
> +		} elsif ($res->{status} eq 'complete') {
> +		    $new_volid = $res->{volid};
> +		    last;
> +		} else {
> +		    die "unknown query-disk-import result: $res->{status}\n";
> +		}
> +	    }
> +
> +	    # now close the socket
> +	    close($socket);
> +	    die $send_error if $send_error;
> +	} else {
> +	    my $bwlimit = $local_volumes->{$volid}->{bwlimit};
> +	    $bwlimit = $bwlimit * 1024 if defined($bwlimit); # storage_migrate uses bps
> +
> +	    my $storage_migrate_opts = {
> +		'ratelimit_bps' => $bwlimit,
> +		'insecure' => $opts->{migration_type} eq 'insecure',
> +		'with_snapshots' => $local_volumes->{$volid}->{snapshots},
> +		'allow_rename' => !$local_volumes->{$volid}->{is_vmstate},
> +	    };
> +
> +	    my $logfunc = sub { $self->log('info', $_[0]); };
> +	    $new_volid = eval {
> +		PVE::Storage::storage_migrate(
> +		    $storecfg,
> +		    $volid,
> +		    $self->{ssh_info},
> +		    $targetsid,
> +		    $storage_migrate_opts,
> +		    $logfunc,
> +		);
> +	    };
> +	    if (my $err = $@) {
> +		die "storage migration for '$volid' to storage '$targetsid' failed - $err\n";
> +	    }
>   	}
>   
>   	$self->{volume_map}->{$volid} = $new_volid;
> @@ -744,6 +1065,12 @@ sub sync_offline_local_volumes {
>   sub cleanup_remotedisks {
>       my ($self) = @_;
>   
Nit, not to be taken seriously: cleanup_remotedisks_and_maybe_tunnel ;)
> +    if ($self->{opts}->{remote}) {
> +	$self->finish_tunnel($self->{tunnel}, 1);
> +	delete $self->{tunnel};
> +	return;
> +    }
> +
>       my $local_volumes = $self->{local_volumes};
>   
>       foreach my $volid (values %{$self->{volume_map}}) {
> @@ -793,8 +1120,84 @@ sub phase1 {
>       $self->handle_replication($vmid);
>   
>       $self->sync_offline_local_volumes();
> +    $self->phase1_remote($vmid) if $self->{opts}->{remote};
>   };
>   
> +sub phase1_remote {
> +    my ($self, $vmid) = @_;
> +
> +    my $remote_conf = PVE::QemuConfig->load_config($vmid);
> +    PVE::QemuConfig->update_volume_ids($remote_conf, $self->{volume_map});
> +
> +    # TODO: check bridge availability earlier?
> +    my $bridgemap = $self->{opts}->{bridgemap};
> +    foreach my $opt (keys %$remote_conf) {
> +	next if $opt !~ m/^net\d+$/;
> +
> +	next if !$remote_conf->{$opt};
> +	my $d = PVE::QemuServer::parse_net($remote_conf->{$opt});
> +	next if !$d || !$d->{bridge};
> +
> +	my $target_bridge = PVE::QemuServer::map_id($bridgemap, $d->{bridge});
> +	$self->log('info', "mapped: $opt from $d->{bridge} to $target_bridge");
> +	$d->{bridge} = $target_bridge;
> +	$remote_conf->{$opt} = PVE::QemuServer::print_net($d);
> +    }
> +
> +    my @online_local_volumes = $self->filter_local_volumes('online');
> +
> +    my $storage_map = $self->{opts}->{storagemap};
> +    $self->{nbd} = {};
> +    PVE::QemuConfig->foreach_volume($remote_conf, sub {
> +	my ($ds, $drive) = @_;
> +
> +	# TODO eject CDROM?
> +	return if PVE::QemuServer::drive_is_cdrom($drive);
> +
> +	my $volid = $drive->{file};
> +	return if !$volid;
> +
> +	return if !grep { $_ eq $volid} @online_local_volumes;
> +
> +	my ($storeid, $volname) = PVE::Storage::parse_volume_id($volid);
> +	my $scfg = PVE::Storage::storage_config($self->{storecfg}, $storeid);
> +	my $source_format = PVE::QemuServer::qemu_img_format($scfg, $volname);
> +
> +	# set by target cluster
> +	my $oldvolid = delete $drive->{file};
> +	delete $drive->{format};
> +
> +	my $targetsid = PVE::QemuServer::map_id($storage_map, $storeid);
> +
> +	my $params = {
> +	    format => $source_format,
> +	    storage => $targetsid,
> +	    drive => $drive,
> +	};
> +
> +	$self->log('info', "Allocating volume for drive '$ds' on remote storage '$targetsid'..");
> +	my $res = $self->write_tunnel($self->{tunnel}, 600, 'disk', $params);
> +
> +	$self->log('info', "volume '$oldvolid' os '$res->{volid}' on the target\n");
> +	$remote_conf->{$ds} = $res->{drivestr};
> +	$self->{nbd}->{$ds} = $res;
> +    });
> +
> +    my $conf_str = PVE::QemuServer::write_vm_config("remote", $remote_conf);
> +
> +    # TODO expose in PVE::Firewall?
> +    my $vm_fw_conf_path = "/etc/pve/firewall/$vmid.fw";
> +    my $fw_conf_str;
> +    $fw_conf_str = PVE::Tools::file_get_contents($vm_fw_conf_path)
> +	if -e $vm_fw_conf_path;
> +    my $params = {
> +	conf => $conf_str,
> +	'firewall-config' => $fw_conf_str,
> +    };
> +
> +    $self->write_tunnel($self->{tunnel}, 10, 'config', $params);
> +}
> +
>   sub phase1_cleanup {
>       my ($self, $vmid, $err) = @_;
>   
> @@ -825,7 +1228,6 @@ sub phase2_start_local_cluster {
>       my $local_volumes = $self->{local_volumes};
>       my @online_local_volumes = $self->filter_local_volumes('online');
>   
> -    $self->{storage_migration} = 1 if scalar(@online_local_volumes);
>       my $start = $params->{start_params};
>       my $migrate = $params->{migrate_opts};
>   
> @@ -948,10 +1350,34 @@ sub phase2_start_local_cluster {
>       return ($tunnel_info, $spice_port);
>   }
>   
> +sub phase2_start_remote_cluster {
> +    my ($self, $vmid, $params) = @_;
> +
> +    die "insecure migration to remote cluster not implemented\n"
> +	if $params->{migrate_opts}->{type} ne 'websocket';
> +
> +    my $remote_vmid = $self->{opts}->{remote}->{vmid};
> +
> +    my $res = $self->write_tunnel($self->{tunnel}, 10, "start", $params);
> +
> +    foreach my $drive (keys %{$res->{drives}}) {
> +	$self->{stopnbd} = 1;
> +	$self->{target_drive}->{$drive}->{drivestr} = $res->{drives}->{$drive}->{drivestr};
> +	my $nbd_uri = $res->{drives}->{$drive}->{nbd_uri};
> +	die "unexpected NBD uri for '$drive': $nbd_uri\n"
> +	    if $nbd_uri !~ s!/run/qemu-server/$remote_vmid\_!/run/qemu-server/$vmid\_!;
> +
> +	$self->{target_drive}->{$drive}->{nbd_uri} = $nbd_uri;
> +    }
> +
> +    return ($res->{migrate}, $res->{spice_port});
> +}
> +
>   sub phase2 {
>       my ($self, $vmid) = @_;
>   
>       my $conf = $self->{vmconf};
> +    my $local_volumes = $self->{local_volumes};
>   
>       # version > 0 for unix socket support
>       my $nbd_protocol_version = 1;
> @@ -983,10 +1409,42 @@ sub phase2 {
>   	},
>       };
>   
> -    my ($tunnel_info, $spice_port) = $self->phase2_start_local_cluster($vmid, $params);
> +    my ($tunnel_info, $spice_port);
> +
> +    my @online_local_volumes = $self->filter_local_volumes('online');
> +    $self->{storage_migration} = 1 if scalar(@online_local_volumes);
> +
> +    if (my $remote = $self->{opts}->{remote}) {
> +	my $remote_vmid = $remote->{vmid};
> +	$params->{migrate_opts}->{remote_node} = $self->{node};
> +	($tunnel_info, $spice_port) = $self->phase2_start_remote_cluster($vmid, $params);
> +	die "only UNIX sockets are supported for remote migration\n"
> +	    if $tunnel_info->{proto} ne 'unix';
> +
> +	my $forwarded = {};
> +	my $remote_socket = $tunnel_info->{addr};
> +	my $local_socket = $remote_socket;
> +	$local_socket =~ s/$remote_vmid/$vmid/g;
> +	$tunnel_info->{addr} = $local_socket;
> +
> +	$self->log('info', "Setting up tunnel for '$local_socket'");
> +	$forward_unix_socket->($self, $local_socket, $remote_socket);
> +	$forwarded->{$local_socket} = 1;
> +
> +	foreach my $remote_socket (@{$tunnel_info->{unix_sockets}}) {
> +	    my $local_socket = $remote_socket;
> +	    $local_socket =~ s/$remote_vmid/$vmid/g;
> +	    next if $forwarded->{$local_socket};
> +	    $self->log('info', "Setting up tunnel for '$local_socket'");
> +	    $forward_unix_socket->($self, $local_socket, $remote_socket);
> +	    $forwarded->{$local_socket} = 1;
> +	}
> +    } else {
> +	($tunnel_info, $spice_port) = $self->phase2_start_local_cluster($vmid, $params);
>   
> -    $self->log('info', "start remote tunnel");
> -    $self->start_remote_tunnel($tunnel_info);
> +	$self->log('info', "start remote tunnel");
> +	$self->start_remote_tunnel($tunnel_info);
> +    }
>   
>       my $migrate_uri = "$tunnel_info->{proto}:$tunnel_info->{addr}";
>       $migrate_uri .= ":$tunnel_info->{port}"
> @@ -996,8 +1454,6 @@ sub phase2 {
>   	$self->{storage_migration_jobs} = {};
>   	$self->log('info', "starting storage migration");
>   
> -	my @online_local_volumes = $self->filter_local_volumes('online');
> -
>   	die "The number of local disks does not match between the source and the destination.\n"
>   	    if (scalar(keys %{$self->{target_drive}}) != scalar(@online_local_volumes));
>   	foreach my $drive (keys %{$self->{target_drive}}){
> @@ -1070,7 +1526,7 @@ sub phase2 {
>       };
>       $self->log('info', "migrate-set-parameters error: $@") if $@;
>   
> -    if (PVE::QemuServer::vga_conf_has_spice($conf->{vga})) {
> +    if (PVE::QemuServer::vga_conf_has_spice($conf->{vga} && !$self->{opts}->{remote})) {
>   	my $rpcenv = PVE::RPCEnvironment::get();
>   	my $authuser = $rpcenv->get_user();
>   
> @@ -1267,11 +1723,15 @@ sub phase2_cleanup {
>   
>       my $nodename = PVE::INotify::nodename();
>   
> -    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'stop', $vmid, '--skiplock', '--migratedfrom', $nodename];
> -    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
> -    if (my $err = $@) {
> -        $self->log('err', $err);
> -        $self->{errors} = 1;
> +    if ($self->{tunnel} && $self->{tunnel}->{version} >= 2) {
> +	$self->write_tunnel($self->{tunnel}, 10, 'stop');
> +    } else {
> +	my $cmd = [@{$self->{rem_ssh}}, 'qm', 'stop', $vmid, '--skiplock', '--migratedfrom', $nodename];
> +	eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
> +	if (my $err = $@) {
> +	    $self->log('err', $err);
> +	    $self->{errors} = 1;
> +	}
>       }
>   
>       # cleanup after stopping, otherwise disks might be in-use by target VM!
> @@ -1304,7 +1764,7 @@ sub phase3_cleanup {
>   
>       my $tunnel = $self->{tunnel};
>   
> -    if ($self->{volume_map}) {
> +    if ($self->{volume_map} && !$self->{opts}->{remote}) {
>   	my $target_drives = $self->{target_drive};
>   
>   	# FIXME: for NBD storage migration we now only update the volid, and
> @@ -1321,26 +1781,33 @@ sub phase3_cleanup {
>   
>       # transfer replication state before move config
>       $self->transfer_replication_state() if $self->{is_replicated};
> -    PVE::QemuConfig->move_config_to_node($vmid, $self->{node});
> +    if (!$self->{opts}->{remote}) {
> +	PVE::QemuConfig->move_config_to_node($vmid, $self->{node});
> +    }
>       $self->switch_replication_job_target() if $self->{is_replicated};
All three lines could/should be guarded by the if.
>   
>       if ($self->{livemigration}) {
>   	if ($self->{stopnbd}) {
>   	    $self->log('info', "stopping NBD storage migration server on target.");
>   	    # stop nbd server on remote vm - requirement for resume since 2.9
> -	    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'nbdstop', $vmid];
> +	    if ($tunnel && $tunnel->{version} && $tunnel->{version} >= 2) {
> +		$self->write_tunnel($tunnel, 30, 'nbdstop');
> +	    } else {
> +		my $cmd = [@{$self->{rem_ssh}}, 'qm', 'nbdstop', $vmid];
>   
> -	    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
> -	    if (my $err = $@) {
> -		$self->log('err', $err);
> -		$self->{errors} = 1;
> +		eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
> +		if (my $err = $@) {
> +		    $self->log('err', $err);
> +		    $self->{errors} = 1;
> +		}
>   	    }
>   	}
>   
>   	# config moved and nbd server stopped - now we can resume vm on target
>   	if ($tunnel && $tunnel->{version} && $tunnel->{version} >= 1) {
> +	    my $cmd = $tunnel->{version} == 1 ? "resume $vmid" : "resume";
>   	    eval {
> -		$self->write_tunnel($tunnel, 30, "resume $vmid");
> +		$self->write_tunnel($tunnel, 30, $cmd);
>   	    };
>   	    if (my $err = $@) {
>   		$self->log('err', $err);
> @@ -1360,18 +1827,24 @@ sub phase3_cleanup {
>   	}
>   
>   	if ($self->{storage_migration} && PVE::QemuServer::parse_guest_agent($conf)->{fstrim_cloned_disks} && $self->{running}) {
> -	    my $cmd = [@{$self->{rem_ssh}}, 'qm', 'guest', 'cmd', $vmid, 'fstrim'];
> -	    eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
> +	    if ($self->{opts}->{remote}) {
> +		$self->write_tunnel($self->{tunnel}, 600, 'fstrim');
> +	    } else {
> +		my $cmd = [@{$self->{rem_ssh}}, 'qm', 'guest', 'cmd', $vmid, 'fstrim'];
> +		eval{ PVE::Tools::run_command($cmd, outfunc => sub {}, errfunc => sub {}) };
> +	    }
>   	}
>       }
>   
>       # close tunnel on successful migration, on error phase2_cleanup closed it
> -    if ($tunnel) {
> +    if ($tunnel && $tunnel->{version} == 1) {
>   	eval { finish_tunnel($self, $tunnel);  };
>   	if (my $err = $@) {
>   	    $self->log('err', $err);
>   	    $self->{errors} = 1;
>   	}
> +	$tunnel = undef;
> +	delete $self->{tunnel};
>       }
>   
>       eval {
> @@ -1409,6 +1882,9 @@ sub phase3_cleanup {
>   
>       # destroy local copies
>       foreach my $volid (@not_replicated_volumes) {
> +	# remote is cleaned up below
> +	next if $self->{opts}->{remote};
> +
>   	eval { PVE::Storage::vdisk_free($self->{storecfg}, $volid); };
>   	if (my $err = $@) {
>   	    $self->log('err', "removing local copy of '$volid' failed - $err");
> @@ -1418,8 +1894,19 @@ sub phase3_cleanup {
>       }
>   
>       # clear migrate lock
> -    my $cmd = [ @{$self->{rem_ssh}}, 'qm', 'unlock', $vmid ];
> -    $self->cmd_logerr($cmd, errmsg => "failed to clear migrate lock");
> +    if ($tunnel && $tunnel->{version} >= 2) {
> +	$self->write_tunnel($tunnel, 10, "unlock");
> +
> +	$self->finish_tunnel($tunnel);
> +    } else {
> +	my $cmd = [ @{$self->{rem_ssh}}, 'qm', 'unlock', $vmid ];
> +	$self->cmd_logerr($cmd, errmsg => "failed to clear migrate lock");
> +    }
> +
> +    if ($self->{opts}->{remote} && $self->{opts}->{delete}) {
> +	eval { PVE::QemuServer::destroy_vm($self->{storecfg}, $vmid, 1, undef, 0) };
> +	warn "Failed to remove source VM - $@\n" if $@;
> +    }
>   }
>   
>   sub final_cleanup {
> diff --git a/PVE/QemuServer.pm b/PVE/QemuServer.pm
> index d494cc0..bf05da2 100644
> --- a/PVE/QemuServer.pm
> +++ b/PVE/QemuServer.pm
> @@ -5384,7 +5384,11 @@ sub vm_start_nolock {
>       my $defaults = load_defaults();
>   
>       # set environment variable useful inside network script
> -    $ENV{PVE_MIGRATED_FROM} = $migratedfrom if $migratedfrom;
> +    if ($migrate_opts->{remote_node}) {
> +	$ENV{PVE_MIGRATED_FROM} = $migrate_opts->{remote_node};
> +    } elsif ($migratedfrom) {
> +	$ENV{PVE_MIGRATED_FROM} = $migratedfrom;
> +    }
But the network script tries to load the config from that node and if 
it's not in the cluster that doesn't work?
>   
>       PVE::GuestHelpers::exec_hookscript($conf, $vmid, 'pre-start', 1);
>   
> @@ -5621,7 +5625,7 @@ sub vm_start_nolock {
>   
>   	my $migrate_storage_uri;
>   	# nbd_protocol_version > 0 for unix socket support
> -	if ($nbd_protocol_version > 0 && $migration_type eq 'secure') {
> +	if ($nbd_protocol_version > 0 && ($migration_type eq 'secure' || $migration_type eq 'websocket')) {
>   	    my $socket_path = "/run/qemu-server/$vmid\_nbd.migrate";
>   	    mon_cmd($vmid, "nbd-server-start", addr => { type => 'unix', data => { path => $socket_path } } );
>   	    $migrate_storage_uri = "nbd:unix:$socket_path";
> 
    
    
More information about the pve-devel
mailing list