[pve-devel] [PATCH pve-manager 2/2] PVE::Replication - use new PVE::ReplicationState class

Dietmar Maurer dietmar at proxmox.com
Fri Jun 2 10:20:23 CEST 2017


Signed-off-by: Dietmar Maurer <dietmar at proxmox.com>
---
 PVE/Replication.pm             | 165 ++++++++++++++++-------------------------
 bin/test/ReplicationTestEnv.pm |  23 +-----
 2 files changed, 66 insertions(+), 122 deletions(-)

diff --git a/PVE/Replication.pm b/PVE/Replication.pm
index 55fda1f0..fad5061c 100644
--- a/PVE/Replication.pm
+++ b/PVE/Replication.pm
@@ -19,66 +19,17 @@ use PVE::LXC;
 use PVE::Storage;
 use PVE::GuestHelpers;
 use PVE::ReplicationConfig;
+use PVE::ReplicationState;
 
-# Note: regression tests can overwrite $state_path for testing
-our $state_path = "/var/lib/pve-manager/pve-replication-state.json";
 our $pvesr_lock_path = "/var/lock/pvesr.lck";
 
-my $update_job_state = sub {
-    my ($stateobj, $jobcfg, $state) = @_;
-
-    my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
-
-    my $vmid = $jobcfg->{guest};
-    my $tid = $plugin->get_unique_target_id($jobcfg);
-
-    # Note: tuple ($vmid, $tid) is unique
-    $stateobj->{$vmid}->{$tid} = $state;
-
-    PVE::Tools::file_set_contents($state_path, encode_json($stateobj));
-};
-
-my $get_job_state = sub {
-    my ($stateobj, $jobcfg) = @_;
-
-    my $plugin = PVE::ReplicationConfig->lookup($jobcfg->{type});
-
-    my $vmid = $jobcfg->{guest};
-    my $tid = $plugin->get_unique_target_id($jobcfg);
-    my $state = $stateobj->{$vmid}->{$tid};
-
-    $state = {} if !$state;
-
-    $state->{last_iteration} //= 0;
-    $state->{last_try} //= 0; # last sync start time
-    $state->{last_sync} //= 0; # last successful sync start time
-    $state->{fail_count} //= 0;
-
-    return $state;
-};
-
-my $read_state = sub {
-
-    return {} if ! -e $state_path;
-
-    my $raw = PVE::Tools::file_get_contents($state_path);
-
-    return {} if $raw eq '';
-
-    # untaint $raw
-    $raw =~ m/^({.*})$/;
-
-    return decode_json($1);
-};
-
 sub job_status {
-    my ($stateobj) = @_;
 
     my $local_node = PVE::INotify::nodename();
 
     my $jobs = {};
 
-    $stateobj = $read_state->() if !$stateobj;
+    my $stateobj = PVE::ReplicationState::read_state();
 
     my $cfg = PVE::ReplicationConfig->new();
 
@@ -103,7 +54,7 @@ sub job_status {
 	    next if $jobcfg->{disable};
 	}
 
-	my $state = $get_job_state->($stateobj, $jobcfg);
+	my $state = PVE::ReplicationState::extract_job_state($stateobj, $jobcfg);
 	$jobcfg->{state} = $state;
 	$jobcfg->{id} = $jobid;
 	$jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
@@ -134,9 +85,9 @@ sub job_status {
 }
 
 my $get_next_job = sub {
-    my ($stateobj, $iteration, $start_time) = @_;
+    my ($iteration, $start_time) = @_;
 
-    my $jobs = job_status($stateobj);
+    my $jobs = job_status();
 
     my $sort_func = sub {
 	my $joba = $jobs->{$a};
@@ -404,59 +355,71 @@ sub replicate {
     die $err if $err;
 }
 
-my $run_replication = sub {
-    my ($stateobj, $jobcfg, $iteration, $start_time, $logfunc) = @_;
-
-    my $state = $get_job_state->($stateobj, $jobcfg);
+my $run_replication_nolock = sub {
+    my ($jobcfg, $iteration, $start_time, $logfunc) = @_;
 
-    my $t0 = [gettimeofday];
+    # we normaly write errors into the state file,
+    # but we also catch unexpected errors and log them to syslog
+    # (for examply when there are problems writing the state file)
+    eval {
+	my $state = PVE::ReplicationState::read_job_state($jobcfg);
 
-    # cleanup stale pid/ptime state
-    foreach my $vmid (keys %$stateobj) {
-	foreach my $tid (keys %{$stateobj->{$vmid}}) {
-	    my $state = $stateobj->{$vmid}->{$tid};
-	    delete $state->{pid};
-	    delete $state->{ptime};
-	}
-    }
+	my $t0 = [gettimeofday];
 
-    $state->{pid} = $$;
-    $state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
-    $state->{last_try} = $start_time;
-    $state->{last_iteration} = $iteration;
+	$state->{pid} = $$;
+	$state->{ptime} = PVE::ProcFSTools::read_proc_starttime($state->{pid});
+	$state->{last_try} = $start_time;
+	$state->{last_iteration} = $iteration;
 
-    $update_job_state->($stateobj, $jobcfg,  $state);
+	PVE::ReplicationState::write_job_state($jobcfg, $state);
 
-    $logfunc->($start_time, "$jobcfg->{id}: start replication job") if $logfunc;
+	$logfunc->($start_time, "$jobcfg->{id}: start replication job") if $logfunc;
 
-    eval {
-	my $timeout = 2; # do not wait too long - we repeat periodically anyways
-	PVE::GuestHelpers::guest_migration_lock(
-	    $jobcfg->{guest}, $timeout, \&replicate,
-	    $jobcfg, $state->{last_sync}, $start_time, $logfunc);
-    };
-    my $err = $@;
+	eval {
+	    replicate($jobcfg, $state->{last_sync}, $start_time, $logfunc);
+	};
+	my $err = $@;
 
-    $state->{duration} = tv_interval($t0);
-    delete $state->{pid};
-    delete $state->{ptime};
+	$state->{duration} = tv_interval($t0);
+	delete $state->{pid};
+	delete $state->{ptime};
 
-    if ($err) {
-	$state->{fail_count}++;
-	$state->{error} = "$err";
-	$update_job_state->($stateobj, $jobcfg,  $state);
-	if ($logfunc) {
+	if ($err) {
 	    chomp $err;
-	    $logfunc->($start_time, "$jobcfg->{id}: end replication job with error: $err");
+	    $state->{fail_count}++;
+	    $state->{error} = "$err";
+	    PVE::ReplicationState::write_job_state($jobcfg,  $state);
+	    my $msg = "$jobcfg->{id}: end replication job with error: $err";
+	    if ($logfunc) {
+		$logfunc->($start_time, $msg);
+	    } else {
+		warn $msg;
+	    }
 	} else {
-	    warn $err;
+	    $logfunc->($start_time, "$jobcfg->{id}: end replication job") if $logfunc;
+	    $state->{last_sync} = $start_time;
+	    $state->{fail_count} = 0;
+	    delete $state->{error};
+	    PVE::ReplicationState::write_job_state($jobcfg,  $state);
 	}
-    } else {
-	$logfunc->($start_time, "$jobcfg->{id}: end replication job") if $logfunc;
-	$state->{last_sync} = $start_time;
-	$state->{fail_count} = 0;
-	delete $state->{error};
-	$update_job_state->($stateobj, $jobcfg,  $state);
+    };
+    if (my $err = $@) {
+	warn "$jobcfg->{id}: got unexpected error - $err";
+    }
+};
+
+my $run_replication = sub {
+    my ($jobcfg, $iteration, $start_time, $logfunc, $noerr) = @_;
+
+    eval {
+	my $timeout = 2; # do not wait too long - we repeat periodically anyways
+	PVE::GuestHelpers::guest_migration_lock(
+	    $jobcfg->{guest}, $timeout, $run_replication_nolock,
+	    $jobcfg, $iteration, $start_time, $logfunc);
+    };
+    if (my $err = $@) {
+	return undef if $noerr;
+	die $err;
     }
 };
 
@@ -468,8 +431,6 @@ sub run_single_job {
     my $code = sub {
 	$now //= time();
 
-	my $stateobj = $read_state->();
-
 	my $cfg = PVE::ReplicationConfig->new();
 
 	my $jobcfg = $cfg->{ids}->{$jobid};
@@ -489,11 +450,10 @@ sub run_single_job {
 
 	die "unable to sync to local node\n" if $jobcfg->{target} eq $local_node;
 
-	$jobcfg->{state} = $get_job_state->($stateobj, $jobcfg);
 	$jobcfg->{id} = $jobid;
 	$jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
 
-	$run_replication->($stateobj, $jobcfg, $now, $now, $logfunc);
+	$run_replication->($jobcfg, $now, $now, $logfunc);
     };
 
     my $res = PVE::Tools::lock_file($pvesr_lock_path, 60, $code);
@@ -506,11 +466,10 @@ sub run_jobs {
     my $iteration = $now // time();
 
     my $code = sub {
-	my $stateobj = $read_state->();
 	my $start_time = $now // time();
 
-	while (my $jobcfg = $get_next_job->($stateobj, $iteration, $start_time)) {
-	    $run_replication->($stateobj, $jobcfg, $iteration, $start_time, $logfunc);
+	while (my $jobcfg = $get_next_job->($iteration, $start_time)) {
+	    $run_replication->($jobcfg, $iteration, $start_time, $logfunc, 1);
 	    $start_time = $now // time();
 	}
     };
diff --git a/bin/test/ReplicationTestEnv.pm b/bin/test/ReplicationTestEnv.pm
index 79d826a1..a5605c3b 100755
--- a/bin/test/ReplicationTestEnv.pm
+++ b/bin/test/ReplicationTestEnv.pm
@@ -13,6 +13,8 @@ use Data::Dumper;
 use PVE::INotify;
 use PVE::Cluster;
 use PVE::Storage;
+use PVE::ReplicationConfig;
+use PVE::ReplicationState;
 use PVE::Replication;
 use PVE::QemuConfig;
 use PVE::LXC::Config;
@@ -59,27 +61,10 @@ my $mocked_ssh_info_to_command = sub {
 my $statefile = ".mocked_repl_state";
 
 unlink $statefile;
-$PVE::Replication::state_path = $statefile;
+$PVE::ReplicationState::state_path = $statefile;
+$PVE::ReplicationState::state_lock = ".mocked_repl_state_lock";
 $PVE::Replication::pvesr_lock_path = ".mocked_pvesr_lock";
 
-my $mocked_write_state = sub {
-    my ($state) = @_;
-
-    PVE::Tools::file_set_contents($statefile, encode_json($state));
-};
-
-my $mocked_read_state = sub {
-
-    return {} if ! -e $statefile;
-
-    my $raw = PVE::Tools::file_get_contents($statefile);
-
-    return {} if $raw eq '';
-
-    return decode_json($raw);
-};
-
-
 my $pve_cluster_module = Test::MockModule->new('PVE::Cluster');
 
 my $pve_inotify_module = Test::MockModule->new('PVE::INotify');
-- 
2.11.0




More information about the pve-devel mailing list