[pve-devel] applied: [PATCH manager] metric server: improve flush on big data updates
Thomas Lamprecht
t.lamprecht at proxmox.com
Fri May 8 17:23:01 CEST 2020
Signed-off-by: Thomas Lamprecht <t.lamprecht at proxmox.com>
---
PVE/ExtMetric.pm | 39 ++++++++-------------------------------
PVE/Status/Graphite.pm | 14 +++++++-------
PVE/Status/InfluxDB.pm | 21 ++++++++++++---------
PVE/Status/Plugin.pm | 36 ++++++++++++++++++++++++++++++++++++
4 files changed, 63 insertions(+), 47 deletions(-)
diff --git a/PVE/ExtMetric.pm b/PVE/ExtMetric.pm
index 14d98317..448d3925 100644
--- a/PVE/ExtMetric.pm
+++ b/PVE/ExtMetric.pm
@@ -28,17 +28,10 @@ sub update_all($$@) {
my $method = "update_${subsystem}_status";
- my (undef, $fn, $line, $subr) = caller(1);
for my $txn (@$transactions) {
my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
$plugin->$method($txn, @params);
-
- if (length($txn->{data}) > 48000) {
- # UDP stack cannot handle messages > 65k, if we've alot of data we
- # do smaller batch sends then, but keep the connection alive
- transaction_flush($txn, 1);
- }
}
}
@@ -69,36 +62,20 @@ sub transactions_start {
return $transactions;
}
-sub transaction_flush {
- my ($txn, $keepconnected) = @_;
+sub transactions_finish {
+ my ($transactions) = @_;
- if (!$txn->{connection}) {
- return if !$txn->{data}; # OK, if data was already sent/flushed
- die "cannot flush metric data, no connection available!\n";
- }
- return if !defined($txn->{data}) || $txn->{data} eq '';
+ for my $txn (@$transactions) {
+ my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
- my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
+ eval { $plugin->flush_data($txn) };
+ my $flush_err = $@;
+ warn "$flush_err" if $flush_err;
- my $data = delete $txn->{data};
- eval { $plugin->send($txn->{connection}, $data) };
- my $senderr = $@;
-
- if (!$keepconnected) {
$plugin->_disconnect($txn->{connection});
$txn->{connection} = undef;
# avoid log spam, already got a send error; disconnect would fail too
- warn "disconnect failed: $@" if $@ && !$senderr;
- }
- die "metrics send error '$txn->{id}': $senderr" if $senderr;
-};
-
-sub transactions_finish {
- my ($transactions) = @_;
-
- for my $txn (@$transactions) {
- eval { transaction_flush($txn) };
- warn "$@" if $@;
+ warn "disconnect failed: $@" if $@ && !$flush_err;
}
}
diff --git a/PVE/Status/Graphite.pm b/PVE/Status/Graphite.pm
index 28fa65fd..ecab5583 100644
--- a/PVE/Status/Graphite.pm
+++ b/PVE/Status/Graphite.pm
@@ -61,25 +61,26 @@ sub options {
sub update_node_status {
my ($class, $txn, $node, $data, $ctime) = @_;
- assemble($txn, $data, $ctime, "nodes.$node");
+ return assemble($class, $txn, $data, $ctime, "nodes.$node");
}
sub update_qemu_status {
my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
- assemble($txn, $data, $ctime, "qemu.$vmid");
+
+ return assemble($class, $txn, $data, $ctime, "qemu.$vmid");
}
sub update_lxc_status {
my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
- assemble($txn, $data, $ctime, "lxc.$vmid");
+ return assemble($class, $txn, $data, $ctime, "lxc.$vmid");
}
sub update_storage_status {
my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
- assemble($txn, $data, $ctime, "storages.$nodename.$storeid");
+ return assemble($class, $txn, $data, $ctime, "storages.$nodename.$storeid");
}
sub _connect {
@@ -108,7 +109,7 @@ sub _connect {
}
sub assemble {
- my ($txn, $data, $ctime, $object) = @_;
+ my ($class, $txn, $data, $ctime, $object) = @_;
my $path = $txn->{cfg}->{path} // 'proxmox';
$path .= ".$object";
@@ -121,7 +122,6 @@ sub assemble {
'serial' => 1,
};
- $txn->{data} //= '';
my $assemble_graphite_data;
$assemble_graphite_data = sub {
my ($metric, $path) = @_;
@@ -136,7 +136,7 @@ sub assemble {
if (ref($value) eq 'HASH') {
$assemble_graphite_data->($value, $metricpath);
} elsif ($value =~ m/^[+-]?[0-9]*\.?[0-9]+$/ && !$key_blacklist->{$key}) {
- $txn->{data} .= "$metricpath $value $ctime\n";
+ $class->add_metric_data($txn, "$metricpath $value $ctime\n");
}
}
};
diff --git a/PVE/Status/InfluxDB.pm b/PVE/Status/InfluxDB.pm
index 21949400..c7bc15a9 100644
--- a/PVE/Status/InfluxDB.pm
+++ b/PVE/Status/InfluxDB.pm
@@ -5,6 +5,7 @@ use warnings;
use POSIX qw(isnan isinf);
use Scalar::Util 'looks_like_number';
+use IO::Socket::IP;
use PVE::SafeSyslog;
@@ -37,7 +38,7 @@ sub update_node_status {
$ctime *= 1000000000;
- build_influxdb_payload(\$txn->{data}, $data, $ctime, "object=nodes,host=$node");
+ build_influxdb_payload($class, $txn, $data, $ctime, "object=nodes,host=$node");
}
sub update_qemu_status {
@@ -51,7 +52,7 @@ sub update_qemu_status {
}
$object =~ s/\s/\\ /g;
- build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
+ build_influxdb_payload($class, $txn, $data, $ctime, $object);
}
sub update_lxc_status {
@@ -65,7 +66,7 @@ sub update_lxc_status {
}
$object =~ s/\s/\\ /g;
- build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
+ build_influxdb_payload($class, $txn, $data, $ctime, $object);
}
sub update_storage_status {
@@ -79,7 +80,7 @@ sub update_storage_status {
}
$object =~ s/\s/\\ /g;
- build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
+ build_influxdb_payload($class, $txn, $data, $ctime, $object);
}
sub _connect {
@@ -94,11 +95,13 @@ sub _connect {
Proto => 'udp',
) || die "couldn't create influxdb socket [$host]:$port - $@\n";
+ $socket->blocking(0);
+
return $socket;
}
sub build_influxdb_payload {
- my ($payload, $data, $ctime, $tags, $measurement, $instance) = @_;
+ my ($class, $txn, $data, $ctime, $tags, $measurement, $instance) = @_;
my @values = ();
@@ -116,9 +119,9 @@ sub build_influxdb_payload {
# value is a hash
if (!defined($measurement)) {
- build_influxdb_payload($payload, $value, $ctime, $tags, $key);
+ build_influxdb_payload($class, $txn, $value, $ctime, $tags, $key);
} elsif(!defined($instance)) {
- build_influxdb_payload($payload, $value, $ctime, $tags, $measurement, $key);
+ build_influxdb_payload($class, $txn, $value, $ctime, $tags, $measurement, $key);
} else {
push @values, get_recursive_values($value);
}
@@ -129,8 +132,8 @@ sub build_influxdb_payload {
my $mm = $measurement // 'system';
my $tagstring = $tags;
$tagstring .= ",instance=$instance" if defined($instance);
- my $valuestr = join(',', @values);
- $$payload .= "$mm,$tagstring $valuestr $ctime\n";
+ my $valuestr = join(',', @values);
+ $class->add_metric_data($txn, "$mm,$tagstring $valuestr $ctime\n");
}
}
diff --git a/PVE/Status/Plugin.pm b/PVE/Status/Plugin.pm
index 402c5b4a..b1c91f8e 100644
--- a/PVE/Status/Plugin.pm
+++ b/PVE/Status/Plugin.pm
@@ -66,6 +66,42 @@ sub _disconnect {
$connection->close(); # overwrite if not a simple socket
}
+# UDP cannot do more than 64k at once. Overwrite for different protocol limits.
+sub _send_batch_size {
+ my ($class, $cfg) = @_;
+ return 48000;
+}
+
+# call with the smalles $data chunks possible
+sub add_metric_data {
+ my ($class, $txn, $data) = @_;
+ return if !defined($data);
+
+ my $batch_size = $class->_send_batch_size();
+ my $data_length = length($data) // 0;
+ my $dataq_len = length($txn->{data}) // 0;
+
+ if ($dataq_len > ($batch_size / 2) && ($dataq_len + $data_length) > $batch_size) {
+ $class->flush_data($txn);
+ }
+ $txn->{data} //= '';
+ $txn->{data} .= "$data";
+}
+
+sub flush_data {
+ my ($class, $txn) = @_;
+
+ if (!$txn->{connection}) {
+ return if !$txn->{data}; # OK, if data was already sent/flushed
+ die "cannot flush metric data, no connection available!\n";
+ }
+ return if !defined($txn->{data}) || $txn->{data} eq '';
+
+ my $data = delete $txn->{data};
+ eval { $class->send($txn->{connection}, $data) };
+ die "metrics send error '$txn->{id}': $@" if $@;
+}
+
sub send {
my ($class, $connection, $data) = @_;
--
2.20.1
More information about the pve-devel
mailing list