[pve-devel] [PATCH v2 qemu-server 5/8] mtunnel: add API endpoints

Fabian Grünbichler f.gruenbichler at proxmox.com
Thu Nov 11 15:07:18 CET 2021


the following two endpoints are used for migration on the remote side

POST /nodes/NODE/qemu/VMID/mtunnel

which creates and locks an empty VM config, and spawns the main qmtunnel
worker which binds to a VM-specific UNIX socket.

this worker handles JSON-encoded migration commands coming in via this
UNIX socket:
- config (set target VM config)
-- checks permissions for updating config
-- strips pending changes and snapshots
-- sets (optional) firewall config
- disk (allocate disk for NBD migration)
-- checks permission for target storage
-- returns drive string for allocated volume
- disk-import (import 'pvesm export' stream for offline migration)
-- checks permission for target storage
-- forks a child running 'pvesm import' reading from a UNIX socket
-- only one import allowed to run at any given moment
- query-disk-import
-- checks output of 'pvesm import' for volume ID message
-- returns volid + success, or 'pending', or 'error'
- start (returning migration info)
- fstrim (via agent)
- bwlimit (query bwlimit for storage)
- ticket (creates a ticket for a WS connection to a specific socket)
- resume
- stop
- nbdstop
- unlock
- quit (+ cleanup)

this worker serves as a replacement for both 'qm mtunnel' and various
manual calls via SSH. the API call will return a ticket valid for
connecting to the worker's UNIX socket via a websocket connection.

GET+WebSocket upgrade /nodes/NODE/qemu/VMID/mtunnelwebsocket

gets called for connecting to a UNIX socket via websocket forwarding,
i.e. once for the main command mtunnel, and once each for the memory
migration and each NBD drive-mirror/storage migration.

access is guarded by a short-lived ticket binding the authenticated user
to the socket path. such tickets can be requested over the main mtunnel,
which keeps track of socket paths currently used by that
mtunnel/migration instance.

each command handler should check privileges for the requested action if
necessary.

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---

Notes:
    v2: incorporated Fabian Ebner's feedback, mainly:
    - use modified nbd alloc helper instead of duplicating
    - fix disk cleanup, also cleanup imported disks
    - fix firewall-conf vs firewall-config mismatch
    
    requires
    - pve-storage with UNIX import support
    - pve-access-control with tunnel ticket support
    - pve-http-server with websocket fixes

 PVE/API2/Qemu.pm | 629 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 629 insertions(+)

diff --git a/PVE/API2/Qemu.pm b/PVE/API2/Qemu.pm
index 185fcc8..97f6588 100644
--- a/PVE/API2/Qemu.pm
+++ b/PVE/API2/Qemu.pm
@@ -6,8 +6,12 @@ use Cwd 'abs_path';
 use Net::SSLeay;
 use POSIX;
 use IO::Socket::IP;
+use IO::Socket::UNIX;
+use IPC::Open3;
+use JSON;
 use URI::Escape;
 use Crypt::OpenSSL::Random;
+use Socket qw(SOCK_STREAM);
 
 use PVE::Cluster qw (cfs_read_file cfs_write_file);;
 use PVE::RRD;
@@ -857,6 +861,7 @@ __PACKAGE__->register_method({
 	    { subdir => 'spiceproxy' },
 	    { subdir => 'sendkey' },
 	    { subdir => 'firewall' },
+	    { subdir => 'mtunnel' },
 	    ];
 
 	return $res;
@@ -4645,4 +4650,628 @@ __PACKAGE__->register_method({
 	return PVE::QemuServer::Cloudinit::dump_cloudinit_config($conf, $param->{vmid}, $param->{type});
     }});
 
+__PACKAGE__->register_method({
+    name => 'mtunnel',
+    path => '{vmid}/mtunnel',
+    method => 'POST',
+    protected => 1,
+    proxyto => 'node',
+    description => 'Migration tunnel endpoint - only for internal use by VM migration.',
+    permissions => {
+	check => ['perm', '/vms/{vmid}', [ 'VM.Allocate' ]],
+	description => "You need 'VM.Allocate' permissions on /vms/{vmid}. Further permission checks happen during the actual migration.",
+    },
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    node => get_standard_option('pve-node'),
+	    vmid => get_standard_option('pve-vmid'),
+	    storages => {
+		type => 'string',
+		format => 'pve-storage-id-list',
+		optional => 1,
+		description => 'List of storages to check permission and availability. Will be checked again for all actually used storages during migration.',
+	    },
+	},
+    },
+    returns => {
+	additionalProperties => 0,
+	properties => {
+	    upid => { type => 'string' },
+	    ticket => { type => 'string' },
+	    socket => { type => 'string' },
+	},
+    },
+    code => sub {
+	my ($param) = @_;
+
+	my $rpcenv = PVE::RPCEnvironment::get();
+	my $authuser = $rpcenv->get_user();
+
+	my $node = extract_param($param, 'node');
+	my $vmid = extract_param($param, 'vmid');
+
+	my $storages = extract_param($param, 'storages');
+
+	my $storecfg = PVE::Storage::config();
+	foreach my $storeid (PVE::Tools::split_list($storages)) {
+	    $check_storage_access_migrate->($rpcenv, $authuser, $storecfg, $storeid, $node);
+	}
+
+	PVE::Cluster::check_cfs_quorum();
+
+	my $socket_addr = "/run/qemu-server/$vmid.mtunnel";
+
+	my $lock = 'create';
+	eval { PVE::QemuConfig->create_and_lock_config($vmid, 0, $lock); };
+
+	raise_param_exc({ vmid => "unable to create empty VM config - $@"})
+	    if $@;
+
+	my $realcmd = sub {
+	    my $pveproxy_uid;
+
+	    my $state = {
+		storecfg => PVE::Storage::config(),
+		lock => $lock,
+	    };
+
+	    my $run_locked = sub {
+		my ($code, $params) = @_;
+		return PVE::QemuConfig->lock_config($vmid, sub {
+		    my $conf = PVE::QemuConfig->load_config($vmid);
+
+		    $state->{conf} = $conf;
+
+		    die "Encountered wrong lock - aborting mtunnel command handling.\n"
+			if $state->{lock} && !PVE::QemuConfig->has_lock($conf, $state->{lock});
+
+		    return $code->($params);
+		});
+	    };
+
+	    my $cmd_desc = {
+		bwlimit => {
+		    storage => {
+			type => 'string',
+			format => 'pve-storage-id',
+			description => "Storage for which bwlimit is queried",
+		    },
+		    bwlimit => {
+			description => "Override I/O bandwidth limit (in KiB/s).",
+			optional => 1,
+			type => 'integer',
+			minimum => '0',
+		    },
+		},
+		config => {
+		    conf => {
+			type => 'string',
+			description => 'Full VM config, adapted for target cluster/node',
+		    },
+		    'firewall-config' => {
+			type => 'string',
+			description => 'VM firewall config',
+			optional => 1,
+		    },
+		},
+		disk => {
+		    format => PVE::JSONSchema::get_standard_option('pve-qm-image-format'),
+		    storage => {
+			type => 'string',
+			format => 'pve-storage-id',
+		    },
+		    drive => {
+			type => 'object',
+			description => 'parsed drive information without volid and format',
+		    },
+		},
+		'disk-import' => {
+		    volname => {
+			type => 'string',
+			description => 'volume name to use as preferred target volume name',
+		    },
+		    format => PVE::JSONSchema::get_standard_option('pve-qm-image-format'),
+		    'export-formats' => {
+			type => 'string',
+			description => 'list of supported export formats',
+		    },
+		    storage => {
+			type => 'string',
+			format => 'pve-storage-id',
+		    },
+		    'with-snapshots' => {
+			description =>
+			    "Whether the stream includes intermediate snapshots",
+			type => 'boolean',
+			optional => 1,
+			default => 0,
+		    },
+		    'allow-rename' => {
+			description => "Choose a new volume ID if the requested " .
+			  "volume ID already exists, instead of throwing an error.",
+			type => 'boolean',
+			optional => 1,
+			default => 0,
+		    },
+		},
+		start => {
+		    start_params => {
+			type => 'object',
+			description => 'params passed to vm_start_nolock',
+		    },
+		    migrate_opts => {
+			type => 'object',
+			description => 'migrate_opts passed to vm_start_nolock',
+		    },
+		},
+		ticket => {
+		    path => {
+			type => 'string',
+			description => 'socket path for which the ticket should be valid. must be known to current mtunnel instance.',
+		    },
+		},
+		quit => {
+		    cleanup => {
+			type => 'boolean',
+			description => 'remove VM config and disks, aborting migration',
+			default => 0,
+		    },
+		},
+	    };
+
+	    my $cmd_handlers = {
+		'version' => sub {
+		    # compared against other end's version
+		    # bump/reset both for breaking changes
+		    # bump tunnel only for opt-in changes
+		    return {
+			api => 2,
+			age => 0,
+		    };
+		},
+		'config' => sub {
+		    my ($params) = @_;
+
+		    # parse and write out VM FW config if given
+		    if (my $fw_conf = $params->{'firewall-config'}) {
+			my ($path, $fh) = PVE::Tools::tempfile_contents($fw_conf, 700);
+
+			my $empty_conf = {
+			    rules => [],
+			    options => {},
+			    aliases => {},
+			    ipset => {} ,
+			    ipset_comments => {},
+			};
+			my $cluster_fw_conf = PVE::Firewall::load_clusterfw_conf();
+
+			# TODO: add flag for strict parsing?
+			# TODO: add import sub that does all this given raw content?
+			my $vmfw_conf = PVE::Firewall::generic_fw_config_parser($path, $cluster_fw_conf, $empty_conf, 'vm');
+			$vmfw_conf->{vmid} = $vmid;
+			PVE::Firewall::save_vmfw_conf($vmid, $vmfw_conf);
+
+			$state->{cleanup}->{fw} = 1;
+		    }
+
+		    PVE::QemuConfig->remove_lock($vmid, 'create');
+
+		    # TODO add flag for strict parsing?
+		    my $new_conf = PVE::QemuServer::parse_vm_config("incoming/qemu-server/$vmid.conf", $params->{conf});
+		    delete $new_conf->{lock};
+		    delete $new_conf->{digest};
+
+		    # TODO handle properly?
+		    delete $new_conf->{snapshots};
+		    delete $new_conf->{parent};
+		    delete $new_conf->{pending};
+
+		    # not handled by update_vm_api
+		    my $vmgenid = delete $new_conf->{vmgenid};
+		    my $meta = delete $new_conf->{meta};
+
+		    $new_conf->{vmid} = $vmid;
+		    $new_conf->{node} = $node;
+
+		    $update_vm_api->($new_conf, 1);
+
+		    my $conf = PVE::QemuConfig->load_config($vmid);
+		    $conf->{lock} = 'migrate';
+		    $conf->{vmgenid} = $vmgenid;
+		    $conf->{meta} = $meta;
+		    PVE::QemuConfig->write_config($vmid, $conf);
+
+		    $state->{lock} = 'migrate';
+
+		    return;
+		},
+		'bwlimit' => sub {
+		    my ($params) = @_;
+
+		    my $bwlimit = PVE::Storage::get_bandwidth_limit('migration', [$params->{storage}], $params->{bwlimit});
+		    return { bwlimit => $bwlimit };
+
+		},
+		'disk' => sub {
+		    my ($params) = @_;
+
+		    my $format = $params->{format};
+		    my $storeid = $params->{storage};
+		    my $drive = $params->{drive};
+
+		    $check_storage_access_migrate->($rpcenv, $authuser, $state->{storecfg}, $storeid, $node);
+
+		    my $storagemap = {
+			default => $storeid,
+		    };
+
+		    my $source_volumes = {
+			'disk' => [
+			    undef,
+			    $storeid,
+			    undef,
+			    $drive,
+			    0,
+			    $format,
+			],
+		    };
+
+		    my $res = PVE::QemuServer::vm_migrate_alloc_nbd_disks($state->{storecfg}, $vmid, $source_volumes, $storagemap);
+		    if (defined($res->{disk})) {
+			$state->{cleanup}->{volumes}->{$res->{disk}->{volid}} = 1;
+			return $res->{disk};
+		    } else {
+			die "failed to allocate NBD disk..\n";
+		    }
+		},
+		'disk-import' => sub {
+		    my ($params) = @_;
+
+		    die "disk import already running as PID '$state->{disk_import}->{pid}'\n"
+			if $state->{disk_import}->{pid};
+
+		    my $format = $params->{format};
+		    my $storeid = $params->{storage};
+		    $check_storage_access_migrate->($rpcenv, $authuser, $state->{storecfg}, $storeid, $node);
+
+		    my $with_snapshots = $params->{'with-snapshots'} ? 1 : 0;
+
+		    my ($default_format, $valid_formats) = PVE::Storage::storage_default_format($state->{storecfg}, $storeid);
+		    my $scfg = PVE::Storage::storage_config($storecfg, $storeid);
+		    die "unsupported format '$format' for storage '$storeid'\n"
+			if !grep {$format eq $_} @{$valid_formats};
+
+		    my $volname = $params->{volname};
+
+		    # get target volname, taken from PVE::Storage
+		    (my $name_without_extension = $volname) =~ s/\.$format$//;
+		    if ($scfg->{path}) {
+			$volname = "$vmid/$name_without_extension.$format";
+		    } else {
+			$volname = "$name_without_extension";
+		    }
+
+		    my $migration_snapshot;
+		    if ($scfg->{type} eq 'zfspool' || $scfg->{type} eq 'btrfs') {
+			$migration_snapshot = '__migration__';
+		    }
+
+		    my $volid = "$storeid:$volname";
+
+		    # find common import/export format, taken from PVE::Storage
+		    my @import_formats = PVE::Storage::volume_import_formats($state->{storecfg}, $volid, $migration_snapshot, undef, $with_snapshots);
+		    my @export_formats = PVE::Tools::split_list($params->{'export-formats'});
+		    my %import_hash = map { $_ => 1 } @import_formats;
+		    my @common = grep { $import_hash{$_} } @export_formats;
+		    die "no matching import/export format found for storage '$storeid'\n"
+			if !@common;
+		    $format = $common[0];
+
+		    my $input = IO::File->new();
+		    my $info = IO::File->new();
+		    my $unix = "/run/qemu-server/$vmid.storage";
+
+		    my $import_cmd = ['pvesm', 'import', $volid, $format, "unix://$unix", '-with-snapshots', $with_snapshots];
+		    if ($params->{'allow-rename'}) {
+			push @$import_cmd, '-allow-rename', $params->{'allow-rename'};
+		    }
+		    if ($migration_snapshot) {
+			push @$import_cmd, '-delete-snapshot', $migration_snapshot;
+			push @$import_cmd, '-snapshot', $migration_snapshot;
+		    }
+
+		    unlink $unix;
+		    my $cpid = open3($input, $info, $info, @{$import_cmd})
+			or die "failed to spawn disk-import child - $!\n";
+
+		    $state->{disk_import}->{pid} = $cpid;
+		    my $ready;
+		    eval {
+			PVE::Tools::run_with_timeout(5, sub { $ready = <$info>; });
+		    };
+		    die "failed to read readyness from disk import child: $@\n" if $@;
+		    print "$ready\n";
+
+		    chown $pveproxy_uid, -1, $unix;
+
+		    $state->{disk_import}->{fh} = $info;
+		    $state->{disk_import}->{socket} = $unix;
+
+		    $state->{sockets}->{$unix} = 1;
+
+		    return {
+			socket => $unix,
+			format => $format,
+		    };
+		},
+		'query-disk-import' => sub {
+		    my ($params) = @_;
+
+		    die "no disk import running\n"
+			if !$state->{disk_import}->{pid};
+
+		    my $pattern = PVE::Storage::volume_imported_message(undef, 1);
+		    my $result;
+		    eval {
+			my $fh = $state->{disk_import}->{fh};
+			PVE::Tools::run_with_timeout(5, sub { $result = <$fh>; });
+			print "disk-import: $result\n" if $result;
+		    };
+		    if ($result && $result =~ $pattern) {
+			my $volid = $1;
+			waitpid($state->{disk_import}->{pid}, 0);
+
+			my $unix = $state->{disk_import}->{socket};
+			unlink $unix;
+			delete $state->{sockets}->{$unix};
+			delete $state->{disk_import};
+			$state->{cleanup}->{volumes}->{$volid} = 1;
+			return {
+			    status => "complete",
+			    volid => $volid,
+			};
+		    } elsif (!$result && waitpid($state->{disk_import}->{pid}, WNOHANG)) {
+			my $unix = $state->{disk_import}->{socket};
+			unlink $unix;
+			delete $state->{sockets}->{$unix};
+			delete $state->{disk_import};
+
+			return {
+			    status => "error",
+			};
+		    } else {
+			return {
+			    status => "pending",
+			};
+		    }
+		},
+		'start' => sub {
+		    my ($params) = @_;
+
+		    my $info = PVE::QemuServer::vm_start_nolock(
+			$state->{storecfg},
+			$vmid,
+			$state->{conf},
+			$params->{start_params},
+			$params->{migrate_opts},
+		    );
+
+
+		    if ($info->{migrate}->{proto} ne 'unix') {
+			PVE::QemuServer::vm_stop(undef, $vmid, 1, 1);
+			die "migration over non-UNIX sockets not possible\n";
+		    }
+
+		    my $socket = $info->{migrate}->{addr};
+		    chown $pveproxy_uid, -1, $socket;
+		    $state->{sockets}->{$socket} = 1;
+
+		    my $unix_sockets = $info->{migrate}->{unix_sockets};
+		    foreach my $socket (@$unix_sockets) {
+			chown $pveproxy_uid, -1, $socket;
+			$state->{sockets}->{$socket} = 1;
+		    }
+		    return $info;
+		},
+		'fstrim' => sub {
+		    if (PVE::QemuServer::qga_check_running($vmid)) {
+			eval { mon_cmd($vmid, "guest-fstrim") };
+			warn "fstrim failed: $@\n" if $@;
+		    }
+		    return;
+		},
+		'stop' => sub {
+		    PVE::QemuServer::vm_stop(undef, $vmid, 1, 1);
+		    return;
+		},
+		'nbdstop' => sub {
+		    PVE::QemuServer::nbd_stop($vmid);
+		    return;
+		},
+		'resume' => sub {
+		    if (PVE::QemuServer::check_running($vmid, 1)) {
+			PVE::QemuServer::vm_resume($vmid, 1, 1);
+		    } else {
+			die "VM $vmid not running\n";
+		    }
+		    return;
+		},
+		'unlock' => sub {
+		    PVE::QemuConfig->remove_lock($vmid, $state->{lock});
+		    delete $state->{lock};
+		    return;
+		},
+		'ticket' => sub {
+		    my ($params) = @_;
+
+		    my $path = $params->{path};
+
+		    die "Not allowed to generate ticket for unknown socket '$path'\n"
+			if !defined($state->{sockets}->{$path});
+
+		    return { ticket => PVE::AccessControl::assemble_tunnel_ticket($authuser, "/socket/$path") };
+		},
+		'quit' => sub {
+		    my ($params) = @_;
+
+		    if ($params->{cleanup}) {
+			if ($state->{cleanup}->{fw}) {
+			    PVE::Firewall::remove_vmfw_conf($vmid);
+			}
+
+			for my $volid (keys $state->{cleanup}->{volumes}->%*) {
+			    print "freeing volume '$volid' as part of cleanup\n";
+			    eval { PVE::Storage::vdisk_free($state->{storecfg}, $volid) };
+			    warn $@ if $@;
+			}
+
+			PVE::QemuServer::destroy_vm($state->{storecfg}, $vmid, 1);
+		    }
+
+		    $state->{exit} = 1;
+		    return;
+		},
+	    };
+
+	    $run_locked->(sub {
+		my $socket_addr = "/run/qemu-server/$vmid.mtunnel";
+		unlink $socket_addr;
+
+		$state->{socket} = IO::Socket::UNIX->new(
+	            Type => SOCK_STREAM(),
+		    Local => $socket_addr,
+		    Listen => 1,
+		);
+
+		$pveproxy_uid = getpwnam('www-data')
+		    or die "Failed to resolve user 'www-data' to numeric UID\n";
+		chown $pveproxy_uid, -1, $socket_addr;
+	    });
+
+	    print "mtunnel started\n";
+
+	    my $conn = $state->{socket}->accept();
+
+	    $state->{conn} = $conn;
+
+	    my $reply_err = sub {
+		my ($msg) = @_;
+
+		my $reply = JSON::encode_json({
+		    success => JSON::false,
+		    msg => $msg,
+		});
+		$conn->print("$reply\n");
+		$conn->flush();
+	    };
+
+	    my $reply_ok = sub {
+		my ($res) = @_;
+
+		$res->{success} = JSON::true;
+		my $reply = JSON::encode_json($res);
+		$conn->print("$reply\n");
+		$conn->flush();
+	    };
+
+	    while (my $line = <$conn>) {
+		chomp $line;
+
+		# untaint, we validate below if needed
+		($line) = $line =~ /^(.*)$/;
+		my $parsed = eval { JSON::decode_json($line) };
+		if ($@) {
+		    $reply_err->("failed to parse command - $@");
+		    next;
+		}
+
+		my $cmd = delete $parsed->{cmd};
+		if (!defined($cmd)) {
+		    $reply_err->("'cmd' missing");
+		} elsif (my $handler = $cmd_handlers->{$cmd}) {
+		    print "received command '$cmd'\n";
+		    eval {
+			if ($cmd_desc->{$cmd}) {
+			    PVE::JSONSchema::validate($cmd_desc->{$cmd}, $parsed);
+			} else {
+			    $parsed = {};
+			}
+			my $res = $run_locked->($handler, $parsed);
+			$reply_ok->($res);
+		    };
+		    $reply_err->("failed to handle '$cmd' command - $@")
+			if $@;
+		} else {
+		    $reply_err->("unknown command '$cmd' given");
+		}
+
+		if ($state->{exit}) {
+		    $state->{conn}->close();
+		    $state->{socket}->close();
+		    last;
+		}
+	    }
+
+	    print "mtunnel exited\n";
+	};
+
+	my $ticket = PVE::AccessControl::assemble_tunnel_ticket($authuser, "/socket/$socket_addr");
+	my $upid = $rpcenv->fork_worker('qmtunnel', $vmid, $authuser, $realcmd);
+
+	return {
+	    ticket => $ticket,
+	    upid => $upid,
+	    socket => $socket_addr,
+	};
+    }});
+
+__PACKAGE__->register_method({
+    name => 'mtunnelwebsocket',
+    path => '{vmid}/mtunnelwebsocket',
+    method => 'GET',
+    proxyto => 'node',
+    permissions => {
+	description => "You need to pass a ticket valid for the selected socket. Tickets can be created via the mtunnel API call, which will check permissions accordingly.",
+        user => 'all', # check inside
+    },
+    description => 'Migration tunnel endpoint for websocket upgrade - only for internal use by VM migration.',
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    node => get_standard_option('pve-node'),
+	    vmid => get_standard_option('pve-vmid'),
+	    socket => {
+		type => "string",
+		description => "unix socket to forward to",
+	    },
+	    ticket => {
+		type => "string",
+		description => "ticket return by initial 'mtunnel' API call, or retrieved via 'ticket' tunnel command",
+	    },
+	},
+    },
+    returns => {
+	type => "object",
+	properties => {
+	    port => { type => 'string', optional => 1 },
+	    socket => { type => 'string', optional => 1 },
+	},
+    },
+    code => sub {
+	my ($param) = @_;
+
+	my $rpcenv = PVE::RPCEnvironment::get();
+	my $authuser = $rpcenv->get_user();
+
+	my $vmid = $param->{vmid};
+	# check VM exists
+	PVE::QemuConfig->load_config($vmid);
+
+	my $socket = $param->{socket};
+	PVE::AccessControl::verify_tunnel_ticket($param->{ticket}, $authuser, "/socket/$socket");
+
+	return { socket => $socket };
+    }});
+
 1;
-- 
2.30.2





More information about the pve-devel mailing list