[pve-devel] applied: [PATCH manager] metric server: improve flush on big data updates

Thomas Lamprecht t.lamprecht at proxmox.com
Fri May 8 17:23:01 CEST 2020


Signed-off-by: Thomas Lamprecht <t.lamprecht at proxmox.com>
---
 PVE/ExtMetric.pm       | 39 ++++++++-------------------------------
 PVE/Status/Graphite.pm | 14 +++++++-------
 PVE/Status/InfluxDB.pm | 21 ++++++++++++---------
 PVE/Status/Plugin.pm   | 36 ++++++++++++++++++++++++++++++++++++
 4 files changed, 63 insertions(+), 47 deletions(-)

diff --git a/PVE/ExtMetric.pm b/PVE/ExtMetric.pm
index 14d98317..448d3925 100644
--- a/PVE/ExtMetric.pm
+++ b/PVE/ExtMetric.pm
@@ -28,17 +28,10 @@ sub update_all($$@) {
 
     my $method = "update_${subsystem}_status";
 
-    my (undef, $fn, $line, $subr) = caller(1);
     for my $txn (@$transactions) {
 	my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
 
 	$plugin->$method($txn, @params);
-
-	if (length($txn->{data}) > 48000) {
-	    # UDP stack cannot handle messages > 65k, if we've alot of data we
-	    # do smaller batch sends then, but keep the connection alive
-	    transaction_flush($txn, 1);
-	}
     }
 }
 
@@ -69,36 +62,20 @@ sub transactions_start {
     return $transactions;
 }
 
-sub transaction_flush {
-    my ($txn, $keepconnected) = @_;
+sub transactions_finish {
+    my ($transactions) = @_;
 
-    if (!$txn->{connection}) {
-	return if !$txn->{data}; # OK, if data was already sent/flushed
-	die "cannot flush metric data, no connection available!\n";
-    }
-    return if !defined($txn->{data}) || $txn->{data} eq '';
+    for my $txn (@$transactions) {
+	my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
 
-    my $plugin = PVE::Status::Plugin->lookup($txn->{cfg}->{type});
+	eval { $plugin->flush_data($txn) };
+	my $flush_err = $@;
+	warn "$flush_err" if $flush_err;
 
-    my $data = delete $txn->{data};
-    eval { $plugin->send($txn->{connection}, $data) };
-    my $senderr = $@;
-
-    if (!$keepconnected) {
 	$plugin->_disconnect($txn->{connection});
 	$txn->{connection} = undef;
 	# avoid log spam, already got a send error; disconnect would fail too
-	warn "disconnect failed: $@" if $@ && !$senderr;
-    }
-    die "metrics send error '$txn->{id}': $senderr" if $senderr;
-};
-
-sub transactions_finish {
-    my ($transactions) = @_;
-
-    for my $txn (@$transactions) {
-	eval { transaction_flush($txn) };
-	warn "$@" if $@;
+	warn "disconnect failed: $@" if $@ && !$flush_err;
     }
 }
 
diff --git a/PVE/Status/Graphite.pm b/PVE/Status/Graphite.pm
index 28fa65fd..ecab5583 100644
--- a/PVE/Status/Graphite.pm
+++ b/PVE/Status/Graphite.pm
@@ -61,25 +61,26 @@ sub options {
 sub update_node_status {
     my ($class, $txn, $node, $data, $ctime) = @_;
 
-    assemble($txn, $data, $ctime, "nodes.$node");
+    return assemble($class, $txn, $data, $ctime, "nodes.$node");
 
 }
 
 sub update_qemu_status {
     my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
-    assemble($txn, $data, $ctime, "qemu.$vmid");
+
+    return assemble($class, $txn, $data, $ctime, "qemu.$vmid");
 }
 
 sub update_lxc_status {
     my ($class, $txn, $vmid, $data, $ctime, $nodename) = @_;
 
-    assemble($txn, $data, $ctime, "lxc.$vmid");
+    return assemble($class, $txn, $data, $ctime, "lxc.$vmid");
 }
 
 sub update_storage_status {
     my ($class, $txn, $nodename, $storeid, $data, $ctime) = @_;
 
-    assemble($txn, $data, $ctime, "storages.$nodename.$storeid");
+    return assemble($class, $txn, $data, $ctime, "storages.$nodename.$storeid");
 }
 
 sub _connect {
@@ -108,7 +109,7 @@ sub _connect {
 }
 
 sub assemble {
-    my ($txn, $data, $ctime, $object) = @_;
+    my ($class, $txn, $data, $ctime, $object) = @_;
 
     my $path = $txn->{cfg}->{path} // 'proxmox';
     $path .= ".$object";
@@ -121,7 +122,6 @@ sub assemble {
 	'serial' => 1,
     };
 
-    $txn->{data} //= '';
     my $assemble_graphite_data;
     $assemble_graphite_data = sub {
 	my ($metric, $path) = @_;
@@ -136,7 +136,7 @@ sub assemble {
 	    if (ref($value) eq 'HASH') {
 		$assemble_graphite_data->($value, $metricpath);
 	    } elsif ($value =~ m/^[+-]?[0-9]*\.?[0-9]+$/ && !$key_blacklist->{$key}) {
-		$txn->{data} .= "$metricpath $value $ctime\n";
+		$class->add_metric_data($txn, "$metricpath $value $ctime\n");
 	    }
 	}
     };
diff --git a/PVE/Status/InfluxDB.pm b/PVE/Status/InfluxDB.pm
index 21949400..c7bc15a9 100644
--- a/PVE/Status/InfluxDB.pm
+++ b/PVE/Status/InfluxDB.pm
@@ -5,6 +5,7 @@ use warnings;
 
 use POSIX qw(isnan isinf);
 use Scalar::Util 'looks_like_number';
+use IO::Socket::IP;
 
 use PVE::SafeSyslog;
 
@@ -37,7 +38,7 @@ sub update_node_status {
 
     $ctime *= 1000000000;
 
-    build_influxdb_payload(\$txn->{data}, $data, $ctime, "object=nodes,host=$node");
+    build_influxdb_payload($class, $txn, $data, $ctime, "object=nodes,host=$node");
 }
 
 sub update_qemu_status {
@@ -51,7 +52,7 @@ sub update_qemu_status {
     }
     $object =~ s/\s/\\ /g;
 
-    build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
+    build_influxdb_payload($class, $txn, $data, $ctime, $object);
 }
 
 sub update_lxc_status {
@@ -65,7 +66,7 @@ sub update_lxc_status {
     }
     $object =~ s/\s/\\ /g;
 
-    build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
+    build_influxdb_payload($class, $txn, $data, $ctime, $object);
 }
 
 sub update_storage_status {
@@ -79,7 +80,7 @@ sub update_storage_status {
     }
     $object =~ s/\s/\\ /g;
 
-    build_influxdb_payload(\$txn->{data}, $data, $ctime, $object);
+    build_influxdb_payload($class, $txn, $data, $ctime, $object);
 }
 
 sub _connect {
@@ -94,11 +95,13 @@ sub _connect {
         Proto       => 'udp',
     ) || die "couldn't create influxdb socket [$host]:$port - $@\n";
 
+    $socket->blocking(0);
+
     return $socket;
 }
 
 sub build_influxdb_payload {
-    my ($payload, $data, $ctime, $tags, $measurement, $instance) = @_;
+    my ($class, $txn, $data, $ctime, $tags, $measurement, $instance) = @_;
 
     my @values = ();
 
@@ -116,9 +119,9 @@ sub build_influxdb_payload {
 	    # value is a hash
 
 	    if (!defined($measurement)) {
-		build_influxdb_payload($payload, $value, $ctime, $tags, $key);
+		build_influxdb_payload($class, $txn, $value, $ctime, $tags, $key);
 	    } elsif(!defined($instance)) {
-		build_influxdb_payload($payload, $value, $ctime, $tags, $measurement, $key);
+		build_influxdb_payload($class, $txn, $value, $ctime, $tags, $measurement, $key);
 	    } else {
 		push @values, get_recursive_values($value);
 	    }
@@ -129,8 +132,8 @@ sub build_influxdb_payload {
 	my $mm = $measurement // 'system';
 	my $tagstring = $tags;
 	$tagstring .= ",instance=$instance" if defined($instance);
-	my $valuestr =  join(',', @values);
-	$$payload .= "$mm,$tagstring $valuestr $ctime\n";
+	my $valuestr = join(',', @values);
+	$class->add_metric_data($txn, "$mm,$tagstring $valuestr $ctime\n");
     }
 }
 
diff --git a/PVE/Status/Plugin.pm b/PVE/Status/Plugin.pm
index 402c5b4a..b1c91f8e 100644
--- a/PVE/Status/Plugin.pm
+++ b/PVE/Status/Plugin.pm
@@ -66,6 +66,42 @@ sub _disconnect {
     $connection->close(); # overwrite if not a simple socket
 }
 
+# UDP cannot do more than 64k at once. Overwrite for different protocol limits.
+sub _send_batch_size {
+    my ($class, $cfg) = @_;
+    return 48000;
+}
+
+# call with the smalles $data chunks possible
+sub add_metric_data {
+    my ($class, $txn, $data) = @_;
+    return if !defined($data);
+
+    my $batch_size = $class->_send_batch_size();
+    my $data_length = length($data) // 0;
+    my $dataq_len = length($txn->{data}) // 0;
+
+    if ($dataq_len > ($batch_size / 2) && ($dataq_len + $data_length) > $batch_size) {
+	$class->flush_data($txn);
+    }
+    $txn->{data} //= '';
+    $txn->{data} .= "$data";
+}
+
+sub flush_data {
+    my ($class, $txn) = @_;
+
+    if (!$txn->{connection}) {
+	return if !$txn->{data}; # OK, if data was already sent/flushed
+	die "cannot flush metric data, no connection available!\n";
+    }
+    return if !defined($txn->{data}) || $txn->{data} eq '';
+
+    my $data = delete $txn->{data};
+    eval { $class->send($txn->{connection}, $data) };
+    die "metrics send error '$txn->{id}': $@" if $@;
+}
+
 sub send {
     my ($class, $connection, $data) = @_;
 
-- 
2.20.1





More information about the pve-devel mailing list