[pve-devel] [PATCH manager v2 2/6] add PVE/Jobs to handle VZDump jobs
Dominik Csapak
d.csapak at proxmox.com
Mon Nov 8 14:07:54 CET 2021
this adds a SectionConfig handling for jobs (only 'vzdump' for now) that
represents a job that will be handled by pvescheduler and a basic
'job-state' handling for reading/writing state json files
this has some intersections with pvesrs state handling, but does not
use a single state file for all jobs, but seperate ones, like we
do it in the backup-server.
Signed-off-by: Dominik Csapak <d.csapak at proxmox.com>
---
PVE/Jobs.pm | 286 +++++++++++++++++++++++++++++++++++++++++++++
PVE/Jobs/Makefile | 16 +++
PVE/Jobs/Plugin.pm | 61 ++++++++++
PVE/Jobs/VZDump.pm | 54 +++++++++
PVE/Makefile | 3 +-
5 files changed, 419 insertions(+), 1 deletion(-)
create mode 100644 PVE/Jobs.pm
create mode 100644 PVE/Jobs/Makefile
create mode 100644 PVE/Jobs/Plugin.pm
create mode 100644 PVE/Jobs/VZDump.pm
diff --git a/PVE/Jobs.pm b/PVE/Jobs.pm
new file mode 100644
index 00000000..2fe197d2
--- /dev/null
+++ b/PVE/Jobs.pm
@@ -0,0 +1,286 @@
+package PVE::Jobs;
+
+use strict;
+use warnings;
+use JSON;
+
+use PVE::Cluster qw(cfs_read_file cfs_lock_file);
+use PVE::Jobs::Plugin;
+use PVE::Jobs::VZDump;
+use PVE::Tools;
+
+PVE::Jobs::VZDump->register();
+PVE::Jobs::Plugin->init();
+
+my $state_dir = "/var/lib/pve-manager/jobs";
+my $lock_dir = "/var/lock/pve-manager";
+
+my $get_state_file = sub {
+ my ($jobid, $type) = @_;
+ return "$state_dir/$type-$jobid.json";
+};
+
+my $default_state = {
+ state => 'created',
+ time => 0,
+};
+
+# lockless, since we use file_get_contents, which is atomic
+sub read_job_state {
+ my ($jobid, $type) = @_;
+ my $path = $get_state_file->($jobid, $type);
+ return if ! -e $path;
+
+ my $raw = PVE::Tools::file_get_contents($path);
+
+ return $default_state if $raw eq '';
+
+ # untaint $raw
+ if ($raw =~ m/^(\{.*\})$/) {
+ return decode_json($1);
+ }
+
+ die "invalid json data in '$path'\n";
+}
+
+sub lock_job_state {
+ my ($jobid, $type, $sub) = @_;
+
+ my $filename = "$lock_dir/$type-$jobid.lck";
+
+ my $res = PVE::Tools::lock_file($filename, 10, $sub);
+ die $@ if $@;
+
+ return $res;
+}
+
+my $get_job_status = sub {
+ my ($state) = @_;
+
+ if (!defined($state->{upid})) {
+ return; # not started
+ }
+
+ my ($task, $filename) = PVE::Tools::upid_decode($state->{upid}, 1);
+ die "unable to parse worker upid - $state->{upid}\n" if !$task;
+ die "no such task\n" if ! -f $filename;
+
+ my $pstart = PVE::ProcFSTools::read_proc_starttime($task->{pid});
+ if ($pstart && $pstart == $task->{pstart}) {
+ return; # still running
+ }
+
+ return PVE::Tools::upid_read_status($state->{upid});
+};
+
+# checks if the job is already finished if it was started before and
+# updates the statefile accordingly
+sub update_job_stopped {
+ my ($jobid, $type) = @_;
+
+ # first check unlocked to save time,
+ my $state = read_job_state($jobid, $type);
+ return if !defined($state) || $state->{state} ne 'started'; # removed or not started
+
+ if (defined($get_job_status->($state))) {
+ lock_job_state($jobid, $type, sub {
+ my $state = read_job_state($jobid, $type);
+ return if !defined($state) || $state->{state} ne 'started'; # removed or not started
+
+ my $status = $get_job_status->($state);
+
+ my $new_state = {
+ state => 'stopped',
+ msg => $status,
+ upid => $state->{upid}
+ };
+
+ if ($state->{updated}) { # save updated time stamp
+ $new_state->{updated} = $state->{updated};
+ }
+
+ my $path = $get_state_file->($jobid, $type);
+ PVE::Tools::file_set_contents($path, encode_json($new_state));
+ });
+ }
+}
+
+# must be called when the job is first created
+sub create_job {
+ my ($jobid, $type) = @_;
+
+ lock_job_state($jobid, $type, sub {
+ my $state = read_job_state($jobid, $type) // $default_state;
+
+ if ($state->{state} ne 'created') {
+ die "job state already exists\n";
+ }
+
+ $state->{time} = time();
+
+ my $path = $get_state_file->($jobid, $type);
+ PVE::Tools::file_set_contents($path, encode_json($state));
+ });
+}
+
+# to be called when the job is removed
+sub remove_job {
+ my ($jobid, $type) = @_;
+ my $path = $get_state_file->($jobid, $type);
+ unlink $path;
+}
+
+# checks if the job can be started and sets the state to 'starting'
+# returns 1 if the job can be started, 0 otherwise
+sub starting_job {
+ my ($jobid, $type) = @_;
+
+ # first check unlocked to save time
+ my $state = read_job_state($jobid, $type);
+ return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
+
+ lock_job_state($jobid, $type, sub {
+ my $state = read_job_state($jobid, $type);
+ return 0 if !defined($state) || $state->{state} eq 'started'; # removed or already started
+
+ my $new_state = {
+ state => 'starting',
+ time => time(),
+ };
+
+ my $path = $get_state_file->($jobid, $type);
+ PVE::Tools::file_set_contents($path, encode_json($new_state));
+ });
+ return 1;
+}
+
+sub started_job {
+ my ($jobid, $type, $upid, $err) = @_;
+ lock_job_state($jobid, $type, sub {
+ my $state = read_job_state($jobid, $type);
+ return if !defined($state); # job was removed, do not update
+ die "unexpected state '$state->{state}'\n" if $state->{state} ne 'starting';
+
+ my $new_state;
+ if (defined($err)) {
+ $new_state = {
+ state => 'stopped',
+ msg => $err,
+ time => time(),
+ };
+ } else {
+ $new_state = {
+ state => 'started',
+ upid => $upid,
+ };
+ }
+
+ my $path = $get_state_file->($jobid, $type);
+ PVE::Tools::file_set_contents($path, encode_json($new_state));
+ });
+}
+
+# will be called when the job schedule is updated
+sub updated_job_schedule {
+ my ($jobid, $type) = @_;
+ lock_job_state($jobid, $type, sub {
+ my $old_state = read_job_state($jobid, $type) // $default_state;
+
+ $old_state->{updated} = time();
+
+ my $path = $get_state_file->($jobid, $type);
+ PVE::Tools::file_set_contents($path, encode_json($old_state));
+ });
+}
+
+sub get_last_runtime {
+ my ($jobid, $type) = @_;
+
+ my $state = read_job_state($jobid, $type) // $default_state;
+
+ return $state->{updated} if defined($state->{updated});
+
+ if (my $upid = $state->{upid}) {
+ my ($task) = PVE::Tools::upid_decode($upid, 1);
+ die "unable to parse worker upid\n" if !$task;
+ return $task->{starttime};
+ }
+
+ return $state->{time} // 0;
+}
+
+sub run_jobs {
+ synchronize_job_states_with_config();
+
+ my $jobs_cfg = cfs_read_file('jobs.cfg');
+ my $nodename = PVE::INotify::nodename();
+
+ foreach my $id (sort keys %{$jobs_cfg->{ids}}) {
+ my $cfg = $jobs_cfg->{ids}->{$id};
+ my $type = $cfg->{type};
+ my $schedule = delete $cfg->{schedule};
+
+ # only schedule local jobs
+ next if defined($cfg->{node}) && $cfg->{node} ne $nodename;
+
+ eval {
+ update_job_stopped($id, $type);
+ };
+ if (my $err = $@) {
+ warn "could not update job state, skipping - $err\n";
+ next;
+ }
+
+ # only schedule enabled jobs
+ next if defined($cfg->{enabled}) && !$cfg->{enabled};
+
+ my $last_run = get_last_runtime($id, $type);
+ my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
+ my $next_sync = PVE::CalendarEvent::compute_next_event($calspec, $last_run) // 0;
+
+ if (time() >= $next_sync) {
+ my $plugin = PVE::Jobs::Plugin->lookup($type);
+ if (starting_job($id, $type)) {
+ my $upid = eval { $plugin->run($cfg) };
+ if (my $err = $@) {
+ warn $@ if $@;
+ started_job($id, $type, undef, $err);
+ } elsif ($upid eq 'OK') { # some jobs return OK immediatly
+ started_job($id, $type, undef, 'OK');
+ } else {
+ started_job($id, $type, $upid);
+ }
+ }
+ }
+ }
+}
+
+# creates and removes statefiles for job configs
+sub synchronize_job_states_with_config {
+ cfs_lock_file('jobs.cfg', undef, sub {
+ my $data = cfs_read_file('jobs.cfg');
+
+ for my $id (keys $data->{ids}->%*) {
+ my $job = $data->{ids}->{$id};
+ my $type = $job->{type};
+ my $jobstate = read_job_state($id, $type);
+ create_job($id, $type) if !defined($jobstate);
+ }
+
+ PVE::Tools::dir_glob_foreach($state_dir, '(.*?)-(.*).json', sub {
+ my ($path, $type, $id) = @_;
+
+ if (!defined($data->{ids}->{$id})) {
+ remove_job($id, $type);
+ }
+ });
+ });
+ die $@ if $@;
+}
+
+sub setup_dirs {
+ mkdir $state_dir;
+ mkdir $lock_dir;
+}
+
+1;
diff --git a/PVE/Jobs/Makefile b/PVE/Jobs/Makefile
new file mode 100644
index 00000000..6023c3ba
--- /dev/null
+++ b/PVE/Jobs/Makefile
@@ -0,0 +1,16 @@
+include ../../defines.mk
+
+PERLSOURCE = \
+ Plugin.pm\
+ VZDump.pm
+
+all:
+
+.PHONY: clean
+clean:
+ rm -rf *~
+
+.PHONY: install
+install: ${PERLSOURCE}
+ install -d ${PERLLIBDIR}/PVE/Jobs
+ install -m 0644 ${PERLSOURCE} ${PERLLIBDIR}/PVE/Jobs
diff --git a/PVE/Jobs/Plugin.pm b/PVE/Jobs/Plugin.pm
new file mode 100644
index 00000000..69c31cf2
--- /dev/null
+++ b/PVE/Jobs/Plugin.pm
@@ -0,0 +1,61 @@
+package PVE::Jobs::Plugin;
+
+use strict;
+use warnings;
+
+use PVE::Cluster qw(cfs_register_file);
+
+use base qw(PVE::SectionConfig);
+
+cfs_register_file('jobs.cfg',
+ sub { __PACKAGE__->parse_config(@_); },
+ sub { __PACKAGE__->write_config(@_); });
+
+my $defaultData = {
+ propertyList => {
+ type => { description => "Section type." },
+ id => {
+ description => "The ID of the VZDump job.",
+ type => 'string',
+ format => 'pve-configid',
+ },
+ enabled => {
+ description => "Determines if the job is enabled.",
+ type => 'boolean',
+ default => 1,
+ optional => 1,
+ },
+ schedule => {
+ description => "Backup schedule. The format is a subset of `systemd` calendar events.",
+ type => 'string', format => 'pve-calendar-event',
+ maxLength => 128,
+ },
+ },
+};
+
+sub private {
+ return $defaultData;
+}
+
+sub parse_config {
+ my ($class, $filename, $raw) = @_;
+
+ my $cfg = $class->SUPER::parse_config($filename, $raw);
+
+ foreach my $id (sort keys %{$cfg->{ids}}) {
+ my $data = $cfg->{ids}->{$id};
+
+ $data->{id} = $id;
+ $data->{enabled} //= 1;
+ }
+
+ return $cfg;
+}
+
+sub run {
+ my ($class, $cfg) = @_;
+ # implement in subclass
+ die "not implemented";
+}
+
+1;
diff --git a/PVE/Jobs/VZDump.pm b/PVE/Jobs/VZDump.pm
new file mode 100644
index 00000000..043b7ace
--- /dev/null
+++ b/PVE/Jobs/VZDump.pm
@@ -0,0 +1,54 @@
+package PVE::Jobs::VZDump;
+
+use strict;
+use warnings;
+
+use PVE::INotify;
+use PVE::VZDump::Common;
+use PVE::API2::VZDump;
+use PVE::Cluster;
+
+use base qw(PVE::Jobs::Plugin);
+
+sub type {
+ return 'vzdump';
+}
+
+my $props = PVE::VZDump::Common::json_config_properties();
+
+sub properties {
+ return $props;
+}
+
+sub options {
+ my $options = {
+ enabled => { optional => 1 },
+ schedule => {},
+ };
+ foreach my $opt (keys %$props) {
+ if ($props->{$opt}->{optional}) {
+ $options->{$opt} = { optional => 1 };
+ } else {
+ $options->{$opt} = {};
+ }
+ }
+
+ return $options;
+}
+
+sub run {
+ my ($class, $conf) = @_;
+
+ # remove all non vzdump related options
+ foreach my $opt (keys %$conf) {
+ delete $conf->{$opt} if !defined($props->{$opt});
+ }
+
+ $conf->{quiet} = 1; # do not write to stdout/stderr
+
+ PVE::Cluster::cfs_update(); # refresh vmlist
+
+ return PVE::API2::VZDump->vzdump($conf);
+}
+
+1;
diff --git a/PVE/Makefile b/PVE/Makefile
index 0071fab1..48b85d33 100644
--- a/PVE/Makefile
+++ b/PVE/Makefile
@@ -1,6 +1,6 @@
include ../defines.mk
-SUBDIRS=API2 Status CLI Service Ceph
+SUBDIRS=API2 Status CLI Service Ceph Jobs
PERLSOURCE = \
API2.pm \
@@ -11,6 +11,7 @@ PERLSOURCE = \
CertHelpers.pm \
ExtMetric.pm \
HTTPServer.pm \
+ Jobs.pm \
NodeConfig.pm \
Report.pm \
VZDump.pm
--
2.30.2
More information about the pve-devel
mailing list