[pve-devel] [PATCH pve-manager 17/18] PVE::Replication::replicate - implement replicate

Fabian Grünbichler f.gruenbichler at proxmox.com
Mon May 29 08:22:52 CEST 2017


On Tue, May 23, 2017 at 09:08:56AM +0200, Dietmar Maurer wrote:
> The actual volume replication is done in replicate_volume(), which is just
> a stub for now.
> 
> I also added a regression test replication_test5.pl to verify basic
> functions.
> 
> Signed-off-by: Dietmar Maurer <dietmar at proxmox.com>
> ---
>  PVE/Replication.pm             | 132 ++++++++++++++++++++++++++++++++++++++++-
>  bin/test/replication_test5.log |  50 ++++++++++++++++
>  bin/test/replication_test5.pl  | 128 +++++++++++++++++++++++++++++++++++++++
>  3 files changed, 307 insertions(+), 3 deletions(-)
>  create mode 100644 bin/test/replication_test5.log
>  create mode 100755 bin/test/replication_test5.pl
> 
> diff --git a/PVE/Replication.pm b/PVE/Replication.pm
> index a49fa8ad..727fbc0f 100644
> --- a/PVE/Replication.pm
> +++ b/PVE/Replication.pm
> @@ -219,12 +219,138 @@ sub prepare {
>      return $last_snapshots;
>  }
>  
> -sub replicate {
> -    my ($jobcfg, $start_time, $logfunc) = @_;
> +sub replicate_volume {
> +    my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname) = @_;
>  
>      die "implement me";
>  }
>  
> +sub replicate {
> +    my ($jobcfg, $last_sync, $start_time, $logfunc) = @_;
> +
> +    $logfunc = sub {} if !$logfunc; # log nothing by default
> +
> +    my $local_node = PVE::INotify::nodename();
> +
> +    die "not implemented - internal error" if $jobcfg->{type} ne 'local';
> +
> +    my $dc_conf = PVE::Cluster::cfs_read_file('datacenter.cfg');
> +    my $migration_network = $dc_conf->{migration_network};
> +    my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target}, $migration_network);
> +
> +    my $jobid = $jobcfg->{id};
> +    my $storecfg = PVE::Storage::config();
> +
> +    die "start time in past ($start_time <= $last_sync) - abort sync\n"

s/in past/before last sync/

> +	if $start_time <= $last_sync;
> +
> +    my $vmid = $jobcfg->{guest};
> +    my $vmtype = $jobcfg->{vmtype};
> +
> +    my $conf;
> +    my $running;
> +    my $qga;
> +    my $volumes;
> +
> +    if ($vmtype eq 'qemu') {
> +	$conf = PVE::QemuConfig->load_config($vmid);
> +	$running = PVE::QemuServer::check_running($vmid);
> +	$qga = PVE::QemuServer::qga_check_running($vmid)
> +	    if $running && $conf->{agent};
> +	$volumes = PVE::QemuConfig->get_replicatable_volumes($storecfg, $conf);
> +    } elsif ($vmtype eq 'lxc') {
> +	$conf = PVE::LXC::Config->load_config($vmid);
> +	$running = PVE::LXC::check_running($vmid);
> +	$volumes = PVE::LXC::Config->get_replicatable_volumes($storecfg, $conf);
> +    } else {
> +	die "internal error";
> +    }
> +
> +    my $sorted_volids = [ sort keys %$volumes ];
> +
> +    $logfunc->($start_time, "$jobid: guest => $vmid, type => $vmtype, running => $running");
> +    $logfunc->($start_time, "$jobid: volumes => " . join(',', @$sorted_volids));
> +
> +    # prepare remote side
> +    my $remote_snapshots = remote_prepare_local_job(
> +	$ssh_info, $jobid, $vmid, $volumes, $last_sync);
> +
> +    # test if we have a replication_ snapshot from last sync
> +    # and remove all other/stale replication snapshots
> +    my $last_sync_snapname = replication_snapshot_name($jobid, $last_sync);
> +    my $sync_snapname = replication_snapshot_name($jobid, $start_time);
> +
> +    my $last_snapshots = prepare(
> +	$storecfg, $sorted_volids, $jobid, $last_sync, $start_time, $logfunc);
> +
> +    # freeze filesystem for data consistency
> +    if ($qga) {
> +	$logfunc->($start_time, "$jobid: freeze guest filesystem");
> +	PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
> +    }
> +
> +    # make snapshot of all volumes
> +    my $replicate_snapshots = {};
> +    eval {
> +	foreach my $volid (@$sorted_volids) {
> +	    $logfunc->($start_time, "$jobid: create snapshot '${sync_snapname}' on $volid");
> +	    PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
> +	    $replicate_snapshots->{$volid} = 1;
> +	}
> +    };
> +    my $err = $@;
> +
> +    # unfreeze immediately
> +    if ($qga) {
> +	$logfunc->($start_time, "$jobid: unfreeze guest filesystem");
> +	eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
> +	warn $@ if $@; # ignore errors here, because we cannot fix it anyways
> +    }
> +
> +    my $cleanup_local_snapshots = sub {
> +	my ($volid_hash, $snapname) = @_;
> +	foreach my $volid (sort keys %$volid_hash) {
> +	    $logfunc->($start_time, "$jobid: delete snapshot '$snapname' on $volid");
> +	    eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname, $running); };
> +	    warn $@ if $@;
> +	}
> +    };
> +
> +    if ($err) {
> +	$cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
> +	die $err;
> +    }
> +
> +    eval {
> +
> +	# fixme: limit, insecure
> +	foreach my $volid (@$sorted_volids) {
> +	    if ($last_snapshots->{$volid} && $remote_snapshots->{$volid}) {
> +		$logfunc->($start_time, "$jobid: incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
> +		replicate_volume($ssh_info, $storecfg, $volid, $last_sync_snapname, $sync_snapname);
> +	    } else {
> +		$logfunc->($start_time, "$jobid: full sync '$volid' ($sync_snapname)");
> +		replicate_volume($ssh_info, $storecfg, $volid, undef, $sync_snapname);
> +	    }
> +	}
> +    };
> +    $err = $@;
> +
> +    if ($err) {
> +	$cleanup_local_snapshots->($replicate_snapshots, $sync_snapname); # try to cleanup
> +	# we do not cleanup the remote side here - this is done in
> +	# next run of prepare_local_job
> +	die $err;
> +    }
> +
> +    # remove old snapshots because they are no longer needed
> +    $cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
> +
> +    remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time);
> +
> +    die $err if $err;
> +}
> +
>  my $run_replication = sub {
>      my ($stateobj, $jobcfg, $start_time, $logfunc) = @_;
>  
> @@ -248,7 +374,7 @@ my $run_replication = sub {
>  
>      $logfunc->($start_time, "$jobcfg->{id}: start replication job") if $logfunc;
>  
> -    eval { replicate($jobcfg, $start_time, $logfunc); };
> +    eval { replicate($jobcfg, $state->{last_sync}, $start_time, $logfunc); };
>      my $err = $@;
>  
>      $state->{duration} = tv_interval($t0);
> diff --git a/bin/test/replication_test5.log b/bin/test/replication_test5.log
> new file mode 100644
> index 00000000..26d50404
> --- /dev/null
> +++ b/bin/test/replication_test5.log
> @@ -0,0 +1,50 @@
> +1000 job_900_to_node2: new job next_sync => 900
> +1000 job_900_to_node2: start replication job
> +1000 job_900_to_node2: guest => 900, type => qemu, running => 0
> +1000 job_900_to_node2: volumes => local-zfs:vm-900-disk-1
> +1000 job_900_to_node2: create snapshot 'replicate_job_900_to_node2_1000_snap' on local-zfs:vm-900-disk-1
> +1000 job_900_to_node2: full sync 'local-zfs:vm-900-disk-1' (replicate_job_900_to_node2_1000_snap)
> +1000 job_900_to_node2: end replication job
> +1000 job_900_to_node2: changed config next_sync => 1800
> +1000 job_900_to_node2: changed state last_try => 1000, last_sync => 1000
> +1840 job_900_to_node2: start replication job
> +1840 job_900_to_node2: guest => 900, type => qemu, running => 0
> +1840 job_900_to_node2: volumes => local-zfs:vm-900-disk-1
> +1840 job_900_to_node2: create snapshot 'replicate_job_900_to_node2_1840_snap' on local-zfs:vm-900-disk-1
> +1840 job_900_to_node2: incremental sync 'local-zfs:vm-900-disk-1' (replicate_job_900_to_node2_1000_snap => replicate_job_900_to_node2_1840_snap)
> +1840 job_900_to_node2: delete snapshot 'replicate_job_900_to_node2_1000_snap' on local-zfs:vm-900-disk-1
> +1840 job_900_to_node2: end replication job
> +1840 job_900_to_node2: changed config next_sync => 2700
> +1840 job_900_to_node2: changed state last_try => 1840, last_sync => 1840
> +2740 job_900_to_node2: start replication job
> +2740 job_900_to_node2: guest => 900, type => qemu, running => 0
> +2740 job_900_to_node2: volumes => local-zfs:vm-900-disk-1,local-zfs:vm-900-disk-2
> +2740 job_900_to_node2: create snapshot 'replicate_job_900_to_node2_2740_snap' on local-zfs:vm-900-disk-1
> +2740 job_900_to_node2: create snapshot 'replicate_job_900_to_node2_2740_snap' on local-zfs:vm-900-disk-2
> +2740 job_900_to_node2: delete snapshot 'replicate_job_900_to_node2_2740_snap' on local-zfs:vm-900-disk-1
> +2740 job_900_to_node2: end replication job with error: no such volid 'local-zfs:vm-900-disk-2'
> +2740 job_900_to_node2: changed config next_sync => 3040
> +2740 job_900_to_node2: changed state last_try => 2740, fail_count => 1, error => no such volid 'local-zfs:vm-900-disk-2'
> +3040 job_900_to_node2: start replication job
> +3040 job_900_to_node2: guest => 900, type => qemu, running => 0
> +3040 job_900_to_node2: volumes => local-zfs:vm-900-disk-1,local-zfs:vm-900-disk-2
> +3040 job_900_to_node2: create snapshot 'replicate_job_900_to_node2_3040_snap' on local-zfs:vm-900-disk-1
> +3040 job_900_to_node2: create snapshot 'replicate_job_900_to_node2_3040_snap' on local-zfs:vm-900-disk-2
> +3040 job_900_to_node2: incremental sync 'local-zfs:vm-900-disk-1' (replicate_job_900_to_node2_1840_snap => replicate_job_900_to_node2_3040_snap)
> +3040 job_900_to_node2: full sync 'local-zfs:vm-900-disk-2' (replicate_job_900_to_node2_3040_snap)
> +3040 job_900_to_node2: delete snapshot 'replicate_job_900_to_node2_1840_snap' on local-zfs:vm-900-disk-1
> +3040 job_900_to_node2: end replication job
> +3040 job_900_to_node2: changed config next_sync => 3600
> +3040 job_900_to_node2: changed state last_try => 3040, last_sync => 3040, fail_count => 0, error => 
> +3640 job_900_to_node2: start replication job
> +3640 job_900_to_node2: guest => 900, type => qemu, running => 0
> +3640 job_900_to_node2: volumes => local-zfs:vm-900-disk-1,local-zfs:vm-900-disk-2
> +3640 job_900_to_node2: create snapshot 'replicate_job_900_to_node2_3640_snap' on local-zfs:vm-900-disk-1
> +3640 job_900_to_node2: create snapshot 'replicate_job_900_to_node2_3640_snap' on local-zfs:vm-900-disk-2
> +3640 job_900_to_node2: incremental sync 'local-zfs:vm-900-disk-1' (replicate_job_900_to_node2_3040_snap => replicate_job_900_to_node2_3640_snap)
> +3640 job_900_to_node2: incremental sync 'local-zfs:vm-900-disk-2' (replicate_job_900_to_node2_3040_snap => replicate_job_900_to_node2_3640_snap)
> +3640 job_900_to_node2: delete snapshot 'replicate_job_900_to_node2_3040_snap' on local-zfs:vm-900-disk-1
> +3640 job_900_to_node2: delete snapshot 'replicate_job_900_to_node2_3040_snap' on local-zfs:vm-900-disk-2
> +3640 job_900_to_node2: end replication job
> +3640 job_900_to_node2: changed config next_sync => 4500
> +3640 job_900_to_node2: changed state last_try => 3640, last_sync => 3640
> diff --git a/bin/test/replication_test5.pl b/bin/test/replication_test5.pl
> new file mode 100755
> index 00000000..a084cd34
> --- /dev/null
> +++ b/bin/test/replication_test5.pl
> @@ -0,0 +1,128 @@
> +#!/usr/bin/perl
> +
> +# Note:
> +# 1.) Start replication job with single disk
> +# 2.) add non-existent disk (replication fails)
> +# 3.) create disk (replication continues).
> +
> +use strict;
> +use warnings;
> +use JSON;
> +
> +use lib ('.', '../..');
> +
> +use Data::Dumper;
> +
> +use Test::MockModule;
> +use ReplicationTestEnv;
> +
> +use PVE::Tools;
> +
> +$ReplicationTestEnv::mocked_nodename = 'node1';
> +
> +use PVE::INotify;
> +use PVE::Cluster;
> +use PVE::QemuConfig;
> +use PVE::QemuServer;
> +use PVE::LXC::Config;
> +use PVE::LXC;
> +use PVE::Storage;
> +
> +my $replicated_volume_status = {};
> +
> +my $mocked_remote_prepare_local_job = sub {
> +    my ($ssh_info, $jobid, $vmid, $volumes, $last_sync) = @_;
> +
> +    my $target = $ssh_info->{node};
> +
> +    my $last_snapshots = {};
> +
> +    return $last_snapshots if !defined($replicated_volume_status->{$target});
> +
> +    my $last_sync_snapname = PVE::Replication::replication_snapshot_name($jobid, $last_sync);
> +
> +    foreach my $volid (keys %{$replicated_volume_status->{$target}}) {
> +	my $snapname = $replicated_volume_status->{$target}->{$volid};
> +
> +	$last_snapshots->{$volid} = 1 if $last_sync_snapname eq $snapname;
> +    }
> +
> +    return $last_snapshots;
> +};
> +
> +my $mocked_remote_finalize_local_job = sub {
> +    my ($ssh_info, $jobid, $vmid, $volumes, $last_sync) = @_;
> +
> +    # do nothing
> +};
> +
> +my $mocked_replicate_volume = sub {
> +    my ($ssh_info, $storecfg, $volid, $base_snapshot, $sync_snapname) = @_;
> +
> +    my $target = $ssh_info->{node};
> +
> +    $replicated_volume_status->{$target}->{$volid} = $sync_snapname;
> +};
> +
> +my $pve_replication_module = Test::MockModule->new('PVE::Replication');
> +$pve_replication_module->mock(
> +    remote_prepare_local_job => $mocked_remote_prepare_local_job,
> +    remote_finalize_local_job => $mocked_remote_finalize_local_job,
> +    replicate_volume => $mocked_replicate_volume);
> +
> +my $testjob = {
> +    'type'  => 'local',
> +    'target' => 'node1',
> +    'guest' => 900,
> +};
> +
> +$ReplicationTestEnv::mocked_replication_jobs = {
> +    job_900_to_node2 => {
> +	'type'  => 'local',
> +	'target' => 'node2',
> +	'guest' => 900,
> +    },
> +};
> +
> +$ReplicationTestEnv::mocked_vm_configs = {
> +    900 => {
> +	node => 'node1',
> +	snapshots => {},
> +	ide0 => 'local-zfs:vm-900-disk-1,size=4G',
> +	memory => 512,
> +	ide2 => 'none,media=cdrom',
> +    },
> +};
> +
> +ReplicationTestEnv::setup();
> +
> +ReplicationTestEnv::register_mocked_volid('local-zfs:vm-900-disk-1');
> +
> +my $ctime = 1000;
> +
> +my $status;
> +
> +ReplicationTestEnv::openlog();
> +
> +for (my $i = 0; $i < 15; $i++) {
> +    ReplicationTestEnv::track_jobs($ctime);
> +    $ctime += 60;
> +}
> +
> +# add a new, disk (but disk does not exist, so replication fails)
> +$ReplicationTestEnv::mocked_vm_configs->{900}->{ide1} =  'local-zfs:vm-900-disk-2,size=4G';
> +for (my $i = 0; $i < 15; $i++) {
> +    ReplicationTestEnv::track_jobs($ctime);
> +    $ctime += 60;
> +}
> +
> +# register disk, so replication should succeed
> +ReplicationTestEnv::register_mocked_volid('local-zfs:vm-900-disk-2');
> +for (my $i = 0; $i < 15; $i++) {
> +    ReplicationTestEnv::track_jobs($ctime);
> +    $ctime += 60;
> +}
> +
> +ReplicationTestEnv::commit_log();
> +
> +exit(0);
> -- 
> 2.11.0
> 
> _______________________________________________
> pve-devel mailing list
> pve-devel at pve.proxmox.com
> https://pve.proxmox.com/cgi-bin/mailman/listinfo/pve-devel




More information about the pve-devel mailing list