[pve-devel] [PATCH v3 pve-manager 01/23] pvesr: add pve storage replication tool

Wolfgang Link w.link at proxmox.com
Wed May 31 10:47:42 CEST 2017



On 05/30/2017 03:19 PM, Dietmar Maurer wrote:
> Just added code to configure jobs. Replication itself is not
> implemented.
>
> Signed-off-by: Dietmar Maurer <dietmar at proxmox.com>
> ---
>  PVE/API2/Cluster.pm           |   7 ++
>  PVE/API2/Makefile             |   2 +
>  PVE/API2/Nodes.pm             |   7 ++
>  PVE/API2/Replication.pm       |  93 ++++++++++++++++
>  PVE/API2/ReplicationConfig.pm | 217 ++++++++++++++++++++++++++++++++++++
>  PVE/CLI/Makefile              |   2 +-
>  PVE/CLI/pvesr.pm              | 150 +++++++++++++++++++++++++
>  PVE/Makefile                  |   1 +
>  PVE/Replication.pm            | 249 ++++++++++++++++++++++++++++++++++++++++++
>  bin/Makefile                  |   2 +-
>  bin/pvesr                     |   8 ++
>  11 files changed, 736 insertions(+), 2 deletions(-)
>  create mode 100644 PVE/API2/Replication.pm
>  create mode 100644 PVE/API2/ReplicationConfig.pm
>  create mode 100644 PVE/CLI/pvesr.pm
>  create mode 100644 PVE/Replication.pm
>  create mode 100644 bin/pvesr
>
> diff --git a/PVE/API2/Cluster.pm b/PVE/API2/Cluster.pm
> index 6ce86de4..0e94e9ff 100644
> --- a/PVE/API2/Cluster.pm
> +++ b/PVE/API2/Cluster.pm
> @@ -22,10 +22,16 @@ use PVE::RPCEnvironment;
>  use PVE::JSONSchema qw(get_standard_option);
>  use PVE::Firewall;
>  use PVE::API2::Firewall::Cluster;
> +use PVE::API2::ReplicationConfig;
>
>  use base qw(PVE::RESTHandler);
>
>  __PACKAGE__->register_method ({
> +    subclass => "PVE::API2::ReplicationConfig",
> +    path => 'replication',
> +});
> +
> +__PACKAGE__->register_method ({
>      subclass => "PVE::API2::ClusterConfig",
>      path => 'config',
>  });
> @@ -82,6 +88,7 @@ __PACKAGE__->register_method ({
>  	    { name => 'log' },
>  	    { name => 'options' },
>  	    { name => 'resources' },
> +	    { name => 'replication' },
>  	    { name => 'tasks' },
>  	    { name => 'backup' },
>  	    { name => 'ha' },
> diff --git a/PVE/API2/Makefile b/PVE/API2/Makefile
> index 4dfcbb56..86d75d36 100644
> --- a/PVE/API2/Makefile
> +++ b/PVE/API2/Makefile
> @@ -1,6 +1,8 @@
>  include ../../defines.mk
>
>  PERLSOURCE = 			\
> +	Replication.pm		\
> +	ReplicationConfig.pm	\
>  	Ceph.pm			\
>  	APT.pm			\
>  	Subscription.pm		\
> diff --git a/PVE/API2/Nodes.pm b/PVE/API2/Nodes.pm
> index a45ca6db..885cf116 100644
> --- a/PVE/API2/Nodes.pm
> +++ b/PVE/API2/Nodes.pm
> @@ -40,6 +40,7 @@ use PVE::API2::VZDump;
>  use PVE::API2::APT;
>  use PVE::API2::Ceph;
>  use PVE::API2::Firewall::Host;
> +use PVE::API2::Replication;
>  use Digest::MD5;
>  use Digest::SHA;
>  use PVE::API2::Disks;
> @@ -113,6 +114,11 @@ __PACKAGE__->register_method ({
>  });
>
>  __PACKAGE__->register_method ({
> +    subclass => "PVE::API2::Replication",
> +    path => 'replication',
> +});
> +
> +__PACKAGE__->register_method ({
>      name => 'index',
>      path => '',
>      method => 'GET',
> @@ -147,6 +153,7 @@ __PACKAGE__->register_method ({
>  	    { name => 'tasks' },
>  	    { name => 'rrd' }, # fixme: remove?
>  	    { name => 'rrddata' },# fixme: remove?
> +	    { name => 'replication' },
>  	    { name => 'vncshell' },
>  	    { name => 'spiceshell' },
>  	    { name => 'time' },
> diff --git a/PVE/API2/Replication.pm b/PVE/API2/Replication.pm
> new file mode 100644
> index 00000000..eaa7e571
> --- /dev/null
> +++ b/PVE/API2/Replication.pm
> @@ -0,0 +1,93 @@
> +package PVE::API2::Replication;
> +
> +use warnings;
> +use strict;
> +
> +use PVE::JSONSchema qw(get_standard_option);
> +use PVE::RPCEnvironment;
> +use PVE::ReplicationConfig;
> +use PVE::Replication;
> +
> +use PVE::RESTHandler;
> +
> +use base qw(PVE::RESTHandler);
> +
> +__PACKAGE__->register_method ({
> +    name => 'index',
> +    path => '',
> +    method => 'GET',
> +    permissions => { user => 'all' },
> +    description => "Directory index.",
> +    parameters => {
> +	additionalProperties => 0,
> +	properties => {
> +	    node => get_standard_option('pve-node'),
> +	},
> +    },
> +    returns => {
> +	type => 'array',
> +	items => {
> +	    type => "object",
> +	    properties => {},
> +	},
> +	links => [ { rel => 'child', href => "{name}" } ],
> +    },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	return [
> +	    { name => 'status' },
> +	];
> +    }});
> +
> +
> +__PACKAGE__->register_method ({
> +    name => 'status',
> +    path => 'status',
> +    method => 'GET',
> +    description => "List replication job status.",
> +    permissions => {
> +	description => "Requires the VM.Audit permission on /vms/<vmid>.",
> +	user => 'all',
> +    },
> +    protected => 1,
> +    proxyto => 'node',
> +    parameters => {
> +	additionalProperties => 0,
> +	properties => {
> +	    node => get_standard_option('pve-node'),
> +	},
> +    },
> +    returns => {
> +	type => 'array',
> +	items => {
> +	    type => "object",
> +	    properties => {},
> +	},
> +	links => [ { rel => 'child', href => "{vmid}" } ],
> +    },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	my $rpcenv = PVE::RPCEnvironment::get();
> +	my $authuser = $rpcenv->get_user();
> +
> +	my $jobs = PVE::Replication::job_status();
> +
> +	my $res = [];
> +	foreach my $id (sort keys %$jobs) {
> +	    my $d = $jobs->{$id};
> +	    my $state = delete $d->{state};
> +	    my $vmid = $d->{guest};
> +	    next if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
> +	    $d->{id} = $id;
> +	    foreach my $k (qw(last_sync fail_count error duration)) {
> +		$d->{$k} = $state->{$k} if defined($state->{$k});
> +	    }
> +	    push @$res, $d;
> +	}
> +
> +	return $res;
> +    }});
> +
> +1;
> diff --git a/PVE/API2/ReplicationConfig.pm b/PVE/API2/ReplicationConfig.pm
> new file mode 100644
> index 00000000..6f4de385
> --- /dev/null
> +++ b/PVE/API2/ReplicationConfig.pm
> @@ -0,0 +1,217 @@
> +package PVE::API2::ReplicationConfig;
> +
> +use warnings;
> +use strict;
> +
> +use PVE::Tools qw(extract_param);
> +use PVE::Exception qw(raise_perm_exc);
> +use PVE::JSONSchema qw(get_standard_option);
> +use PVE::RPCEnvironment;
> +use PVE::ReplicationConfig;
> +
> +use PVE::RESTHandler;
> +
> +use base qw(PVE::RESTHandler);
> +
> +__PACKAGE__->register_method ({
> +    name => 'index',
> +    path => '',
> +    method => 'GET',
> +    description => "List replication jobs.",
> +    permissions => {
> +	description => "Requires the VM.Audit permission on /vms/<vmid>.",
> +	user => 'all',
> +    },
> +    parameters => {
> +	additionalProperties => 0,
> +	properties => {},
> +    },
> +    returns => {
> +	type => 'array',
> +	items => {
> +	    type => "object",
> +	    properties => {},
> +	},
> +	links => [ { rel => 'child', href => "{vmid}" } ],
> +    },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	my $rpcenv = PVE::RPCEnvironment::get();
> +	my $authuser = $rpcenv->get_user();
> +
> +	my $cfg = PVE::ReplicationConfig->new();
> +
> +	my $res = [];
> +	foreach my $id (sort keys %{$cfg->{ids}}) {
> +	    my $d = $cfg->{ids}->{$id};
> +	    my $vmid = $d->{guest};
> +	    next if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
> +	    $d->{id} = $id;
> +	    push @$res, $d;
> +	}
> +
> +	return $res;
> +    }});
> +
> +__PACKAGE__->register_method ({
> +    name => 'read',
> +    path => '{id}',
> +    method => 'GET',
> +    description => "Read replication job configuration.",
> +    permissions => {
> +	description => "Requires the VM.Audit permission on /vms/<vmid>.",
> +	user => 'all',
> +    },
> +    parameters => {
> +    	additionalProperties => 0,
> +	properties => {
> +	    id => get_standard_option('pve-replication-id'),
> +	},
> +    },
> +    returns => { type => 'object' },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	my $rpcenv = PVE::RPCEnvironment::get();
> +	my $authuser = $rpcenv->get_user();
> +
> +	my $cfg = PVE::ReplicationConfig->new();
> +
> +	my $data = $cfg->{ids}->{$param->{id}};
> +
> +	die "no such replication job '$param->{id}'\n" if !defined($data);
> +
> +	my $vmid = $data->{guest};
> +
> +	raise_perm_exc() if !$rpcenv->check($authuser, "/vms/$vmid", [ 'VM.Audit' ]);
> +
> +	$data->{id} = $param->{id};
> +
> +	return $data;
> +    }});
> +
> +__PACKAGE__->register_method ({
> +    name => 'create',
> +    path => '',
> +    protected => 1,
> +    method => 'POST',
> +    description => "Create a new replication job",
> +    permissions => {
> +	check => ['perm', '/storage', ['Datastore.Allocate']],
> +    },
> +    parameters => PVE::ReplicationConfig->createSchema(),
> +    returns => { type => 'null' },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	my $type = extract_param($param, 'type');
> +	my $plugin = PVE::ReplicationConfig->lookup($type);
> +	my $id = extract_param($param, 'id');
> +
> +	my $code = sub {
> +	    my $cfg = PVE::ReplicationConfig->new();
> +
> +	    #die "replication job for guest '$param->{guest}' to target '$param->{target}' already exists\n"
> +	    die "replication job '$id' already exists\n"
> +		if $cfg->{ids}->{$id};
> +
> +	    my $opts = $plugin->check_config($id, $param, 1, 1);
> +
> +	    $cfg->{ids}->{$id} = $opts;
> +
> +	    $cfg->write();
> +	};
> +
> +	PVE::ReplicationConfig::lock($code);
> +
> +	return undef;
> +    }});
> +
> +
> +__PACKAGE__->register_method ({
> +    name => 'update',
> +    protected => 1,
> +    path => '{id}',
> +    method => 'PUT',
> +    description => "Update replication job configuration.",
> +    permissions => {
> +	check => ['perm', '/storage', ['Datastore.Allocate']],
> +    },
> +    parameters => PVE::ReplicationConfig->updateSchema(),
> +    returns => { type => 'null' },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	my $id = extract_param($param, 'id');
> +
> +	my $code = sub {
> +	    my $cfg = PVE::ReplicationConfig->new();
> +
> +	    my $data = $cfg->{ids}->{$id};
> +	    die "no such job '$id'\n" if !$data;
> +
> +	    my $plugin = PVE::ReplicationConfig->lookup($data->{type});
> +	    my $opts = $plugin->check_config($id, $param, 0, 1);
> +
> +	    foreach my $k (%$opts) {
> +		$data->{$k} = $opts->{$k};
> +	    }
> +
> +	    $cfg->write();
> +	};
> +
> +	PVE::ReplicationConfig::lock($code);
> +
> +	return undef;
> +    }});
> +
> +__PACKAGE__->register_method ({
> +    name => 'delete',
> +    protected => 1,
> +    path => '{id}',
> +    method => 'DELETE',
> +    description => "Delete replication job",
> +    permissions => {
> +	check => ['perm', '/storage', ['Datastore.Allocate']],
> +    },
> +    parameters => {
> +    	additionalProperties => 0,
> +	properties => {
> +	    id => get_standard_option('pve-replication-id'),
> +	    keep => {
> +		description => "Keep replicated data at target (do not remove).",
> +		type => 'boolean',
> +		optional => 1,
> +		default => 0,
> +	    },
> +	}
> +    },
> +    returns => { type => 'null' },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	my $id = extract_param($param, 'id');
> +
> +	my $code = sub {
> +	    my $cfg = PVE::ReplicationConfig->new();
> +
> +	    my $data = $cfg->{ids}->{$id};
> +	    die "no such job '$id'\n" if !$data;
> +
> +	    if (!$param->{keep}) {
> +		# fixme: cleanup data at target
> +
> +	    }
> +	    # fixme: cleanup snapshots
> +
> +	    delete $cfg->{ids}->{$id};
> +
> +	    $cfg->write();
> +	};
> +
> +	PVE::ReplicationConfig::lock($code);
> +
> +	return undef;
> +    }});
> +1;
> diff --git a/PVE/CLI/Makefile b/PVE/CLI/Makefile
> index b005a8f1..3a27503d 100644
> --- a/PVE/CLI/Makefile
> +++ b/PVE/CLI/Makefile
> @@ -1,6 +1,6 @@
>  include ../../defines.mk
>
> -SOURCES=vzdump.pm pvesubscription.pm pveceph.pm pveam.pm
> +SOURCES=vzdump.pm pvesubscription.pm pveceph.pm pveam.pm pvesr.pm
>
>  all:
>
> diff --git a/PVE/CLI/pvesr.pm b/PVE/CLI/pvesr.pm
> new file mode 100644
> index 00000000..43359604
> --- /dev/null
> +++ b/PVE/CLI/pvesr.pm
> @@ -0,0 +1,150 @@
> +package PVE::CLI::pvesr;
> +
> +use strict;
> +use warnings;
> +use POSIX qw(strftime);
> +use JSON;
> +
> +use PVE::JSONSchema qw(get_standard_option);
> +use PVE::INotify;
> +use PVE::RPCEnvironment;
> +use PVE::Tools qw(extract_param);
> +use PVE::SafeSyslog;
> +use PVE::CLIHandler;
> +
> +use PVE::Replication;
> +use PVE::API2::ReplicationConfig;
> +use PVE::API2::Replication;
> +
> +use base qw(PVE::CLIHandler);
> +
> +my $nodename = PVE::INotify::nodename();
> +
> +sub setup_environment {
> +    PVE::RPCEnvironment->setup_default_cli_env();
> +}
> +
> +__PACKAGE__->register_method ({
> +    name => 'run',
> +    path => 'run',
> +    method => 'POST',
> +    description => "This method is called by the systemd-timer and executes all (or a specific) sync jobs.",
> +    parameters => {
> +	additionalProperties => 0,
> +	properties => {
> +	    id => get_standard_option('pve-replication-id', { optional => 1 }),
> +	},
> +    },
> +    returns => { type => 'null' },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	if (my $id = extract_param($param, 'id')) {
> +
> +	    PVE::Replication::run_single_job($id);
> +
> +	} else {
> +
> +	    PVE::Replication::run_jobs();
> +	}
> +
> +	return undef;
> +    }});
> +
> +__PACKAGE__->register_method ({
> +    name => 'enable',
> +    path => 'enable',
> +    method => 'POST',
> +    description => "Enable a replication job.",
> +    parameters => {
> +	additionalProperties => 0,
> +	properties => {
> +	    id => get_standard_option('pve-replication-id'),
> +	},
> +    },
> +    returns => { type => 'null' },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	$param->{disable} = 0;
> +
> +	return PVE::API2::ReplicationConfig->update($param);
> +    }});
> +
> +__PACKAGE__->register_method ({
> +    name => 'disable',
> +    path => 'disable',
> +    method => 'POST',
> +    description => "Disable a replication job.",
> +    parameters => {
> +	additionalProperties => 0,
> +	properties => {
> +	    id => get_standard_option('pve-replication-id'),
> +	},
> +    },
> +    returns => { type => 'null' },
> +    code => sub {
> +	my ($param) = @_;
> +
> +	$param->{disable} = 1;
> +
> +	return PVE::API2::ReplicationConfig->update($param);
> +    }});
> +
> +my $print_job_list = sub {
> +    my ($list) = @_;
> +
> +    my $format = "%-20s %10s %-20s %10s %5s %8s\n";
> +
> +    printf($format, "JobID", "GuestID", "Target", "Interval", "Rate", "Enabled");
> +
> +    foreach my $job (sort { $a->{guest} <=> $b->{guest} } @$list) {
> +	my $plugin = PVE::ReplicationConfig->lookup($job->{type});
> +	my $tid = $plugin->get_unique_target_id($job);
> +
> +	printf($format, $job->{id}, $job->{guest}, $tid,
> +	       defined($job->{interval}) ? $job->{interval} : '-',
> +	       defined($job->{rate}) ? $job->{rate} : '-',
> +	       $job->{disable} ? 'no' : 'yes'
> +	    );
> +    }
> +};
> +
> +my $print_job_status = sub {
> +    my ($list) = @_;
> +
> +    my $format = "%-20s %10s %-20s %20s %10s %10s %s\n";
> +
> +    printf($format, "JobID", "GuestID", "Target", "LastSync", "Duration", "FailCount", "State");
> +
> +    foreach my $job (sort { $a->{guest} <=> $b->{guest} } @$list) {
> +	my $plugin = PVE::ReplicationConfig->lookup($job->{type});
> +	my $tid = $plugin->get_unique_target_id($job);
> +
> +	my $timestr = $job->{last_sync} ?
> +	    strftime("%Y-%m-%d_%H:%M:%S", localtime($job->{last_sync})) : '-';
> +
> +	printf($format, $job->{id}, $job->{guest}, $tid,
> +	       $timestr, $job->{duration} // '-',
> +	       $job->{fail_count}, , $job->{error} // 'OK');
> +    }
> +};
> +
> +our $cmddef = {
> +    status => [ 'PVE::API2::Replication', 'status', [], { node => $nodename }, $print_job_status ],
> +
> +    jobs => [ 'PVE::API2::ReplicationConfig', 'index' , [], {}, $print_job_list ],
> +    read => [ 'PVE::API2::ReplicationConfig', 'read' , ['id'], {},
> +	     sub { my $res = shift; print to_json($res, { utf8 => 1, pretty => 1, canonical => 1}); }],
> +    update => [ 'PVE::API2::ReplicationConfig', 'update' , ['id'], {} ],
> +    delete => [ 'PVE::API2::ReplicationConfig', 'delete' , ['id'], {} ],
> +    'create-local-job' => [ 'PVE::API2::ReplicationConfig', 'create' , ['id', 'guest', 'target'],
> +			    { type => 'local' } ],
> +
> +    enable => [ __PACKAGE__, 'enable', ['id'], {}],
> +    disable => [ __PACKAGE__, 'disable', ['id'], {}],
> +
> +    run => [ __PACKAGE__ , 'run'],
> +};
> +
> +1;
> diff --git a/PVE/Makefile b/PVE/Makefile
> index ac230fbf..05086629 100644
> --- a/PVE/Makefile
> +++ b/PVE/Makefile
> @@ -3,6 +3,7 @@ include ../defines.mk
>  SUBDIRS=API2 Status CLI Service
>
>  PERLSOURCE = 			\
> +	Replication.pm		\
>  	API2.pm			\
>  	API2Tools.pm		\
>  	HTTPServer.pm		\
> diff --git a/PVE/Replication.pm b/PVE/Replication.pm
> new file mode 100644
> index 00000000..220d8f6e
> --- /dev/null
> +++ b/PVE/Replication.pm
> @@ -0,0 +1,249 @@
> +package PVE::Replication;
> +
> +use warnings;
> +use strict;
> +use Data::Dumper;
> +use JSON;
> +use Time::HiRes qw(gettimeofday tv_interval);
> +
> +use PVE::INotify;
> +use PVE::Tools;
> +use PVE::Cluster;
> +use PVE::QemuConfig;
> +use PVE::QemuServer;
> +use PVE::LXC::Config;
> +use PVE::LXC;
> +use PVE::Storage;
> +use PVE::ReplicationConfig;
> +
> +# Note: regression tests can overwrite $state_path for testing
> +our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
> +
> +my $update_job_state = sub {
> +    my ($stateobj, $jobcfg, $state) = @_;
> +
> +    my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
> +
> +    my $vmid = $jobcfg->{guest};
> +    my $tid = $plugin->get_unique_target_id($jobcfg);
> +
> +    # Note: tuple ($vmid, $tid) is unique
> +    $stateobj->{$vmid}->{$tid} = $state;
> +
> +    PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
> +};
> +
> +my $get_job_state = sub {
> +    my ($stateobj, $jobcfg) = @_;
> +
> +    my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
> +
> +    my $vmid = $jobcfg->{guest};
> +    my $tid = $plugin->get_unique_target_id($jobcfg);
> +    my $state = $stateobj->{$vmid}->{$tid};
> +
> +    $state = {} if !$state;
> +
> +    $state->{last_iteration} //= 0;
> +    $state->{last_sync} //= 0;
> +    $state->{fail_count} //= 0;
> +
> +    return $state;
> +};
> +
> +my $read_state = sub {
> +
> +    return {} if ! -e $state_path;
> +
> +    my $raw = PVE::Tools::file_get_contents($state_path);
> +
> +    return {} if $raw eq '';
> +
> +    return decode_json($raw);
JSON::decode will not untaint the raw text, so we get problems with the 
last_sync, which we use many times in the code with open3.
I would use a regex to untaint the $raw here, because it is much easier 
then doing this all over the code.
> +};
> +
> +sub job_status {
> +    my ($stateobj) = @_;
> +
> +    my $local_node = PVE::INotify::nodename();
> +
> +    my $jobs = {};
> +
> +    $stateobj = $read_state->() if !$stateobj;
> +
> +    my $cfg = PVE::ReplicationConfig->new();
> +
> +    my $vms = PVE::Cluster::get_vmlist();
> +
> +    foreach my $jobid (sort keys %{$cfg->{ids}}) {
> +	my $jobcfg = $cfg->{ids}->{$jobid};
> +	my $vmid = $jobcfg->{guest};
> +
> +	die "internal error - not implemented" if $jobcfg->{type} ne 'local';
> +
> +	# skip non existing vms
> +	next if !$vms->{ids}->{$vmid};
> +
> +	# only consider guest on local node
> +	next if $vms->{ids}->{$vmid}->{node} ne $local_node;
> +
> +	# never sync to local node
> +	next if $jobcfg->{target} eq $local_node;
> +
> +	next if $jobcfg->{disable};
> +
> +	$jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
> +	$jobcfg->{id} = $jobid;
> +	$jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
> +
> +	$jobs->{$jobid} = $jobcfg;
> +    }
> +
> +    return $jobs;
> +}
> +
> +my $get_next_job = sub {
> +    my ($stateobj, $iteration, $start_time) = @_;
> +
> +    my $next_jobid;
> +
> +    my $jobs = job_status($stateobj);
> +
> +    # compute next_sync here to make it easy to sort jobs
> +    my $next_sync_hash = {};
> +    foreach my $jobid (keys %$jobs) {
> +	my $jobcfg = $jobs->{$jobid};
> +	my $interval = $jobcfg->{interval} || 15;
> +	my $last_sync = $jobcfg->{state}->{last_sync};
> +	$next_sync_hash->{$jobid} = $last_sync + $interval * 60;
> +    }
> +
> +    my $sort_func = sub {
> +	my $joba = $jobs->{$a};
> +	my $jobb = $jobs->{$b};
> +	my $sa =  $joba->{state};
> +	my $sb =  $jobb->{state};
> +	my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
> +	return $res if $res != 0;
> +	$res = $next_sync_hash->{$a} <=> $next_sync_hash->{$b};
> +	return $res if $res != 0;
> +	return  $joba->{guest} <=> $jobb->{guest};
> +    };
> +
> +    foreach my $jobid (sort $sort_func keys %$jobs) {
> +	my $jobcfg = $jobs->{$jobid};
> +<<<<<<< HEAD
> +	next if $jobcfg->{state}->{last_iteration} >= $now;
> +	if ($now >= $next_sync_hash->{$jobid}) {
> +=======
> +	next if $jobcfg->{state}->{last_iteration} >= $iteration;
> +	if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
> +>>>>>>> c2eac19e... fixup iteration marker
> +	    $next_jobid = $jobid;
> +	    last;
> +	}
> +    }
> +
> +    return undef if !$next_jobid;
> +
> +    my $jobcfg = $jobs->{$next_jobid};
> +
> +    $jobcfg->{state}->{last_iteration} = $iteration;
> +    $update_job_state->($stateobj, $jobcfg,  $jobcfg->{state});
> +
> +    return $jobcfg;
> +};
> +
> +sub replicate {
> +    my ($jobcfg, $start_time) = @_;
> +
> +    die "implement me";
> +}
> +
> +my $run_replication = sub {
> +    my ($stateobj, $jobcfg, $start_time) = @_;
> +
> +    my $state = delete $jobcfg->{state};
> +
> +    my $t0 = [gettimeofday];
> +
> +    eval { replicate($jobcfg, $start_time); };
> +    my $err = $@;
> +
> +    $state->{duration} = tv_interval($t0);
> +
> +    if ($err) {
> +	$state->{fail_count}++;
> +	$state->{error} = "$err";
> +	$update_job_state->($stateobj, $jobcfg,  $state);
> +   } else {
> +	$state->{last_sync} = $start_time;
> +	$state->{fail_count} = 0;
> +	delete $state->{error};
> +	$update_job_state->($stateobj, $jobcfg,  $state);
> +    }
> +};
> +
> +sub run_single_job {
> +    my ($jobid, $now) = @_; # passing $now useful for regression testing
> +
> +    my $local_node = PVE::INotify::nodename();
> +
> +    my $code = sub {
> +	$now //= time();
> +
> +	my $stateobj = $read_state->();
> +
> +	my $cfg = PVE::ReplicationConfig->new();
> +
> +	my $jobcfg = $cfg->{ids}->{$jobid};
> +	die "no such job '$jobid'\n" if !$jobcfg;
> +
> +	die "internal error - not implemented" if $jobcfg->{type} ne 'local';
> +
> +	die "job '$jobid' is disabled\n" if $jobcfg->{disable};
> +
> +	my $vms = PVE::Cluster::get_vmlist();
> +	my $vmid = $jobcfg->{guest};
> +
> +	die "no such guest '$vmid'\n" if !$vms->{ids}->{$vmid};
> +
> +	die "guest '$vmid' is not on local node\n"
> +	    if $vms->{ids}->{$vmid}->{node} ne $local_node;
> +
> +	die "unable to sync to local node\n" if $jobcfg->{target} eq $local_node;
> +
> +	$jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
> +	$jobcfg->{id} = $jobid;
> +	$jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
> +
> +	$jobcfg->{state}->{last_iteration} = $now;
> +	$update_job_state->($stateobj, $jobcfg,  $jobcfg->{state});
> +
> +	$run_replication->($stateobj, $jobcfg, $now);
> +    };
> +
> +    my $res = PVE::Tools::lock_file($state_path, 60, $code);
> +    die $@ if $@;
> +}
> +
> +sub run_jobs {
> +    my ($now) = @_; # passing $now useful for regression testing
> +
> +    my $iteration = $now // time();
> +
> +    my $code = sub {
> +	my $stateobj = $read_state->();
> +	my $start_time = $now // time();
> +
> +	while (my $jobcfg = $get_next_job->($stateobj, $iteration, $start_time)) {
> +	    $run_replication->($stateobj, $jobcfg, $start_time);
> +	    $start_time = $now // time();
> +	}
> +    };
> +
> +    my $res = PVE::Tools::lock_file($state_path, 60, $code);
> +    die $@ if $@;
> +}
> +
> +1;
> diff --git a/bin/Makefile b/bin/Makefile
> index f9143eab..c7fca9f8 100644
> --- a/bin/Makefile
> +++ b/bin/Makefile
> @@ -9,7 +9,7 @@ export PERLLIB=..
>  SUBDIRS = init.d ocf test
>
>  SERVICES = pvestatd pveproxy pvedaemon spiceproxy
> -CLITOOLS = vzdump pvesubscription pveceph pveam
> +CLITOOLS = vzdump pvesubscription pveceph pveam pvesr
>
>  SCRIPTS =  			\
>  	${SERVICES}		\
> diff --git a/bin/pvesr b/bin/pvesr
> new file mode 100644
> index 00000000..762bb356
> --- /dev/null
> +++ b/bin/pvesr
> @@ -0,0 +1,8 @@
> +#!/usr/bin/perl -T
> +
> +use strict;
> +use warnings;
> +
> +use PVE::CLI::pvesr;
> +
> +PVE::CLI::pvesr->run_cli_handler();
>




More information about the pve-devel mailing list