[pve-devel] [PATCH v5 guest-common 3/3] add storage tunnel module

Fabian Grünbichler f.gruenbichler at proxmox.com
Wed Feb 9 14:07:36 CET 2022

encapsulating storage-related tunnel methods, currently
- source-side storage-migrate helper
- target-side disk-import handler
- target-side query-disk-import handler
- target-side bwlimit handler

to be extended further with replication-related handlers and helpers.

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>

    - fix socket path to no longer reference qemu-server
    - fix accidental setting of $base_snapshot when determining export formats
    - pass $migration_snapshot to storage_migrate_snapshot
    - delete snapshot based on $migration_snapshot, not $snapshot
    - add 'bwlimit' command, extended to support multiple storages and operation override
    - fix option/parameter names ('-' vs '_')
    - storage_migrate: move bwlimit decision to caller
    - storage_migrate: handle volids owned by third VMID
    - storage_migrate: pass snapshot to volume_export_formats
    - storage_migrate: pass log function to export helper
    - disk-import: add missing parameters to schema
    - query-disk-import: fix race when querying import result
    new in v3, includes code previously in qemu-server
    requires bumped pve-storage with new export/import helpers

 src/Makefile             |   1 +
 src/PVE/StorageTunnel.pm | 296 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 297 insertions(+)
 create mode 100644 src/PVE/StorageTunnel.pm

diff --git a/src/Makefile b/src/Makefile
index d82162c..baa2688 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -12,6 +12,7 @@ install: PVE
 	install -m 0644 PVE/ReplicationConfig.pm ${PERL5DIR}/PVE/
 	install -m 0644 PVE/ReplicationState.pm ${PERL5DIR}/PVE/
 	install -m 0644 PVE/Replication.pm ${PERL5DIR}/PVE/
+	install -m 0644 PVE/StorageTunnel.pm ${PERL5DIR}/PVE/
 	install -m 0644 PVE/Tunnel.pm ${PERL5DIR}/PVE/
 	install -d ${PERL5DIR}/PVE/VZDump
 	install -m 0644 PVE/VZDump/Plugin.pm ${PERL5DIR}/PVE/VZDump/
diff --git a/src/PVE/StorageTunnel.pm b/src/PVE/StorageTunnel.pm
new file mode 100644
index 0000000..41147dd
--- /dev/null
+++ b/src/PVE/StorageTunnel.pm
@@ -0,0 +1,296 @@
+package PVE::StorageTunnel;
+use strict;
+use warnings;
+use IO::Socket::UNIX;
+use Socket qw(SOCK_STREAM);
+use PVE::Storage;
+use PVE::Tools;
+use PVE::Tunnel;
+sub storage_migrate {
+    my ($tunnel, $storecfg, $volid, $local_vmid, $remote_vmid, $opts, $log) = @_;
+    my $targetsid = $opts->{targetsid};
+    my $bwlimit = $opts->{bwlimit};
+    # JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
+    $bwlimit = $bwlimit * 1024 if defined($bwlimit);
+    # adapt volume name for import call
+    my ($sid, undef) = PVE::Storage::parse_volume_id($volid);
+    my (undef, $name, $owner, undef, undef, undef, $format) = PVE::Storage::parse_volname($storecfg, $volid);
+    my $scfg = PVE::Storage::storage_config($storecfg, $sid);
+    PVE::Storage::activate_volumes($storecfg, [$volid]);
+    die "failed to determine owner of volume '$volid'\n" if !defined($owner);
+    $log->('warn', "volume '$volid' owner by VM/CT '$owner', not '$local_vmid'\n")
+	if $owner != $local_vmid;
+    if ($owner != $remote_vmid) {
+	$name =~ s/-$owner-/-$remote_vmid-/g;
+	$name =~ s/^$owner\///; # re-added on target if dir-based storage
+    }
+    my $with_snapshots = $opts->{snapshots} ? 1 : 0;
+    my $snapshot;
+    my $migration_snapshot = PVE::Storage::storage_migrate_snapshot($storecfg, $sid, $with_snapshots);
+    if ($migration_snapshot) {
+	$snapshot = '__migration__';
+	$with_snapshots = 1;
+    }
+    my @export_formats = PVE::Storage::volume_export_formats($storecfg, $volid, $snapshot, undef, $with_snapshots);
+    die "no export formats for '$volid' - check storage plugin support!\n"
+	if !@export_formats;
+    my $disk_import_opts = {
+	format => $format,
+	storage => $targetsid,
+	snapshot => $snapshot,
+	migration_snapshot => $migration_snapshot,
+	with_snapshots => $with_snapshots,
+	allow_rename => !$opts->{is_vmstate},
+	export_formats => join(",", @export_formats),
+	volname => $name,
+    };
+    my $res = PVE::Tunnel::write_tunnel($tunnel, 600, 'disk-import', $disk_import_opts);
+    my $local = "/run/pve/$local_vmid.storage";
+    if (!$tunnel->{forwarded}->{$local}) {
+	PVE::Tunnel::forward_unix_socket($tunnel, $local, $res->{socket});
+    }
+    my $socket = IO::Socket::UNIX->new(Peer => $local, Type => SOCK_STREAM())
+	or die "failed to connect to websocket tunnel at $local\n";
+    # we won't be reading from the socket
+    shutdown($socket, 0);
+    my $disk_export_opts = {
+	snapshot => $snapshot,
+	migration_snapshot => $migration_snapshot,
+	with_snapshots => $with_snapshots,
+	ratelimit_bps => $bwlimit,
+	cmd => {
+	    output => '>&'.fileno($socket),
+	},
+    };
+    eval {
+	PVE::Storage::volume_export_start(
+	    $storecfg,
+	    $volid,
+	    $res->{format},
+	    sub { $log->('info', shift) },
+	    $disk_export_opts,
+	);
+    };
+    my $send_error = $@;
+    warn "$send_error\n" if $send_error;
+    # don't close the connection entirely otherwise the
+    # receiving end might not get all buffered data (and
+    # fails with 'connection reset by peer')
+    shutdown($socket, 1);
+    # wait for the remote process to finish
+    my $new_volid;
+    while ($res = PVE::Tunnel::write_tunnel($tunnel, 10, 'query-disk-import')) {
+	if ($res->{status} eq 'pending') {
+	    if (my $msg = $res->{msg}) {
+		$log->('info', "disk-import: $msg\n");
+	    } else {
+		$log->('info', "waiting for disk import to finish..\n");
+	    }
+	    sleep(1)
+	} elsif ($res->{status} eq 'complete') {
+	    $new_volid = $res->{volid};
+	    last;
+	} else {
+	    $log->('err', "unknown query-disk-import result: $res->{status}\n");
+	    last;
+	}
+    }
+    # now close the socket
+    close($socket);
+    if ($migration_snapshot) {
+	eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapshot, 0) };
+	warn "could not remove source snapshot: $@\n" if $@;
+    }
+    die $send_error if $send_error;
+    die "disk import failed - see log above\n" if !$new_volid;
+    return $new_volid;
+our $cmd_schema = {
+    bwlimit => {
+	storages => {
+	    type => 'string',
+	    format => 'pve-storage-id-list',
+	    description => "Storage for which bwlimit is queried",
+	},
+	bwlimit => {
+	    description => "Override I/O bandwidth limit (in KiB/s).",
+	    optional => 1,
+	    type => 'integer',
+	    minimum => '0',
+	},
+	operation => {
+	    description => 'Operation for which bwlimit is queried ("restore", "migration", "clone", "move")',
+	    type => 'string',
+	    default => 'migration',
+	    optional => 1,
+	},
+    },
+    'disk-import' => {
+	volname => {
+	    type => 'string',
+	    description => 'volume name to use as preferred target volume name',
+	},
+	format => PVE::JSONSchema::get_standard_option('pve-qm-image-format'),
+	export_formats => {
+	    type => 'string',
+	    description => 'list of supported export formats',
+	},
+	storage => {
+	    type => 'string',
+	    format => 'pve-storage-id',
+	},
+	snapshot => {
+	    description => "The current-state snapshot if the stream contains snapshots",
+	    type => 'string',
+	    pattern => qr/[a-z0-9_\-]{1,40}/i,
+	    optional => 1,
+	},
+	migration_snapshot => {
+	    type => 'boolean',
+	    optional => 1,
+	    description => '`snapshot` was created for migration and will be removed after import',
+	},
+	with_snapshots => {
+	    description => 'Whether the stream includes intermediate snapshots',
+	    type => 'boolean',
+	    optional => 1,
+	    default => 0,
+	},
+	allow_rename => {
+	    description => "Choose a new volume ID if the requested " .
+		"volume ID already exists, instead of throwing an error.",
+	    type => 'boolean',
+	    optional => 1,
+	    default => 0,
+	},
+    },
+    'query-disk-import' => {},
+sub handle_disk_import {
+    my ($state, $params) = @_;
+    die "disk import already running as PID '$state->{disk_import}->{pid}'\n"
+	if $state->{disk_import}->{pid};
+    my $storage = delete $params->{storage};
+    my $format = delete $params->{format};
+    my $volname = delete $params->{volname};
+    my $import = PVE::Storage::volume_import_start($state->{storecfg}, $storage, $volname, $format, $state->{vmid}, $params);
+    my $socket = $import->{socket};
+    $format = delete $import->{format};
+    $state->{sockets}->{$socket} = 1;
+    $state->{disk_import} = $import;
+    chown $state->{socket_uid}, -1, $socket;
+    return {
+	socket => $socket,
+	format => $format,
+    };
+sub handle_query_disk_import {
+    my ($state, $params) = @_;
+    die "no disk import running\n"
+	if !$state->{disk_import}->{pid};
+    my $pattern = PVE::Storage::volume_imported_message(undef, 1);
+    my $read_output = sub {
+	my ($timeout) = @_;
+	my $line;
+	eval {
+	    my $fh = $state->{disk_import}->{fh};
+	    PVE::Tools::run_with_timeout($timeout, sub { $line = <$fh>; });
+	    print "disk-import: $line\n" if $line;
+        };
+	return $line;
+    };
+    my $result = $read_output->(5);
+    # attempted read empty or timeout, and process has exited already
+    if (!$result && waitpid($state->{disk_import}->{pid}, WNOHANG)) {
+	my $msg = '';
+	# read any missed output
+	while (my $line = $read_output->(1)) {
+	    if ($line =~ $pattern) {
+		$result = $line;
+	    } else {
+		$msg .= "$line\n";
+	    }
+	}
+	my $unix = $state->{disk_import}->{socket};
+	unlink $unix;
+	delete $state->{sockets}->{$unix};
+	delete $state->{disk_import};
+	if (!$result) {
+	    $msg = "import process failed\n" if !$msg;
+	    return {
+		status => "error",
+		msg => $msg,
+	    };
+	}
+    }
+    if ($result && $result =~ $pattern) {
+	my $volid = $1;
+	waitpid($state->{disk_import}->{pid}, 0);
+	my $unix = $state->{disk_import}->{socket};
+	unlink $unix;
+	delete $state->{sockets}->{$unix};
+	delete $state->{disk_import};
+	$state->{cleanup}->{volumes}->{$volid} = 1;
+	return {
+	    status => "complete",
+	    volid => $volid,
+	};
+    } else {
+	return {
+	    status => "pending",
+	    msg => $result,
+	};
+    }
+sub handle_bwlimit {
+    my ($params) = @_;
+    my $op = $params->{operation} // "migration";
+    my $storages = $params->{storages};
+    my $override = $params->{bwlimit};
+    return { bwlimit => PVE::Storage::get_bandwidth_limit($op, $storages, $override) };

More information about the pve-devel mailing list