[pve-devel] [PATCH v3 pve-manager 01/23] pvesr: add pve storage replication tool
Dietmar Maurer
dietmar at proxmox.com
Tue May 30 15:19:58 CEST 2017
Just added code to configure jobs. Replication itself is not
implemented.
Signed-off-by: Dietmar Maurer <dietmar at proxmox.com>
---
PVE/API2/Cluster.pm | 7 ++
PVE/API2/Makefile | 2 +
PVE/API2/Nodes.pm | 7 ++
PVE/API2/Replication.pm | 93 ++++++++++++++++
PVE/API2/ReplicationConfig.pm | 217 ++++++++++++++++++++++++++++++++++++
PVE/CLI/Makefile | 2 +-
PVE/CLI/pvesr.pm | 150 +++++++++++++++++++++++++
PVE/Makefile | 1 +
PVE/Replication.pm | 249 ++++++++++++++++++++++++++++++++++++++++++
bin/Makefile | 2 +-
bin/pvesr | 8 ++
11 files changed, 736 insertions(+), 2 deletions(-)
create mode 100644 PVE/API2/Replication.pm
create mode 100644 PVE/API2/ReplicationConfig.pm
create mode 100644 PVE/CLI/pvesr.pm
create mode 100644 PVE/Replication.pm
create mode 100644 bin/pvesr
diff --git a/PVE/API2/Cluster.pm b/PVE/API2/Cluster.pm
index 6ce86de4..0e94e9ff 100644
--- a/PVE/API2/Cluster.pm
+++ b/PVE/API2/Cluster.pm
@@ -22,10 +22,16 @@ use PVE::RPCEnvironment;
use PVE::JSONSchema qw(get_standard_option);
use PVE::Firewall;
use PVE::API2::Firewall::Cluster;
+use PVE::API2::ReplicationConfig;
use base qw(PVE::RESTHandler);
__PACKAGE__->register_method ({
+ subclass => "PVE::API2::ReplicationConfig",
+ path => 'replication',
+});
+
+__PACKAGE__->register_method ({
subclass => "PVE::API2::ClusterConfig",
path => 'config',
});
@@ -82,6 +88,7 @@ __PACKAGE__->register_method ({
{ name => 'log' },
{ name => 'options' },
{ name => 'resources' },
+ { name => 'replication' },
{ name => 'tasks' },
{ name => 'backup' },
{ name => 'ha' },
diff --git a/PVE/API2/Makefile b/PVE/API2/Makefile
index 4dfcbb56..86d75d36 100644
--- a/PVE/API2/Makefile
+++ b/PVE/API2/Makefile
@@ -1,6 +1,8 @@
include ../../defines.mk
PERLSOURCE = \
+ Replication.pm \
+ ReplicationConfig.pm \
Ceph.pm \
APT.pm \
Subscription.pm \
diff --git a/PVE/API2/Nodes.pm b/PVE/API2/Nodes.pm
index a45ca6db..885cf116 100644
--- a/PVE/API2/Nodes.pm
+++ b/PVE/API2/Nodes.pm
@@ -40,6 +40,7 @@ use PVE::API2::VZDump;
use PVE::API2::APT;
use PVE::API2::Ceph;
use PVE::API2::Firewall::Host;
+use PVE::API2::Replication;
use Digest::MD5;
use Digest::SHA;
use PVE::API2::Disks;
@@ -113,6 +114,11 @@ __PACKAGE__->register_method ({
});
__PACKAGE__->register_method ({
+ subclass => "PVE::API2::Replication",
+ path => 'replication',
+});
+
+__PACKAGE__->register_method ({
name => 'index',
path => '',
method => 'GET',
@@ -147,6 +153,7 @@ __PACKAGE__->register_method ({
{ name => 'tasks' },
{ name => 'rrd' }, # fixme: remove?
{ name => 'rrddata' },# fixme: remove?
+ { name => 'replication' },
{ name => 'vncshell' },
{ name => 'spiceshell' },
{ name => 'time' },
diff --git a/PVE/API2/Replication.pm b/PVE/API2/Replication.pm
new file mode 100644
index 00000000..eaa7e571
--- /dev/null
+++ b/PVE/API2/Replication.pm
@@ -0,0 +1,93 @@
+package PVE::API2::Replication;
+
+use warnings;
+use strict;
+
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::RPCEnvironment;
+use PVE::ReplicationConfig;
+use PVE::Replication;
+
+use PVE::RESTHandler;
+
+use base qw(PVE::RESTHandler);
+
+__PACKAGE__->register_method ({
+ name => 'index',
+ path => '',
+ method => 'GET',
+ permissions => { user => 'all' },
+ description => "Directory index.",
+ parameters => {
+ additionalProperties => 0,
+ properties => {
+ node => get_standard_option('pve-node'),
+ },
+ },
+ returns => {
+ type => 'array',
+ items => {
+ type => "object",
+ properties => {},
+ },
+ links => [ { rel => 'child', href => "{name}" } ],
+ },
+ code => sub {
+ my ($param) = @_;
+
+ return [
+ { name => 'status' },
+ ];
+ }});
+
+
+__PACKAGE__->register_method ({
+ name => 'status',
+ path => 'status',
+ method => 'GET',
+ description => "List replication job status.",
+ permissions => {
+ description => "Requires the VM.Audit permission on /vms/<vmid>.",
+ user => 'all',
+ },
+ protected => 1,
+ proxyto => 'node',
+ parameters => {
+ additionalProperties => 0,
+ properties => {
+ node => get_standard_option('pve-node'),
+ },
+ },
+ returns => {
+ type => 'array',
+ items => {
+ type => "object",
+ properties => {},
+ },
+ links => [ { rel => 'child', href => "{vmid}" } ],
+ },
+ code => sub {
+ my ($param) = @_;
+
+ my $rpcenv = PVE::RPCEnvironment::get();
+ my $authuser = $rpcenv->get_user();
+
+ my $jobs = PVE::Replication::job_status();
+
+ my $res = [];
+ foreach my $id (sort keys %$jobs) {
+ my $d = $jobs->{$id};
+ my $state = delete $d->{state};
+ my $vmid = $d->{guest};
+ next if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
+ $d->{id} = $id;
+ foreach my $k (qw(last_sync fail_count error duration)) {
+ $d->{$k} = $state->{$k} if defined($state->{$k});
+ }
+ push @$res, $d;
+ }
+
+ return $res;
+ }});
+
+1;
diff --git a/PVE/API2/ReplicationConfig.pm b/PVE/API2/ReplicationConfig.pm
new file mode 100644
index 00000000..6f4de385
--- /dev/null
+++ b/PVE/API2/ReplicationConfig.pm
@@ -0,0 +1,217 @@
+package PVE::API2::ReplicationConfig;
+
+use warnings;
+use strict;
+
+use PVE::Tools qw(extract_param);
+use PVE::Exception qw(raise_perm_exc);
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::RPCEnvironment;
+use PVE::ReplicationConfig;
+
+use PVE::RESTHandler;
+
+use base qw(PVE::RESTHandler);
+
+__PACKAGE__->register_method ({
+ name => 'index',
+ path => '',
+ method => 'GET',
+ description => "List replication jobs.",
+ permissions => {
+ description => "Requires the VM.Audit permission on /vms/<vmid>.",
+ user => 'all',
+ },
+ parameters => {
+ additionalProperties => 0,
+ properties => {},
+ },
+ returns => {
+ type => 'array',
+ items => {
+ type => "object",
+ properties => {},
+ },
+ links => [ { rel => 'child', href => "{vmid}" } ],
+ },
+ code => sub {
+ my ($param) = @_;
+
+ my $rpcenv = PVE::RPCEnvironment::get();
+ my $authuser = $rpcenv->get_user();
+
+ my $cfg = PVE::ReplicationConfig->new();
+
+ my $res = [];
+ foreach my $id (sort keys %{$cfg->{ids}}) {
+ my $d = $cfg->{ids}->{$id};
+ my $vmid = $d->{guest};
+ next if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
+ $d->{id} = $id;
+ push @$res, $d;
+ }
+
+ return $res;
+ }});
+
+__PACKAGE__->register_method ({
+ name => 'read',
+ path => '{id}',
+ method => 'GET',
+ description => "Read replication job configuration.",
+ permissions => {
+ description => "Requires the VM.Audit permission on /vms/<vmid>.",
+ user => 'all',
+ },
+ parameters => {
+ additionalProperties => 0,
+ properties => {
+ id => get_standard_option('pve-replication-id'),
+ },
+ },
+ returns => { type => 'object' },
+ code => sub {
+ my ($param) = @_;
+
+ my $rpcenv = PVE::RPCEnvironment::get();
+ my $authuser = $rpcenv->get_user();
+
+ my $cfg = PVE::ReplicationConfig->new();
+
+ my $data = $cfg->{ids}->{$param->{id}};
+
+ die "no such replication job '$param->{id}'\n" if !defined($data);
+
+ my $vmid = $data->{guest};
+
+ raise_perm_exc() if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
+
+ $data->{id} = $param->{id};
+
+ return $data;
+ }});
+
+__PACKAGE__->register_method ({
+ name => 'create',
+ path => '',
+ protected => 1,
+ method => 'POST',
+ description => "Create a new replication job",
+ permissions => {
+ check => ['perm', '/storage', ['Datastore.Allocate']],
+ },
+ parameters => PVE::ReplicationConfig->createSchema(),
+ returns => { type => 'null' },
+ code => sub {
+ my ($param) = @_;
+
+ my $type = extract_param($param, 'type');
+ my $plugin = PVE::ReplicationConfig->lookup($type);
+ my $id = extract_param($param, 'id');
+
+ my $code = sub {
+ my $cfg = PVE::ReplicationConfig->new();
+
+ #die "replication job for guest '$param->{guest}' to target '$param->{target}' already exists\n"
+ die "replication job '$id' already exists\n"
+ if $cfg->{ids}->{$id};
+
+ my $opts = $plugin->check_config($id, $param, 1, 1);
+
+ $cfg->{ids}->{$id} = $opts;
+
+ $cfg->write();
+ };
+
+ PVE::ReplicationConfig::lock($code);
+
+ return undef;
+ }});
+
+
+__PACKAGE__->register_method ({
+ name => 'update',
+ protected => 1,
+ path => '{id}',
+ method => 'PUT',
+ description => "Update replication job configuration.",
+ permissions => {
+ check => ['perm', '/storage', ['Datastore.Allocate']],
+ },
+ parameters => PVE::ReplicationConfig->updateSchema(),
+ returns => { type => 'null' },
+ code => sub {
+ my ($param) = @_;
+
+ my $id = extract_param($param, 'id');
+
+ my $code = sub {
+ my $cfg = PVE::ReplicationConfig->new();
+
+ my $data = $cfg->{ids}->{$id};
+ die "no such job '$id'\n" if !$data;
+
+ my $plugin = PVE::ReplicationConfig->lookup($data->{type});
+ my $opts = $plugin->check_config($id, $param, 0, 1);
+
+ foreach my $k (%$opts) {
+ $data->{$k} = $opts->{$k};
+ }
+
+ $cfg->write();
+ };
+
+ PVE::ReplicationConfig::lock($code);
+
+ return undef;
+ }});
+
+__PACKAGE__->register_method ({
+ name => 'delete',
+ protected => 1,
+ path => '{id}',
+ method => 'DELETE',
+ description => "Delete replication job",
+ permissions => {
+ check => ['perm', '/storage', ['Datastore.Allocate']],
+ },
+ parameters => {
+ additionalProperties => 0,
+ properties => {
+ id => get_standard_option('pve-replication-id'),
+ keep => {
+ description => "Keep replicated data at target (do not remove).",
+ type => 'boolean',
+ optional => 1,
+ default => 0,
+ },
+ }
+ },
+ returns => { type => 'null' },
+ code => sub {
+ my ($param) = @_;
+
+ my $id = extract_param($param, 'id');
+
+ my $code = sub {
+ my $cfg = PVE::ReplicationConfig->new();
+
+ my $data = $cfg->{ids}->{$id};
+ die "no such job '$id'\n" if !$data;
+
+ if (!$param->{keep}) {
+ # fixme: cleanup data at target
+
+ }
+ # fixme: cleanup snapshots
+
+ delete $cfg->{ids}->{$id};
+
+ $cfg->write();
+ };
+
+ PVE::ReplicationConfig::lock($code);
+
+ return undef;
+ }});
+1;
diff --git a/PVE/CLI/Makefile b/PVE/CLI/Makefile
index b005a8f1..3a27503d 100644
--- a/PVE/CLI/Makefile
+++ b/PVE/CLI/Makefile
@@ -1,6 +1,6 @@
include ../../defines.mk
-SOURCES=vzdump.pm pvesubscription.pm pveceph.pm pveam.pm
+SOURCES=vzdump.pm pvesubscription.pm pveceph.pm pveam.pm pvesr.pm
all:
diff --git a/PVE/CLI/pvesr.pm b/PVE/CLI/pvesr.pm
new file mode 100644
index 00000000..43359604
--- /dev/null
+++ b/PVE/CLI/pvesr.pm
@@ -0,0 +1,150 @@
+package PVE::CLI::pvesr;
+
+use strict;
+use warnings;
+use POSIX qw(strftime);
+use JSON;
+
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::INotify;
+use PVE::RPCEnvironment;
+use PVE::Tools qw(extract_param);
+use PVE::SafeSyslog;
+use PVE::CLIHandler;
+
+use PVE::Replication;
+use PVE::API2::ReplicationConfig;
+use PVE::API2::Replication;
+
+use base qw(PVE::CLIHandler);
+
+my $nodename = PVE::INotify::nodename();
+
+sub setup_environment {
+ PVE::RPCEnvironment->setup_default_cli_env();
+}
+
+__PACKAGE__->register_method ({
+ name => 'run',
+ path => 'run',
+ method => 'POST',
+ description => "This method is called by the systemd-timer and executes all (or a specific) sync jobs.",
+ parameters => {
+ additionalProperties => 0,
+ properties => {
+ id => get_standard_option('pve-replication-id', { optional => 1 }),
+ },
+ },
+ returns => { type => 'null' },
+ code => sub {
+ my ($param) = @_;
+
+ if (my $id = extract_param($param, 'id')) {
+
+ PVE::Replication::run_single_job($id);
+
+ } else {
+
+ PVE::Replication::run_jobs();
+ }
+
+ return undef;
+ }});
+
+__PACKAGE__->register_method ({
+ name => 'enable',
+ path => 'enable',
+ method => 'POST',
+ description => "Enable a replication job.",
+ parameters => {
+ additionalProperties => 0,
+ properties => {
+ id => get_standard_option('pve-replication-id'),
+ },
+ },
+ returns => { type => 'null' },
+ code => sub {
+ my ($param) = @_;
+
+ $param->{disable} = 0;
+
+ return PVE::API2::ReplicationConfig->update($param);
+ }});
+
+__PACKAGE__->register_method ({
+ name => 'disable',
+ path => 'disable',
+ method => 'POST',
+ description => "Disable a replication job.",
+ parameters => {
+ additionalProperties => 0,
+ properties => {
+ id => get_standard_option('pve-replication-id'),
+ },
+ },
+ returns => { type => 'null' },
+ code => sub {
+ my ($param) = @_;
+
+ $param->{disable} = 1;
+
+ return PVE::API2::ReplicationConfig->update($param);
+ }});
+
+my $print_job_list = sub {
+ my ($list) = @_;
+
+ my $format = "%-20s %10s %-20s %10s %5s %8s\n";
+
+ printf($format, "JobID", "GuestID", "Target", "Interval", "Rate", "Enabled");
+
+ foreach my $job (sort { $a->{guest} <=> $b->{guest} } @$list) {
+ my $plugin = PVE::ReplicationConfig->lookup($job->{type});
+ my $tid = $plugin->get_unique_target_id($job);
+
+ printf($format, $job->{id}, $job->{guest}, $tid,
+ defined($job->{interval}) ? $job->{interval} : '-',
+ defined($job->{rate}) ? $job->{rate} : '-',
+ $job->{disable} ? 'no' : 'yes'
+ );
+ }
+};
+
+my $print_job_status = sub {
+ my ($list) = @_;
+
+ my $format = "%-20s %10s %-20s %20s %10s %10s %s\n";
+
+ printf($format, "JobID", "GuestID", "Target", "LastSync", "Duration", "FailCount", "State");
+
+ foreach my $job (sort { $a->{guest} <=> $b->{guest} } @$list) {
+ my $plugin = PVE::ReplicationConfig->lookup($job->{type});
+ my $tid = $plugin->get_unique_target_id($job);
+
+ my $timestr = $job->{last_sync} ?
+ strftime("%Y-%m-%d_%H:%M:%S", localtime($job->{last_sync})) : '-';
+
+ printf($format, $job->{id}, $job->{guest}, $tid,
+ $timestr, $job->{duration} // '-',
+ $job->{fail_count}, , $job->{error} // 'OK');
+ }
+};
+
+our $cmddef = {
+ status => [ 'PVE::API2::Replication', 'status', [], { node => $nodename }, $print_job_status ],
+
+ jobs => [ 'PVE::API2::ReplicationConfig', 'index' , [], {}, $print_job_list ],
+ read => [ 'PVE::API2::ReplicationConfig', 'read' , ['id'], {},
+ sub { my $res = shift; print to_json($res, { utf8 => 1, pretty => 1, canonical => 1}); }],
+ update => [ 'PVE::API2::ReplicationConfig', 'update' , ['id'], {} ],
+ delete => [ 'PVE::API2::ReplicationConfig', 'delete' , ['id'], {} ],
+ 'create-local-job' => [ 'PVE::API2::ReplicationConfig', 'create' , ['id', 'guest', 'target'],
+ { type => 'local' } ],
+
+ enable => [ __PACKAGE__, 'enable', ['id'], {}],
+ disable => [ __PACKAGE__, 'disable', ['id'], {}],
+
+ run => [ __PACKAGE__ , 'run'],
+};
+
+1;
diff --git a/PVE/Makefile b/PVE/Makefile
index ac230fbf..05086629 100644
--- a/PVE/Makefile
+++ b/PVE/Makefile
@@ -3,6 +3,7 @@ include ../defines.mk
SUBDIRS=API2 Status CLI Service
PERLSOURCE = \
+ Replication.pm \
API2.pm \
API2Tools.pm \
HTTPServer.pm \
diff --git a/PVE/Replication.pm b/PVE/Replication.pm
new file mode 100644
index 00000000..220d8f6e
--- /dev/null
+++ b/PVE/Replication.pm
@@ -0,0 +1,249 @@
+package PVE::Replication;
+
+use warnings;
+use strict;
+use Data::Dumper;
+use JSON;
+use Time::HiRes qw(gettimeofday tv_interval);
+
+use PVE::INotify;
+use PVE::Tools;
+use PVE::Cluster;
+use PVE::QemuConfig;
+use PVE::QemuServer;
+use PVE::LXC::Config;
+use PVE::LXC;
+use PVE::Storage;
+use PVE::ReplicationConfig;
+
+# Note: regression tests can overwrite $state_path for testing
+our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
+
+my $update_job_state = sub {
+ my ($stateobj, $jobcfg, $state) = @_;
+
+ my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
+
+ my $vmid = $jobcfg->{guest};
+ my $tid = $plugin->get_unique_target_id($jobcfg);
+
+ # Note: tuple ($vmid, $tid) is unique
+ $stateobj->{$vmid}->{$tid} = $state;
+
+ PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
+};
+
+my $get_job_state = sub {
+ my ($stateobj, $jobcfg) = @_;
+
+ my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
+
+ my $vmid = $jobcfg->{guest};
+ my $tid = $plugin->get_unique_target_id($jobcfg);
+ my $state = $stateobj->{$vmid}->{$tid};
+
+ $state = {} if !$state;
+
+ $state->{last_iteration} //= 0;
+ $state->{last_sync} //= 0;
+ $state->{fail_count} //= 0;
+
+ return $state;
+};
+
+my $read_state = sub {
+
+ return {} if ! -e $state_path;
+
+ my $raw = PVE::Tools::file_get_contents($state_path);
+
+ return {} if $raw eq '';
+
+ return decode_json($raw);
+};
+
+sub job_status {
+ my ($stateobj) = @_;
+
+ my $local_node = PVE::INotify::nodename();
+
+ my $jobs = {};
+
+ $stateobj = $read_state->() if !$stateobj;
+
+ my $cfg = PVE::ReplicationConfig->new();
+
+ my $vms = PVE::Cluster::get_vmlist();
+
+ foreach my $jobid (sort keys %{$cfg->{ids}}) {
+ my $jobcfg = $cfg->{ids}->{$jobid};
+ my $vmid = $jobcfg->{guest};
+
+ die "internal error - not implemented" if $jobcfg->{type} ne 'local';
+
+ # skip non existing vms
+ next if !$vms->{ids}->{$vmid};
+
+ # only consider guest on local node
+ next if $vms->{ids}->{$vmid}->{node} ne $local_node;
+
+ # never sync to local node
+ next if $jobcfg->{target} eq $local_node;
+
+ next if $jobcfg->{disable};
+
+ $jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
+ $jobcfg->{id} = $jobid;
+ $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
+
+ $jobs->{$jobid} = $jobcfg;
+ }
+
+ return $jobs;
+}
+
+my $get_next_job = sub {
+ my ($stateobj, $iteration, $start_time) = @_;
+
+ my $next_jobid;
+
+ my $jobs = job_status($stateobj);
+
+ # compute next_sync here to make it easy to sort jobs
+ my $next_sync_hash = {};
+ foreach my $jobid (keys %$jobs) {
+ my $jobcfg = $jobs->{$jobid};
+ my $interval = $jobcfg->{interval} || 15;
+ my $last_sync = $jobcfg->{state}->{last_sync};
+ $next_sync_hash->{$jobid} = $last_sync + $interval * 60;
+ }
+
+ my $sort_func = sub {
+ my $joba = $jobs->{$a};
+ my $jobb = $jobs->{$b};
+ my $sa = $joba->{state};
+ my $sb = $jobb->{state};
+ my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
+ return $res if $res != 0;
+ $res = $next_sync_hash->{$a} <=> $next_sync_hash->{$b};
+ return $res if $res != 0;
+ return $joba->{guest} <=> $jobb->{guest};
+ };
+
+ foreach my $jobid (sort $sort_func keys %$jobs) {
+ my $jobcfg = $jobs->{$jobid};
+<<<<<<< HEAD
+ next if $jobcfg->{state}->{last_iteration} >= $now;
+ if ($now >= $next_sync_hash->{$jobid}) {
+=======
+ next if $jobcfg->{state}->{last_iteration} >= $iteration;
+ if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
+>>>>>>> c2eac19e... fixup iteration marker
+ $next_jobid = $jobid;
+ last;
+ }
+ }
+
+ return undef if !$next_jobid;
+
+ my $jobcfg = $jobs->{$next_jobid};
+
+ $jobcfg->{state}->{last_iteration} = $iteration;
+ $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
+
+ return $jobcfg;
+};
+
+sub replicate {
+ my ($jobcfg, $start_time) = @_;
+
+ die "implement me";
+}
+
+my $run_replication = sub {
+ my ($stateobj, $jobcfg, $start_time) = @_;
+
+ my $state = delete $jobcfg->{state};
+
+ my $t0 = [gettimeofday];
+
+ eval { replicate($jobcfg, $start_time); };
+ my $err = $@;
+
+ $state->{duration} = tv_interval($t0);
+
+ if ($err) {
+ $state->{fail_count}++;
+ $state->{error} = "$err";
+ $update_job_state->($stateobj, $jobcfg, $state);
+ } else {
+ $state->{last_sync} = $start_time;
+ $state->{fail_count} = 0;
+ delete $state->{error};
+ $update_job_state->($stateobj, $jobcfg, $state);
+ }
+};
+
+sub run_single_job {
+ my ($jobid, $now) = @_; # passing $now useful for regression testing
+
+ my $local_node = PVE::INotify::nodename();
+
+ my $code = sub {
+ $now //= time();
+
+ my $stateobj = $read_state->();
+
+ my $cfg = PVE::ReplicationConfig->new();
+
+ my $jobcfg = $cfg->{ids}->{$jobid};
+ die "no such job '$jobid'\n" if !$jobcfg;
+
+ die "internal error - not implemented" if $jobcfg->{type} ne 'local';
+
+ die "job '$jobid' is disabled\n" if $jobcfg->{disable};
+
+ my $vms = PVE::Cluster::get_vmlist();
+ my $vmid = $jobcfg->{guest};
+
+ die "no such guest '$vmid'\n" if !$vms->{ids}->{$vmid};
+
+ die "guest '$vmid' is not on local node\n"
+ if $vms->{ids}->{$vmid}->{node} ne $local_node;
+
+ die "unable to sync to local node\n" if $jobcfg->{target} eq $local_node;
+
+ $jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
+ $jobcfg->{id} = $jobid;
+ $jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
+
+ $jobcfg->{state}->{last_iteration} = $now;
+ $update_job_state->($stateobj, $jobcfg, $jobcfg->{state});
+
+ $run_replication->($stateobj, $jobcfg, $now);
+ };
+
+ my $res = PVE::Tools::lock_file($state_path, 60, $code);
+ die $@ if $@;
+}
+
+sub run_jobs {
+ my ($now) = @_; # passing $now useful for regression testing
+
+ my $iteration = $now // time();
+
+ my $code = sub {
+ my $stateobj = $read_state->();
+ my $start_time = $now // time();
+
+ while (my $jobcfg = $get_next_job->($stateobj, $iteration, $start_time)) {
+ $run_replication->($stateobj, $jobcfg, $start_time);
+ $start_time = $now // time();
+ }
+ };
+
+ my $res = PVE::Tools::lock_file($state_path, 60, $code);
+ die $@ if $@;
+}
+
+1;
diff --git a/bin/Makefile b/bin/Makefile
index f9143eab..c7fca9f8 100644
--- a/bin/Makefile
+++ b/bin/Makefile
@@ -9,7 +9,7 @@ export PERLLIB=..
SUBDIRS = init.d ocf test
SERVICES = pvestatd pveproxy pvedaemon spiceproxy
-CLITOOLS = vzdump pvesubscription pveceph pveam
+CLITOOLS = vzdump pvesubscription pveceph pveam pvesr
SCRIPTS = \
${SERVICES} \
diff --git a/bin/pvesr b/bin/pvesr
new file mode 100644
index 00000000..762bb356
--- /dev/null
+++ b/bin/pvesr
@@ -0,0 +1,8 @@
+#!/usr/bin/perl -T
+
+use strict;
+use warnings;
+
+use PVE::CLI::pvesr;
+
+PVE::CLI::pvesr->run_cli_handler();
--
2.11.0
More information about the pve-devel
mailing list