[pve-devel] applied: [PATCH manager] ext. metric: move to a transaction model
Thomas Lamprecht
t.lamprecht at proxmox.com
Mon Nov 18 19:14:12 CET 2019
Signed-off-by: Thomas Lamprecht <t.lamprecht at proxmox.com>
---
the performance is really a lot better, the memory usage stayed about the
same... AFAICT, the Graphite plug is about an order of magnitude worse than the
InfluxDB one, but I have yet no idea what the cause is, I tried a lot of doing
stuff differently here and there. Anyway, the performance increase for
conection-oriented protocols like TCP is really good, as I had this quite
a bit around I straight applied it.
PVE/ExtMetric.pm | 81 ++++++++++++++++++++++++++++++++++++++---
PVE/Service/pvestatd.pm | 20 +++++++---
PVE/Status/Graphite.pm | 51 +++++++++++---------------
PVE/Status/InfluxDB.pm | 34 +++++------------
PVE/Status/Plugin.pm | 26 ++++++++-----
5 files changed, 139 insertions(+), 73 deletions(-)
diff --git a/PVE/ExtMetric.pm b/PVE/ExtMetric.pm
index 342dc281..c5559073 100644
--- a/PVE/ExtMetric.pm
+++ b/PVE/ExtMetric.pm
@@ -14,22 +14,93 @@ PVE::Status::Plugin->init();
sub foreach_plug($&) {
my ($status_cfg, $code) = @_;
- for my $plugin_config (values %{$status_cfg->{ids}}) {
+ for my $id (sort keys %{$status_cfg->{ids}}) {
+ my $plugin_config = $status_cfg->{ids}->{$id};
next if $plugin_config->{disable};
+
my $plugin = PVE::Status::Plugin->lookup($plugin_config->{type});
- $code->($plugin, $plugin_config);
+ $code->($plugin, $id, $plugin_config);
}
}
sub update_all($$@) {
- my ($cfg, $subsystem, @params) = @_;
+ my ($transactions, $subsystem, @params) = @_;
my $method = "update_${subsystem}_status";
+ my (undef, $fn, $line, $subr) = caller(1);
+ for my $txn (@$transactions) {
+ my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
+
+ $plugin->$method($txn, @params);
+
+ if (length($txn->{data}) > 48000) {
+ # UDP stack cannot handle messages > 65k, if we've alot of data we
+ # do smaller batch sends then, but keep the connection alive
+ transaction_flush($txn, 1);
+ }
+ }
+}
+
+# must return a transaction hash with the format:
+# {
+# cfg => $plugin_config,
+# connection => ..., # the connected socket
+# data => '', # payload, will be sent at the trannsaction flush
+# }
+my $transactions;
+sub transactions_start {
+ my ($cfg) = @_;
+
+ @$transactions = ();
+
foreach_plug($cfg, sub {
- my ($plugin, $plugin_config) = @_;
- $plugin->$method($plugin_config, @params);
+ my ($plugin, $id, $plugin_config) = @_;
+
+ my $connection = $plugin->_connect($plugin_config);
+
+ push @$transactions, {
+ connection => $connection,
+ cfg => $plugin_config,
+ id => $id,
+ data => '',
+ };
});
+
+ return $transactions;
+}
+
+sub transaction_flush {
+ my ($txn, $keepconnected) = @_;
+
+ if (!$txn->{connection}) {
+ return if !$txn->{data}; # OK, if data was already sent/flushed
+ die "cannot flush metric data, no connection available!\n";
+ }
+ return if !defined($txn->{data}) || $txn->{data} eq '';
+
+ my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
+
+ my $data = delete $txn->{data};
+ eval { $plugin->send($txn->{connection}, $data) };
+ my $senderr = $@;
+
+ if (!$keepconnected) {
+ $plugin->_disconnect($txn->{connection});
+ $txn->{connection} = undef;
+ # avoid log spam, already got a send error; disconnect would fail too
+ warn "disconnect failed: $@" if $@ && !$senderr;
+ }
+ die "metrics send error '$txn->{id}': $senderr" if $senderr;
+};
+
+sub transactions_finish {
+ my ($transactions) = @_;
+
+ for my $txn (@$transactions) {
+ eval { transaction_flush($txn) };
+ warn "$@" if $@;
+ }
}
1;
diff --git a/PVE/Service/pvestatd.pm b/PVE/Service/pvestatd.pm
index 012d05f7..a50f3499 100755
--- a/PVE/Service/pvestatd.pm
+++ b/PVE/Service/pvestatd.pm
@@ -126,7 +126,9 @@ sub update_node_status {
$node_metric->{cpustat}->@{qw(avg1 avg5 avg15)} = ($avg1, $avg5, $avg15);
$node_metric->{cpustat}->{cpus} = $maxcpu;
- PVE::ExtMetric::update_all($status_cfg, 'node', $nodename, $node_metric, $ctime);
+ my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
+ PVE::ExtMetric::update_all($transactions, 'node', $nodename, $node_metric, $ctime);
+ PVE::ExtMetric::transactions_finish($transactions);
}
sub auto_balloning {
@@ -161,12 +163,12 @@ sub update_qemu_status {
my ($status_cfg) = @_;
my $ctime = time();
-
my $vmstatus = PVE::QemuServer::vmstatus(undef, 1);
eval { auto_balloning($vmstatus); };
syslog('err', "auto ballooning error: $@") if $@;
+ my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
foreach my $vmid (keys %$vmstatus) {
my $d = $vmstatus->{$vmid};
my $data;
@@ -184,8 +186,10 @@ sub update_qemu_status {
}
PVE::Cluster::broadcast_rrd("pve2.3-vm/$vmid", $data);
- PVE::ExtMetric::update_all($status_cfg, 'qemu', $vmid, $d, $ctime, $nodename);
+ PVE::ExtMetric::update_all($transactions, 'qemu', $vmid, $d, $ctime, $nodename);
}
+
+ PVE::ExtMetric::transactions_finish($transactions);
}
sub remove_stale_lxc_consoles {
@@ -359,6 +363,8 @@ sub update_lxc_status {
my $ctime = time();
my $vmstatus = PVE::LXC::vmstatus();
+ my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
+
foreach my $vmid (keys %$vmstatus) {
my $d = $vmstatus->{$vmid};
my $template = $d->{template} ? $d->{template} : "0";
@@ -378,8 +384,9 @@ sub update_lxc_status {
}
PVE::Cluster::broadcast_rrd("pve2.3-vm/$vmid", $data);
- PVE::ExtMetric::update_all($status_cfg, 'lxc', $vmid, $d, $ctime, $nodename);
+ PVE::ExtMetric::update_all($transactions, 'lxc', $vmid, $d, $ctime, $nodename);
}
+ PVE::ExtMetric::transactions_finish($transactions);
}
sub update_storage_status {
@@ -389,6 +396,8 @@ sub update_storage_status {
my $ctime = time();
my $info = PVE::Storage::storage_info($cfg);
+ my $transactions = PVE::ExtMetric::transactions_start($status_cfg);
+
foreach my $storeid (keys %$info) {
my $d = $info->{$storeid};
next if !$d->{active};
@@ -398,8 +407,9 @@ sub update_storage_status {
my $key = "pve2-storage/${nodename}/$storeid";
PVE::Cluster::broadcast_rrd($key, $data);
- PVE::ExtMetric::update_all($status_cfg, 'storage', $nodename, $storeid, $d, $ctime);
+ PVE::ExtMetric::update_all($transactions, 'storage', $nodename, $storeid, $d, $ctime);
}
+ PVE::ExtMetric::transactions_finish($transactions);
}
sub rotate_authkeys {
diff --git a/PVE/Status/Graphite.pm b/PVE/Status/Graphite.pm
index a0cec1c7..04542cad 100644
--- a/PVE/Status/Graphite.pm
+++ b/PVE/Status/Graphite.pm
@@ -32,13 +32,15 @@ sub properties {
},
timeout => {
type => 'integer',
- description => "graphite tcp socket timeout (default=1)",
+ description => "graphite TCP socket timeout (default=1)",
+ minimum => 0,
+ default => 1,
optional => 1
},
proto => {
type => 'string',
enum => ['udp', 'tcp'],
- description => "send graphite data using tcp or udp (default)",
+ description => "Protocol to send graphite data. TCP or UDP (default)",
optional => 1,
},
};
@@ -57,27 +59,27 @@ sub options {
# Plugin implementation
sub update_node_status {
- my ($class, $plugin_config, $node, $data, $ctime) = @_;
+ my ($class, $txn, $node, $data, $ctime) = @_;
- write_graphite_hash($plugin_config, $data, $ctime, "nodes.$node");
+ assemble($txn, $data, $ctime, "nodes.$node");
}
sub update_qemu_status {
- my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
- write_graphite_hash($plugin_config, $data, $ctime, "qemu.$vmid");
+ my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
+ assemble($txn, $data, $ctime, "qemu.$vmid");
}
sub update_lxc_status {
- my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
+ my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
- write_graphite_hash($plugin_config, $data, $ctime, "lxc.$vmid");
+ assemble($txn, $data, $ctime, "lxc.$vmid");
}
sub update_storage_status {
- my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_;
+ my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
- write_graphite_hash($plugin_config, $data, $ctime, "storages.$nodename.$storeid");
+ assemble($txn, $data, $ctime, "storages.$nodename.$storeid");
}
sub _connect {
@@ -105,21 +107,11 @@ sub _connect {
return $carbon_socket;
}
-sub write_graphite_hash {
- my ($plugin_config, $d, $ctime, $object) = @_;
+sub assemble {
+ my ($txn, $data, $ctime, $object) = @_;
- my $path = $plugin_config->{path} // 'proxmox';
-
- my $carbon_socket = __PACKAGE__->_connect($plugin_config);
-
- write_graphite($carbon_socket, $d, $ctime, $path.".$object");
-
- $carbon_socket->close() if $carbon_socket;
-
-}
-
-sub write_graphite {
- my ($carbon_socket, $d, $ctime, $path) = @_;
+ my $path = $txn->{cfg}->{path} // 'proxmox';
+ $path .= ".$object";
# we do not want boolean/state information to export to graphite
my $key_blacklist = {
@@ -129,13 +121,14 @@ sub write_graphite {
'serial' => 1,
};
- my $graphite_data = '';
+ $txn->{data} //= '';
my $assemble_graphite_data;
$assemble_graphite_data = sub {
my ($metric, $path) = @_;
for my $key (sort keys %$metric) {
- my $value = $d->{$key} // next;
+ my $value = $data->{$key};
+ next if !defined($value);
$key =~ s/\./-/g;
my $metricpath = $path . ".$key";
@@ -143,13 +136,11 @@ sub write_graphite {
if (ref($value) eq 'HASH') {
$assemble_graphite_data->($value, $metricpath);
} elsif ($value =~ m/^[+-]?[0-9]*\.?[0-9]+$/ && !$key_blacklist->{$key}) {
- $graphite_data .= "$metricpath $value $ctime\n";
+ $txn->{data} .= "$metricpath $value $ctime\n";
}
}
};
- $assemble_graphite_data->($d, $path);
-
- $carbon_socket->send($graphite_data) if $graphite_data ne '';
+ $assemble_graphite_data->($data, $path);
}
PVE::JSONSchema::register_format('graphite-path', \&pve_verify_graphite_path);
diff --git a/PVE/Status/InfluxDB.pm b/PVE/Status/InfluxDB.pm
index f02c8854..21949400 100644
--- a/PVE/Status/InfluxDB.pm
+++ b/PVE/Status/InfluxDB.pm
@@ -33,16 +33,15 @@ sub options {
# Plugin implementation
sub update_node_status {
- my ($class, $plugin_config, $node, $data, $ctime) = @_;
+ my ($class, $txn, $node, $data, $ctime) = @_;
$ctime *= 1000000000;
- write_influxdb_hash($plugin_config, $data, $ctime, "object=nodes,host=$node");
-
+ build_influxdb_payload(\$txn->{data}, $data, $ctime, "object=nodes,host=$node");
}
sub update_qemu_status {
- my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
+ my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
$ctime *= 1000000000;
@@ -51,11 +50,12 @@ sub update_qemu_status {
$object .= ",host=$data->{name}";
}
$object =~ s/\s/\\ /g;
- write_influxdb_hash($plugin_config, $data, $ctime, $object);
+
+ build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
}
sub update_lxc_status {
- my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
+ my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
$ctime *= 1000000000;
@@ -65,11 +65,11 @@ sub update_lxc_status {
}
$object =~ s/\s/\\ /g;
- write_influxdb_hash($plugin_config, $data, $ctime, $object);
+ build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
}
sub update_storage_status {
- my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_;
+ my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
$ctime *= 1000000000;
@@ -79,7 +79,7 @@ sub update_storage_status {
}
$object =~ s/\s/\\ /g;
- write_influxdb_hash($plugin_config, $data, $ctime, $object);
+ build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
}
sub _connect {
@@ -97,20 +97,6 @@ sub _connect {
return $socket;
}
-sub write_influxdb_hash {
- my ($plugin_config, $d, $ctime, $tags) = @_;
-
- my $payload = {};
-
- build_influxdb_payload($payload, $d, $ctime, $tags);
-
- my $socket = __PACKAGE__->_connect($plugin_config);
-
- $socket->send($payload->{string});
-
- $socket->close() if $socket;
-}
-
sub build_influxdb_payload {
my ($payload, $data, $ctime, $tags, $measurement, $instance) = @_;
@@ -144,7 +130,7 @@ sub build_influxdb_payload {
my $tagstring = $tags;
$tagstring .= ",instance=$instance" if defined($instance);
my $valuestr = join(',', @values);
- $payload->{string} .= "$mm,$tagstring $valuestr $ctime\n";
+ $$payload .= "$mm,$tagstring $valuestr $ctime\n";
}
}
diff --git a/PVE/Status/Plugin.pm b/PVE/Status/Plugin.pm
index 59be30d6..402c5b4a 100644
--- a/PVE/Status/Plugin.pm
+++ b/PVE/Status/Plugin.pm
@@ -57,31 +57,39 @@ sub parse_section_header {
sub _connect {
my ($class, $cfg) = @_;
-
die "please implement inside plugin";
}
+sub _disconnect {
+ my ($class, $connection) = @_;
+
+ $connection->close(); # overwrite if not a simple socket
+}
+
+sub send {
+ my ($class, $connection, $data) = @_;
+
+ defined($connection->send($data))
+ or die "failed to send metrics: $!\n";
+}
+
sub update_node_status {
- my ($class, $plugin_config, $node, $data, $ctime) = @_;
-
+ my ($class, $txn, $node, $data, $ctime) = @_;
die "please implement inside plugin";
}
sub update_qemu_status {
- my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
-
+ my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
die "please implement inside plugin";
}
sub update_lxc_status {
- my ($class, $plugin_config, $vmid, $data, $ctime, $nodename) = @_;
-
+ my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
die "please implement inside plugin";
}
sub update_storage_status {
- my ($class, $plugin_config, $nodename, $storeid, $data, $ctime) = @_;
-
+ my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
die "please implement inside plugin";
}
--
2.20.1
More information about the pve-devel
mailing list