[pve-devel] [PATCH manager 4/7] add PVE/Jobs to handle VZDump jobs

Dominik Csapak d.csapak at proxmox.com
Thu Oct 7 10:27:24 CEST 2021


this adds a SectionConfig handling for jobs (only 'vzdump' for now) that
represents a job that will be handled by pvescheduler and a basic
'job-state' handling for reading/writing state json files

this has some intersections with pvesrs state handling, but does not
use a single state file for all jobs, but seperate ones, like we
do it in the backup-server.

Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
 PVE/Jobs.pm        | 210 +++++++++++++++++++++++++++++++++++++++++++++
 PVE/Jobs/Makefile  |  16 ++++
 PVE/Jobs/Plugin.pm |  61 +++++++++++++
 PVE/Jobs/VZDump.pm |  54 ++++++++++++
 PVE/Makefile       |   3 +-
 5 files changed, 343 insertions(+), 1 deletion(-)
 create mode 100644 PVE/Jobs.pm
 create mode 100644 PVE/Jobs/Makefile
 create mode 100644 PVE/Jobs/Plugin.pm
 create mode 100644 PVE/Jobs/VZDump.pm

diff --git a/PVE/Jobs.pm b/PVE/Jobs.pm
new file mode 100644
index 00000000..f17676bb
--- /dev/null
+++ b/PVE/Jobs.pm
@@ -0,0 +1,210 @@
+package PVE::Jobs;
+
+use strict;
+use warnings;
+use JSON;
+
+use PVE::Cluster qw(cfs_read_file);
+use PVE::Jobs::Plugin;
+use PVE::Jobs::VZDump;
+use PVE::Tools;
+
+PVE::Jobs::VZDump->register();
+PVE::Jobs::Plugin->init();
+
+my $state_dir = "/var/lib/pve-manager/jobs";
+my $lock_dir = "/var/lock/pve-manager";
+
+my $get_state_file = sub {
+    my ($jobid, $type) = @_;
+    return "$state_dir/$type-$jobid.json";
+};
+
+my $default_state = {
+    state => 'created',
+    time => 0,
+};
+
+# lockless, since we use file_set_contents, which is atomic
+sub read_job_state {
+    my ($jobid, $type) = @_;
+    my $path = $get_state_file->($jobid, $type);
+    return $default_state if ! -e $path;
+
+    my $raw = PVE::Tools::file_get_contents($path);
+
+    return $default_state if $raw eq '';
+
+    # untaint $raw
+    if ($raw =~ m/^(\{.*\})$/) {
+	return decode_json($1);
+    }
+
+    die "invalid json data in '$path'\n";
+}
+
+sub lock_job_state {
+    my ($jobid, $type, $sub) = @_;
+
+    my $filename = "$lock_dir/$type-$jobid.lck";
+
+    my $res = PVE::Tools::lock_file($filename, 10, $sub);
+    die $@ if $@;
+
+    return $res;
+}
+
+# returns the state and checks if the job was 'started' and is now finished
+sub get_job_state {
+    my ($jobid, $type) = @_;
+
+    # first check unlocked to save time,
+    my $state = read_job_state($jobid, $type);
+    if ($state->{state} ne 'started') {
+	return $state; # nothing to check
+    }
+
+    return lock_job_state($jobid, $type, sub {
+	my $state = read_job_state($jobid, $type);
+
+	if ($state->{state} ne 'started') {
+	    return $state; # nothing to check
+	}
+
+	my ($task, $filename) = PVE::Tools::upid_decode($state->{upid}, 1);
+	die "unable to parse worker upid\n" if !$task;
+	die "no such task\n" if ! -f $filename;
+
+	my $pstart = PVE::ProcFSTools::read_proc_starttime($task->{pid});
+	if ($pstart && $pstart == $task->{pstart}) {
+	    return $state; # still running
+	}
+
+	my $new_state = {
+	    state => 'stopped',
+	    msg => PVE::Tools::upid_read_status($state->{upid}),
+	    upid => $state->{upid}
+	};
+
+	my $path = $get_state_file->($jobid, $type);
+	PVE::Tools::file_set_contents($path, encode_json($new_state));
+	return $new_state;
+    });
+}
+
+# must be called when the job is first created
+sub create_job {
+    my ($jobid, $type) = @_;
+
+    lock_job_state($jobid, $type, sub {
+	my $state = read_job_state($jobid, $type);
+
+	if ($state->{state} ne 'created') {
+	    die "job state already exists\n";
+	}
+
+	$state->{time} = time();
+
+	my $path = $get_state_file->($jobid, $type);
+	PVE::Tools::file_set_contents($path, encode_json($state));
+    });
+}
+
+# to be called when the job is removed
+sub remove_job {
+    my ($jobid, $type) = @_;
+    my $path = $get_state_file->($jobid, $type);
+    unlink $path;
+}
+
+# will be called when the job was started according to schedule
+sub started_job {
+    my ($jobid, $type, $upid) = @_;
+    lock_job_state($jobid, $type, sub {
+	my $state = {
+	    state => 'started',
+	    upid => $upid,
+	};
+
+	my $path = $get_state_file->($jobid, $type);
+	PVE::Tools::file_set_contents($path, encode_json($state));
+    });
+}
+
+# will be called when the job schedule is updated
+sub updated_job_schedule {
+    my ($jobid, $type) = @_;
+    lock_job_state($jobid, $type, sub {
+	my $old_state = read_job_state($jobid, $type);
+
+	if ($old_state->{state} eq 'started') {
+	    return; # do not update timestamp on running jobs
+	}
+
+	$old_state->{updated} = time();
+
+	my $path = $get_state_file->($jobid, $type);
+	PVE::Tools::file_set_contents($path, encode_json($old_state));
+    });
+}
+
+sub get_last_runtime {
+    my ($jobid, $type) = @_;
+
+    my $state = read_job_state($jobid, $type);
+
+    if (defined($state->{updated})) {
+	return $state->{updated};
+    }
+
+    if (my $upid = $state->{upid}) {
+	my ($task) = PVE::Tools::upid_decode($upid, 1);
+	die "unable to parse worker upid\n" if !$task;
+	return $task->{starttime};
+    }
+
+    return $state->{time} // 0;
+}
+
+sub run_jobs {
+    my $jobs_cfg = cfs_read_file('jobs.cfg');
+    my $nodename = PVE::INotify::nodename();
+
+    foreach my $id (sort keys %{$jobs_cfg->{ids}}) {
+	my $cfg = $jobs_cfg->{ids}->{$id};
+	my $type = $cfg->{type};
+	my $schedule = delete $cfg->{schedule};
+
+	# only schedule local jobs
+	next if defined($cfg->{node}) && $cfg->{node} eq $nodename;
+
+	# only schedule enabled jobs
+	next if defined($cfg->{enabled}) && !$cfg->{enabled};
+
+	my $last_run = get_last_runtime($id, $type);
+	my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
+	my $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $last_run) // 0;
+
+	if (time() >= $next_sync) {
+	    # only warn on errors, so that all jobs can run
+	    my $state = get_job_state($id, $type); # to update the state
+
+	    next if $state->{state} eq 'started'; # still running
+
+	    my $plugin = PVE::Jobs::Plugin->lookup($type);
+
+	    my $upid = eval { $plugin->run($cfg) };
+	    warn $@ if $@;
+	    if ($upid) {
+		started_job($id, $type, $upid);
+	    }
+	}
+    }
+}
+
+sub setup_dirs {
+    mkdir $state_dir;
+    mkdir $lock_dir;
+}
+
+1;
diff --git a/PVE/Jobs/Makefile b/PVE/Jobs/Makefile
new file mode 100644
index 00000000..6023c3ba
--- /dev/null
+++ b/PVE/Jobs/Makefile
@@ -0,0 +1,16 @@
+include ../../defines.mk
+
+PERLSOURCE =   \
+	Plugin.pm\
+	VZDump.pm
+
+all:
+
+.PHONY: clean
+clean:
+	rm -rf *~
+
+.PHONY: install
+install: ${PERLSOURCE}
+	install -d ${PERLLIBDIR}/PVE/Jobs
+	install -m 0644 ${PERLSOURCE} ${PERLLIBDIR}/PVE/Jobs
diff --git a/PVE/Jobs/Plugin.pm b/PVE/Jobs/Plugin.pm
new file mode 100644
index 00000000..69c31cf2
--- /dev/null
+++ b/PVE/Jobs/Plugin.pm
@@ -0,0 +1,61 @@
+package PVE::Jobs::Plugin;
+
+use strict;
+use warnings;
+
+use PVE::Cluster qw(cfs_register_file);
+
+use base qw(PVE::SectionConfig);
+
+cfs_register_file('jobs.cfg',
+		  sub { __PACKAGE__->parse_config(@_); },
+		  sub { __PACKAGE__->write_config(@_); });
+
+my $defaultData = {
+    propertyList => {
+	type => { description => "Section type." },
+	id => {
+	    description => "The ID of the VZDump job.",
+	    type => 'string',
+	    format => 'pve-configid',
+	},
+	enabled => {
+	    description => "Determines if the job is enabled.",
+	    type => 'boolean',
+	    default => 1,
+	    optional => 1,
+	},
+	schedule => {
+	    description => "Backup schedule. The format is a subset of `systemd` calendar events.",
+	    type => 'string', format => 'pve-calendar-event',
+	    maxLength => 128,
+	},
+    },
+};
+
+sub private {
+    return $defaultData;
+}
+
+sub parse_config {
+    my ($class, $filename, $raw) = @_;
+
+    my $cfg = $class->SUPER::parse_config($filename, $raw);
+
+    foreach my $id (sort keys %{$cfg->{ids}}) {
+	my $data = $cfg->{ids}->{$id};
+
+	$data->{id} = $id;
+	$data->{enabled}  //= 1;
+   }
+
+   return $cfg;
+}
+
+sub run {
+    my ($class, $cfg) = @_;
+    # implement in subclass
+    die "not implemented";
+}
+
+1;
diff --git a/PVE/Jobs/VZDump.pm b/PVE/Jobs/VZDump.pm
new file mode 100644
index 00000000..043b7ace
--- /dev/null
+++ b/PVE/Jobs/VZDump.pm
@@ -0,0 +1,54 @@
+package PVE::Jobs::VZDump;
+
+use strict;
+use warnings;
+
+use PVE::INotify;
+use PVE::VZDump::Common;
+use PVE::API2::VZDump;
+use PVE::Cluster;
+
+use base qw(PVE::Jobs::Plugin);
+
+sub type {
+    return 'vzdump';
+}
+
+my $props = PVE::VZDump::Common::json_config_properties();
+
+sub properties {
+    return $props;
+}
+
+sub options {
+    my $options = {
+	enabled => { optional => 1 },
+	schedule => {},
+    };
+    foreach my $opt (keys %$props) {
+	if ($props->{$opt}->{optional}) {
+	    $options->{$opt} = { optional => 1 };
+	} else {
+	    $options->{$opt} = {};
+	}
+    }
+
+    return $options;
+}
+
+sub run {
+    my ($class, $conf) = @_;
+
+    # remove all non vzdump related options
+    foreach my $opt (keys %$conf) {
+	delete $conf->{$opt} if !defined($props->{$opt});
+    }
+
+    $conf->{quiet} = 1; # do not write to stdout/stderr
+
+    PVE::Cluster::cfs_update(); # refresh vmlist
+
+    return PVE::API2::VZDump->vzdump($conf);
+}
+
+1;
diff --git a/PVE/Makefile b/PVE/Makefile
index 0071fab1..48b85d33 100644
--- a/PVE/Makefile
+++ b/PVE/Makefile
@@ -1,6 +1,6 @@
 include ../defines.mk
 
-SUBDIRS=API2 Status CLI Service Ceph
+SUBDIRS=API2 Status CLI Service Ceph Jobs
 
 PERLSOURCE = 			\
 	API2.pm			\
@@ -11,6 +11,7 @@ PERLSOURCE = 			\
 	CertHelpers.pm		\
 	ExtMetric.pm		\
 	HTTPServer.pm		\
+	Jobs.pm			\
 	NodeConfig.pm		\
 	Report.pm		\
 	VZDump.pm
-- 
2.30.2





More information about the pve-devel mailing list