[pve-devel] [PATCH v3 guest-common 3/3] add storage tunnel module

Fabian Ebner f.ebner at proxmox.com
Wed Jan 5 11:50:28 CET 2022


Am 22.12.21 um 14:52 schrieb Fabian Grünbichler:
> encapsulating storage-related tunnel methods, currently
> - source-side storage-migrate helper
> - target-side disk-import handler
> - target-side query-disk-import handler
> 
> to be extended further with replication-related handlers and helpers.
> 
> Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
> ---
> 
> Notes:
>      new in v3, includes code previously in qemu-server
> 
>   src/Makefile             |   1 +
>   src/PVE/StorageTunnel.pm | 231 +++++++++++++++++++++++++++++++++++++++
>   2 files changed, 232 insertions(+)
>   create mode 100644 src/PVE/StorageTunnel.pm
> 
> diff --git a/src/Makefile b/src/Makefile
> index d82162c..baa2688 100644
> --- a/src/Makefile
> +++ b/src/Makefile
> @@ -12,6 +12,7 @@ install: PVE
>   	install -m 0644 PVE/ReplicationConfig.pm ${PERL5DIR}/PVE/
>   	install -m 0644 PVE/ReplicationState.pm ${PERL5DIR}/PVE/
>   	install -m 0644 PVE/Replication.pm ${PERL5DIR}/PVE/
> +	install -m 0644 PVE/StorageTunnel.pm ${PERL5DIR}/PVE/
>   	install -m 0644 PVE/Tunnel.pm ${PERL5DIR}/PVE/
>   	install -d ${PERL5DIR}/PVE/VZDump
>   	install -m 0644 PVE/VZDump/Plugin.pm ${PERL5DIR}/PVE/VZDump/
> diff --git a/src/PVE/StorageTunnel.pm b/src/PVE/StorageTunnel.pm
> new file mode 100644
> index 0000000..06902ef
> --- /dev/null
> +++ b/src/PVE/StorageTunnel.pm
> @@ -0,0 +1,231 @@
> +package PVE::StorageTunnel;
> +
> +use strict;
> +use warnings;
> +
> +use IO::Socket::UNIX;
> +use POSIX qw(WNOHANG);
> +use Socket qw(SOCK_STREAM);
> +
> +use PVE::Tools;
> +use PVE::Tunnel;
> +use PVE::Storage;
> +
> +sub storage_migrate {
> +    my ($tunnel, $storecfg, $volid, $local_vmid, $remote_vmid, $opts, $log) = @_;
> +
> +    my $bwlimit = $opts->{bwlimit};
> +    my $targetsid = $opts->{targetsid};
> +
> +    # use 'migrate' limit for transfer to other node
> +    my $bwlimit_opts = {
> +	storage => $targetsid,
> +	bwlimit => $bwlimit,
> +    };
> +    my $remote_bwlimit = PVE::Tunnel::write_tunnel($tunnel, 10, 'bwlimit', $bwlimit_opts);
> +    $remote_bwlimit = $remote_bwlimit->{bwlimit};
> +    if (defined($remote_bwlimit)) {
> +	$bwlimit = $remote_bwlimit if !defined($bwlimit);
> +	$bwlimit = $remote_bwlimit if $remote_bwlimit < $bwlimit;
> +    }
> +
> +    # JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
> +    $bwlimit = $bwlimit * 1024 if defined($bwlimit);
> +
> +    # adapt volume name for import call
> +    my ($sid, undef) = PVE::Storage::parse_volume_id($volid);
> +    my (undef, $name, undef, undef, undef, undef, $format) = PVE::Storage::parse_volname($storecfg, $volid);
> +    my $scfg = PVE::Storage::storage_config($storecfg, $sid);
> +    PVE::Storage::activate_volumes($storecfg, [$volid]);
> +    if ($local_vmid != $remote_vmid) {
> +	$name =~ s/-$local_vmid-/-$remote_vmid-/g;
> +	$name =~ s/^$local_vmid\///; # re-added on target if dir-based storage
> +    }
> +
> +    my $with_snapshots = $opts->{snapshots} ? 1 : 0;
> +    my $snapshot;
> +    my $migration_snapshot = PVE::Storage::storage_migrate_snapshot($storecfg, $sid);
> +    if ($migration_snapshot) {
> +	$snapshot = '__migration__';
> +	$with_snapshots = 1;
> +    }
> +
> +    my @export_formats = PVE::Storage::volume_export_formats($storecfg, $volid, undef, undef, $with_snapshots);
> +
> +    my $disk_import_opts = {
> +	format => $format,
> +	storage => $targetsid,
> +	snapshot => $snapshot,
> +	migration_snapshot => $migration_snapshot,

Noticed while looking at how volume_import_start is used:
snapshot and migration_snapshot are not defined in the schema for 
'disk-import' below

> +	'with-snapshots' => $with_snapshots,
> +	'allow-rename' => !$opts->{is_vmstate},
> +	'export-formats' => join(",", @export_formats),
> +	volname => $name,
> +    };
> +    my $res = PVE::Tunnel::write_tunnel($tunnel, 600, 'disk-import', $disk_import_opts);
> +    my $local = "/run/qemu-server/$local_vmid.storage";
> +    if (!$tunnel->{forwarded}->{$local}) {
> +	PVE::Tunnel::forward_unix_socket($tunnel, $local, $res->{socket});
> +    }
> +    my $socket = IO::Socket::UNIX->new(Peer => $local, Type => SOCK_STREAM())
> +	or die "failed to connect to websocket tunnel at $local\n";
> +    # we won't be reading from the socket
> +    shutdown($socket, 0);
> +
> +    my $disk_export_opts = {
> +	snapshot => $snapshot,
> +	migration_snapshot => $migration_snapshot,
> +	'with-snapshots' => $with_snapshots,
> +	ratelimit_bps => $bwlimit,
> +	cmd => {
> +	    output => '>&'.fileno($socket),
> +	},
> +    };
> +
> +    eval {
> +	PVE::Storage::volume_export_start(
> +	    $storecfg,
> +	    $volid,
> +	    $res->{format},
> +	    $disk_export_opts,
> +	);
> +    };
> +    my $send_error = $@;
> +    warn "$send_error\n" if $send_error;
> +
> +    # don't close the connection entirely otherwise the
> +    # receiving end might not get all buffered data (and
> +    # fails with 'connection reset by peer')
> +    shutdown($socket, 1);
> +
> +    # wait for the remote process to finish
> +    my $new_volid;
> +    while ($res = PVE::Tunnel::write_tunnel($tunnel, 10, 'query-disk-import')) {
> +	if ($res->{status} eq 'pending') {
> +	    if (my $msg = $res->{msg}) {
> +		$log->("disk-import: $msg\n");
> +	    } else {
> +		$log->("waiting for disk import to finish..\n");
> +	    }
> +	    sleep(1)
> +	} elsif ($res->{status} eq 'complete') {
> +	    $new_volid = $res->{volid};
> +	    last;
> +	} else {
> +	    warn "unknown query-disk-import result: $res->{status}\n";
> +	    last;
> +	}
> +    }
> +
> +    # now close the socket
> +    close($socket);
> +    if ($snapshot) {
> +	eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapshot, 0) };
> +	warn "could not remove source snapshot: $@\n" if $@;
> +    }
> +    die $send_error if $send_error;
> +    die "disk import failed - see log above\n" if !$new_volid;
> +
> +    return $new_volid;
> +}
> +
> +our $cmd_schema = {
> +    'disk-import' => {
> +	volname => {
> +	    type => 'string',
> +	    description => 'volume name to use as preferred target volume name',
> +	},
> +	format => PVE::JSONSchema::get_standard_option('pve-qm-image-format'),
> +	'export-formats' => {
> +	    type => 'string',
> +	    description => 'list of supported export formats',
> +	},
> +	storage => {
> +	    type => 'string',
> +	    format => 'pve-storage-id',
> +	},
> +	'with-snapshots' => {
> +	    description =>
> +	        "Whether the stream includes intermediate snapshots",
> +	    type => 'boolean',
> +	    optional => 1,
> +	    default => 0,
> +	},
> +	'allow-rename' => {
> +	    description => "Choose a new volume ID if the requested " .
> +		"volume ID already exists, instead of throwing an error.",
> +	    type => 'boolean',
> +	    optional => 1,
> +	    default => 0,
> +	},
> +    },
> +};
> +
> +sub handle_disk_import {
> +    my ($state, $params) = @_;
> +
> +    die "disk import already running as PID '$state->{disk_import}->{pid}'\n"
> +	if $state->{disk_import}->{pid};
> +
> +    my $storage = delete $params->{storage};
> +    my $format = delete $params->{format};
> +    my $volname = delete $params->{volname};
> +
> +    my $import = PVE::Storage::volume_import_start($state->{storecfg}, $storage, $volname, $format, $state->{vmid}, $params);
> +
> +    my $socket = $import->{socket};
> +    $format = delete $import->{format};
> +
> +    $state->{sockets}->{$socket} = 1;
> +    $state->{disk_import} = $import;
> +
> +    chown $state->{socket_uid}, -1, $socket;
> +
> +    return {
> +	socket => $socket,
> +	format => $format,
> +    };
> +}
> +
> +sub handle_query_disk_import {
> +    my ($state, $params) = @_;
> +
> +    die "no disk import running\n"
> +	if !$state->{disk_import}->{pid};
> +
> +    my $pattern = PVE::Storage::volume_imported_message(undef, 1);
> +    my $result;
> +    eval {
> +	my $fh = $state->{disk_import}->{fh};
> +	PVE::Tools::run_with_timeout(5, sub { $result = <$fh>; });
> +	print "disk-import: $result\n" if $result;
> +    };
> +    if ($result && $result =~ $pattern) {
> +	my $volid = $1;
> +	waitpid($state->{disk_import}->{pid}, 0);
> +
> +	my $unix = $state->{disk_import}->{socket};
> +	unlink $unix;
> +	delete $state->{sockets}->{$unix};
> +	delete $state->{disk_import};
> +	$state->{cleanup}->{volumes}->{$volid} = 1;
> +	return {
> +	    status => "complete",
> +	    volid => $volid,
> +	};
> +    } elsif (!$result && waitpid($state->{disk_import}->{pid}, WNOHANG)) {
> +	my $unix = $state->{disk_import}->{socket};
> +	unlink $unix;
> +	delete $state->{sockets}->{$unix};
> +	delete $state->{disk_import};
> +
> +	return {
> +	    status => "error",
> +	};
> +    } else {
> +	return {
> +	    status => "pending",
> +	    msg => $result,
> +	};
> +    }
> +}





More information about the pve-devel mailing list