[pve-devel] [PATCH v2 pve-zsync] Improve locking and state handling

Fabian Ebner f.ebner at proxmox.com
Thu Oct 3 13:06:43 CEST 2019


On the one hand this introduces a new locked() mechanism allowing to enclose
locked sections in a cleaner way. There's only two types of locks namely
one for state and cron (they are always read together and almost always
written together) and one for sync.
On the other hand it improves checks regarding states. A
'waiting' state is introduced and other 'waiting' and 'syncing'
instances of the same job are now detected. Also there are
two new checks that allow disabling a job while it is 'syncing' or
'waiting'. Previously sync would re-enable such a job involuntarily.
Disabling a 'waiting' job causes it to not sync anymore.

Signed-off-by: Fabian Ebner <f.ebner at proxmox.com>
---

Changes from v1:
    * Refactored locking as Thomas and Fabian suggested
    * Added 2 additional checks to not involuntarily re-enable a disabled job

 pve-zsync | 257 +++++++++++++++++++++++++++++-------------------------
 1 file changed, 138 insertions(+), 119 deletions(-)

diff --git a/pve-zsync b/pve-zsync
index 425ffa2..8425650 100755
--- a/pve-zsync
+++ b/pve-zsync
@@ -18,7 +18,6 @@ my $PATH = "/usr/sbin";
 my $PVE_DIR = "/etc/pve/local";
 my $QEMU_CONF = "${PVE_DIR}/qemu-server";
 my $LXC_CONF = "${PVE_DIR}/lxc";
-my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
 my $PROG_PATH = "$PATH/${PROGNAME}";
 my $INTERVAL = 15;
 my $DEBUG;
@@ -110,14 +109,20 @@ sub cut_target_width {
     return "$head/" . $path . "/$tail";
 }
 
-sub lock {
-    my ($fh) = @_;
-    flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
-}
+sub locked {
+    my ($lock_fn, $code) = @_;
+
+    my $lock_fh = IO::File->new("> $lock_fn");
+
+    flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
+    my $res = eval { $code->() };
+    my $err = $@;
 
-sub unlock {
-    my ($fh) = @_;
-    flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
+    flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
+    die "$err" if $err;
+
+    close($lock_fh);
+    return $res;
 }
 
 sub get_status {
@@ -338,13 +343,11 @@ sub update_state {
     my $text;
     my $in_fh;
 
-    eval {
-
+    if (-e $STATE) {
 	$in_fh = IO::File->new("< $STATE");
 	die "Could not open file $STATE: $!\n" if !$in_fh;
-	lock($in_fh);
 	$text = <$in_fh>;
-    };
+    }
 
     my $out_fh = IO::File->new("> $STATE.new");
     die "Could not open file ${STATE}.new: $!\n" if !$out_fh;
@@ -376,9 +379,7 @@ sub update_state {
 
     close($out_fh);
     rename "$STATE.new", $STATE;
-    eval {
-	close($in_fh);
-    };
+    close($in_fh);
 
     return $states;
 }
@@ -395,7 +396,6 @@ sub update_cron {
 
     my $fh = IO::File->new("< $CRONJOBS");
     die "Could not open file $CRONJOBS: $!\n" if !$fh;
-    lock($fh);
 
     my @test = <$fh>;
 
@@ -502,43 +502,45 @@ sub vm_exists {
 sub init {
     my ($param) = @_;
 
-    my $cfg = read_cron();
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+	my $cfg = read_cron();
 
-    my $job = param_to_job($param);
+	my $job = param_to_job($param);
 
-    $job->{state} = "ok";
-    $job->{lsync} = 0;
+	$job->{state} = "ok";
+	$job->{lsync} = 0;
 
-    my $source = parse_target($param->{source});
-    my $dest = parse_target($param->{dest});
+	my $source = parse_target($param->{source});
+	my $dest = parse_target($param->{dest});
 
-    if (my $ip =  $dest->{ip}) {
-	run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
-    }
+	if (my $ip =  $dest->{ip}) {
+	    run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
+	}
 
-    if (my $ip =  $source->{ip}) {
-	run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
-    }
+	if (my $ip =  $source->{ip}) {
+	    run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
+	}
 
-    die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
+	die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
 
-    if (!defined($source->{vmid})) {
-	die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
-    }
+	if (!defined($source->{vmid})) {
+	    die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
+	}
 
-    my $vm_type = vm_exists($source, $param->{source_user});
-    $job->{vm_type} = $vm_type;
-    $source->{vm_type} = $vm_type;
+	my $vm_type = vm_exists($source, $param->{source_user});
+	$job->{vm_type} = $vm_type;
+	$source->{vm_type} = $vm_type;
 
-    die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
+	die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
 
-    die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
+	die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
 
-    #check if vm has zfs disks if not die;
-    get_disks($source, $param->{source_user}) if $source->{vmid};
+	#check if vm has zfs disks if not die;
+	get_disks($source, $param->{source_user}) if $source->{vmid};
 
-    update_cron($job);
-    update_state($job);
+	update_cron($job);
+	update_state($job);
+    }); #cron and state lock
 
     eval {
 	sync($param) if !$param->{skip};
@@ -568,96 +570,109 @@ sub get_job {
 sub destroy_job {
     my ($param) = @_;
 
-    my $job = get_job($param);
-    $job->{state} = "del";
-
-    update_cron($job);
-    update_state($job);
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+	my $job = get_job($param);
+	$job->{state} = "del";
+	update_cron($job);
+	update_state($job);
+    });
 }
 
 sub sync {
     my ($param) = @_;
 
-    my $lock_fh = IO::File->new("> $LOCKFILE");
-    die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
-    lock($lock_fh);
-
-    my $date = get_date();
     my $job;
-    eval {
-	$job = get_job($param);
-    };
-
-    if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
-	die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
-    }
-
-    my $dest = parse_target($param->{dest});
-    my $source = parse_target($param->{source});
-
-    my $sync_path = sub {
-	my ($source, $dest, $job, $param, $date) = @_;
+    my $dest;
+    my $source;
+    my $vm_type;
+    my $date;
 
-	($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+	$job = eval { get_job($param); };
 
-	snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
+	if ($job) {
+	    if (defined($job->{state}) && ($job->{state} eq "syncing" || $job->{state} eq "waiting")) {
+		die "Job --source $param->{source} --name $param->{name} is already scheduled to sync\n";
+	    }
+	    $job->{state} = "waiting";
+	    update_state($job);
+	}
+    });
 
-	send_image($source, $dest, $param);
+    locked("$CONFIG_PATH/sync.lock", sub {
+	locked("$CONFIG_PATH/cron_and_state.lock", sub {
+	    #job might've changed while we waited for the sync lock, but we can be sure it's not syncing
+	    $date = get_date();
+	    eval { $job = get_job($param); };
 
-	snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
+	    if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
+		die "Job --source $param->{source} --name $param->{name} has been disabled\n";
+	    }
 
-    };
+	    $dest = parse_target($param->{dest});
+	    $source = parse_target($param->{source});
 
-    my $vm_type = vm_exists($source, $param->{source_user});
-    $source->{vm_type} = $vm_type;
+	    $vm_type = vm_exists($source, $param->{source_user});
+	    $source->{vm_type} = $vm_type;
 
-    if ($job) {
-	$job->{state} = "syncing";
-	$job->{vm_type} = $vm_type if !$job->{vm_type};
-	update_state($job);
-    }
+	    if ($job) {
+		$job->{state} = "syncing";
+		$job->{vm_type} = $vm_type if !$job->{vm_type};
+		update_state($job);
+	    }
+	}); #cron and state lock
+
+	my $sync_path = sub {
+	    my ($source, $dest, $job, $param, $date) = @_;
+	    ($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
+	    snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
+	    send_image($source, $dest, $param);
+	    snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
+	};
 
-    eval{
-	if ($source->{vmid}) {
-	    die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
-	    die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
-	    my $disks = get_disks($source, $param->{source_user});
-
-	    foreach my $disk (sort keys %{$disks}) {
-		$source->{all} = $disks->{$disk}->{all};
-		$source->{pool} = $disks->{$disk}->{pool};
-		$source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
-		$source->{last_part} = $disks->{$disk}->{last_part};
+	eval{
+	    if ($source->{vmid}) {
+		die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
+		die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
+		my $disks = get_disks($source, $param->{source_user});
+
+		foreach my $disk (sort keys %{$disks}) {
+		    $source->{all} = $disks->{$disk}->{all};
+		    $source->{pool} = $disks->{$disk}->{pool};
+		    $source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
+		    $source->{last_part} = $disks->{$disk}->{last_part};
+		    &$sync_path($source, $dest, $job, $param, $date);
+		}
+		if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
+		    send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
+		} else {
+		    send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
+		}
+	    } else {
 		&$sync_path($source, $dest, $job, $param, $date);
 	    }
-	    if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
-		send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
-	    } else {
-		send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
+	};
+	if (my $err = $@) {
+	    if ($job) {
+		$job->{state} = "error";
+		locked("$CONFIG_PATH/cron_and_state.lock", sub { update_state($job); });
+		print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
 	    }
-	} else {
-	    &$sync_path($source, $dest, $job, $param, $date);
-	}
-    };
-    if(my $err = $@) {
-	if ($job) {
-	    $job->{state} = "error";
-	    update_state($job);
-	    unlock($lock_fh);
-	    close($lock_fh);
-	    print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
+	    die "$err\n";
 	}
-	die "$err\n";
-    }
 
-    if ($job) {
-	$job->{state} = "ok";
-	$job->{lsync} = $date;
-	update_state($job);
-    }
 
-    unlock($lock_fh);
-    close($lock_fh);
+	locked("$CONFIG_PATH/cron_and_state.lock", sub {
+	    $job = get_job($param);
+	    if ($job && defined($job->{state}) && $job->{state} eq "stopped") {
+		$job->{state} = "stopped";
+	    } else {
+		$job->{state} = "ok";
+	    }
+	    $job->{lsync} = $date;
+	    update_state($job);
+	});
+    }); #sync lock
 }
 
 sub snapshot_get{
@@ -1031,19 +1046,23 @@ sub status {
 sub enable_job {
     my ($param) = @_;
 
-    my $job = get_job($param);
-    $job->{state} = "ok";
-    update_state($job);
-    update_cron($job);
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+	my $job = get_job($param);
+	$job->{state} = "ok";
+	update_state($job);
+	update_cron($job);
+    });
 }
 
 sub disable_job {
     my ($param) = @_;
 
-    my $job = get_job($param);
-    $job->{state} = "stopped";
-    update_state($job);
-    update_cron($job);
+    locked("$CONFIG_PATH/cron_and_state.lock", sub {
+	my $job = get_job($param);
+	$job->{state} = "stopped";
+	update_state($job);
+	update_cron($job);
+    });
 }
 
 my $cmd_help = {
-- 
2.20.1





More information about the pve-devel mailing list