[pve-devel] [PATCH v3 pve-manager 01/23] pvesr: add pve storage replication tool
Wolfgang Link
w.link at proxmox.com
Wed May 31 10:47:42 CEST 2017
On 05/30/2017 03:19 PM, Dietmar Maurer wrote:
> Just added code to configure jobs. Replication itself is not
> implemented.
>
> Signed-off-by: Dietmar Maurer <dietmar at proxmox.com>
> ---
> PVE/API2/Cluster.pm | 7 ++
> PVE/API2/Makefile | 2 +
> PVE/API2/Nodes.pm | 7 ++
> PVE/API2/Replication.pm | 93 ++++++++++++++++
> PVE/API2/ReplicationConfig.pm | 217 ++++++++++++++++++++++++++++++++++++
> PVE/CLI/Makefile | 2 +-
> PVE/CLI/pvesr.pm | 150 +++++++++++++++++++++++++
> PVE/Makefile | 1 +
> PVE/Replication.pm | 249 ++++++++++++++++++++++++++++++++++++++++++
> bin/Makefile | 2 +-
> bin/pvesr | 8 ++
> 11 files changed, 736 insertions(+), 2 deletions(-)
> create mode 100644 PVE/API2/Replication.pm
> create mode 100644 PVE/API2/ReplicationConfig.pm
> create mode 100644 PVE/CLI/pvesr.pm
> create mode 100644 PVE/Replication.pm
> create mode 100644 bin/pvesr
>
> diff --git a/PVE/API2/Cluster.pm b/PVE/API2/Cluster.pm
> index 6ce86de4..0e94e9ff 100644
> --- a/PVE/API2/Cluster.pm
> +++ b/PVE/API2/Cluster.pm
> @@ -22,10 +22,16 @@ use PVE::RPCEnvironment;
> use PVE::JSONSchema qw(get_standard_option);
> use PVE::Firewall;
> use PVE::API2::Firewall::Cluster;
> +use PVE::API2::ReplicationConfig;
>
> use base qw(PVE::RESTHandler);
>
> __PACKAGE__->register_method ({
> + subclass => "PVE::API2::ReplicationConfig",
> + path => 'replication',
> +});
> +
> +__PACKAGE__->register_method ({
> subclass => "PVE::API2::ClusterConfig",
> path => 'config',
> });
> @@ -82,6 +88,7 @@ __PACKAGE__->register_method ({
> { name => 'log' },
> { name => 'options' },
> { name => 'resources' },
> + { name => 'replication' },
> { name => 'tasks' },
> { name => 'backup' },
> { name => 'ha' },
> diff --git a/PVE/API2/Makefile b/PVE/API2/Makefile
> index 4dfcbb56..86d75d36 100644
> --- a/PVE/API2/Makefile
> +++ b/PVE/API2/Makefile
> @@ -1,6 +1,8 @@
> include ../../defines.mk
>
> PERLSOURCE = \
> + Replication.pm \
> + ReplicationConfig.pm \
> Ceph.pm \
> APT.pm \
> Subscription.pm \
> diff --git a/PVE/API2/Nodes.pm b/PVE/API2/Nodes.pm
> index a45ca6db..885cf116 100644
> --- a/PVE/API2/Nodes.pm
> +++ b/PVE/API2/Nodes.pm
> @@ -40,6 +40,7 @@ use PVE::API2::VZDump;
> use PVE::API2::APT;
> use PVE::API2::Ceph;
> use PVE::API2::Firewall::Host;
> +use PVE::API2::Replication;
> use Digest::MD5;
> use Digest::SHA;
> use PVE::API2::Disks;
> @@ -113,6 +114,11 @@ __PACKAGE__->register_method ({
> });
>
> __PACKAGE__->register_method ({
> + subclass => "PVE::API2::Replication",
> + path => 'replication',
> +});
> +
> +__PACKAGE__->register_method ({
> name => 'index',
> path => '',
> method => 'GET',
> @@ -147,6 +153,7 @@ __PACKAGE__->register_method ({
> { name => 'tasks' },
> { name => 'rrd' }, # fixme: remove?
> { name => 'rrddata' },# fixme: remove?
> + { name => 'replication' },
> { name => 'vncshell' },
> { name => 'spiceshell' },
> { name => 'time' },
> diff --git a/PVE/API2/Replication.pm b/PVE/API2/Replication.pm
> new file mode 100644
> index 00000000..eaa7e571
> --- /dev/null
> +++ b/PVE/API2/Replication.pm
> @@ -0,0 +1,93 @@
> +package PVE::API2::Replication;
> +
> +use warnings;
> +use strict;
> +
> +use PVE::JSONSchema qw(get_standard_option);
> +use PVE::RPCEnvironment;
> +use PVE::ReplicationConfig;
> +use PVE::Replication;
> +
> +use PVE::RESTHandler;
> +
> +use base qw(PVE::RESTHandler);
> +
> +__PACKAGE__->register_method ({
> + name => 'index',
> + path => '',
> + method => 'GET',
> + permissions => { user => 'all' },
> + description => "Directory index.",
> + parameters => {
> + additionalProperties => 0,
> + properties => {
> + node => get_standard_option('pve-node'),
> + },
> + },
> + returns => {
> + type => 'array',
> + items => {
> + type => "object",
> + properties => {},
> + },
> + links => [ { rel => 'child', href => "{name}" } ],
> + },
> + code => sub {
> + my ($param) = @_;
> +
> + return [
> + { name => 'status' },
> + ];
> + }});
> +
> +
> +__PACKAGE__->register_method ({
> + name => 'status',
> + path => 'status',
> + method => 'GET',
> + description => "List replication job status.",
> + permissions => {
> + description => "Requires the VM.Audit permission on /vms/<vmid>.",
> + user => 'all',
> + },
> + protected => 1,
> + proxyto => 'node',
> + parameters => {
> + additionalProperties => 0,
> + properties => {
> + node => get_standard_option('pve-node'),
> + },
> + },
> + returns => {
> + type => 'array',
> + items => {
> + type => "object",
> + properties => {},
> + },
> + links => [ { rel => 'child', href => "{vmid}" } ],
> + },
> + code => sub {
> + my ($param) = @_;
> +
> + my $rpcenv = PVE::RPCEnvironment::get();
> + my $authuser = $rpcenv->get_user();
> +
> + my $jobs = PVE::Replication::job_status();
> +
> + my $res = [];
> + foreach my $id (sort keys %$jobs) {
> + my $d = $jobs->{$id};
> + my $state = delete $d->{state};
> + my $vmid = $d->{guest};
> + next if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
> + $d->{id} = $id;
> + foreach my $k (qw(last_sync fail_count error duration)) {
> + $d->{$k} = $state->{$k} if defined($state->{$k});
> + }
> + push @$res, $d;
> + }
> +
> + return $res;
> + }});
> +
> +1;
> diff --git a/PVE/API2/ReplicationConfig.pm b/PVE/API2/ReplicationConfig.pm
> new file mode 100644
> index 00000000..6f4de385
> --- /dev/null
> +++ b/PVE/API2/ReplicationConfig.pm
> @@ -0,0 +1,217 @@
> +package PVE::API2::ReplicationConfig;
> +
> +use warnings;
> +use strict;
> +
> +use PVE::Tools qw(extract_param);
> +use PVE::Exception qw(raise_perm_exc);
> +use PVE::JSONSchema qw(get_standard_option);
> +use PVE::RPCEnvironment;
> +use PVE::ReplicationConfig;
> +
> +use PVE::RESTHandler;
> +
> +use base qw(PVE::RESTHandler);
> +
> +__PACKAGE__->register_method ({
> + name => 'index',
> + path => '',
> + method => 'GET',
> + description => "List replication jobs.",
> + permissions => {
> + description => "Requires the VM.Audit permission on /vms/<vmid>.",
> + user => 'all',
> + },
> + parameters => {
> + additionalProperties => 0,
> + properties => {},
> + },
> + returns => {
> + type => 'array',
> + items => {
> + type => "object",
> + properties => {},
> + },
> + links => [ { rel => 'child', href => "{vmid}" } ],
> + },
> + code => sub {
> + my ($param) = @_;
> +
> + my $rpcenv = PVE::RPCEnvironment::get();
> + my $authuser = $rpcenv->get_user();
> +
> + my $cfg = PVE::ReplicationConfig->new();
> +
> + my $res = [];
> + foreach my $id (sort keys %{$cfg->{ids}}) {
> + my $d = $cfg->{ids}->{$id};
> + my $vmid = $d->{guest};
> + next if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
> + $d->{id} = $id;
> + push @$res, $d;
> + }
> +
> + return $res;
> + }});
> +
> +__PACKAGE__->register_method ({
> + name => 'read',
> + path => '{id}',
> + method => 'GET',
> + description => "Read replication job configuration.",
> + permissions => {
> + description => "Requires the VM.Audit permission on /vms/<vmid>.",
> + user => 'all',
> + },
> + parameters => {
> + additionalProperties => 0,
> + properties => {
> + id => get_standard_option('pve-replication-id'),
> + },
> + },
> + returns => { type => 'object' },
> + code => sub {
> + my ($param) = @_;
> +
> + my $rpcenv = PVE::RPCEnvironment::get();
> + my $authuser = $rpcenv->get_user();
> +
> + my $cfg = PVE::ReplicationConfig->new();
> +
> + my $data = $cfg->{ids}->{$param->{id}};
> +
> + die "no such replication job '$param->{id}'\n" if !defined($data);
> +
> + my $vmid = $data->{guest};
> +
> + raise_perm_exc() if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
> +
> + $data->{id} = $param->{id};
> +
> + return $data;
> + }});
> +
> +__PACKAGE__->register_method ({
> + name => 'create',
> + path => '',
> + protected => 1,
> + method => 'POST',
> + description => "Create a new replication job",
> + permissions => {
> + check => ['perm', '/storage', ['Datastore.Allocate']],
> + },
> + parameters => PVE::ReplicationConfig->createSchema(),
> + returns => { type => 'null' },
> + code => sub {
> + my ($param) = @_;
> +
> + my $type = extract_param($param, 'type');
> + my $plugin = PVE::ReplicationConfig->lookup($type);
> + my $id = extract_param($param, 'id');
> +
> + my $code = sub {
> + my $cfg = PVE::ReplicationConfig->new();
> +
> + #die "replication job for guest '$param->{guest}' to target '$param->{target}' already exists\n"
> + die "replication job '$id' already exists\n"
> + if $cfg->{ids}->{$id};
> +
> + my $opts = $plugin->check_config($id, $param, 1, 1);
> +
> + $cfg->{ids}->{$id} = $opts;
> +
> + $cfg->write();
> + };
> +
> + PVE::ReplicationConfig::lock($code);
> +
> + return undef;
> + }});
> +
> +
> +__PACKAGE__->register_method ({
> + name => 'update',
> + protected => 1,
> + path => '{id}',
> + method => 'PUT',
> + description => "Update replication job configuration.",
> + permissions => {
> + check => ['perm', '/storage', ['Datastore.Allocate']],
> + },
> + parameters => PVE::ReplicationConfig->updateSchema(),
> + returns => { type => 'null' },
> + code => sub {
> + my ($param) = @_;
> +
> + my $id = extract_param($param, 'id');
> +
> + my $code = sub {
> + my $cfg = PVE::ReplicationConfig->new();
> +
> + my $data = $cfg->{ids}->{$id};
> + die "no such job '$id'\n" if !$data;
> +
> + my $plugin = PVE::ReplicationConfig->lookup($data->{type});
> + my $opts = $plugin->check_config($id, $param, 0, 1);
> +
> + foreach my $k (%$opts) {
> + $data->{$k} = $opts->{$k};
> + }
> +
> + $cfg->write();
> + };
> +
> + PVE::ReplicationConfig::lock($code);
> +
> + return undef;
> + }});
> +
> +__PACKAGE__->register_method ({
> + name => 'delete',
> + protected => 1,
> + path => '{id}',
> + method => 'DELETE',
> + description => "Delete replication job",
> + permissions => {
> + check => ['perm', '/storage', ['Datastore.Allocate']],
> + },
> + parameters => {
> + additionalProperties => 0,
> + properties => {
> + id => get_standard_option('pve-replication-id'),
> + keep => {
> + description => "Keep replicated data at target (do not remove).",
> + type => 'boolean',
> + optional => 1,
> + default => 0,
> + },
> + }
> + },
> + returns => { type => 'null' },
> + code => sub {
> + my ($param) = @_;
> +
> + my $id = extract_param($param, 'id');
> +
> + my $code = sub {
> + my $cfg = PVE::ReplicationConfig->new();
> +
> + my $data = $cfg->{ids}->{$id};
> + die "no such job '$id'\n" if !$data;
> +
> + if (!$param->{keep}) {
> + # fixme: cleanup data at target
> +
> + }
> + # fixme: cleanup snapshots
> +
> + delete $cfg->{ids}->{$id};
> +
> + $cfg->write();
> + };
> +
> + PVE::ReplicationConfig::lock($code);
> +
> + return undef;
> + }});
> +1;
> diff --git a/PVE/CLI/Makefile b/PVE/CLI/Makefile
> index b005a8f1..3a27503d 100644
> --- a/PVE/CLI/Makefile
> +++ b/PVE/CLI/Makefile
> @@ -1,6 +1,6 @@
> include ../../defines.mk
>
> -SOURCES=vzdump.pm pvesubscription.pm pveceph.pm pveam.pm
> +SOURCES=vzdump.pm pvesubscription.pm pveceph.pm pveam.pm pvesr.pm
>
> all:
>
> diff --git a/PVE/CLI/pvesr.pm b/PVE/CLI/pvesr.pm
> new file mode 100644
> index 00000000..43359604
> --- /dev/null
> +++ b/PVE/CLI/pvesr.pm
> @@ -0,0 +1,150 @@
> +package PVE::CLI::pvesr;
> +
> +use strict;
> +use warnings;
> +use POSIX qw(strftime);
> +use JSON;
> +
> +use PVE::JSONSchema qw(get_standard_option);
> +use PVE::INotify;
> +use PVE::RPCEnvironment;
> +use PVE::Tools qw(extract_param);
> +use PVE::SafeSyslog;
> +use PVE::CLIHandler;
> +
> +use PVE::Replication;
> +use PVE::API2::ReplicationConfig;
> +use PVE::API2::Replication;
> +
> +use base qw(PVE::CLIHandler);
> +
> +my $nodename = PVE::INotify::nodename();
> +
> +sub setup_environment {
> + PVE::RPCEnvironment->setup_default_cli_env();
> +}
> +
> +__PACKAGE__->register_method ({
> + name => 'run',
> + path => 'run',
> + method => 'POST',
> + description => "This method is called by the systemd-timer and executes all (or a specific) sync jobs.",
> + parameters => {
> + additionalProperties => 0,
> + properties => {
> + id => get_standard_option('pve-replication-id', { optional => 1 }),
> + },
> + },
> + returns => { type => 'null' },
> + code => sub {
> + my ($param) = @_;
> +
> + if (my $id = extract_param($param, 'id')) {
> +
> + PVE::Replication::run_single_job($id);
> +
> + } else {
> +
> + PVE::Replication::run_jobs();
> + }
> +
> + return undef;
> + }});
> +
> +__PACKAGE__->register_method ({
> + name => 'enable',
> + path => 'enable',
> + method => 'POST',
> + description => "Enable a replication job.",
> + parameters => {
> + additionalProperties => 0,
> + properties => {
> + id => get_standard_option('pve-replication-id'),
> + },
> + },
> + returns => { type => 'null' },
> + code => sub {
> + my ($param) = @_;
> +
> + $param->{disable} = 0;
> +
> + return PVE::API2::ReplicationConfig->update($param);
> + }});
> +
> +__PACKAGE__->register_method ({
> + name => 'disable',
> + path => 'disable',
> + method => 'POST',
> + description => "Disable a replication job.",
> + parameters => {
> + additionalProperties => 0,
> + properties => {
> + id => get_standard_option('pve-replication-id'),
> + },
> + },
> + returns => { type => 'null' },
> + code => sub {
> + my ($param) = @_;
> +
> + $param->{disable} = 1;
> +
> + return PVE::API2::ReplicationConfig->update($param);
> + }});
> +
> +my $print_job_list = sub {
> + my ($list) = @_;
> +
> + my $format = "%-20s %10s %-20s %10s %5s %8s\n";
> +
> + printf($format, "JobID", "GuestID", "Target", "Interval", "Rate", "Enabled");
> +
> + foreach my $job (sort { $a->{guest} <=> $b->{guest} } @$list) {
> + my $plugin = PVE::ReplicationConfig->lookup($job->{type});
> + my $tid = $plugin->get_unique_target_id($job);
> +
> + printf($format, $job->{id}, $job->{guest}, $tid,
> + defined($job->{interval}) ? $job->{interval} : '-',
> + defined($job->{rate}) ? $job->{rate} : '-',
> + $job->{disable} ? 'no' : 'yes'
> + );
> + }
> +};
> +
> +my $print_job_status = sub {
> + my ($list) = @_;
> +
> + my $format = "%-20s %10s %-20s %20s %10s %10s %s\n";
> +
> + printf($format, "JobID", "GuestID", "Target", "LastSync", "Duration", "FailCount", "State");
> +
> + foreach my $job (sort { $a->{guest} <=> $b->{guest} } @$list) {
> + my $plugin = PVE::ReplicationConfig->lookup($job->{type});
> + my $tid = $plugin->get_unique_target_id($job);
> +
> + my $timestr = $job->{last_sync} ?
> + strftime("%Y-%m-%d_%H:%M:%S", localtime($job->{last_sync})) : '-';
> +
> + printf($format, $job->{id}, $job->{guest}, $tid,
> + $timestr, $job->{duration} // '-',
> + $job->{fail_count}, , $job->{error} // 'OK');
> + }
> +};
> +
> +our $cmddef = {
> + status => [ 'PVE::API2::Replication', 'status', [], { node => $nodename }, $print_job_status ],
> +
> + jobs => [ 'PVE::API2::ReplicationConfig', 'index' , [], {}, $print_job_list ],
> + read => [ 'PVE::API2::ReplicationConfig', 'read' , ['id'], {},
> + sub { my $res = shift; print to_json($res, { utf8 => 1, pretty => 1, canonical => 1}); }],
> + update => [ 'PVE::API2::ReplicationConfig', 'update' , ['id'], {} ],
> + delete => [ 'PVE::API2::ReplicationConfig', 'delete' , ['id'], {} ],
> + 'create-local-job' => [ 'PVE::API2::ReplicationConfig', 'create' , ['id', 'guest', 'target'],
> + { type => 'local' } ],
> +
> + enable => [ __PACKAGE__, 'enable', ['id'], {}],
> + disable => [ __PACKAGE__, 'disable', ['id'], {}],
> +
> + run => [ __PACKAGE__ , 'run'],
> +};
> +
> +1;
> diff --git a/PVE/Makefile b/PVE/Makefile
> index ac230fbf..05086629 100644
> --- a/PVE/Makefile
> +++ b/PVE/Makefile
> @@ -3,6 +3,7 @@ include ../defines.mk
> SUBDIRS=API2 Status CLI Service
>
> PERLSOURCE = \
> + Replication.pm \
> API2.pm \
> API2Tools.pm \
> HTTPServer.pm \
> diff --git a/PVE/Replication.pm b/PVE/Replication.pm
> new file mode 100644
> index 00000000..220d8f6e
> --- /dev/null
> +++ b/PVE/Replication.pm
> @@ -0,0 +1,249 @@
> +package PVE::Replication;
> +
> +use warnings;
> +use strict;
> +use Data::Dumper;
> +use JSON;
> +use Time::HiRes qw(gettimeofday tv_interval);
> +
> +use PVE::INotify;
> +use PVE::Tools;
> +use PVE::Cluster;
> +use PVE::QemuConfig;
> +use PVE::QemuServer;
> +use PVE::LXC::Config;
> +use PVE::LXC;
> +use PVE::Storage;
> +use PVE::ReplicationConfig;
> +
> +# Note: regression tests can overwrite $state_path for testing
> +our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
> +
> +my $update_job_state = sub {
> + my ($stateobj, $jobcfg, $state) = @_;
> +
> + my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
> +
> + my $vmid = $jobcfg->{guest};
> + my $tid = $plugin->get_unique_target_id($jobcfg);
> +
> + # Note: tuple ($vmid, $tid) is unique
> + $stateobj->{$vmid}->{$tid} = $state;
> +
> + PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
> +};
> +
> +my $get_job_state = sub {
> + my ($stateobj, $jobcfg) = @_;
> +
> + my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
> +
> + my $vmid = $jobcfg->{guest};
> + my $tid = $plugin->get_unique_target_id($jobcfg);
> + my $state = $stateobj->{$vmid}->{$tid};
> +
> + $state = {} if !$state;
> +
> + $state->{last_iteration} //= 0;
> + $state->{last_sync} //= 0;
> + $state->{fail_count} //= 0;
> +
> + return $state;
> +};
> +
> +my $read_state = sub {
> +
> + return {} if ! -e $state_path;
> +
> + my $raw = PVE::Tools::file_get_contents($state_path);
> +
> + return {} if $raw eq '';
> +
> + return decode_json($raw);
JSON::decode will not untaint the raw text, so we get problems with the
last_sync, which we use many times in the code with open3.
I would use a regex to untaint the $raw here, because it is much easier
then doing this all over the code.
> +};
> +
> +sub job_status {
> + my ($stateobj) = @_;
> +
> + my $local_node = PVE::INotify::nodename();
> +
> + my $jobs = {};
> +
> + $stateobj = $read_state->() if !$stateobj;
> +
> + my $cfg = PVE::ReplicationConfig->new();
> +
> + my $vms = PVE::Cluster::get_vmlist();
> +
> + foreach my $jobid (sort keys %{$cfg->{ids}}) {
> + my $jobcfg = $cfg->{ids}->{$jobid};
> + my $vmid = $jobcfg->{guest};
> +
> + die "internal error - not implemented" if $jobcfg->{type} ne 'local';
> +
> + # skip non existing vms
> + next if !$vms->{ids}->{$vmid};
> +
> + # only consider guest on local node
> + next if $vms->{ids}->{$vmid}->{node} ne $local_node;
> +
> + # never sync to local node
> + next if $jobcfg->{target} eq $local_node;
> +
> + next if $jobcfg->{disable};
> +
> + $jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
> + $jobcfg->{id} = $jobid;
> + $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
> +
> + $jobs->{$jobid} = $jobcfg;
> + }
> +
> + return $jobs;
> +}
> +
> +my $get_next_job = sub {
> + my ($stateobj, $iteration, $start_time) = @_;
> +
> + my $next_jobid;
> +
> + my $jobs = job_status($stateobj);
> +
> + # compute next_sync here to make it easy to sort jobs
> + my $next_sync_hash = {};
> + foreach my $jobid (keys %$jobs) {
> + my $jobcfg = $jobs->{$jobid};
> + my $interval = $jobcfg->{interval} || 15;
> + my $last_sync = $jobcfg->{state}->{last_sync};
> + $next_sync_hash->{$jobid} = $last_sync + $interval * 60;
> + }
> +
> + my $sort_func = sub {
> + my $joba = $jobs->{$a};
> + my $jobb = $jobs->{$b};
> + my $sa = $joba->{state};
> + my $sb = $jobb->{state};
> + my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
> + return $res if $res != 0;
> + $res = $next_sync_hash->{$a} <=> $next_sync_hash->{$b};
> + return $res if $res != 0;
> + return $joba->{guest} <=> $jobb->{guest};
> + };
> +
> + foreach my $jobid (sort $sort_func keys %$jobs) {
> + my $jobcfg = $jobs->{$jobid};
> +<<<<<<< HEAD
> + next if $jobcfg->{state}->{last_iteration} >= $now;
> + if ($now >= $next_sync_hash->{$jobid}) {
> +=======
> + next if $jobcfg->{state}->{last_iteration} >= $iteration;
> + if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
> +>>>>>>> c2eac19e... fixup iteration marker
> + $next_jobid = $jobid;
> + last;
> + }
> + }
> +
> + return undef if !$next_jobid;
> +
> + my $jobcfg = $jobs->{$next_jobid};
> +
> + $jobcfg->{state}->{last_iteration} = $iteration;
> + $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
> +
> + return $jobcfg;
> +};
> +
> +sub replicate {
> + my ($jobcfg, $start_time) = @_;
> +
> + die "implement me";
> +}
> +
> +my $run_replication = sub {
> + my ($stateobj, $jobcfg, $start_time) = @_;
> +
> + my $state = delete $jobcfg->{state};
> +
> + my $t0 = [gettimeofday];
> +
> + eval { replicate($jobcfg, $start_time); };
> + my $err = $@;
> +
> + $state->{duration} = tv_interval($t0);
> +
> + if ($err) {
> + $state->{fail_count}++;
> + $state->{error} = "$err";
> + $update_job_state->($stateobj, $jobcfg, $state);
> + } else {
> + $state->{last_sync} = $start_time;
> + $state->{fail_count} = 0;
> + delete $state->{error};
> + $update_job_state->($stateobj, $jobcfg, $state);
> + }
> +};
> +
> +sub run_single_job {
> + my ($jobid, $now) = @_; # passing $now useful for regression testing
> +
> + my $local_node = PVE::INotify::nodename();
> +
> + my $code = sub {
> + $now //= time();
> +
> + my $stateobj = $read_state->();
> +
> + my $cfg = PVE::ReplicationConfig->new();
> +
> + my $jobcfg = $cfg->{ids}->{$jobid};
> + die "no such job '$jobid'\n" if !$jobcfg;
> +
> + die "internal error - not implemented" if $jobcfg->{type} ne 'local';
> +
> + die "job '$jobid' is disabled\n" if $jobcfg->{disable};
> +
> + my $vms = PVE::Cluster::get_vmlist();
> + my $vmid = $jobcfg->{guest};
> +
> + die "no such guest '$vmid'\n" if !$vms->{ids}->{$vmid};
> +
> + die "guest '$vmid' is not on local node\n"
> + if $vms->{ids}->{$vmid}->{node} ne $local_node;
> +
> + die "unable to sync to local node\n" if $jobcfg->{target} eq $local_node;
> +
> + $jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
> + $jobcfg->{id} = $jobid;
> + $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
> +
> + $jobcfg->{state}->{last_iteration} = $now;
> + $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
> +
> + $run_replication->($stateobj, $jobcfg, $now);
> + };
> +
> + my $res = PVE::Tools::lock_file($state_path, 60, $code);
> + die $@ if $@;
> +}
> +
> +sub run_jobs {
> + my ($now) = @_; # passing $now useful for regression testing
> +
> + my $iteration = $now // time();
> +
> + my $code = sub {
> + my $stateobj = $read_state->();
> + my $start_time = $now // time();
> +
> + while (my $jobcfg = $get_next_job->($stateobj, $iteration, $start_time)) {
> + $run_replication->($stateobj, $jobcfg, $start_time);
> + $start_time = $now // time();
> + }
> + };
> +
> + my $res = PVE::Tools::lock_file($state_path, 60, $code);
> + die $@ if $@;
> +}
> +
> +1;
> diff --git a/bin/Makefile b/bin/Makefile
> index f9143eab..c7fca9f8 100644
> --- a/bin/Makefile
> +++ b/bin/Makefile
> @@ -9,7 +9,7 @@ export PERLLIB=..
> SUBDIRS = init.d ocf test
>
> SERVICES = pvestatd pveproxy pvedaemon spiceproxy
> -CLITOOLS = vzdump pvesubscription pveceph pveam
> +CLITOOLS = vzdump pvesubscription pveceph pveam pvesr
>
> SCRIPTS = \
> ${SERVICES} \
> diff --git a/bin/pvesr b/bin/pvesr
> new file mode 100644
> index 00000000..762bb356
> --- /dev/null
> +++ b/bin/pvesr
> @@ -0,0 +1,8 @@
> +#!/usr/bin/perl -T
> +
> +use strict;
> +use warnings;
> +
> +use PVE::CLI::pvesr;
> +
> +PVE::CLI::pvesr->run_cli_handler();
>
More information about the pve-devel
mailing list