[pve-devel] [RFC pve-storage 1/3] This patch will include storage asyncron replica.
Wolfgang Link
w.link at proxmox.com
Mon Apr 3 16:53:40 CEST 2017
It is possible to sync a volume to an other node in a defines interval.
So if a node fail there will be an copy of the volumes from a VM
on an other node.
With this copy it is possible to start the VM on this node.
---
Makefile | 12 +-
PVE/API2/Makefile | 1 +
PVE/API2/ReplicationManager.pm | 45 +++
PVE/CLI/Makefile | 2 +-
PVE/CLI/pverepm.pm | 154 ++++++++
PVE/Makefile | 1 +
PVE/ReplicaTools.pm | 619 +++++++++++++++++++++++++++++++++
PVE/Storage/Makefile | 1 +
PVE/Storage/Replica/AbstractStorage.pm | 51 +++
PVE/Storage/Replica/Makefile | 5 +
PVE/Storage/Replica/ZFSStorage.pm | 199 +++++++++++
pverepm | 8 +
12 files changed, 1095 insertions(+), 3 deletions(-)
create mode 100644 PVE/API2/ReplicationManager.pm
create mode 100644 PVE/CLI/pverepm.pm
create mode 100644 PVE/ReplicaTools.pm
create mode 100644 PVE/Storage/Replica/AbstractStorage.pm
create mode 100644 PVE/Storage/Replica/Makefile
create mode 100644 PVE/Storage/Replica/ZFSStorage.pm
create mode 100644 pverepm
diff --git a/Makefile b/Makefile
index 57500e3..54c774b 100644
--- a/Makefile
+++ b/Makefile
@@ -33,15 +33,23 @@ pvesm.bash-completion:
perl -I. -T -e "use PVE::CLI::pvesm; PVE::CLI::pvesm->generate_bash_completions();" >$@.tmp
mv $@.tmp $@
+pverepm.bash-completion:
+ perl -I. -T -e "use PVE::CLI::pverepm; PVE::CLI::pverepm->generate_bash_completions();" >$@.tmp
+ mv $@.tmp $@
+
.PHONY: install
-install: pvesm.1 pvesm.bash-completion
+install: pvesm.1 pvesm.bash-completion pverepm.bash-completion
install -d ${DESTDIR}${SBINDIR}
install -m 0755 pvesm ${DESTDIR}${SBINDIR}
+ install -m 0755 pverepm ${DESTDIR}${SBINDIR}
make -C PVE install
+ install -d ${DESTDIR}/var/lib/pve-replica
install -d ${DESTDIR}/usr/share/man/man1
install -m 0644 pvesm.1 ${DESTDIR}/usr/share/man/man1/
gzip -9 -n ${DESTDIR}/usr/share/man/man1/pvesm.1
install -m 0644 -D pvesm.bash-completion ${DESTDIR}${BASHCOMPLDIR}/pvesm
+ install -m 0644 -D pverepm.bash-completion ${DESTDIR}${BASHCOMPLDIR}/pverepm
+
.PHONY: deb
deb: ${DEB}
@@ -65,7 +73,7 @@ ${DEB}:
.PHONY: clean
clean:
make cleanup-docgen
- rm -rf debian *.deb ${PACKAGE}-*.tar.gz dist *.1 *.tmp pvesm.bash-completion
+ rm -rf debian *.deb ${PACKAGE}-*.tar.gz dist *.1 *.tmp pvesm.bash-completion pverepm.bash-completion
find . -name '*~' -exec rm {} ';'
.PHONY: distclean
diff --git a/PVE/API2/Makefile b/PVE/API2/Makefile
index 7b7226e..d67b459 100644
--- a/PVE/API2/Makefile
+++ b/PVE/API2/Makefile
@@ -3,4 +3,5 @@
.PHONY: install
install:
install -D -m 0644 Disks.pm ${DESTDIR}${PERLDIR}/PVE/API2/Disks.pm
+ install -D -m 0644 ReplicationManager.pm ${DESTDIR}${PERLDIR}/PVE/API2/ReplicationManager.pm
make -C Storage install
diff --git a/PVE/API2/ReplicationManager.pm b/PVE/API2/ReplicationManager.pm
new file mode 100644
index 0000000..c65168a
--- /dev/null
+++ b/PVE/API2/ReplicationManager.pm
@@ -0,0 +1,45 @@
+package PVE::API2::ReplicationManager;
+
+use warnings;
+use strict;
+
+use Data::Dumper qw(Dumper);
+
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::ReplicaTools;
+
+use PVE::RESTHandler;
+
+use base qw(PVE::RESTHandler);
+
+__PACKAGE__->register_method ({
+ name => 'list',
+ path => 'list',
+ method => 'GET',
+ description => "List of all replications",
+ permissions => {
+ user => 'all',
+ },
+ protected => 1,
+ proxyto => 'node',
+ parameters => {
+ additionalProperties => 0,
+ properties => {
+ node => get_standard_option('pve-node'),
+ nodes => get_standard_option('pve-node-list' , {
+ optional => 1,
+ description => "Notes where the jobs is located."
+ }),
+ },
+ },
+ returns => { type => 'object' },
+ code => sub {
+ my ($param) = @_;
+
+ PVE::ReplicaTools::check_nodes($param->{nodes}) if $param->{nodes};
+
+ my $nodes = $param->{nodes} ? $param->{nodes} : $param->{node};
+ return PVE::ReplicaTools::get_all_jobs($nodes);
+}});
+
+1;
diff --git a/PVE/CLI/Makefile b/PVE/CLI/Makefile
index 6c6e258..39f67e3 100644
--- a/PVE/CLI/Makefile
+++ b/PVE/CLI/Makefile
@@ -1,4 +1,4 @@
-SOURCES=pvesm.pm
+SOURCES=pvesm.pm pverepm.pm
.PHONY: install
install: ${SOURCES}
diff --git a/PVE/CLI/pverepm.pm b/PVE/CLI/pverepm.pm
new file mode 100644
index 0000000..bbb1c86
--- /dev/null
+++ b/PVE/CLI/pverepm.pm
@@ -0,0 +1,154 @@
+package PVE::CLI::pverepm;
+
+use strict;
+use warnings;
+
+use PVE::API2::ReplicationManager;
+use PVE::JSONSchema qw(get_standard_option);
+use PVE::INotify;
+use PVE::RPCEnvironment;
+use PVE::Tools qw(extract_param);
+use PVE::SafeSyslog;
+use PVE::CLIHandler;
+
+use base qw(PVE::CLIHandler);
+
+my $nodename = PVE::INotify::nodename();
+
+sub setup_environment {
+ PVE::RPCEnvironment->setup_default_cli_env();
+}
+
+my $print_list = sub {
+ my $conf = shift;
+
+ printf("%-10s%-20s%-20s%-5s%-10s\n",
+ "VMID", "DEST", "LAST SYNC","IVAL", "STATE");
+
+ foreach my $vmid (sort keys %$conf) {
+
+ printf("%-10s", $vmid);
+ printf("%-20s", $conf->{$vmid}->{tnode});
+ printf("%-20s", $conf->{$vmid}->{lastsync});
+ printf("%-5s", $conf->{$vmid}->{interval});
+ printf("%-10s\n", $conf->{$vmid}->{state});
+ }
+
+};
+
+my $get_replica_list = sub {
+
+ my $jobs = PVE::ReplicaTools::read_state();
+ my $list = {};
+
+ return $list if !defined($jobs);
+
+ foreach my $vmid (keys %$jobs) {
+
+ my $lastsync = $jobs->{$vmid}->{lastsync};
+
+ #interval in min
+ my $interval = $jobs->{$vmid}->{interval};
+ $lastsync = PVE::ReplicaTools::timestamp2time($lastsync) if
+ !($lastsync eq '0');
+ my $now = time();
+
+ my $synctime = $lastsync + $interval * 60;
+ if ($now > $synctime && $jobs->{$vmid}->{state} eq 'ok' ){
+ $list->{$synctime} = $vmid;
+ }
+ }
+
+ return $list;
+};
+
+my $replicate_vms = sub {
+ my ($list) = @_;
+
+ my $oldest_rep = 0;
+
+ while (%$list) {
+
+ foreach my $synctime (keys %$list) {
+
+ if ($oldest_rep == 0 || $oldest_rep > $synctime) {
+ $oldest_rep = $synctime;
+ }
+ }
+
+ eval {
+ PVE::ReplicaTools::sync_job($list->{$oldest_rep});
+ };
+ if (my $err = $@) {
+ syslog ('err', $err );
+ }
+ delete $list->{$oldest_rep};
+ $oldest_rep = 0;
+
+ }
+};
+
+__PACKAGE__->register_method ({
+ name => 'run',
+ path => 'run',
+ method => 'POST',
+ description => "This method will run by the systemdtimer and sync all jobs",
+ permissions => {
+ description => {
+ check => ['perm', '/', [ 'Sys.Console' ]],
+ },
+ },
+ protected => 1,
+ parameters => {
+ additionalProperties => 0,
+ properties => {
+ },
+ },
+ returns => { type => 'null' },
+ code => sub {
+
+ my $list = &$get_replica_list();
+ &$replicate_vms($list);
+ syslog ('info', time() );
+ return undef;
+ }});
+
+__PACKAGE__->register_method ({
+ name => 'destroyjob',
+ path => 'destroyjob',
+ method => 'DELETE',
+ description => "Destroy an async replication job",
+ permissions => {
+ description => {
+ check => ['perm', '/storage', ['Datastore.Allocate']],
+ },
+ },
+ protected => 1,
+ parameters => {
+ additionalProperties => 0,
+ properties => {
+ vmid => {
+ type => 'string', format => 'pve-vmid',
+ description => "The VMID of the guest.",
+ completion => \&PVE::Cluster::complete_local_vmid,
+ },
+ },
+ },
+ returns => { type => 'null' },
+ code => sub {
+ my ($param) = @_;
+
+ my $vmid = extract_param($param, 'vmid');
+
+ PVE::ReplicaTools::destroy_replica($vmid);
+
+ }});
+
+our $cmddef = {
+ list => [ 'PVE::API2::ReplicationManager' , 'list' , [], { node => $nodename },
+ $print_list],
+ run => [ __PACKAGE__ , 'run'],
+ destroyjob => [ __PACKAGE__ , 'destroyjob', ['vmid']],
+};
+
+1;
diff --git a/PVE/Makefile b/PVE/Makefile
index ae2bd35..d77dd95 100644
--- a/PVE/Makefile
+++ b/PVE/Makefile
@@ -3,6 +3,7 @@
.PHONY: install
install:
install -D -m 0644 Storage.pm ${DESTDIR}${PERLDIR}/PVE/Storage.pm
+ install -D -m 0644 ReplicaTools.pm ${DESTDIR}${PERLDIR}/PVE/ReplicaTools.pm
install -D -m 0644 Diskmanage.pm ${DESTDIR}${PERLDIR}/PVE/Diskmanage.pm
make -C Storage install
make -C API2 install
diff --git a/PVE/ReplicaTools.pm b/PVE/ReplicaTools.pm
new file mode 100644
index 0000000..5ffa5d4
--- /dev/null
+++ b/PVE/ReplicaTools.pm
@@ -0,0 +1,619 @@
+package PVE::ReplicaTools;
+
+use warnings;
+use strict;
+
+use PVE::Tools qw(run_command);
+use PVE::Cluster;
+use PVE::QemuConfig;
+use PVE::LXC::Config;
+use PVE::Storage;
+use PVE::Storage::Replica::AbstractStorage;
+use PVE::Storage::Replica::ZFSStorage;
+
+use Time::Local;
+use JSON;
+use Data::Dumper qw(Dumper);
+
+my $STATE_DIR = '/var/lib/pve-replica/';
+my $STATE_FILE = "pve-replica.state";
+my $STATE_PATH = $STATE_DIR.$STATE_FILE;
+
+PVE::Cluster::cfs_update;
+my $local_node = PVE::INotify::nodename();
+
+my $cluster_nodes;
+
+my $get_guestconfig = sub {
+ my ($vmid) = @_;
+
+ my $vms = PVE::Cluster::get_vmlist();
+
+ my $type = $vms->{ids}->{$vmid}->{type};
+
+ my $guestconf;
+ if ($type =~ m/^qemu$/) {
+ $guestconf = PVE::QemuConfig->load_config($vmid);
+
+ } elsif ($type =~ m/^lxc$/) {
+ $guestconf = PVE::LXC::Config->load_config($vmid);
+
+ }
+
+ return wantarray ? ($guestconf, $type) : $guestconf;
+};
+
+#Will only read by pverepd
+sub write_state {
+ my ($state) = @_;
+
+ PVE::Tools::file_set_contents($STATE_PATH, JSON::encode_json($state));
+}
+
+sub read_state {
+
+ return undef if !(-e $STATE_PATH);
+
+ my $raw = PVE::Tools::file_get_contents($STATE_PATH);
+
+ return {} if $raw eq '';
+ return JSON::decode_json($raw);
+}
+
+my $get_nodelist = sub {
+ my ($update) = @_;
+
+ if (defined($update) || !defined($cluster_nodes)) {
+
+ $cluster_nodes = PVE::Cluster::get_nodelist();
+ }
+
+ return $cluster_nodes;
+};
+
+sub get_node_ip {
+ my ($nodename, $migration_network) = @_;
+
+ my $localip;
+ my $migrate_network_addr;
+
+ $migrate_network_addr = PVE::Cluster::get_local_migration_ip($migration_network)
+ if defined($migration_network);
+
+ if ($migrate_network_addr) {
+ $localip = $migrate_network_addr;
+ } else {
+ $localip = PVE::Cluster::remote_node_ip($nodename, 1);
+ }
+
+ $localip = "[$localip]" if Net::IP::ip_is_ipv6($localip);
+
+ return $localip;
+}
+
+sub get_all_jobs {
+ my ($nodes) = @_;
+
+ my $nodelist;
+ if (!defined($nodes)) {
+ $nodelist = &$get_nodelist;
+ } else {
+ @$nodelist = PVE::Tools::split_list($nodes)
+ }
+
+ my $vms = PVE::Cluster::get_vmlist();
+ my $state = read_state();
+ my $jobs = {};
+
+ my $outfunc = sub {
+ my $line = shift;
+
+ $jobs = JSON::decode_json($line);
+ };
+
+ foreach my $node (@$nodelist) {
+ if (!($local_node eq $node)) {
+
+ my $ip = get_node_ip($node);
+ my @cmd = ('ssh', '-o', 'Batchmode', '--', 'pveredm', 'list',
+ '--nodes', $node, '--json');
+
+ run_command([@cmd], outfunc=>$outfunc)
+
+ } else {
+
+ foreach my $vmid (keys %{$vms->{ids}}) {
+
+ next if !($vms->{ids}->{$vmid}->{node} eq $local_node);
+ next if !defined($state->{$vmid});
+
+ $jobs->{$vmid}->{limit} = $state->{$vmid}->{limit};
+ $jobs->{$vmid}->{interval} = $state->{$vmid}->{interval};
+ $jobs->{$vmid}->{tnode} = $state->{$vmid}->{tnode};
+ $jobs->{$vmid}->{lastsync} = $state->{$vmid}->{lastsync};
+ $jobs->{$vmid}->{state} = $state->{$vmid}->{state};
+ }
+
+ }
+ }
+
+ return $jobs;
+}
+
+sub parse_volid {
+ my ($volid, $vm_type) = @_;
+
+ my $storage_config = PVE::Storage::config();
+
+ #remove config parameters if exists
+ $volid =~ s|,.*||;
+
+ my $storage_id = PVE::Storage::parse_volume_id($volid);
+
+ my $storage_type = $storage_config->{ids}->{$storage_id}->{type};
+
+ my $plugin = PVE::Storage::Replica::AbstractStorage->lookup($storage_type);
+
+ my $vol = $plugin->parse_volid($volid, $vm_type, $storage_config);
+
+ return ($vol, $plugin);
+}
+
+sub sync_job {
+ my ($vmid, $param) = @_;
+
+ my $jobs = read_state();
+
+ my ($guest_conf, $vm_type) = &$get_guestconfig($vmid);
+ my $qga = 0;
+
+ my $tnode = $jobs->{$vmid}->{tnode};
+
+ if ($vm_type eq "qemu" && defined($guest_conf->{agent}) ) {
+ $qga = PVE::QemuServer::qga_check_running($vmid)
+ if PVE::QemuServer::check_running($vmid);
+ }
+
+ #will not die if a disk is not syncable
+ my $disks = get_syncable_guestdisks($guest_conf, $vm_type);
+
+ #check if all nodes have the storage availible
+ storage_exists_on_nodes($disks, "$tnode,$local_node");
+
+ $param->{limit} = $guest_conf
+ if (defined($param->{limit}) && defined($guest_conf->{replimit}));
+
+ my $timestamp = time2timestamp(time());
+
+ my $lastsync = $jobs->{$vmid}->{lastsync};
+
+ #freeze filesystem for data consistency
+ if ($qga == 1 ) {
+ print "Freeze guest filesystem\n";
+
+ eval {
+ PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
+ };
+ }
+
+ my $snapname = "rep_$timestamp";
+
+ my $disks_status = {};
+ $disks_status->{snapname} = $snapname;
+
+
+ my $sync_job = sub {
+
+ #make snapshot of all datasets
+ foreach my $volid (@$disks) {
+ my ($vol, $plugin) = parse_volid($volid, $vm_type);
+
+ $disks_status->{$volid} = $vol;
+ $disks_status->{$volid}->{synctime} = $timestamp;
+ $disks_status->{$volid}->{lastsync} =
+ defined($lastsync) ? $lastsync : 0;
+
+ eval {
+ $plugin->snapshot_add($vol, $snapname);
+ };
+
+ if (my $err = $@) {
+ if ($qga == 1) {
+ print "Unfreeze guest filesystem\n";
+ eval {
+ PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw");
+ };
+ }
+ cleanup_snapshot($disks_status);
+ $jobs->{$vmid}->{state} = 'err';
+ write_state($jobs);
+
+ die $err;
+ }
+
+ $disks_status->{$volid}->{snapshot} = 1;
+ }
+
+ if ($qga == 1) {
+ print "Unfreeze guest filesystem\n";
+ eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
+ }
+
+ my $ip = get_node_ip($tnode, $param->{mig_network});
+
+ foreach my $volid (@$disks) {
+
+ eval { send_image($disks_status->{$volid}, $param, $ip); };
+
+ if (my $err = $@) {
+ cleanup_snapshot($disks_status, $ip);
+ $jobs->{$vmid}->{state} = 'err';
+ write_state($jobs);
+ die "$err";
+ }
+
+ $disks_status->{$volid}->{synced} = 1;
+ }
+ cleanup_snapshot($disks_status, $ip, 1) if
+ $jobs->{$vmid}->{lastsync} !~ m/^0$/;
+
+ $jobs->{$vmid}->{lastsync} = $timestamp;
+ write_state($jobs);
+ };
+
+ PVE::Tools::lock_file_full($STATE_PATH, 60, 0 , $sync_job);
+ die $@ if $@;
+
+ return $timestamp;
+}
+
+sub get_snapshots {
+ my ($vol, $prefix, $nodes) = @_;
+
+ my $plugin = $vol->{plugin};
+ return $plugin->get_snapshots($vol, $prefix, $nodes);
+}
+
+sub snapshot_add {
+ my ($vol, $snapname, $ip) = @_;
+
+ my $plugin = $vol->{plugin};
+ $plugin->snapshot_add($vol, $snapname, $ip);
+}
+
+sub snapshot_destroy {
+ my ($vol, $snap, $ip, $noerr) = @_;
+
+ my $plugin = $vol->{plugin};
+ $plugin->snapshot_destroy($vol, $snap, $ip, $noerr);
+}
+
+sub send_image {
+ my ($vol, $param, $ip, $all_snaps_in_delta, $alter_path) = @_;
+
+ my $plugin = $vol->{plugin};
+ $plugin->send_image($vol, $param, $ip, $all_snaps_in_delta, $alter_path);
+}
+
+#Timestamp must in format YYYY-MM-DD_hh:mm:ss
+sub timestamp2time {
+ my ($string) = @_;
+
+ $string =~ m/^(\d{4})-(\d{2})-(\d{2})_(\d{2}):(\d{2}):(\d{2})$/;
+ my $time = timelocal($6, $5, $4, $3, ($2-1), ($1-1900));
+
+ return $time;
+}
+
+sub time2timestamp {
+ my ($time) = @_;
+
+ my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) =
+ localtime($time);
+ my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
+ $year+1900, $mon+1, $mday, $hour, $min, $sec);
+
+ #Return in format YYYY-MM-DD_hh:mm:ss
+ return $datestamp;
+}
+
+sub job_enable {
+ my ($vmid, $no_sync) = @_;
+
+ my $update_state = sub {
+ my ($state) = @_;
+
+ my $jobs = read_state();
+
+ my $config = &$get_guestconfig($vmid);
+ my $param = {};
+
+ $jobs->{$vmid}->{interval} = $config->{repinterval} ?
+ $config->{repinterval} : 15;
+
+ if ( defined($config->{replimit})) {
+ $jobs->{$vmid}->{limit} = $config->{replimit};
+ $param->{limit} = $config->{replimit};
+ }
+
+ die "rettarget must be set\n" if !defined($config->{reptarget});
+ $jobs->{$vmid}->{tnode} = $config->{reptarget};
+
+ if (!defined($jobs->{$vmid}->{lastsync})) {
+
+ if ( my $lastsync = get_lastsync($vmid)) {
+ $jobs->{$vmid}->{lastsync} = $lastsync;
+ } else {
+ $jobs->{$vmid}->{lastsync} = 0;
+ }
+ }
+
+ $param->{verbose} = 1;
+
+ $jobs->{$vmid}->{state} = 'ok';
+ write_state($jobs);
+
+ eval{
+ sync_job($vmid, $param) if !defined($no_sync);
+ };
+ if (my $err = $@) {
+ $jobs->{$vmid}->{state} = 'error';
+ write_state($jobs);
+ die $err;
+ }
+ };
+
+ PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
+ die $@ if $@;
+}
+
+sub job_disable {
+ my ($vmid) = @_;
+
+ my $update_state = sub {
+
+ my $jobs = read_state();
+
+ if (defined($jobs->{$vmid})) {
+ $jobs->{$vmid}->{state} = 'off';
+ write_state($jobs);
+ } else {
+ print "No replica service for $vmid\n";
+ }
+ };
+
+ PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
+ die $@ if $@;
+}
+
+sub job_remove {
+ my ($vmid) = @_;
+
+ my $update_state = sub {
+
+ my $jobs = read_state();
+
+ if (defined($jobs->{$vmid})) {
+ delete($jobs->{$vmid});
+ write_state($jobs);
+ } else {
+ print "No replica service for $vmid\n";
+ }
+ };
+
+ PVE::Tools::lock_file_full($STATE_PATH, 5, 0 , $update_state);
+ die $@ if $@;
+}
+
+sub storage_exists_on_nodes {
+ my ($disks, $nodes) = @_;
+
+ my $storage_config = PVE::Storage::config();
+
+ my @nodes = PVE::Tools::split_list($nodes);
+
+ foreach my $volid (@$disks) {
+ my ($storeid) = PVE::Storage::parse_volume_id($volid);
+ next if !defined($storage_config->{ids}->{$storeid}->{nodes});
+
+ foreach my $node (@nodes) {
+ if (!$storage_config->{ids}->{$storeid}->{nodes}->{$node}) {
+ die "Storage not availible on node: $node\n";
+ }
+ }
+ }
+
+}
+
+sub get_syncable_guestdisks {
+ my ($config, $vm_type, $get_err) = @_;
+
+ my @disks;
+ foreach my $k (sort (keys %$config)) {
+ next if $k !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs)$/;
+ next if $config->{$k} =~ m/media=cdrom/;
+
+ push @disks, $config->{$k};
+ }
+
+ my $storage_config = PVE::Storage::config();
+ my $syncable_disks = [];
+
+ foreach my $volid (@disks) {
+
+ if ($volid =~ m/rep=(?i:0|no|off|false)/) {
+ warn "Disk: $volid will not include in sync\n"
+ if defined($get_err);
+ next;
+ }
+
+ my ($storeid) = PVE::Storage::parse_volume_id($volid);
+ my $store_type = $storage_config->{ids}->{$storeid}->{type};
+ if ($store_type eq 'zfspool') {
+ push @$syncable_disks, $volid;
+ } else {
+ die "Can't replicate VM because Disk: $volid does not suport replication\n"
+ if defined($get_err);
+ }
+ }
+
+ return $syncable_disks;
+}
+
+sub destroy_all_snapshots {
+ my ($vmid, $prefix, $nodes, $dryrun) = @_;
+
+ $nodes = $local_node if !defined($nodes);
+
+ my ($guest_conf, $vm_type) = &$get_guestconfig($vmid);
+
+ my $disks = get_syncable_guestdisks($guest_conf, $vm_type, 0);
+
+ my $snapshots;
+
+ foreach my $volid (@$disks) {
+ my ($vol, $plugin) = parse_volid($volid, $vm_type);
+ $snapshots->{$vol->{volid}} = get_snapshots($vol, $prefix, $nodes);
+
+ $snapshots->{$vol->{volid}}->{vol} = $vol;
+ $snapshots->{$vol->{volid}}->{plugin} = $plugin;
+ }
+
+ if (!defined($dryrun)) {
+ foreach my $volid (keys %$snapshots) {
+
+ foreach my $node (keys %{$snapshots->{$volid}}) {
+ next if $node eq 'plugin';
+ next if $node eq 'vol';
+
+ my $plugin = $snapshots->{$volid}->{plugin};
+ my $vol = $snapshots->{$volid}->{vol};
+ my $ip = $snapshots->{$volid}->{$node}->{ip} if
+ $snapshots->{$volid}->{$node}->{ip};
+
+ if (defined($prefix)) {
+ foreach my $snap (@{$snapshots->{$volid}->{$node}->{$prefix}}) {
+ $plugin->snapshot_destroy($vol, $snap, $ip, 1);
+ }
+ } else {
+ $plugin->snapshot_destroy($vol, undef, $ip, 1);
+ }
+ }
+ }
+ }
+
+ return $snapshots;
+}
+
+sub cleanup_snapshot {
+ my ($disks, $ip, $remove_old) = @_;
+
+ foreach my $volid (keys %$disks) {
+
+ next if $volid eq 'snapname';
+
+ my $vol = $disks->{$volid};
+
+ my $plugin = $vol->{plugin};
+ my $snapname;
+
+ if ($remove_old) {
+ $snapname = "rep_$vol->{lastsync}";
+ } else {
+ $snapname = $disks->{snapname};
+ }
+
+ if (defined($remove_old) || $disks->{$volid}->{synced}) {
+ $plugin->snapshot_destroy($vol, $snapname, $ip, 1);
+ }
+
+ if (defined($remove_old) || $disks->{$volid}->{snapshot}) {
+ $plugin->snapshot_destroy($vol, $snapname, undef, 1);
+ }
+ }
+}
+
+sub check_nodes {
+ my ($nodes, $check_local) = @_;
+
+ foreach my $node (PVE::Tools::split_list($nodes)) {
+ die "Local host and taget host can't be the same\n"
+ if defined($check_local) && $local_node eq $node;
+
+ die "Node: $node does not exists.\n" if
+ !PVE::Cluster::check_node_exists($node);
+ }
+}
+
+sub destroy_replica {
+ my ($vmid) = @_;
+
+ my $code = sub {
+
+ my $jobs = read_state();
+
+ return if !defined($jobs->{$vmid});
+
+ my ($guest_conf, $vm_type) = &$get_guestconfig($vmid);
+
+ destroy_all_snapshots($vmid, 'rep', $local_node);
+ destroy_all_snapshots($vmid, undef, $guest_conf->{reptarget});
+
+ delete($jobs->{$vmid});
+
+ delete($guest_conf->{replimit});
+ delete($guest_conf->{repinterval});
+ delete($guest_conf->{reptarget});
+ delete($guest_conf->{replica});
+
+ if ($vm_type eq 'qemu') {
+ PVE::QemuConfig->write_config($vmid, $guest_conf);
+ } else {
+ PVE::LXC::Config->write_config($vmid, $guest_conf);
+ }
+ write_state($jobs);
+ };
+
+ PVE::Tools::lock_file_full($STATE_PATH, 30, 0 , $code);
+ die $@ if $@;
+}
+
+sub get_rep_snap {
+ my ($volid) = @_;
+
+ my $vmtype;
+
+ if ($volid =~ m|subvol-|) {
+ $vmtype = 'lxc';
+ } else {
+ $vmtype = 'qemu';
+ }
+
+ my ($vol, $plugin) = parse_volid($volid, $vmtype);
+ my $snap = get_snapshots($vol, 'rep', $local_node);
+
+ return @{$snap->{$local_node}->{rep}}[0];
+}
+
+sub get_lastsync {
+ my ($vmid) = @_;
+
+ my ($conf, $vm_type) = &$get_guestconfig($vmid);
+
+ my $sync_vol = get_syncable_guestdisks($conf, $vm_type);
+
+ my $snap;
+ foreach my $volid (@$sync_vol) {
+ my $tmp_snap = get_rep_snap($volid);
+
+ if (defined($tmp_snap)) {
+ $tmp_snap =~ m/^rep_(.*)$/;
+ die "snapshots not coherent"
+ if defined($snap) && !($snap eq $1);
+ $snap = $1;
+ }
+ }
+
+ return $snap;
+}
+1;
diff --git a/PVE/Storage/Makefile b/PVE/Storage/Makefile
index b924f21..bfcfc18 100644
--- a/PVE/Storage/Makefile
+++ b/PVE/Storage/Makefile
@@ -4,3 +4,4 @@ SOURCES=Plugin.pm DirPlugin.pm LVMPlugin.pm NFSPlugin.pm ISCSIPlugin.pm RBDPlugi
install:
for i in ${SOURCES}; do install -D -m 0644 $$i ${DESTDIR}${PERLDIR}/PVE/Storage/$$i; done
make -C LunCmd install
+ make -C Replica install
diff --git a/PVE/Storage/Replica/AbstractStorage.pm b/PVE/Storage/Replica/AbstractStorage.pm
new file mode 100644
index 0000000..7a39f8b
--- /dev/null
+++ b/PVE/Storage/Replica/AbstractStorage.pm
@@ -0,0 +1,51 @@
+package PVE::Storage::Replica::AbstractStorage;
+
+use strict;
+use warnings;
+
+use PVE::Storage::Replica::ZFSStorage;
+
+sub lookup {
+ my ($class, $type) = @_;
+
+ my $plugin;
+ if ( $type eq 'zfspool') {
+ $plugin = 'PVE::Storage::Replica::ZFSStorage';
+ } else {
+ die "unknown section type '$type'\n";
+ }
+
+ return $plugin;
+}
+
+sub parse_volid {
+ my ($self, $volid, $vm_type, $storage_config) = @_;
+
+ die "implement me";
+}
+
+sub get_snapshots {
+ my ($self, $vol, $prefix, $nodes) = @_;
+
+ die "implement me";
+}
+
+sub snapshot_add {
+ my ($self, $vol, $snapname) = @_;
+
+ die "implement me";
+}
+
+sub snapshot_destroy {
+ my ($self, $vol, $snapname, $ip, $noerr) = @_;
+
+ die "implement me";
+}
+
+sub send_image {
+ my ($self, $dataset, $param, $ip) = @_;
+
+ die "implement me";
+}
+
+1;
diff --git a/PVE/Storage/Replica/Makefile b/PVE/Storage/Replica/Makefile
new file mode 100644
index 0000000..fbd52b8
--- /dev/null
+++ b/PVE/Storage/Replica/Makefile
@@ -0,0 +1,5 @@
+SOURCES=AbstractStorage.pm ZFSStorage.pm
+
+.PHONY: install
+install:
+ for i in ${SOURCES}; do install -D -m 0644 $$i ${DESTDIR}${PERLDIR}/PVE/Storage/Replica/$$i; done
diff --git a/PVE/Storage/Replica/ZFSStorage.pm b/PVE/Storage/Replica/ZFSStorage.pm
new file mode 100644
index 0000000..3784b96
--- /dev/null
+++ b/PVE/Storage/Replica/ZFSStorage.pm
@@ -0,0 +1,199 @@
+package PVE::Storage::Replica::ZFSStorage;
+
+use strict;
+use warnings;
+use PVE::Tools qw(run_command);
+use PVE::ReplicaTools;
+use PVE::Storage;
+
+use Data::Dumper qw(Dumper);
+
+use base qw(PVE::Storage::Replica::AbstractStorage);
+
+my $recover_inc_send = sub {
+ my ($dataset, $param, $ip, $err) = @_;
+
+ return undef;
+};
+
+sub parse_volid {
+ my ($self, $volid, $vm_type, $storage_config) = @_;
+
+ my $vol = {
+ 'storagetype' => 'zfspool',
+ 'plugin' => $self,
+ 'volid' => $volid
+ };
+
+ #remove config parameters if exists
+ $volid =~ s|,.*||;
+
+ my $dataset = (PVE::Storage::parse_volume_id($volid))[1];
+ my $zfspath = PVE::Storage::path($storage_config, $volid);
+
+ my $pool;
+ my $subpath;
+
+ if ($vm_type eq 'qemu'&&
+ $zfspath =~ m/^\/dev\/zvol\/(\w+.*)(\/$dataset)$/) {
+
+ my @array = split('/', $1);
+ $pool = shift(@array);
+ if (0 < @array) {
+ $subpath = join('/', @array);
+ }
+
+ } elsif ($vm_type eq 'lxc' &&
+ $zfspath =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$dataset)$/) {
+
+ $pool = $1;
+
+ if ($2) {
+ $subpath = $3;
+ }
+
+ } else {
+ die "ERROR: in path\n";
+ }
+
+ my $zpath = $vol->{pool} = "$pool/";
+ $zpath .= $vol->{subpath} = "$subpath/" if defined($subpath);
+ $zpath .= $vol->{dataset} = "$dataset";
+ $vol->{zfspath} = $zpath;
+
+ return $vol;
+}
+
+sub get_snapshots {
+ my ($self, $vol, $prefix, $nodes) = @_;
+
+ my $zfspath = $vol->{zfspath};
+ my $volid = $vol->{volid};
+ my $local_node = PVE::INotify::nodename();
+
+ $prefix = '' if !defined($prefix);
+ my $snaps = {};
+
+ my @nodes = PVE::Tools::split_list($nodes);
+
+ foreach my $node ( @nodes ) {
+ my $cmd = ['zfs', 'list', '-r', '-H', '-S',
+ 'name', '-t', 'snap', '-o', 'name', $zfspath];
+
+ if (!($node eq $local_node)) {
+ my $node_ip = PVE::ReplicaTools::get_node_ip($node);
+ $snaps->{$node}->{ip} = $node_ip;
+
+ unshift @$cmd, 'ssh', '-o', ' BatchMode=yes'
+ , "root\@".$node_ip, '--';
+ }
+
+ my @snaps;
+
+ my $outfunc = sub {
+ my $line = shift;
+
+ if ($line =~ m/^\Q$zfspath\E@(\Q$prefix\E.*)$/) {
+ push @snaps, $1;
+ }
+ };
+
+ eval { run_command( [$cmd], outfunc => $outfunc , errfunc => sub{}); };
+
+ $snaps->{$node}->{$prefix} = \@snaps if !($@);
+
+ }
+
+ #retun an empty list if dataset does not exist.
+ return $snaps;
+}
+
+sub snapshot_add {
+ my ($self, $vol, $snapname) = @_;
+
+ my $zfspath = $vol->{zfspath};
+
+ my $cmd = [];
+ push @$cmd, 'zfs', 'snapshot', "$zfspath\@$snapname";
+
+ eval{ run_command($cmd); };
+
+ if (my $err = $@) {
+ $self->snapshot_destroy($vol, $snapname, undef, 1);
+ die "$err\n";
+ }
+}
+
+sub snapshot_destroy {
+ my ($self, $vol, $snapname, $ip, $noerr) = @_;
+
+ my $zfspath = $vol->{zfspath};
+
+ my @zfscmd;
+ if (defined $snapname){
+ push @zfscmd, 'zfs', 'destroy', "${zfspath}\@$snapname";
+ } else {
+ push @zfscmd, 'zfs', 'destroy', '-R', ${zfspath};
+ }
+
+ unshift @zfscmd,'ssh', '-o', 'BatchMode=yes', "root\@${ip}", '--'
+ if defined($ip);
+
+ eval { run_command([@zfscmd]); };
+
+ if (my $erro = $@ && !defined($noerr)) {
+ die "$erro\n";
+ }
+}
+
+sub send_image {
+ my ($self, $dataset, $param, $ip) = @_;
+
+ my $cmdsend = [];
+ my $cmdlimit = [];
+
+ push @$cmdsend, 'zfs', 'send', '-R';
+ push @$cmdsend, '-v' if defined($param->{verbose});
+
+ my $zfspath = $dataset->{zfspath};
+ my $lastsync = $dataset->{lastsync};
+ my $prefix = 'rep';
+
+ #untaint lastsysnc
+ $lastsync =~ m/(\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})/;
+ $lastsync = $1;
+
+ #undefined or 0 no snapshot exists
+ if( defined($lastsync) && !($lastsync eq "0") ) {
+ push @$cmdsend, '-I', "$zfspath\@${prefix}_${lastsync}";
+ }
+
+ push @$cmdsend, '--', "$zfspath\@${prefix}_$dataset->{synctime}";
+
+ if ($param->{limit}){
+ my $bwl = $param->{limit}*1024;
+ push @$cmdlimit, 'cstream', '-t', $bwl;
+ }
+
+ my $cmdrecv = [];
+
+ push @$cmdrecv, 'ssh', '-o', 'BatchMode=yes', "root\@${ip}", '--' if $ip;
+ push @$cmdrecv, 'zfs', 'recv', '-F', '--';
+ push @$cmdrecv, $zfspath ;
+
+
+ if ($param->{limit}) {
+ eval { run_command([$cmdsend, $cmdlimit, $cmdrecv]) };
+ } else {
+ my $cds = join(' ', @$cmdsend)." | ". join(' ', @$cmdrecv);
+
+ eval { run_command([$cmdsend, $cmdrecv]) };
+ }
+
+ if (my $err = $@) {
+ my $recovered = &$recover_inc_send($dataset, $param, $ip, $err);
+ die $err if !defined($recovered);
+ }
+}
+
+1;
diff --git a/pverepm b/pverepm
new file mode 100644
index 0000000..5f20fdf
--- /dev/null
+++ b/pverepm
@@ -0,0 +1,8 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use PVE::CLI::pverepm;
+
+PVE::CLI::pverepm->run_cli_handler();
--
2.1.4
More information about the pve-devel
mailing list