[pve-devel] [RFC PATCH manager] WIP: api: implement node-independent bulk actions

Dominik Csapak d.csapak at proxmox.com
Tue Mar 18 11:39:27 CET 2025


To achieve this, we start a worker task and use our generic api client
to start the tasks on the relevant nodes. The client always points
to 'localhost' so we let the pveproxy worry about the proxying etc.

We reuse some logic from the startall/stopall/etc. calls, like getting
the ordered guest info list.

Not yet implemented are:
* filters
* failure mode resolution (we could implement this later too)
* token handling (not sure if we need this at all if we check the
  permissions correctly upfront?)
* suspend
* some call specific parameters

Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
this is a pre-requisite for having bulk actions on PDM.
Since we normally don't do such things 'cluster-wide' I wanted to
send the patch early to get feedback on my design decisons like:
* using the api client this way
* api naming/location
* handling parallel requests
* etc.

There are alternative methods to achieve similar results:
* use some kind of queuing system on the cluster (e.g. via pmxcfs)
* using the 'startall'/'stopall' calls from pve in PDM
* surely some other thing I didn't think about

We can of course start with this, and change the underlying mechanism
later too.

If we go this route, I could also rewrite the code in rust if wanted,
since there is nothing particularly dependent on perl here
(besides getting the vmlist, but that could stay in perl).
The bulk of the logic is how to start tasks + handle them finishing +
handling filter + concurrency.

 PVE/API2/Cluster.pm       |   7 +
 PVE/API2/Cluster/Bulk.pm  | 475 ++++++++++++++++++++++++++++++++++++++
 PVE/API2/Cluster/Makefile |   1 +
 PVE/API2/Nodes.pm         |  24 +-
 4 files changed, 496 insertions(+), 11 deletions(-)
 create mode 100644 PVE/API2/Cluster/Bulk.pm

diff --git a/PVE/API2/Cluster.pm b/PVE/API2/Cluster.pm
index a0e5c11b..478610e6 100644
--- a/PVE/API2/Cluster.pm
+++ b/PVE/API2/Cluster.pm
@@ -25,6 +25,7 @@ use PVE::API2::ACMEAccount;
 use PVE::API2::ACMEPlugin;
 use PVE::API2::Backup;
 use PVE::API2::Cluster::BackupInfo;
+use PVE::API2::Cluster::Bulk;
 use PVE::API2::Cluster::Ceph;
 use PVE::API2::Cluster::Mapping;
 use PVE::API2::Cluster::Jobs;
@@ -103,6 +104,11 @@ __PACKAGE__->register_method ({
     path => 'mapping',
 });
 
+__PACKAGE__->register_method ({
+    subclass => "PVE::API2::Cluster::Bulk",
+    path => 'bulk-actions',
+});
+
 if ($have_sdn) {
     __PACKAGE__->register_method ({
        subclass => "PVE::API2::Network::SDN",
@@ -162,6 +168,7 @@ __PACKAGE__->register_method ({
 	    { name => 'resources' },
 	    { name => 'status' },
 	    { name => 'tasks' },
+	    { name => 'bulk-actions' },
 	];
 
 	if ($have_sdn) {
diff --git a/PVE/API2/Cluster/Bulk.pm b/PVE/API2/Cluster/Bulk.pm
new file mode 100644
index 00000000..05a79155
--- /dev/null
+++ b/PVE/API2/Cluster/Bulk.pm
@@ -0,0 +1,475 @@
+package PVE::API2::Cluster::Bulk;
+
+use strict;
+use warnings;
+
+use PVE::APIClient::LWP;
+use PVE::AccessControl;
+use PVE::Cluster;
+use PVE::Exception qw(raise raise_perm_exc raise_param_exc);
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::RESTHandler;
+use PVE::RPCEnvironment;
+use PVE::Tools qw();
+
+use PVE::API2::Nodes;
+
+use base qw(PVE::RESTHandler);
+
+
+__PACKAGE__->register_method ({
+    name => 'index',
+    path => '',
+    method => 'GET',
+    description => "Bulk action index.",
+    permissions => { user => 'all' },
+    parameters => {
+	additionalProperties => 0,
+	properties => {},
+    },
+    returns => {
+	type => 'array',
+	items => {
+	    type => "object",
+	    properties => {},
+	},
+	links => [ { rel => 'child', href => "{name}" } ],
+    },
+    code => sub {
+	my ($param) = @_;
+
+	return [
+	    { name => 'start' },
+	    { name => 'shutdown' },
+	    { name => 'migrate' },
+	];
+    }});
+
+sub create_client {
+    my ($authuser, $request_timeout) = @_;
+    my ($user, undef) = PVE::AccessControl::split_tokenid($authuser, 1);
+
+    # TODO: How to handle Tokens?
+    my $ticket = PVE::AccessControl::assemble_ticket($user || $authuser);
+    my $csrf_token = PVE::AccessControl::assemble_csrf_prevention_token($user || $authuser);
+
+    my $node = PVE::INotify::nodename();
+    my $fingerprint = PVE::Cluster::get_node_fingerprint($node);
+
+    my $conn_args = {
+	protocol => 'https',
+	host => 'localhost', # always call the api locally, let pveproxy handle the proxying
+	port => 8006,
+	ticket => $ticket,
+	timeout => $request_timeout // 25, # default slightly shorter than the proxy->daemon timeout
+	cached_fingerprints => {
+	    $fingerprint => 1,
+	}
+    };
+
+    my $api_client = PVE::APIClient::LWP->new($conn_args->%*);
+    if (defined($csrf_token)) {
+	$api_client->update_csrftoken($csrf_token);
+    }
+
+    return $api_client;
+}
+
+# takes a vm list in the form of
+# {
+#     0 => {
+#         100 => { .. guest info ..},
+#         101 => { .. guest info ..},
+#     },
+#     1 => {
+#         102 => { .. guest info ..},
+#         103 => { .. guest info ..},
+#     },
+# }
+#
+# max_workers: how many parallel tasks should be started.
+# start_task: a sub that returns eiter a upid or 1 (undef means failure)
+# check_task: if start_task returned a upid, will wait for that to finish and
+#    call check_task with the resulting task status
+sub foreach_guest {
+    my ($startlist, $max_workers, $start_task, $check_task) = @_;
+
+    my $rpcenv = PVE::RPCEnvironment::get();
+    my $authuser = $rpcenv->get_user();
+    my $api_client = create_client($authuser);
+
+    my $failed = [];
+    for my $order (sort {$a <=> $b} keys $startlist->%*) {
+	my $vmlist = $startlist->{$order};
+	my $workers = {};
+
+	for my $vmid (sort {$a <=> $b} keys $vmlist->%*) {
+
+	    # wait until at least one slot is free
+	    while (scalar(keys($workers->%*)) >= $max_workers) {
+		for my $upid (keys($workers->%*)) {
+		    my $worker = $workers->{$upid};
+		    my $node = $worker->{guest}->{node};
+
+		    my $task = $api_client->get("/nodes/$node/tasks/$upid/status");
+		    if ($task->{status} ne 'running') {
+			my $is_error = PVE::Tools::upid_status_is_error($task->{exitstatus});
+			push $failed->@*, $worker->{vmid} if $is_error;
+
+			$check_task->($api_client, $worker->{vmid}, $worker->{guest}, $is_error, $task);
+
+			delete $workers->{$upid};
+		    }
+		}
+		sleep(1); # How much?
+	    }
+
+	    my $guest = $vmlist->{$vmid};
+	    my $upid = eval { $start_task->($api_client, $vmid, $guest) };
+	    warn $@ if $@;
+
+	    # success but no task necessary
+	    next if "$upid" eq "1";
+
+	    if (!defined($upid)) {
+		push $failed->@*, $vmid;
+		continue;
+	    }
+
+	    $workers->{$upid} = {
+		vmid => $vmid,
+		guest => $guest,
+	    };
+	}
+
+	# wait until current order is finished
+	for my $upid (keys($workers->%*)) {
+	    my $worker = $workers->{$upid};
+	    my $node = $worker->{guest}->{node};
+
+	    my $task = wait_for_task_finished($api_client, $node, $upid);
+	    my $is_error = PVE::Tools::upid_status_is_error($task->{exitstatus});
+	    push $failed->@*, $worker->{vmid} if $is_error;
+
+	    $check_task->($api_client, $worker->{vmid}, $worker->{guest}, $is_error, $task);
+
+	    delete $workers->{$upid};
+	}
+    }
+
+    return $failed;
+}
+
+sub get_type_text {
+    my ($type) = @_;
+
+    if ($type eq 'lxc') {
+	return 'CT';
+    } elsif ($type eq 'qemu') {
+	return 'VM';
+    } else {
+	die "unknown guest type $type\n";
+    }
+}
+
+sub wait_for_task_finished {
+    my ($client, $node, $upid) = @_;
+
+    while (1) {
+	my $task = $client->get("/nodes/$node/tasks/$upid/status");
+	return $task if $task->{status} ne 'running';
+	sleep(1); # How much time?
+    }
+}
+
+sub check_guest_permissions {
+    my ($rpcenv, $authuser, $vmlist, $priv_list) = @_;
+
+    my @vms = PVE::Tools::split_list($vmlist);
+    if (scalar(@vms) > 0) {
+	$rpcenv->check($authuser, "/vms/$_", $priv_list) for @vms;
+    } elsif (!$rpcenv->check($authuser, "/", $priv_list, 1)) {
+	raise_perm_exc("/, VM.PowerMgmt");
+    }
+}
+
+__PACKAGE__->register_method({
+    name => 'start',
+    path => 'start',
+    method => 'POST',
+    description => "Bulk start all guests on the cluster.",
+    permissions => { user => 'all' },
+    protected => 1,
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    vms => {
+		description => "Only consider guests from this comma separated list of VMIDs.",
+		type => 'string',  format => 'pve-vmid-list',
+		optional => 1,
+	    },
+	    'max-workers' => {
+		description => "How many parallel tasks at maximum should be started.",
+		optional => 1,
+		default => 1,
+		type => 'integer',
+	    },
+	    # TODO:
+	    # Failure resolution mode (fail, warn, retry?)
+	    # mode-limits (offline only, suspend only, ?)
+	    # filter (tags, name, ?)
+	},
+    },
+    returns => {
+	type => 'string',
+	description => "UPID of the worker",
+    },
+    code => sub {
+	my ($param) = @_;
+
+	my $rpcenv = PVE::RPCEnvironment::get();
+	my $authuser = $rpcenv->get_user();
+
+	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.PowerMgmt' ]);
+
+	my $code = sub {
+	    my $startlist = PVE::API2::Nodes::Nodeinfo::get_start_stop_list(undef, undef, $param->{vms});
+
+	    my $start_task = sub {
+		my ($api_client, $vmid, $guest) = @_;
+		my $node = $guest->{node};
+
+		my $type = $guest->{type};
+		my $type_text = get_type_text($type);
+
+		my $url = "/nodes/$node/$type/$vmid/status/start";
+		print STDERR "Starting $type_text $vmid\n";
+		return $api_client->post($url);
+	    };
+
+	    my $check_task = sub {
+		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
+		my $node = $guest->{node};
+
+		my $default_delay = 0;
+
+		if (!$is_error) {
+		    my $delay = defined($guest->{up}) ? int($guest->{up}) : $default_delay;
+		    if ($delay > 0) {
+			print STDERR "Waiting for $delay seconds (startup delay)\n" if $guest->{up};
+			for (my $i = 0; $i < $delay; $i++) {
+			    sleep(1);
+			}
+		    }
+		} else {
+		    my $type_text = get_type_text($guest->{type});
+		    print STDERR "Starting $type_text $vmid failed: $task->{exitstatus}\n";
+		}
+	    };
+
+	    my $max_workers = $param->{'max-workers'} // 1;
+	    my $failed = foreach_guest($startlist, $max_workers, $start_task, $check_task);
+
+	    if (scalar($failed->@*)) {
+		die "Some guests failed to start: " . join(', ', $failed->@*) . "\n";
+	    }
+	};
+
+	return $rpcenv->fork_worker('bulkstart', undef, $authuser, $code);
+    }});
+
+__PACKAGE__->register_method({
+    name => 'shutdown',
+    path => 'shutdown',
+    method => 'POST',
+    description => "Bulk shutdown all guests on the cluster.",
+    permissions => { user => 'all' },
+    protected => 1,
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    vms => {
+		description => "Only consider guests from this comma separated list of VMIDs.",
+		type => 'string',  format => 'pve-vmid-list',
+		optional => 1,
+	    },
+	    timeout => {
+		description => "Default shutdown timeout in seconds if none is configured for the guest.",
+		type => 'integer',
+		default => 180,
+		optional => 1,
+	    },
+	    'max-workers' => {
+		description => "How many parallel tasks at maximum should be started.",
+		optional => 1,
+		default => 1,
+		type => 'integer',
+	    },
+	    # TODO:
+	    # Failure resolution mode (fail, warn, retry?)
+	    # mode-limits (offline only, suspend only, ?)
+	    # filter (tags, name, ?)
+	},
+    },
+    returns => {
+	type => 'string',
+	description => "UPID of the worker",
+    },
+    code => sub {
+	my ($param) = @_;
+
+	my $rpcenv = PVE::RPCEnvironment::get();
+	my $authuser = $rpcenv->get_user();
+
+	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.PowerMgmt' ]);
+
+	my $code = sub {
+	    my $startlist = PVE::API2::Nodes::Nodeinfo::get_start_stop_list(undef, undef, $param->{vms});
+
+	    # reverse order for shutdown
+	    for my $order (keys $startlist->%*) {
+		my $list = delete $startlist->{$order};
+		$order = $order * -1;
+		$startlist->{$order} = $list;
+	    }
+
+	    my $start_task = sub {
+		my ($api_client, $vmid, $guest) = @_;
+		my $node = $guest->{node};
+
+		my $type = $guest->{type};
+		my $type_text = get_type_text($type);
+
+		my $timeout = int($guest->{down} // $param->{timeout} // 180);
+
+		my $params = {
+		    forceStop => 1,
+		    timeout => $timeout,
+		};
+
+		my $url = "/nodes/$node/$type/$vmid/status/shutdown";
+		print STDERR "Shutting down $type_text $vmid (Timeout = $timeout seconds)\n";
+		return $api_client->post($url, $params);
+	    };
+
+	    my $check_task = sub {
+		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
+		my $node = $guest->{node};
+		    if ($is_error) {
+			my $type_text = get_type_text($guest->{type});
+			print STDERR "Stopping $type_text $vmid failed: $task->{exitstatus}\n";
+		    }
+	    };
+
+	    my $max_workers = $param->{'max-workers'} // 1;
+	    my $failed = foreach_guest($startlist, $max_workers, $start_task, $check_task);
+
+	    if (scalar($failed->@*)) {
+		die "Some guests failed to shutdown " . join(', ', $failed->@*) . "\n";
+	    }
+	};
+
+	return $rpcenv->fork_worker('bulkshutdown', undef, $authuser, $code);
+    }});
+
+__PACKAGE__->register_method({
+    name => 'migrate',
+    path => 'migrate',
+    method => 'POST',
+    description => "Bulk migrate all guests on the cluster.",
+    permissions => { user => 'all' },
+    protected => 1,
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    vms => {
+		description => "Only consider guests from this comma separated list of VMIDs.",
+		type => 'string',  format => 'pve-vmid-list',
+		optional => 1,
+	    },
+	    'target-node' => get_standard_option('pve-node', { description => "Target node." }),
+	    online => {
+		type => 'boolean',
+		description => "Enable live migration for VMs and restart migration for CTs.",
+		optional => 1,
+	    },
+	    "with-local-disks" => {
+		type => 'boolean',
+		description => "Enable live storage migration for local disk",
+		optional => 1,
+	    },
+	    'max-workers' => {
+		description => "How many parallel tasks at maximum should be started.",
+		optional => 1,
+		default => 1,
+		type => 'integer',
+	    },
+	    # TODO:
+	    # Failure resolution mode (fail, warn, retry?)
+	    # mode-limits (offline only, suspend only, ?)
+	    # filter (tags, name, ?)
+	},
+    },
+    returns => {
+	type => 'string',
+	description => "UPID of the worker",
+    },
+    code => sub {
+	my ($param) = @_;
+
+	my $rpcenv = PVE::RPCEnvironment::get();
+	my $authuser = $rpcenv->get_user();
+
+	check_guest_permissions($rpcenv, $authuser, $param->{vms}, [ 'VM.Migrate' ]);
+
+	my $code = sub {
+	    my $list = PVE::API2::Nodes::Nodeinfo::get_filtered_vmlist(undef, $param->{vms}, 1, 1);
+
+	    my $start_task = sub {
+		my ($api_client, $vmid, $guest) = @_;
+		my $node = $guest->{node};
+
+		my $type = $guest->{type};
+		my $type_text = get_type_text($type);
+
+		if ($node eq $param->{'target-node'}) {
+		    print STDERR "$type_text $vmid already on $node, skipping.\n";
+		    return 1;
+		}
+
+		my $params = {
+		    target => $param->{'target-node'},
+		};
+
+		if ($type eq 'lxc') {
+		    $params->{restart} = $param->{online} if defined($param->{online});
+		} elsif ($type eq 'qemu') {
+		    $params->{online} = $param->{online} if defined($param->{online});
+		    $params->{'with-local-disks'} = $param->{'with-local-disks'} if defined($param->{'with-local-disks'});
+		}
+
+		my $url = "/nodes/$node/$type/$vmid/migrate";
+		print STDERR "Migrating $type_text $vmid\n";
+		return $api_client->post($url, $params);
+	    };
+
+	    my $check_task = sub {
+		my ($api_client, $vmid, $guest, $is_error, $task) = @_;
+		if ($is_error) {
+		    my $type_text = get_type_text($guest->{type});
+		    print STDERR "Migrating $type_text $vmid failed: $task->{exitstatus}\n";
+		}
+	    };
+
+	    my $max_workers = $param->{'max-workers'} // 1;
+	    my $failed = foreach_guest({ '0' => $list }, $max_workers, $start_task, $check_task);
+
+	    if (scalar($failed->@*)) {
+		die "Some guests failed to migrate " . join(', ', $failed->@*) . "\n";
+	    }
+	};
+
+	return $rpcenv->fork_worker('bulkmigrate', undef, $authuser, $code);
+    }});
+
+1;
diff --git a/PVE/API2/Cluster/Makefile b/PVE/API2/Cluster/Makefile
index b109e5cb..ed02b4be 100644
--- a/PVE/API2/Cluster/Makefile
+++ b/PVE/API2/Cluster/Makefile
@@ -6,6 +6,7 @@ SUBDIRS=Mapping
 # ensure we do not conflict with files shipped by pve-cluster!!
 PERLSOURCE= 			\
 	BackupInfo.pm		\
+	Bulk.pm			\
 	MetricServer.pm		\
 	Mapping.pm		\
 	Notifications.pm		\
diff --git a/PVE/API2/Nodes.pm b/PVE/API2/Nodes.pm
index 9cdf19db..02bc7299 100644
--- a/PVE/API2/Nodes.pm
+++ b/PVE/API2/Nodes.pm
@@ -1829,7 +1829,7 @@ __PACKAGE__->register_method({
 # * vmid whitelist
 # * guest is a template (default: skip)
 # * guest is HA manged (default: skip)
-my $get_filtered_vmlist = sub {
+sub get_filtered_vmlist {
     my ($nodename, $vmfilter, $templates, $ha_managed) = @_;
 
     my $vmlist = PVE::Cluster::get_vmlist();
@@ -1856,13 +1856,14 @@ my $get_filtered_vmlist = sub {
 		die "unknown virtual guest type '$d->{type}'\n";
 	    }
 
-	    my $conf = $class->load_config($vmid);
+	    my $conf = $class->load_config($vmid, $d->{node});
 	    return if !$templates && $class->is_template($conf);
 	    return if !$ha_managed && PVE::HA::Config::vm_is_ha_managed($vmid);
 
 	    $res->{$vmid}->{conf} = $conf;
 	    $res->{$vmid}->{type} = $d->{type};
 	    $res->{$vmid}->{class} = $class;
+	    $res->{$vmid}->{node} = $d->{node};
 	};
 	warn $@ if $@;
     }
@@ -1871,13 +1872,13 @@ my $get_filtered_vmlist = sub {
 };
 
 # return all VMs which should get started/stopped on power up/down
-my $get_start_stop_list = sub {
+sub get_start_stop_list {
     my ($nodename, $autostart, $vmfilter) = @_;
 
     # do not skip HA vms on force or if a specific VMID set is wanted
     my $include_ha_managed = defined($vmfilter) ? 1 : 0;
 
-    my $vmlist = $get_filtered_vmlist->($nodename, $vmfilter, undef, $include_ha_managed);
+    my $vmlist = get_filtered_vmlist($nodename, $vmfilter, undef, $include_ha_managed);
 
     my $resList = {};
     foreach my $vmid (keys %$vmlist) {
@@ -1889,15 +1890,16 @@ my $get_start_stop_list = sub {
 
 	$resList->{$order}->{$vmid} = $startup;
 	$resList->{$order}->{$vmid}->{type} = $vmlist->{$vmid}->{type};
+	$resList->{$order}->{$vmid}->{node} = $vmlist->{$vmid}->{node};
     }
 
     return $resList;
-};
+}
 
 my $remove_locks_on_startup = sub {
     my ($nodename) = @_;
 
-    my $vmlist = &$get_filtered_vmlist($nodename, undef, undef, 1);
+    my $vmlist = get_filtered_vmlist($nodename, undef, undef, 1);
 
     foreach my $vmid (keys %$vmlist) {
 	my $conf = $vmlist->{$vmid}->{conf};
@@ -1983,7 +1985,7 @@ __PACKAGE__->register_method ({
 	    warn $@ if $@;
 
 	    my $autostart = $force ? undef : 1;
-	    my $startList = $get_start_stop_list->($nodename, $autostart, $param->{vms});
+	    my $startList = get_start_stop_list($nodename, $autostart, $param->{vms});
 
 	    # Note: use numeric sorting with <=>
 	    for my $order (sort {$a <=> $b} keys %$startList) {
@@ -2096,7 +2098,7 @@ __PACKAGE__->register_method ({
 		minimum => 0,
 		maximum => 2 * 3600, # mostly arbitrary, but we do not want to high timeouts
 	    },
-	},
+	}
     },
     returns => {
 	type => 'string',
@@ -2123,7 +2125,7 @@ __PACKAGE__->register_method ({
 
 	    $rpcenv->{type} = 'priv'; # to start tasks in background
 
-	    my $stopList = $get_start_stop_list->($nodename, undef, $param->{vms});
+	    my $stopList = get_start_stop_list($nodename, undef, $param->{vms});
 
 	    my $cpuinfo = PVE::ProcFSTools::read_cpuinfo();
 	    my $datacenterconfig = cfs_read_file('datacenter.cfg');
@@ -2246,7 +2248,7 @@ __PACKAGE__->register_method ({
 
 	    $rpcenv->{type} = 'priv'; # to start tasks in background
 
-	    my $toSuspendList = $get_start_stop_list->($nodename, undef, $param->{vms});
+	    my $toSuspendList = get_start_stop_list($nodename, undef, $param->{vms});
 
 	    my $cpuinfo = PVE::ProcFSTools::read_cpuinfo();
 	    my $datacenterconfig = cfs_read_file('datacenter.cfg');
@@ -2434,7 +2436,7 @@ __PACKAGE__->register_method ({
 	my $code = sub {
 	    $rpcenv->{type} = 'priv'; # to start tasks in background
 
-	    my $vmlist = &$get_filtered_vmlist($nodename, $param->{vms}, 1, 1);
+	    my $vmlist = get_filtered_vmlist($nodename, $param->{vms}, 1, 1);
 	    if (!scalar(keys %$vmlist)) {
 		warn "no virtual guests matched, nothing to do..\n";
 		return;
-- 
2.39.5





More information about the pve-devel mailing list