[pve-devel] applied: [PATCH manager] ext. metric: move to a transaction model

Thomas Lamprecht t.lamprecht at proxmox.com
Mon Nov 18 19:14:12 CET 2019


Signed-off-by: Thomas Lamprecht <t.lamprecht at proxmox.com>
---

the performance is really a lot better, the memory usage stayed about the
same... AFAICT, the Graphite plug is about an order of magnitude worse than the
InfluxDB one, but I have yet no idea what the cause is, I tried a lot of doing
stuff differently here and there. Anyway, the performance increase for
conection-oriented protocols like TCP is really good, as I had this quite
a bit around I straight applied it.

 PVE/ExtMetric.pm        | 81 ++++++++++++++++++++++++++++++++++++++---
 PVE/Service/pvestatd.pm | 20 +++++++---
 PVE/Status/Graphite.pm  | 51 +++++++++++---------------
 PVE/Status/InfluxDB.pm  | 34 +++++------------
 PVE/Status/Plugin.pm    | 26 ++++++++-----
 5 files changed, 139 insertions(+), 73 deletions(-)

diff --git a/PVE/ExtMetric.pm b/PVE/ExtMetric.pm
index 342dc281..c5559073 100644
--- a/PVE/ExtMetric.pm
+++ b/PVE/ExtMetric.pm
@@ -14,22 +14,93 @@ PVE::Status::Plugin->init();
 sub foreach_plug($&) {
     my ($status_cfg, $code) = @_;
 
-    for my $plugin_config (values %{$status_cfg->{ids}}) {
+    for my $id (sort keys %{$status_cfg->{ids}}) {
+	my $plugin_config = $status_cfg->{ids}->{$id};
 	next if $plugin_config->{disable};
+
 	my $plugin = PVE::Status::Plugin->lookup($plugin_config->{type});
-	$code->($plugin, $plugin_config);
+	$code->($plugin, $id, $plugin_config);
     }
 }
 
 sub update_all($$@) {
-    my ($cfg, $subsystem, @params) = @_;
+    my ($transactions, $subsystem, @params) = @_;
 
     my $method = "update_${subsystem}_status";
 
+    my (undef, $fn, $line, $subr) = caller(1);
+    for my $txn (@$transactions) {
+	my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
+
+	$plugin->$method($txn, @params);
+
+	if (length($txn->{data}) > 48000) {
+	    # UDP stack cannot handle messages > 65k, if we've alot of data we
+	    # do smaller batch sends then, but keep the connection alive
+	    transaction_flush($txn, 1);
+	}
+    }
+}
+
+# must return a transaction hash with the format:
+# {
+#    cfg => $plugin_config,
+#    connection => ..., # the connected socket
+#    data => '', # payload, will be sent at the trannsaction flush
+# }
+my $transactions;
+sub transactions_start {
+    my ($cfg) = @_;
+
+    @$transactions = ();
+
     foreach_plug($cfg, sub {
-	my ($plugin, $plugin_config) = @_;
-	$plugin->$method($plugin_config, @params);
+	my ($plugin, $id, $plugin_config) = @_;
+
+	my $connection = $plugin->_connect($plugin_config);
+
+	push @$transactions, {
+	    connection => $connection,
+	    cfg => $plugin_config,
+	    id => $id,
+	    data => '',
+	};
     });
+
+    return $transactions;
+}
+
+sub transaction_flush {
+    my ($txn, $keepconnected) = @_;
+
+    if (!$txn->{connection}) {
+	return if !$txn->{data}; # OK, if data was already sent/flushed
+	die "cannot flush metric data, no connection available!\n";
+    }
+    return if !defined($txn->{data}) || $txn->{data} eq '';
+
+    my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
+
+    my $data = delete $txn->{data};
+    eval { $plugin->send($txn->{connection}, $data) };
+    my $senderr = $@;
+
+    if (!$keepconnected) {
+	$plugin->_disconnect($txn->{connection});
+	$txn->{connection} = undef;
+	# avoid log spam, already got a send error; disconnect would fail too
+	warn "disconnect failed: $@" if $@ && !$senderr;
+    }
+    die "metrics send error '$txn->{id}': $senderr" if $senderr;
+};
+
+sub transactions_finish {
+    my ($transactions) = @_;
+
+    for my $txn (@$transactions) {
+	eval { transaction_flush($txn) };
+	warn "$@" if $@;
+    }
 }
 
 1;
diff --git a/PVE/Service/pvestatd.pm b/PVE/Service/pvestatd.pm
index 012d05f7..a50f3499 100755
--- a/PVE/Service/pvestatd.pm
+++ b/PVE/Service/pvestatd.pm
@@ -126,7 +126,9 @@ sub update_node_status {
     $node_metric->{cpustat}->@{qw(avg1 avg5 avg15)} = ($avg1, $avg5, $avg15);
     $node_metric->{cpustat}->{cpus} = $maxcpu;
 
-    PVE::ExtMetric::update_all($status_cfg, 'node', $nodename, $node_metric, $ctime);
+    my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
+    PVE::ExtMetric::update_all($transactions, 'node', $nodename, $node_metric, $ctime);
+    PVE::ExtMetric::transactions_finish($transactions);
 }
 
 sub auto_balloning {
@@ -161,12 +163,12 @@ sub update_qemu_status {
     my ($status_cfg) = @_;
 
     my $ctime = time();
-
     my $vmstatus = PVE::QemuServer::vmstatus(undef, 1);
 
     eval { auto_balloning($vmstatus); };
     syslog('err', "auto ballooning error: $@") if $@;
 
+    my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
     foreach my $vmid (keys %$vmstatus) {
 	my $d = $vmstatus->{$vmid};
 	my $data;
@@ -184,8 +186,10 @@ sub update_qemu_status {
 	}
 	PVE::Cluster::broadcast_rrd("pve2.3-vm/$vmid", $data);
 
-	PVE::ExtMetric::update_all($status_cfg, 'qemu', $vmid, $d, $ctime, $nodename);
+	PVE::ExtMetric::update_all($transactions, 'qemu', $vmid, $d, $ctime, $nodename);
     }
+
+    PVE::ExtMetric::transactions_finish($transactions);
 }
 
 sub remove_stale_lxc_consoles {
@@ -359,6 +363,8 @@ sub update_lxc_status {
     my $ctime = time();
     my $vmstatus = PVE::LXC::vmstatus();
 
+    my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
+
     foreach my $vmid (keys %$vmstatus) {
 	my $d = $vmstatus->{$vmid};
 	my $template = $d->{template} ? $d->{template} : "0";
@@ -378,8 +384,9 @@ sub update_lxc_status {
 	}
 	PVE::Cluster::broadcast_rrd("pve2.3-vm/$vmid", $data);
 
-	PVE::ExtMetric::update_all($status_cfg, 'lxc', $vmid, $d, $ctime, $nodename);
+	PVE::ExtMetric::update_all($transactions, 'lxc', $vmid, $d, $ctime, $nodename);
     }
+    PVE::ExtMetric::transactions_finish($transactions);
 }
 
 sub update_storage_status {
@@ -389,6 +396,8 @@ sub update_storage_status {
     my $ctime = time();
     my $info = PVE::Storage::storage_info($cfg);
 
+    my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
+
     foreach my $storeid (keys %$info) {
 	my $d = $info->{$storeid};
 	next if !$d->{active};
@@ -398,8 +407,9 @@ sub update_storage_status {
 	my $key = "pve2-storage/${nodename}/$storeid";
 	PVE::Cluster::broadcast_rrd($key, $data);
 
-	PVE::ExtMetric::update_all($status_cfg, 'storage', $nodename, $storeid, $d, $ctime);
+	PVE::ExtMetric::update_all($transactions, 'storage', $nodename, $storeid, $d, $ctime);
     }
+    PVE::ExtMetric::transactions_finish($transactions);
 }
 
 sub rotate_authkeys {
diff --git a/PVE/Status/Graphite.pm b/PVE/Status/Graphite.pm
index a0cec1c7..04542cad 100644
--- a/PVE/Status/Graphite.pm
+++ b/PVE/Status/Graphite.pm
@@ -32,13 +32,15 @@ sub properties {
 	},
 	timeout => {
 	    type => 'integer',
-	    description => "graphite tcp socket timeout (default=1)",
+	    description => "graphite TCP socket timeout (default=1)",
+	    minimum => 0,
+	    default => 1,
 	    optional => 1
 	},
 	proto => {
 	    type => 'string',
 	    enum => ['udp', 'tcp'],
-	    description => "send graphite data using tcp or udp (default)",
+	    description => "Protocol to send graphite data. TCP or UDP (default)",
 	    optional => 1,
 	},
     };
@@ -57,27 +59,27 @@ sub options {
 
 # Plugin implementation
 sub update_node_status {
-    my ($class, $plugin_config, $node, $data, $ctime) = @_;
+    my ($class, $txn, $node, $data, $ctime) = @_;
 
-    write_graphite_hash($plugin_config, $data, $ctime, "nodes.$node");
+    assemble($txn, $data, $ctime, "nodes.$node");
 
 }
 
 sub update_qemu_status {
-    my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
-    write_graphite_hash($plugin_config, $data, $ctime, "qemu.$vmid");
+    my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
+    assemble($txn, $data, $ctime, "qemu.$vmid");
 }
 
 sub update_lxc_status {
-    my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
+    my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
 
-    write_graphite_hash($plugin_config, $data, $ctime, "lxc.$vmid");
+    assemble($txn, $data, $ctime, "lxc.$vmid");
 }
 
 sub update_storage_status {
-    my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_;
+    my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
 
-    write_graphite_hash($plugin_config, $data, $ctime, "storages.$nodename.$storeid");
+    assemble($txn, $data, $ctime, "storages.$nodename.$storeid");
 }
 
 sub _connect {
@@ -105,21 +107,11 @@ sub _connect {
     return $carbon_socket;
 }
 
-sub write_graphite_hash {
-    my ($plugin_config, $d, $ctime, $object) = @_;
+sub assemble {
+    my ($txn, $data, $ctime, $object) = @_;
 
-    my $path = $plugin_config->{path} // 'proxmox';
-
-    my $carbon_socket = __PACKAGE__->_connect($plugin_config);
-
-    write_graphite($carbon_socket, $d, $ctime, $path.".$object");
-
-    $carbon_socket->close() if $carbon_socket;
-
-}
-
-sub write_graphite {
-    my ($carbon_socket, $d, $ctime, $path) = @_;
+    my $path = $txn->{cfg}->{path} // 'proxmox';
+    $path .= ".$object";
 
     # we do not want boolean/state information to export to graphite
     my $key_blacklist = {
@@ -129,13 +121,14 @@ sub write_graphite {
 	'serial' => 1,
     };
 
-    my $graphite_data = '';
+    $txn->{data} //= '';
     my $assemble_graphite_data;
     $assemble_graphite_data = sub {
 	my ($metric, $path) = @_;
 
 	for my $key (sort keys %$metric) {
-	    my $value = $d->{$key} // next;
+	    my $value = $data->{$key};
+	    next if !defined($value);
 
 	    $key =~ s/\./-/g;
 	    my $metricpath = $path . ".$key";
@@ -143,13 +136,11 @@ sub write_graphite {
 	    if (ref($value) eq 'HASH') {
 		$assemble_graphite_data->($value, $metricpath);
 	    } elsif ($value =~ m/^[+-]?[0-9]*\.?[0-9]+$/ && !$key_blacklist->{$key}) {
-		$graphite_data .= "$metricpath $value $ctime\n";
+		$txn->{data} .= "$metricpath $value $ctime\n";
 	    }
 	}
     };
-    $assemble_graphite_data->($d, $path);
-
-    $carbon_socket->send($graphite_data) if $graphite_data ne '';
+    $assemble_graphite_data->($data, $path);
 }
 
 PVE::JSONSchema::register_format('graphite-path', \&pve_verify_graphite_path);
diff --git a/PVE/Status/InfluxDB.pm b/PVE/Status/InfluxDB.pm
index f02c8854..21949400 100644
--- a/PVE/Status/InfluxDB.pm
+++ b/PVE/Status/InfluxDB.pm
@@ -33,16 +33,15 @@ sub options {
 
 # Plugin implementation
 sub update_node_status {
-    my ($class, $plugin_config, $node, $data, $ctime) = @_;
+    my ($class, $txn, $node, $data, $ctime) = @_;
 
     $ctime *= 1000000000;
 
-    write_influxdb_hash($plugin_config, $data, $ctime, "object=nodes,host=$node");
-
+    build_influxdb_payload(\$txn->{data}, $data, $ctime, "object=nodes,host=$node");
 }
 
 sub update_qemu_status {
-    my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
+    my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
 
     $ctime *= 1000000000;
 
@@ -51,11 +50,12 @@ sub update_qemu_status {
 	$object .= ",host=$data->{name}";
     }
     $object =~ s/\s/\\ /g;
-    write_influxdb_hash($plugin_config, $data, $ctime, $object);
+
+    build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
 }
 
 sub update_lxc_status {
-    my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
+    my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
 
     $ctime *= 1000000000;
 
@@ -65,11 +65,11 @@ sub update_lxc_status {
     }
     $object =~ s/\s/\\ /g;
 
-    write_influxdb_hash($plugin_config, $data, $ctime, $object);
+    build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
 }
 
 sub update_storage_status {
-    my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_;
+    my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
 
     $ctime *= 1000000000;
 
@@ -79,7 +79,7 @@ sub update_storage_status {
     }
     $object =~ s/\s/\\ /g;
 
-    write_influxdb_hash($plugin_config, $data, $ctime, $object);
+    build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
 }
 
 sub _connect {
@@ -97,20 +97,6 @@ sub _connect {
     return $socket;
 }
 
-sub write_influxdb_hash {
-    my ($plugin_config, $d, $ctime, $tags) = @_;
-
-    my $payload = {};
-
-    build_influxdb_payload($payload, $d, $ctime, $tags);
-
-    my $socket = __PACKAGE__->_connect($plugin_config);
-
-    $socket->send($payload->{string});
-
-    $socket->close() if $socket;
-}
-
 sub build_influxdb_payload {
     my ($payload, $data, $ctime, $tags, $measurement, $instance) = @_;
 
@@ -144,7 +130,7 @@ sub build_influxdb_payload {
 	my $tagstring = $tags;
 	$tagstring .= ",instance=$instance" if defined($instance);
 	my $valuestr =  join(',', @values);
-	$payload->{string} .= "$mm,$tagstring $valuestr $ctime\n";
+	$$payload .= "$mm,$tagstring $valuestr $ctime\n";
     }
 }
 
diff --git a/PVE/Status/Plugin.pm b/PVE/Status/Plugin.pm
index 59be30d6..402c5b4a 100644
--- a/PVE/Status/Plugin.pm
+++ b/PVE/Status/Plugin.pm
@@ -57,31 +57,39 @@ sub parse_section_header {
 
 sub _connect {
     my ($class, $cfg) = @_;
-
     die "please implement inside plugin";
 }
 
+sub _disconnect {
+    my ($class, $connection) = @_;
+
+    $connection->close(); # overwrite if not a simple socket
+}
+
+sub send {
+    my ($class, $connection, $data) = @_;
+
+    defined($connection->send($data))
+	or die "failed to send metrics: $!\n";
+}
+
 sub update_node_status {
-    my ($class, $plugin_config, $node, $data, $ctime) = @_;
-
+    my ($class, $txn, $node, $data, $ctime) = @_;
     die "please implement inside plugin";
 }
 
 sub update_qemu_status {
-    my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
-
+    my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
     die "please implement inside plugin";
 }
 
 sub update_lxc_status {
-    my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
-
+    my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
     die "please implement inside plugin";
 }
 
 sub update_storage_status {
-    my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_;
-
+    my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
     die "please implement inside plugin";
 }
 
-- 
2.20.1





More information about the pve-devel mailing list