[pve-devel] [RFC pve-storage 1/3] This patch will include storage asyncron replica.

Wolfgang Link w.link at proxmox.com
Mon Apr 3 16:53:40 CEST 2017


It is possible to sync a volume to an other node in a defines interval.
So if a node fail there will be an copy of the volumes from a VM
on an other node.
With this copy it is possible to start the VM on this node.
---
 Makefile                               |  12 +-
 PVE/API2/Makefile                      |   1 +
 PVE/API2/ReplicationManager.pm         |  45 +++
 PVE/CLI/Makefile                       |   2 +-
 PVE/CLI/pverepm.pm                     | 154 ++++++++
 PVE/Makefile                           |   1 +
 PVE/ReplicaTools.pm                    | 619 +++++++++++++++++++++++++++++++++
 PVE/Storage/Makefile                   |   1 +
 PVE/Storage/Replica/AbstractStorage.pm |  51 +++
 PVE/Storage/Replica/Makefile           |   5 +
 PVE/Storage/Replica/ZFSStorage.pm      | 199 +++++++++++
 pverepm                                |   8 +
 12 files changed, 1095 insertions(+), 3 deletions(-)
 create mode 100644 PVE/API2/ReplicationManager.pm
 create mode 100644 PVE/CLI/pverepm.pm
 create mode 100644 PVE/ReplicaTools.pm
 create mode 100644 PVE/Storage/Replica/AbstractStorage.pm
 create mode 100644 PVE/Storage/Replica/Makefile
 create mode 100644 PVE/Storage/Replica/ZFSStorage.pm
 create mode 100644 pverepm

diff --git a/Makefile b/Makefile
index 57500e3..54c774b 100644
--- a/Makefile
+++ b/Makefile
@@ -33,15 +33,23 @@ pvesm.bash-completion:
 	perl -I. -T -e "use PVE::CLI::pvesm; PVE::CLI::pvesm->generate_bash_completions();" >$@.tmp
 	mv $@.tmp $@
 
+pverepm.bash-completion:
+	perl -I. -T -e "use PVE::CLI::pverepm; PVE::CLI::pverepm->generate_bash_completions();" >$@.tmp
+	mv $@.tmp $@
+
 .PHONY: install
-install: pvesm.1 pvesm.bash-completion
+install: pvesm.1 pvesm.bash-completion pverepm.bash-completion
 	install -d ${DESTDIR}${SBINDIR}
 	install -m 0755 pvesm ${DESTDIR}${SBINDIR}
+	install -m 0755 pverepm ${DESTDIR}${SBINDIR}
 	make -C PVE install
+	install -d ${DESTDIR}/var/lib/pve-replica
 	install -d ${DESTDIR}/usr/share/man/man1
 	install -m 0644 pvesm.1 ${DESTDIR}/usr/share/man/man1/
 	gzip -9 -n ${DESTDIR}/usr/share/man/man1/pvesm.1
 	install -m 0644 -D pvesm.bash-completion ${DESTDIR}${BASHCOMPLDIR}/pvesm
+	install -m 0644 -D pverepm.bash-completion ${DESTDIR}${BASHCOMPLDIR}/pverepm
+
 
 .PHONY: deb
 deb: ${DEB}
@@ -65,7 +73,7 @@ ${DEB}:
 .PHONY: clean
 clean:
 	make cleanup-docgen
-	rm -rf debian *.deb ${PACKAGE}-*.tar.gz dist *.1 *.tmp pvesm.bash-completion
+	rm -rf debian *.deb ${PACKAGE}-*.tar.gz dist *.1 *.tmp pvesm.bash-completion pverepm.bash-completion
 	find . -name '*~' -exec rm {} ';'
 
 .PHONY: distclean
diff --git a/PVE/API2/Makefile b/PVE/API2/Makefile
index 7b7226e..d67b459 100644
--- a/PVE/API2/Makefile
+++ b/PVE/API2/Makefile
@@ -3,4 +3,5 @@
 .PHONY: install
 install:
 	install -D -m 0644 Disks.pm ${DESTDIR}${PERLDIR}/PVE/API2/Disks.pm
+	install -D -m 0644 ReplicationManager.pm ${DESTDIR}${PERLDIR}/PVE/API2/ReplicationManager.pm
 	make -C Storage install
diff --git a/PVE/API2/ReplicationManager.pm b/PVE/API2/ReplicationManager.pm
new file mode 100644
index 0000000..c65168a
--- /dev/null
+++ b/PVE/API2/ReplicationManager.pm
@@ -0,0 +1,45 @@
+package PVE::API2::ReplicationManager;
+
+use warnings;
+use strict;
+
+use Data::Dumper qw(Dumper);
+
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::ReplicaTools;
+
+use PVE::RESTHandler;
+
+use base qw(PVE::RESTHandler);
+
+__PACKAGE__->register_method ({
+    name => 'list',
+    path => 'list',
+    method => 'GET',
+    description => "List of all replications",
+    permissions => {
+	user => 'all',
+    },
+    protected => 1,
+    proxyto => 'node',
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    node => get_standard_option('pve-node'),
+	    nodes => get_standard_option('pve-node-list' , {
+					 optional => 1,
+					 description => "Notes where the jobs is located."
+					 }),
+	},
+    },
+    returns => { type => 'object' },
+    code => sub {
+	my ($param) = @_;
+
+	PVE::ReplicaTools::check_nodes($param->{nodes}) if $param->{nodes};
+
+	my $nodes = $param->{nodes} ? $param->{nodes} : $param->{node};
+	return PVE::ReplicaTools::get_all_jobs($nodes);
+}});
+
+1;
diff --git a/PVE/CLI/Makefile b/PVE/CLI/Makefile
index 6c6e258..39f67e3 100644
--- a/PVE/CLI/Makefile
+++ b/PVE/CLI/Makefile
@@ -1,4 +1,4 @@
-SOURCES=pvesm.pm
+SOURCES=pvesm.pm pverepm.pm
 
 .PHONY: install
 install: ${SOURCES}
diff --git a/PVE/CLI/pverepm.pm b/PVE/CLI/pverepm.pm
new file mode 100644
index 0000000..bbb1c86
--- /dev/null
+++ b/PVE/CLI/pverepm.pm
@@ -0,0 +1,154 @@
+package PVE::CLI::pverepm;
+
+use strict;
+use warnings;
+
+use PVE::API2::ReplicationManager;
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::INotify;
+use PVE::RPCEnvironment;
+use PVE::Tools qw(extract_param);
+use PVE::SafeSyslog;
+use PVE::CLIHandler;
+
+use base qw(PVE::CLIHandler);
+
+my $nodename = PVE::INotify::nodename();
+
+sub setup_environment {
+    PVE::RPCEnvironment->setup_default_cli_env();
+}
+
+my $print_list = sub {
+    my $conf = shift;
+
+    printf("%-10s%-20s%-20s%-5s%-10s\n",
+	   "VMID", "DEST", "LAST SYNC","IVAL", "STATE");
+
+    foreach my $vmid (sort keys %$conf) {
+
+	printf("%-10s", $vmid);
+	printf("%-20s", $conf->{$vmid}->{tnode});
+	printf("%-20s", $conf->{$vmid}->{lastsync});
+	printf("%-5s", $conf->{$vmid}->{interval});
+	printf("%-10s\n", $conf->{$vmid}->{state});
+    }
+
+};
+
+my $get_replica_list = sub {
+
+    my $jobs = PVE::ReplicaTools::read_state();
+    my $list = {};
+
+    return $list if !defined($jobs);
+
+    foreach my $vmid (keys %$jobs) {
+
+	my $lastsync = $jobs->{$vmid}->{lastsync};
+
+	#interval in min
+	my $interval = $jobs->{$vmid}->{interval};
+	$lastsync = PVE::ReplicaTools::timestamp2time($lastsync) if
+	    !($lastsync eq '0');
+	my $now = time();
+
+	my $synctime = $lastsync + $interval * 60;
+	if ($now > $synctime && $jobs->{$vmid}->{state} eq 'ok' ){
+	    $list->{$synctime} = $vmid;
+	}
+    }
+
+    return $list;
+};
+
+my $replicate_vms  = sub {
+    my ($list) = @_;
+
+    my $oldest_rep = 0;
+
+    while (%$list) {
+
+	foreach my $synctime (keys %$list) {
+
+	    if ($oldest_rep == 0 || $oldest_rep > $synctime) {
+		$oldest_rep = $synctime;
+	    }
+	}
+
+	eval {
+	    PVE::ReplicaTools::sync_job($list->{$oldest_rep});
+	};
+	if (my $err = $@) {
+	    syslog ('err', $err );
+	}
+	delete $list->{$oldest_rep};
+	$oldest_rep = 0;
+
+    }
+};
+
+__PACKAGE__->register_method ({
+    name => 'run',
+    path => 'run',
+    method => 'POST',
+    description => "This method will run by the systemdtimer and sync all jobs",
+    permissions => {
+	description => {
+	    check => ['perm', '/', [ 'Sys.Console' ]],
+	},
+    },
+    protected => 1,
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	},
+    },
+    returns => { type => 'null' },
+    code => sub {
+
+	my $list = &$get_replica_list();
+	&$replicate_vms($list);
+	syslog ('info', time() );
+	return undef;
+    }});
+
+__PACKAGE__->register_method ({
+    name => 'destroyjob',
+    path => 'destroyjob',
+    method => 'DELETE',
+    description => "Destroy an async replication job",
+    permissions => {
+	description => {
+	    check => ['perm', '/storage', ['Datastore.Allocate']],
+	},
+    },
+    protected => 1,
+    parameters => {
+	additionalProperties => 0,
+	properties => {
+	    vmid => {
+		type => 'string', format => 'pve-vmid',
+		description => "The VMID of the guest.",
+		completion => \&PVE::Cluster::complete_local_vmid,
+	    },
+	},
+    },
+    returns => { type => 'null' },
+    code => sub {
+	my ($param) = @_;
+
+	my $vmid = extract_param($param, 'vmid');
+
+	PVE::ReplicaTools::destroy_replica($vmid);
+
+    }});
+
+our $cmddef = {
+    list => [ 'PVE::API2::ReplicationManager' , 'list' , [],  { node => $nodename },
+	      $print_list],
+    run => [ __PACKAGE__ , 'run'],
+    destroyjob => [ __PACKAGE__ , 'destroyjob', ['vmid']],
+};
+
+1;
diff --git a/PVE/Makefile b/PVE/Makefile
index ae2bd35..d77dd95 100644
--- a/PVE/Makefile
+++ b/PVE/Makefile
@@ -3,6 +3,7 @@
 .PHONY: install
 install:
 	install -D -m 0644 Storage.pm ${DESTDIR}${PERLDIR}/PVE/Storage.pm
+	install -D -m 0644 ReplicaTools.pm ${DESTDIR}${PERLDIR}/PVE/ReplicaTools.pm
 	install -D -m 0644 Diskmanage.pm ${DESTDIR}${PERLDIR}/PVE/Diskmanage.pm
 	make -C Storage install
 	make -C API2 install
diff --git a/PVE/ReplicaTools.pm b/PVE/ReplicaTools.pm
new file mode 100644
index 0000000..5ffa5d4
--- /dev/null
+++ b/PVE/ReplicaTools.pm
@@ -0,0 +1,619 @@
+package PVE::ReplicaTools;
+
+use warnings;
+use strict;
+
+use PVE::Tools qw(run_command);
+use PVE::Cluster;
+use PVE::QemuConfig;
+use PVE::LXC::Config;
+use PVE::Storage;
+use PVE::Storage::Replica::AbstractStorage;
+use PVE::Storage::Replica::ZFSStorage;
+
+use Time::Local;
+use JSON;
+use Data::Dumper qw(Dumper);
+
+my $STATE_DIR = '/var/lib/pve-replica/';
+my $STATE_FILE = "pve-replica.state";
+my $STATE_PATH = $STATE_DIR.$STATE_FILE;
+
+PVE::Cluster::cfs_update;
+my $local_node = PVE::INotify::nodename();
+
+my $cluster_nodes;
+
+my $get_guestconfig = sub {
+    my ($vmid) = @_;
+
+    my $vms = PVE::Cluster::get_vmlist();
+
+    my $type = $vms->{ids}->{$vmid}->{type};
+
+    my $guestconf;
+    if ($type =~ m/^qemu$/) {
+	$guestconf = PVE::QemuConfig->load_config($vmid);
+
+    } elsif ($type =~ m/^lxc$/) {
+	$guestconf = PVE::LXC::Config->load_config($vmid);
+
+    }
+
+    return wantarray ? ($guestconf, $type) : $guestconf;
+};
+
+#Will only read by pverepd
+sub write_state {
+    my ($state) = @_;
+
+    PVE::Tools::file_set_contents($STATE_PATH, JSON::encode_json($state));
+}
+
+sub read_state {
+
+    return undef if !(-e $STATE_PATH);
+
+    my $raw = PVE::Tools::file_get_contents($STATE_PATH);
+
+    return {} if $raw eq '';
+    return JSON::decode_json($raw);
+}
+
+my $get_nodelist = sub {
+    my ($update) = @_;
+
+    if (defined($update) || !defined($cluster_nodes)) {
+
+	$cluster_nodes = PVE::Cluster::get_nodelist();
+    }
+
+    return $cluster_nodes;
+};
+
+sub get_node_ip {
+    my ($nodename, $migration_network) = @_;
+
+    my $localip;
+    my $migrate_network_addr;
+
+    $migrate_network_addr = PVE::Cluster::get_local_migration_ip($migration_network)
+	if defined($migration_network);
+
+    if ($migrate_network_addr) {
+	$localip = $migrate_network_addr;
+    } else {
+	$localip = PVE::Cluster::remote_node_ip($nodename, 1);
+    }
+
+    $localip = "[$localip]" if Net::IP::ip_is_ipv6($localip);
+
+    return $localip;
+}
+
+sub get_all_jobs {
+    my ($nodes) = @_;
+
+    my $nodelist;
+    if (!defined($nodes)) {
+	$nodelist = &$get_nodelist;
+    } else {
+	@$nodelist = PVE::Tools::split_list($nodes)
+    }
+
+    my $vms = PVE::Cluster::get_vmlist();
+    my $state = read_state();
+    my $jobs = {};
+
+    my $outfunc = sub {
+	my $line = shift;
+
+	$jobs = JSON::decode_json($line);
+    };
+
+    foreach my $node (@$nodelist) {
+	if (!($local_node eq $node)) {
+
+	    my $ip = get_node_ip($node);
+	    my @cmd = ('ssh', '-o', 'Batchmode', '--', 'pveredm', 'list',
+		       '--nodes', $node, '--json');
+
+	    run_command([@cmd], outfunc=>$outfunc)
+
+	} else {
+
+	    foreach my $vmid (keys %{$vms->{ids}}) {
+
+		next if !($vms->{ids}->{$vmid}->{node} eq $local_node);
+		next if !defined($state->{$vmid});
+
+		$jobs->{$vmid}->{limit} = $state->{$vmid}->{limit};
+		$jobs->{$vmid}->{interval} = $state->{$vmid}->{interval};
+		$jobs->{$vmid}->{tnode} = $state->{$vmid}->{tnode};
+		$jobs->{$vmid}->{lastsync} = $state->{$vmid}->{lastsync};
+		$jobs->{$vmid}->{state} = $state->{$vmid}->{state};
+	    }
+
+	}
+    }
+
+    return $jobs;
+}
+
+sub parse_volid {
+    my ($volid, $vm_type) = @_;
+
+    my $storage_config = PVE::Storage::config();
+
+    #remove config parameters if exists
+    $volid =~ s|,.*||;
+
+    my $storage_id = PVE::Storage::parse_volume_id($volid);
+
+    my $storage_type = $storage_config->{ids}->{$storage_id}->{type};
+
+    my $plugin = PVE::Storage::Replica::AbstractStorage->lookup($storage_type);
+
+    my $vol = $plugin->parse_volid($volid, $vm_type, $storage_config);
+
+    return ($vol, $plugin);
+}
+
+sub sync_job {
+    my ($vmid, $param) = @_;
+
+    my $jobs = read_state();
+
+    my ($guest_conf, $vm_type) = &$get_guestconfig($vmid);
+    my $qga = 0;
+
+    my $tnode = $jobs->{$vmid}->{tnode};
+
+    if ($vm_type eq "qemu" && defined($guest_conf->{agent}) ) {
+	$qga = PVE::QemuServer::qga_check_running($vmid)
+	    if PVE::QemuServer::check_running($vmid);
+    }
+
+    #will not die if a disk is not syncable
+    my $disks = get_syncable_guestdisks($guest_conf, $vm_type);
+
+    #check if all nodes have the storage availible
+    storage_exists_on_nodes($disks, "$tnode,$local_node");
+
+    $param->{limit} = $guest_conf
+	if (defined($param->{limit}) && defined($guest_conf->{replimit}));
+
+    my $timestamp = time2timestamp(time());
+
+    my $lastsync = $jobs->{$vmid}->{lastsync};
+
+    #freeze filesystem for data consistency
+    if ($qga == 1 ) {
+	print "Freeze guest filesystem\n";
+
+	eval {
+	    PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
+	};
+    }
+
+    my $snapname = "rep_$timestamp";
+
+    my $disks_status = {};
+    $disks_status->{snapname} = $snapname;
+
+
+    my $sync_job = sub {
+
+	#make snapshot of all datasets
+	foreach my $volid (@$disks) {
+	    my ($vol, $plugin) = parse_volid($volid, $vm_type);
+
+	    $disks_status->{$volid} = $vol;
+	    $disks_status->{$volid}->{synctime} = $timestamp;
+	    $disks_status->{$volid}->{lastsync} =
+		defined($lastsync) ? $lastsync : 0;
+
+	    eval {
+		$plugin->snapshot_add($vol, $snapname);
+	    };
+
+	    if (my $err = $@) {
+		if ($qga == 1) {
+		    print "Unfreeze guest filesystem\n";
+		    eval {
+			PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw");
+		    };
+		}
+		cleanup_snapshot($disks_status);
+		$jobs->{$vmid}->{state} = 'err';
+		write_state($jobs);
+
+		die $err;
+	    }
+
+	    $disks_status->{$volid}->{snapshot} = 1;
+	}
+
+	if ($qga == 1) {
+	    print "Unfreeze guest filesystem\n";
+	    eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
+	}
+
+	my $ip = get_node_ip($tnode, $param->{mig_network});
+
+	foreach my $volid (@$disks) {
+
+	    eval { send_image($disks_status->{$volid}, $param, $ip); };
+
+	    if (my $err = $@) {
+		cleanup_snapshot($disks_status, $ip);
+		$jobs->{$vmid}->{state} = 'err';
+		write_state($jobs);
+		die "$err";
+	    }
+
+	    $disks_status->{$volid}->{synced} = 1;
+	}
+	cleanup_snapshot($disks_status, $ip, 1) if
+	    $jobs->{$vmid}->{lastsync} !~ m/^0$/;
+
+	$jobs->{$vmid}->{lastsync} = $timestamp;
+	write_state($jobs);
+    };
+
+    PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $sync_job);
+    die $@ if $@;
+
+    return $timestamp;
+}
+
+sub get_snapshots {
+    my ($vol, $prefix, $nodes) = @_;
+
+    my $plugin = $vol->{plugin};
+    return $plugin->get_snapshots($vol, $prefix, $nodes);
+}
+
+sub snapshot_add {
+    my ($vol, $snapname, $ip) = @_;
+
+    my $plugin = $vol->{plugin};
+    $plugin->snapshot_add($vol, $snapname, $ip);
+}
+
+sub snapshot_destroy {
+    my ($vol, $snap, $ip, $noerr) = @_;
+
+    my $plugin = $vol->{plugin};
+    $plugin->snapshot_destroy($vol, $snap, $ip, $noerr);
+}
+
+sub send_image {
+    my ($vol, $param, $ip, $all_snaps_in_delta, $alter_path) = @_;
+
+    my $plugin = $vol->{plugin};
+    $plugin->send_image($vol, $param, $ip, $all_snaps_in_delta, $alter_path);
+}
+
+#Timestamp must in format YYYY-MM-DD_hh:mm:ss
+sub timestamp2time {
+    my ($string) = @_;
+
+    $string =~ m/^(\d{4})-(\d{2})-(\d{2})_(\d{2}):(\d{2}):(\d{2})$/;
+    my $time = timelocal($6, $5, $4, $3, ($2-1), ($1-1900));
+
+    return $time;
+}
+
+sub time2timestamp {
+    my ($time) = @_;
+
+    my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) =
+	localtime($time);
+    my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
+			     $year+1900, $mon+1, $mday, $hour, $min, $sec);
+
+    #Return in format YYYY-MM-DD_hh:mm:ss
+    return $datestamp;
+}
+
+sub job_enable {
+    my ($vmid, $no_sync) = @_;
+
+    my $update_state = sub {
+	my ($state) = @_;
+
+	my $jobs = read_state();
+
+	my $config = &$get_guestconfig($vmid);
+	my $param = {};
+
+	$jobs->{$vmid}->{interval} = $config->{repinterval} ?
+	    $config->{repinterval} : 15;
+
+	if ( defined($config->{replimit})) {
+	    $jobs->{$vmid}->{limit} = $config->{replimit};
+	    $param->{limit} = $config->{replimit};
+	}
+
+	die "rettarget must be set\n" if !defined($config->{reptarget});
+	$jobs->{$vmid}->{tnode} = $config->{reptarget};
+
+	if (!defined($jobs->{$vmid}->{lastsync})) {
+
+	    if ( my $lastsync = get_lastsync($vmid)) {
+		$jobs->{$vmid}->{lastsync} = $lastsync;
+	    } else {
+		$jobs->{$vmid}->{lastsync} = 0;
+	    }
+	}
+
+	$param->{verbose} = 1;
+
+	$jobs->{$vmid}->{state} = 'ok';
+	write_state($jobs);
+
+	eval{
+	    sync_job($vmid, $param) if !defined($no_sync);
+	};
+	if (my $err = $@) {
+	    $jobs->{$vmid}->{state} = 'error';
+	    write_state($jobs);
+	    die $err;
+	}
+    };
+
+    PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
+    die $@ if $@;
+}
+
+sub job_disable {
+    my ($vmid) = @_;
+
+    my $update_state = sub {
+
+	my $jobs = read_state();
+
+	if (defined($jobs->{$vmid})) {
+	    $jobs->{$vmid}->{state} = 'off';
+	    write_state($jobs);
+	} else {
+	    print "No replica service for $vmid\n";
+	}
+    };
+
+    PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
+    die $@ if $@;
+}
+
+sub job_remove {
+    my ($vmid) = @_;
+
+    my $update_state = sub {
+
+	my $jobs = read_state();
+
+	if (defined($jobs->{$vmid})) {
+	    delete($jobs->{$vmid});
+	    write_state($jobs);
+	} else {
+	    print "No replica service for $vmid\n";
+	}
+    };
+
+    PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
+    die $@ if $@;
+}
+
+sub storage_exists_on_nodes {
+    my ($disks, $nodes) = @_;
+
+    my $storage_config = PVE::Storage::config();
+
+    my @nodes = PVE::Tools::split_list($nodes);
+
+    foreach my $volid (@$disks) {
+	my ($storeid) = PVE::Storage::parse_volume_id($volid);
+	next if !defined($storage_config->{ids}->{$storeid}->{nodes});
+
+	foreach my $node (@nodes) {
+	    if (!$storage_config->{ids}->{$storeid}->{nodes}->{$node}) {
+		die "Storage not availible on node: $node\n";
+	    }
+	}
+    }
+
+}
+
+sub get_syncable_guestdisks {
+    my ($config, $vm_type, $get_err) = @_;
+
+    my @disks;
+    foreach my $k (sort (keys %$config)) {
+	next if $k !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs)$/;
+	next if $config->{$k} =~ m/media=cdrom/;
+
+	push @disks, $config->{$k};
+    }
+
+    my $storage_config = PVE::Storage::config();
+    my $syncable_disks = [];
+
+    foreach my $volid (@disks) {
+
+	if ($volid =~ m/rep=(?i:0|no|off|false)/) {
+	    warn "Disk: $volid will not include in sync\n"
+		if defined($get_err);
+	    next;
+	}
+
+	my ($storeid) = PVE::Storage::parse_volume_id($volid);
+	my $store_type = $storage_config->{ids}->{$storeid}->{type};
+	if ($store_type eq 'zfspool') {
+	    push @$syncable_disks, $volid;
+	} else {
+	    die "Can't replicate VM because Disk: $volid does not suport replication\n"
+		if defined($get_err);
+	}
+    }
+
+    return $syncable_disks;
+}
+
+sub destroy_all_snapshots {
+    my ($vmid, $prefix, $nodes, $dryrun) = @_;
+
+    $nodes = $local_node if !defined($nodes);
+
+    my ($guest_conf, $vm_type) = &$get_guestconfig($vmid);
+
+    my $disks = get_syncable_guestdisks($guest_conf, $vm_type, 0);
+
+    my $snapshots;
+
+    foreach my $volid (@$disks) {
+	my ($vol, $plugin) = parse_volid($volid, $vm_type);
+	$snapshots->{$vol->{volid}} = get_snapshots($vol, $prefix, $nodes);
+
+	$snapshots->{$vol->{volid}}->{vol} = $vol;
+	$snapshots->{$vol->{volid}}->{plugin} = $plugin;
+    }
+
+    if (!defined($dryrun)) {
+	foreach my $volid (keys %$snapshots) {
+
+	    foreach my $node (keys %{$snapshots->{$volid}}) {
+		next if $node eq 'plugin';
+		next if $node eq 'vol';
+
+		my $plugin = $snapshots->{$volid}->{plugin};
+		my $vol = $snapshots->{$volid}->{vol};
+		my $ip = $snapshots->{$volid}->{$node}->{ip} if
+		    $snapshots->{$volid}->{$node}->{ip};
+
+		if (defined($prefix)) {
+		    foreach my $snap (@{$snapshots->{$volid}->{$node}->{$prefix}}) {
+			$plugin->snapshot_destroy($vol, $snap, $ip, 1);
+		    }
+		} else {
+		    $plugin->snapshot_destroy($vol, undef, $ip, 1);
+		}
+	    }
+	}
+    }
+
+    return $snapshots;
+}
+
+sub cleanup_snapshot {
+    my ($disks, $ip, $remove_old) = @_;
+
+    foreach my $volid (keys %$disks) {
+
+	next if $volid eq 'snapname';
+
+	my $vol = $disks->{$volid};
+
+	my $plugin = $vol->{plugin};
+	my $snapname;
+
+	if ($remove_old) {
+	    $snapname = "rep_$vol->{lastsync}";
+	} else {
+	   $snapname = $disks->{snapname};
+	}
+
+	if (defined($remove_old) || $disks->{$volid}->{synced}) {
+	    $plugin->snapshot_destroy($vol, $snapname, $ip, 1);
+	}
+
+	if (defined($remove_old) || $disks->{$volid}->{snapshot}) {
+	    $plugin->snapshot_destroy($vol, $snapname, undef, 1);
+	}
+    }
+}
+
+sub check_nodes {
+    my ($nodes, $check_local) = @_;
+
+    foreach my $node (PVE::Tools::split_list($nodes)) {
+	die "Local host and taget host can't be the same\n"
+	    if defined($check_local) && $local_node eq $node;
+
+	die "Node: $node does not exists.\n" if
+	    !PVE::Cluster::check_node_exists($node);
+    }
+}
+
+sub destroy_replica {
+    my ($vmid) = @_;
+
+    my $code = sub {
+
+	my $jobs = read_state();
+
+	return if !defined($jobs->{$vmid});
+
+	my ($guest_conf, $vm_type) = &$get_guestconfig($vmid);
+
+	destroy_all_snapshots($vmid, 'rep', $local_node);
+	destroy_all_snapshots($vmid, undef, $guest_conf->{reptarget});
+
+	delete($jobs->{$vmid});
+
+	delete($guest_conf->{replimit});
+	delete($guest_conf->{repinterval});
+	delete($guest_conf->{reptarget});
+	delete($guest_conf->{replica});
+
+	if ($vm_type eq 'qemu') {
+	    PVE::QemuConfig->write_config($vmid, $guest_conf);
+	} else {
+	    PVE::LXC::Config->write_config($vmid, $guest_conf);
+	}
+	write_state($jobs);
+    };
+
+    PVE::Tools::lock_file_full($STATE_PATH, 30, 0 , $code);
+    die $@ if $@;
+}
+
+sub get_rep_snap {
+    my ($volid) = @_;
+
+    my $vmtype;
+
+    if ($volid =~ m|subvol-|) {
+	$vmtype = 'lxc';
+    } else {
+	$vmtype = 'qemu';
+    }
+
+    my ($vol, $plugin) = parse_volid($volid, $vmtype);
+    my $snap = get_snapshots($vol, 'rep', $local_node);
+
+    return  @{$snap->{$local_node}->{rep}}[0];
+}
+
+sub get_lastsync {
+    my ($vmid) = @_;
+
+    my ($conf, $vm_type) = &$get_guestconfig($vmid);
+
+    my $sync_vol = get_syncable_guestdisks($conf, $vm_type);
+
+    my $snap;
+    foreach my $volid (@$sync_vol) {
+	my $tmp_snap = get_rep_snap($volid);
+
+	if (defined($tmp_snap)) {
+	    $tmp_snap =~ m/^rep_(.*)$/;
+	    die "snapshots not coherent"
+		if defined($snap) && !($snap eq $1);
+	    $snap = $1;
+	}
+    }
+
+    return $snap;
+}
+1;
diff --git a/PVE/Storage/Makefile b/PVE/Storage/Makefile
index b924f21..bfcfc18 100644
--- a/PVE/Storage/Makefile
+++ b/PVE/Storage/Makefile
@@ -4,3 +4,4 @@ SOURCES=Plugin.pm DirPlugin.pm LVMPlugin.pm NFSPlugin.pm ISCSIPlugin.pm RBDPlugi
 install:
 	for i in ${SOURCES}; do install -D -m 0644 $$i ${DESTDIR}${PERLDIR}/PVE/Storage/$$i; done
 	make -C LunCmd install
+	make -C Replica install
diff --git a/PVE/Storage/Replica/AbstractStorage.pm b/PVE/Storage/Replica/AbstractStorage.pm
new file mode 100644
index 0000000..7a39f8b
--- /dev/null
+++ b/PVE/Storage/Replica/AbstractStorage.pm
@@ -0,0 +1,51 @@
+package PVE::Storage::Replica::AbstractStorage;
+
+use strict;
+use warnings;
+
+use PVE::Storage::Replica::ZFSStorage;
+
+sub lookup {
+    my ($class, $type) = @_;
+
+    my $plugin;
+    if ( $type eq 'zfspool') {
+	$plugin = 'PVE::Storage::Replica::ZFSStorage';
+    } else  {
+	die "unknown section type '$type'\n";
+    }
+
+    return $plugin;
+}
+
+sub parse_volid {
+    my ($self, $volid, $vm_type, $storage_config) = @_;
+
+    die "implement me";
+}
+
+sub get_snapshots {
+    my ($self, $vol, $prefix, $nodes) = @_;
+
+    die "implement me";
+}
+
+sub snapshot_add {
+    my ($self, $vol, $snapname) = @_;
+
+    die "implement me";
+}
+
+sub snapshot_destroy {
+    my ($self, $vol, $snapname, $ip, $noerr) = @_;
+
+    die "implement me";
+}
+
+sub send_image {
+    my ($self, $dataset, $param, $ip) = @_;
+
+    die "implement me";
+}
+
+1;
diff --git a/PVE/Storage/Replica/Makefile b/PVE/Storage/Replica/Makefile
new file mode 100644
index 0000000..fbd52b8
--- /dev/null
+++ b/PVE/Storage/Replica/Makefile
@@ -0,0 +1,5 @@
+SOURCES=AbstractStorage.pm ZFSStorage.pm
+
+.PHONY: install
+install:
+	for i in ${SOURCES}; do install -D -m 0644 $$i ${DESTDIR}${PERLDIR}/PVE/Storage/Replica/$$i; done
diff --git a/PVE/Storage/Replica/ZFSStorage.pm b/PVE/Storage/Replica/ZFSStorage.pm
new file mode 100644
index 0000000..3784b96
--- /dev/null
+++ b/PVE/Storage/Replica/ZFSStorage.pm
@@ -0,0 +1,199 @@
+package PVE::Storage::Replica::ZFSStorage;
+
+use strict;
+use warnings;
+use PVE::Tools qw(run_command);
+use PVE::ReplicaTools;
+use PVE::Storage;
+
+use Data::Dumper qw(Dumper);
+
+use base qw(PVE::Storage::Replica::AbstractStorage);
+
+my $recover_inc_send = sub {
+    my ($dataset, $param, $ip, $err) = @_;
+
+    return undef;
+};
+
+sub parse_volid {
+    my ($self, $volid, $vm_type, $storage_config) = @_;
+
+    my $vol = {
+	'storagetype' => 'zfspool',
+	'plugin' => $self,
+	'volid' => $volid
+    };
+
+    #remove config parameters if exists
+    $volid =~ s|,.*||;
+
+    my $dataset = (PVE::Storage::parse_volume_id($volid))[1];
+    my $zfspath = PVE::Storage::path($storage_config, $volid);
+
+    my $pool;
+    my $subpath;
+
+    if ($vm_type eq 'qemu'&&
+	$zfspath =~ m/^\/dev\/zvol\/(\w+.*)(\/$dataset)$/) {
+
+	my @array = split('/', $1);
+	$pool = shift(@array);
+	if (0 < @array) {
+	    $subpath = join('/', @array);
+	}
+
+    } elsif ($vm_type eq 'lxc' &&
+	     $zfspath =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$dataset)$/) {
+
+	$pool = $1;
+
+	if ($2) {
+	    $subpath = $3;
+	}
+
+    } else {
+	die "ERROR: in path\n";
+    }
+
+    my $zpath = $vol->{pool} = "$pool/";
+    $zpath .= $vol->{subpath} =  "$subpath/" if defined($subpath);
+    $zpath .= $vol->{dataset} = "$dataset";
+    $vol->{zfspath} = $zpath;
+
+    return $vol;
+}
+
+sub get_snapshots {
+    my ($self, $vol, $prefix, $nodes) = @_;
+
+    my $zfspath = $vol->{zfspath};
+    my $volid = $vol->{volid};
+    my $local_node = PVE::INotify::nodename();
+
+    $prefix = '' if !defined($prefix);
+    my $snaps = {};
+
+    my @nodes = PVE::Tools::split_list($nodes);
+
+    foreach my $node ( @nodes ) {
+	my $cmd = ['zfs', 'list', '-r', '-H', '-S',
+		   'name', '-t', 'snap', '-o', 'name', $zfspath];
+
+	if (!($node eq $local_node)) {
+	    my $node_ip = PVE::ReplicaTools::get_node_ip($node);
+	    $snaps->{$node}->{ip} = $node_ip;
+
+	    unshift @$cmd, 'ssh', '-o', ' BatchMode=yes'
+		, "root\@".$node_ip, '--';
+	}
+
+	my @snaps;
+
+	my $outfunc = sub {
+	    my $line = shift;
+
+	    if ($line =~ m/^\Q$zfspath\E@(\Q$prefix\E.*)$/) {
+		push @snaps, $1;
+	    }
+	};
+
+	eval { run_command( [$cmd], outfunc => $outfunc , errfunc => sub{}); };
+
+	$snaps->{$node}->{$prefix} = \@snaps if !($@);
+
+    }
+
+    #retun an empty list if dataset does not exist.
+    return $snaps;
+}
+
+sub snapshot_add {
+    my ($self, $vol, $snapname) = @_;
+
+    my $zfspath = $vol->{zfspath};
+
+    my $cmd = [];
+    push @$cmd, 'zfs', 'snapshot', "$zfspath\@$snapname";
+
+    eval{ run_command($cmd); };
+
+    if (my $err = $@) {
+	$self->snapshot_destroy($vol, $snapname, undef, 1);
+	die "$err\n";
+    }
+}
+
+sub snapshot_destroy {
+    my ($self, $vol, $snapname, $ip, $noerr) = @_;
+
+    my $zfspath = $vol->{zfspath};
+
+    my @zfscmd;
+    if (defined $snapname){
+	push @zfscmd, 'zfs', 'destroy', "${zfspath}\@$snapname";
+    } else {
+	push @zfscmd, 'zfs', 'destroy', '-R', ${zfspath};
+    }
+
+    unshift @zfscmd,'ssh', '-o', 'BatchMode=yes', "root\@${ip}", '--'
+	if defined($ip);
+
+    eval { run_command([@zfscmd]); };
+
+    if (my $erro = $@ && !defined($noerr)) {
+	die "$erro\n";
+    }
+}
+
+sub send_image {
+    my ($self, $dataset, $param, $ip) = @_;
+
+    my $cmdsend = [];
+    my $cmdlimit = [];
+
+    push @$cmdsend, 'zfs', 'send', '-R';
+    push @$cmdsend, '-v' if defined($param->{verbose});
+
+    my $zfspath = $dataset->{zfspath};
+    my $lastsync = $dataset->{lastsync};
+    my $prefix = 'rep';
+
+    #untaint lastsysnc
+    $lastsync =~ m/(\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})/;
+    $lastsync = $1;
+
+    #undefined or 0 no snapshot exists
+    if( defined($lastsync) && !($lastsync eq "0") ) {
+	push @$cmdsend, '-I', "$zfspath\@${prefix}_${lastsync}";
+    }
+
+    push @$cmdsend, '--', "$zfspath\@${prefix}_$dataset->{synctime}";
+
+    if ($param->{limit}){
+	my $bwl = $param->{limit}*1024;
+	push @$cmdlimit, 'cstream', '-t', $bwl;
+    }
+
+    my $cmdrecv = [];
+
+    push @$cmdrecv, 'ssh', '-o', 'BatchMode=yes', "root\@${ip}", '--' if $ip;
+    push @$cmdrecv, 'zfs', 'recv', '-F', '--';
+    push @$cmdrecv, $zfspath ;
+
+
+    if ($param->{limit}) {
+	eval { run_command([$cmdsend, $cmdlimit, $cmdrecv]) };
+    } else {
+	my $cds =  join(' ', @$cmdsend)." | ". join(' ', @$cmdrecv);
+
+	eval { run_command([$cmdsend, $cmdrecv]) };
+    }
+
+    if (my $err = $@) {
+	my $recovered = &$recover_inc_send($dataset, $param, $ip, $err);
+	die $err if !defined($recovered);
+    }
+}
+
+1;
diff --git a/pverepm b/pverepm
new file mode 100644
index 0000000..5f20fdf
--- /dev/null
+++ b/pverepm
@@ -0,0 +1,8 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use PVE::CLI::pverepm;
+
+PVE::CLI::pverepm->run_cli_handler();
-- 
2.1.4





More information about the pve-devel mailing list