[pve-devel] [PATCH pve-manager 2/2] PVE::Replication - use new PVE::ReplicationState class
Dietmar Maurer
dietmar at proxmox.com
Fri Jun 2 10:20:23 CEST 2017
Signed-off-by: Dietmar Maurer <dietmar at proxmox.com>
---
PVE/Replication.pm | 165 ++++++++++++++++-------------------------
bin/test/ReplicationTestEnv.pm | 23 +-----
2 files changed, 66 insertions(+), 122 deletions(-)
diff --git a/PVE/Replication.pm b/PVE/Replication.pm
index 55fda1f0..fad5061c 100644
--- a/PVE/Replication.pm
+++ b/PVE/Replication.pm
@@ -19,66 +19,17 @@ use PVE::LXC;
use PVE::Storage;
use PVE::GuestHelpers;
use PVE::ReplicationConfig;
+use PVE::ReplicationState;
-# Note: regression tests can overwrite $state_path for testing
-our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
our $pvesr_lock_path = "/var/lock/pvesr.lck";
-my $update_job_state = sub {
- my ($stateobj, $jobcfg, $state) = @_;
-
- my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
-
- my $vmid = $jobcfg->{guest};
- my $tid = $plugin->get_unique_target_id($jobcfg);
-
- # Note: tuple ($vmid, $tid) is unique
- $stateobj->{$vmid}->{$tid} = $state;
-
- PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
-};
-
-my $get_job_state = sub {
- my ($stateobj, $jobcfg) = @_;
-
- my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
-
- my $vmid = $jobcfg->{guest};
- my $tid = $plugin->get_unique_target_id($jobcfg);
- my $state = $stateobj->{$vmid}->{$tid};
-
- $state = {} if !$state;
-
- $state->{last_iteration} //= 0;
- $state->{last_try} //= 0; # last sync start time
- $state->{last_sync} //= 0; # last successful sync start time
- $state->{fail_count} //= 0;
-
- return $state;
-};
-
-my $read_state = sub {
-
- return {} if ! -e $state_path;
-
- my $raw = PVE::Tools::file_get_contents($state_path);
-
- return {} if $raw eq '';
-
- # untaint $raw
- $raw =~ m/^({.*})$/;
-
- return decode_json($1);
-};
-
sub job_status {
- my ($stateobj) = @_;
my $local_node = PVE::INotify::nodename();
my $jobs = {};
- $stateobj = $read_state->() if !$stateobj;
+ my $stateobj = PVE::ReplicationState::read_state();
my $cfg = PVE::ReplicationConfig->new();
@@ -103,7 +54,7 @@ sub job_status {
next if $jobcfg->{disable};
}
- my $state = $get_job_state->($stateobj, $jobcfg);
+ my $state = PVE::ReplicationState::extract_job_state($stateobj, $jobcfg);
$jobcfg->{state} = $state;
$jobcfg->{id} = $jobid;
$jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
@@ -134,9 +85,9 @@ sub job_status {
}
my $get_next_job = sub {
- my ($stateobj, $iteration, $start_time) = @_;
+ my ($iteration, $start_time) = @_;
- my $jobs = job_status($stateobj);
+ my $jobs = job_status();
my $sort_func = sub {
my $joba = $jobs->{$a};
@@ -404,59 +355,71 @@ sub replicate {
die $err if $err;
}
-my $run_replication = sub {
- my ($stateobj, $jobcfg, $iteration, $start_time, $logfunc) = @_;
-
- my $state = $get_job_state->($stateobj, $jobcfg);
+my $run_replication_nolock = sub {
+ my ($jobcfg, $iteration, $start_time, $logfunc) = @_;
- my $t0 = [gettimeofday];
+ # we normaly write errors into the state file,
+ # but we also catch unexpected errors and log them to syslog
+ # (for examply when there are problems writing the state file)
+ eval {
+ my $state = PVE::ReplicationState::read_job_state($jobcfg);
- # cleanup stale pid/ptime state
- foreach my $vmid (keys %$stateobj) {
- foreach my $tid (keys %{$stateobj->{$vmid}}) {
- my $state = $stateobj->{$vmid}->{$tid};
- delete $state->{pid};
- delete $state->{ptime};
- }
- }
+ my $t0 = [gettimeofday];
- $state->{pid} = $$;
- $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
- $state->{last_try} = $start_time;
- $state->{last_iteration} = $iteration;
+ $state->{pid} = $$;
+ $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
+ $state->{last_try} = $start_time;
+ $state->{last_iteration} = $iteration;
- $update_job_state->($stateobj, $jobcfg, $state);
+ PVE::ReplicationState::write_job_state($jobcfg, $state);
- $logfunc->($start_time, "$jobcfg->{id}: start replication job") if $logfunc;
+ $logfunc->($start_time, "$jobcfg->{id}: start replication job") if $logfunc;
- eval {
- my $timeout = 2; # do not wait too long - we repeat periodically anyways
- PVE::GuestHelpers::guest_migration_lock(
- $jobcfg->{guest}, $timeout, \&replicate,
- $jobcfg, $state->{last_sync}, $start_time, $logfunc);
- };
- my $err = $@;
+ eval {
+ replicate($jobcfg, $state->{last_sync}, $start_time, $logfunc);
+ };
+ my $err = $@;
- $state->{duration} = tv_interval($t0);
- delete $state->{pid};
- delete $state->{ptime};
+ $state->{duration} = tv_interval($t0);
+ delete $state->{pid};
+ delete $state->{ptime};
- if ($err) {
- $state->{fail_count}++;
- $state->{error} = "$err";
- $update_job_state->($stateobj, $jobcfg, $state);
- if ($logfunc) {
+ if ($err) {
chomp $err;
- $logfunc->($start_time, "$jobcfg->{id}: end replication job with error: $err");
+ $state->{fail_count}++;
+ $state->{error} = "$err";
+ PVE::ReplicationState::write_job_state($jobcfg, $state);
+ my $msg = "$jobcfg->{id}: end replication job with error: $err";
+ if ($logfunc) {
+ $logfunc->($start_time, $msg);
+ } else {
+ warn $msg;
+ }
} else {
- warn $err;
+ $logfunc->($start_time, "$jobcfg->{id}: end replication job") if $logfunc;
+ $state->{last_sync} = $start_time;
+ $state->{fail_count} = 0;
+ delete $state->{error};
+ PVE::ReplicationState::write_job_state($jobcfg, $state);
}
- } else {
- $logfunc->($start_time, "$jobcfg->{id}: end replication job") if $logfunc;
- $state->{last_sync} = $start_time;
- $state->{fail_count} = 0;
- delete $state->{error};
- $update_job_state->($stateobj, $jobcfg, $state);
+ };
+ if (my $err = $@) {
+ warn "$jobcfg->{id}: got unexpected error - $err";
+ }
+};
+
+my $run_replication = sub {
+ my ($jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
+
+ eval {
+ my $timeout = 2; # do not wait too long - we repeat periodically anyways
+ PVE::GuestHelpers::guest_migration_lock(
+ $jobcfg->{guest}, $timeout, $run_replication_nolock,
+ $jobcfg, $iteration, $start_time, $logfunc);
+ };
+ if (my $err = $@) {
+ return undef if $noerr;
+ die $err;
}
};
@@ -468,8 +431,6 @@ sub run_single_job {
my $code = sub {
$now //= time();
- my $stateobj = $read_state->();
-
my $cfg = PVE::ReplicationConfig->new();
my $jobcfg = $cfg->{ids}->{$jobid};
@@ -489,11 +450,10 @@ sub run_single_job {
die "unable to sync to local node\n" if $jobcfg->{target} eq $local_node;
- $jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
$jobcfg->{id} = $jobid;
$jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
- $run_replication->($stateobj, $jobcfg, $now, $now, $logfunc);
+ $run_replication->($jobcfg, $now, $now, $logfunc);
};
my $res = PVE::Tools::lock_file($pvesr_lock_path, 60, $code);
@@ -506,11 +466,10 @@ sub run_jobs {
my $iteration = $now // time();
my $code = sub {
- my $stateobj = $read_state->();
my $start_time = $now // time();
- while (my $jobcfg = $get_next_job->($stateobj, $iteration, $start_time)) {
- $run_replication->($stateobj, $jobcfg, $iteration, $start_time, $logfunc);
+ while (my $jobcfg = $get_next_job->($iteration, $start_time)) {
+ $run_replication->($jobcfg, $iteration, $start_time, $logfunc, 1);
$start_time = $now // time();
}
};
diff --git a/bin/test/ReplicationTestEnv.pm b/bin/test/ReplicationTestEnv.pm
index 79d826a1..a5605c3b 100755
--- a/bin/test/ReplicationTestEnv.pm
+++ b/bin/test/ReplicationTestEnv.pm
@@ -13,6 +13,8 @@ use Data::Dumper;
use PVE::INotify;
use PVE::Cluster;
use PVE::Storage;
+use PVE::ReplicationConfig;
+use PVE::ReplicationState;
use PVE::Replication;
use PVE::QemuConfig;
use PVE::LXC::Config;
@@ -59,27 +61,10 @@ my $mocked_ssh_info_to_command = sub {
my $statefile = ".mocked_repl_state";
unlink $statefile;
-$PVE::Replication::state_path = $statefile;
+$PVE::ReplicationState::state_path = $statefile;
+$PVE::ReplicationState::state_lock = ".mocked_repl_state_lock";
$PVE::Replication::pvesr_lock_path = ".mocked_pvesr_lock";
-my $mocked_write_state = sub {
- my ($state) = @_;
-
- PVE::Tools::file_set_contents($statefile, encode_json($state));
-};
-
-my $mocked_read_state = sub {
-
- return {} if ! -e $statefile;
-
- my $raw = PVE::Tools::file_get_contents($statefile);
-
- return {} if $raw eq '';
-
- return decode_json($raw);
-};
-
-
my $pve_cluster_module = Test::MockModule->new('PVE::Cluster');
my $pve_inotify_module = Test::MockModule->new('PVE::INotify');
--
2.11.0
More information about the pve-devel
mailing list