[pve-devel] [PATCH v5 guest-common 3/3] add storage tunnel module
Fabian Grünbichler
f.gruenbichler at proxmox.com
Wed Feb 9 14:07:36 CET 2022
encapsulating storage-related tunnel methods, currently
- source-side storage-migrate helper
- target-side disk-import handler
- target-side query-disk-import handler
- target-side bwlimit handler
to be extended further with replication-related handlers and helpers.
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
Notes:
v5:
- fix socket path to no longer reference qemu-server
- fix accidental setting of $base_snapshot when determining export formats
- pass $migration_snapshot to storage_migrate_snapshot
- delete snapshot based on $migration_snapshot, not $snapshot
v4:
- add 'bwlimit' command, extended to support multiple storages and operation override
- fix option/parameter names ('-' vs '_')
- storage_migrate: move bwlimit decision to caller
- storage_migrate: handle volids owned by third VMID
- storage_migrate: pass snapshot to volume_export_formats
- storage_migrate: pass log function to export helper
- disk-import: add missing parameters to schema
- query-disk-import: fix race when querying import result
new in v3, includes code previously in qemu-server
requires bumped pve-storage with new export/import helpers
src/Makefile | 1 +
src/PVE/StorageTunnel.pm | 296 +++++++++++++++++++++++++++++++++++++++
2 files changed, 297 insertions(+)
create mode 100644 src/PVE/StorageTunnel.pm
diff --git a/src/Makefile b/src/Makefile
index d82162c..baa2688 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -12,6 +12,7 @@ install: PVE
install -m 0644 PVE/ReplicationConfig.pm ${PERL5DIR}/PVE/
install -m 0644 PVE/ReplicationState.pm ${PERL5DIR}/PVE/
install -m 0644 PVE/Replication.pm ${PERL5DIR}/PVE/
+ install -m 0644 PVE/StorageTunnel.pm ${PERL5DIR}/PVE/
install -m 0644 PVE/Tunnel.pm ${PERL5DIR}/PVE/
install -d ${PERL5DIR}/PVE/VZDump
install -m 0644 PVE/VZDump/Plugin.pm ${PERL5DIR}/PVE/VZDump/
diff --git a/src/PVE/StorageTunnel.pm b/src/PVE/StorageTunnel.pm
new file mode 100644
index 0000000..41147dd
--- /dev/null
+++ b/src/PVE/StorageTunnel.pm
@@ -0,0 +1,296 @@
+package PVE::StorageTunnel;
+
+use strict;
+use warnings;
+
+use IO::Socket::UNIX;
+use POSIX qw(WNOHANG);
+use Socket qw(SOCK_STREAM);
+
+use PVE::Storage;
+use PVE::Tools;
+use PVE::Tunnel;
+
+sub storage_migrate {
+ my ($tunnel, $storecfg, $volid, $local_vmid, $remote_vmid, $opts, $log) = @_;
+
+ my $targetsid = $opts->{targetsid};
+ my $bwlimit = $opts->{bwlimit};
+
+ # JSONSchema and get_bandwidth_limit use kbps - storage_migrate bps
+ $bwlimit = $bwlimit * 1024 if defined($bwlimit);
+
+ # adapt volume name for import call
+ my ($sid, undef) = PVE::Storage::parse_volume_id($volid);
+ my (undef, $name, $owner, undef, undef, undef, $format) = PVE::Storage::parse_volname($storecfg, $volid);
+ my $scfg = PVE::Storage::storage_config($storecfg, $sid);
+ PVE::Storage::activate_volumes($storecfg, [$volid]);
+
+ die "failed to determine owner of volume '$volid'\n" if !defined($owner);
+ $log->('warn', "volume '$volid' owner by VM/CT '$owner', not '$local_vmid'\n")
+ if $owner != $local_vmid;
+
+ if ($owner != $remote_vmid) {
+ $name =~ s/-$owner-/-$remote_vmid-/g;
+ $name =~ s/^$owner\///; # re-added on target if dir-based storage
+ }
+
+ my $with_snapshots = $opts->{snapshots} ? 1 : 0;
+ my $snapshot;
+ my $migration_snapshot = PVE::Storage::storage_migrate_snapshot($storecfg, $sid, $with_snapshots);
+ if ($migration_snapshot) {
+ $snapshot = '__migration__';
+ $with_snapshots = 1;
+ }
+
+ my @export_formats = PVE::Storage::volume_export_formats($storecfg, $volid, $snapshot, undef, $with_snapshots);
+ die "no export formats for '$volid' - check storage plugin support!\n"
+ if !@export_formats;
+
+ my $disk_import_opts = {
+ format => $format,
+ storage => $targetsid,
+ snapshot => $snapshot,
+ migration_snapshot => $migration_snapshot,
+ with_snapshots => $with_snapshots,
+ allow_rename => !$opts->{is_vmstate},
+ export_formats => join(",", @export_formats),
+ volname => $name,
+ };
+ my $res = PVE::Tunnel::write_tunnel($tunnel, 600, 'disk-import', $disk_import_opts);
+ my $local = "/run/pve/$local_vmid.storage";
+ if (!$tunnel->{forwarded}->{$local}) {
+ PVE::Tunnel::forward_unix_socket($tunnel, $local, $res->{socket});
+ }
+ my $socket = IO::Socket::UNIX->new(Peer => $local, Type => SOCK_STREAM())
+ or die "failed to connect to websocket tunnel at $local\n";
+ # we won't be reading from the socket
+ shutdown($socket, 0);
+
+ my $disk_export_opts = {
+ snapshot => $snapshot,
+ migration_snapshot => $migration_snapshot,
+ with_snapshots => $with_snapshots,
+ ratelimit_bps => $bwlimit,
+ cmd => {
+ output => '>&'.fileno($socket),
+ },
+ };
+
+ eval {
+ PVE::Storage::volume_export_start(
+ $storecfg,
+ $volid,
+ $res->{format},
+ sub { $log->('info', shift) },
+ $disk_export_opts,
+ );
+ };
+ my $send_error = $@;
+ warn "$send_error\n" if $send_error;
+
+ # don't close the connection entirely otherwise the
+ # receiving end might not get all buffered data (and
+ # fails with 'connection reset by peer')
+ shutdown($socket, 1);
+
+ # wait for the remote process to finish
+ my $new_volid;
+ while ($res = PVE::Tunnel::write_tunnel($tunnel, 10, 'query-disk-import')) {
+ if ($res->{status} eq 'pending') {
+ if (my $msg = $res->{msg}) {
+ $log->('info', "disk-import: $msg\n");
+ } else {
+ $log->('info', "waiting for disk import to finish..\n");
+ }
+ sleep(1)
+ } elsif ($res->{status} eq 'complete') {
+ $new_volid = $res->{volid};
+ last;
+ } else {
+ $log->('err', "unknown query-disk-import result: $res->{status}\n");
+ last;
+ }
+ }
+
+ # now close the socket
+ close($socket);
+ if ($migration_snapshot) {
+ eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapshot, 0) };
+ warn "could not remove source snapshot: $@\n" if $@;
+ }
+ die $send_error if $send_error;
+ die "disk import failed - see log above\n" if !$new_volid;
+
+ return $new_volid;
+}
+
+our $cmd_schema = {
+ bwlimit => {
+ storages => {
+ type => 'string',
+ format => 'pve-storage-id-list',
+ description => "Storage for which bwlimit is queried",
+ },
+ bwlimit => {
+ description => "Override I/O bandwidth limit (in KiB/s).",
+ optional => 1,
+ type => 'integer',
+ minimum => '0',
+ },
+ operation => {
+ description => 'Operation for which bwlimit is queried ("restore", "migration", "clone", "move")',
+ type => 'string',
+ default => 'migration',
+ optional => 1,
+ },
+ },
+ 'disk-import' => {
+ volname => {
+ type => 'string',
+ description => 'volume name to use as preferred target volume name',
+ },
+ format => PVE::JSONSchema::get_standard_option('pve-qm-image-format'),
+ export_formats => {
+ type => 'string',
+ description => 'list of supported export formats',
+ },
+ storage => {
+ type => 'string',
+ format => 'pve-storage-id',
+ },
+ snapshot => {
+ description => "The current-state snapshot if the stream contains snapshots",
+ type => 'string',
+ pattern => qr/[a-z0-9_\-]{1,40}/i,
+ optional => 1,
+ },
+ migration_snapshot => {
+ type => 'boolean',
+ optional => 1,
+ description => '`snapshot` was created for migration and will be removed after import',
+ },
+ with_snapshots => {
+ description => 'Whether the stream includes intermediate snapshots',
+ type => 'boolean',
+ optional => 1,
+ default => 0,
+ },
+ allow_rename => {
+ description => "Choose a new volume ID if the requested " .
+ "volume ID already exists, instead of throwing an error.",
+ type => 'boolean',
+ optional => 1,
+ default => 0,
+ },
+ },
+ 'query-disk-import' => {},
+};
+
+sub handle_disk_import {
+ my ($state, $params) = @_;
+
+ die "disk import already running as PID '$state->{disk_import}->{pid}'\n"
+ if $state->{disk_import}->{pid};
+
+ my $storage = delete $params->{storage};
+ my $format = delete $params->{format};
+ my $volname = delete $params->{volname};
+
+ my $import = PVE::Storage::volume_import_start($state->{storecfg}, $storage, $volname, $format, $state->{vmid}, $params);
+
+ my $socket = $import->{socket};
+ $format = delete $import->{format};
+
+ $state->{sockets}->{$socket} = 1;
+ $state->{disk_import} = $import;
+
+ chown $state->{socket_uid}, -1, $socket;
+
+ return {
+ socket => $socket,
+ format => $format,
+ };
+}
+
+sub handle_query_disk_import {
+ my ($state, $params) = @_;
+
+ die "no disk import running\n"
+ if !$state->{disk_import}->{pid};
+
+ my $pattern = PVE::Storage::volume_imported_message(undef, 1);
+
+ my $read_output = sub {
+ my ($timeout) = @_;
+
+ my $line;
+
+ eval {
+ my $fh = $state->{disk_import}->{fh};
+ PVE::Tools::run_with_timeout($timeout, sub { $line = <$fh>; });
+ print "disk-import: $line\n" if $line;
+ };
+
+ return $line;
+ };
+
+ my $result = $read_output->(5);
+
+ # attempted read empty or timeout, and process has exited already
+ if (!$result && waitpid($state->{disk_import}->{pid}, WNOHANG)) {
+ my $msg = '';
+
+ # read any missed output
+ while (my $line = $read_output->(1)) {
+ if ($line =~ $pattern) {
+ $result = $line;
+ } else {
+ $msg .= "$line\n";
+ }
+ }
+
+ my $unix = $state->{disk_import}->{socket};
+ unlink $unix;
+ delete $state->{sockets}->{$unix};
+ delete $state->{disk_import};
+
+ if (!$result) {
+ $msg = "import process failed\n" if !$msg;
+ return {
+ status => "error",
+ msg => $msg,
+ };
+ }
+ }
+
+ if ($result && $result =~ $pattern) {
+ my $volid = $1;
+ waitpid($state->{disk_import}->{pid}, 0);
+
+ my $unix = $state->{disk_import}->{socket};
+ unlink $unix;
+ delete $state->{sockets}->{$unix};
+ delete $state->{disk_import};
+ $state->{cleanup}->{volumes}->{$volid} = 1;
+ return {
+ status => "complete",
+ volid => $volid,
+ };
+ } else {
+ return {
+ status => "pending",
+ msg => $result,
+ };
+ }
+}
+
+sub handle_bwlimit {
+ my ($params) = @_;
+
+ my $op = $params->{operation} // "migration";
+ my $storages = $params->{storages};
+ my $override = $params->{bwlimit};
+
+ return { bwlimit => PVE::Storage::get_bandwidth_limit($op, $storages, $override) };
+}
--
2.30.2
More information about the pve-devel
mailing list