[pve-devel] [PATCH http-server 07/10] support streaming data form fh to client
Fabian Grünbichler
f.gruenbichler at proxmox.com
Wed Apr 21 15:25:16 CEST 2021
some nits inline, looks good to me otherwise!
On April 21, 2021 1:15 pm, Stefan Reiter wrote:
> Use an explicit AnyEvent::Handle similar to websocket proxying.
>
> Needs some special care to make sure we apply backpressure correctly to
> avoid caching too much data. Note that because of AnyEvent restrictions,
> specifying a "fh" to point to a file or a packet-based socket may result
> in unwanted behaviour[0].
>
> [0]: https://metacpan.org/pod/AnyEvent::Handle#DESCRIPTION
>
> Signed-off-by: Stefan Reiter <s.reiter at proxmox.com>
> ---
> PVE/APIServer/AnyEvent.pm | 97 +++++++++++++++++++++++++++++++++++++--
> 1 file changed, 93 insertions(+), 4 deletions(-)
>
> diff --git a/PVE/APIServer/AnyEvent.pm b/PVE/APIServer/AnyEvent.pm
> index 60a2a1c..643ae88 100644
> --- a/PVE/APIServer/AnyEvent.pm
> +++ b/PVE/APIServer/AnyEvent.pm
> @@ -189,7 +189,7 @@ sub finish_response {
> }
>
> sub response {
> - my ($self, $reqstate, $resp, $mtime, $nocomp, $delay) = @_;
> + my ($self, $reqstate, $resp, $mtime, $nocomp, $delay, $stream_fh) = @_;
>
> #print "$$: send response: " . Dumper($resp);
>
> @@ -231,7 +231,7 @@ sub response {
> $resp->header('Server' => "pve-api-daemon/3.0");
>
> my $content_length;
> - if ($content) {
> + if ($content && !$stream_fh) {
>
> $content_length = length($content);
>
> @@ -258,11 +258,93 @@ sub response {
> #print "SEND(without content) $res\n" if $self->{debug};
>
> $res .= "\015\012";
> - $res .= $content if $content;
> + $res .= $content if $content && !$stream_fh;
>
> $self->log_request($reqstate, $reqstate->{request});
>
> - if ($delay && $delay > 0) {
> + if ($stream_fh) {
nit the code inside these braces might be worthy of becoming its own sub?
> + # write headers and preamble
> + $reqstate->{hdl}->push_write($res);
> +
> + # then attach an AnyEvent::Handle to pass through the data
> + my $buf_size = 4*1024*1024;
> +
> + my $on_read;
> + $on_read = sub {
> + my ($hdl) = @_;
> + my $reqhdl = $reqstate->{hdl};
> + return if !$reqhdl;
> +
> + my $wbuf_len = length($reqhdl->{wbuf});
> + my $rbuf_len = length($hdl->{rbuf});
> + # TODO: Take into account $reqhdl->{wbuf_max} ? Right now
> + # that's unbounded, so just assume $buf_size
> + my $to_read = $buf_size - $wbuf_len;
> + $to_read = $rbuf_len if $rbuf_len < $to_read;
> + if ($to_read > 0) {
> + my $data = substr($hdl->{rbuf}, 0, $to_read, '');
> + $reqhdl->push_write($data);
> + $rbuf_len -= $to_read;
> + } elsif ($hdl->{_eof}) {
> + # workaround: AnyEvent gives us a fake EPIPE if we don't consume
> + # any data when called at EOF, so unregister ourselves - data is
> + # flushed by on_eof anyway
> + # see: https://sources.debian.org/src/libanyevent-perl/7.170-2/lib/AnyEvent/Handle.pm/#L1329
> + $hdl->on_read();
> + return;
> + }
> +
> + # apply backpressure so we don't accept any more data into
> + # buffer if the client isn't downloading fast enough
> + # note: read_size can double upon read, and we also need to
> + # account for one more read after start_read, so *4
> + if ($rbuf_len + $hdl->{read_size}*4 > $buf_size) {
> + # stop reading
stop reading until write buffer is empty
> + $hdl->on_read();
> + my $prev_on_drain = $reqhdl->{on_drain};
> + $reqhdl->on_drain(sub {
> + my ($wrhdl) = @_;
> + # write buffer is empty, continue reading
on_drain was called because write_buffer is empty, start reading again
> + $hdl->on_read($on_read);
> + if ($prev_on_drain) {
> + $wrhdl->on_drain($prev_on_drain);
> + $prev_on_drain->($wrhdl);
> + }
> + });
> + }
> + };
> +
> + $reqstate->{proxyhdl} = AnyEvent::Handle->new(
> + fh => $stream_fh,
> + rbuf_max => $buf_size,
> + timeout => 0,
> + on_read => $on_read,
> + on_eof => sub {
> + my ($hdl) = @_;
> + eval {
> + if (my $reqhdl = $reqstate->{hdl}) {
> + $self->log_aborted_request($reqstate);
> + # write out any remaining data
> + $reqhdl->push_write($hdl->{rbuf}) if length($hdl->{rbuf}) > 0;
> + $hdl->{rbuf} = "";
> + $reqhdl->push_shutdown();
> + $self->finish_response($reqstate);
> + }
> + };
> + if (my $err = $@) { syslog('err', "$err"); }
> + $on_read = undef;
> + },
> + on_error => sub {
> + my ($hdl, $fatal, $message) = @_;
> + eval {
> + $self->log_aborted_request($reqstate, $message);
> + $self->client_do_disconnect($reqstate);
> + };
> + if (my $err = $@) { syslog('err', "$err"); }
> + $on_read = undef;
> + },
> + );
> + } elsif ($delay && $delay > 0) {
> my $w; $w = AnyEvent->timer(after => $delay, cb => sub {
> undef $w; # delete reference
> $reqstate->{hdl}->push_write($res);
> @@ -322,6 +404,13 @@ sub send_file_start {
> if (ref($download) eq 'HASH') {
> $fh = $download->{fh};
> $mime = $download->{'content-type'};
> +
> + if ($download->{stream}) {
> + my $header = HTTP::Headers->new(Content_Type => $mime);
> + my $resp = HTTP::Response->new(200, "OK", $header);
> + $self->response($reqstate, $resp, undef, 1, 0, $fh);
> + return;
> + }
> } else {
> my $filename = $download;
> $fh = IO::File->new($filename, '<') ||
> --
> 2.20.1
>
>
>
> _______________________________________________
> pve-devel mailing list
> pve-devel at lists.proxmox.com
> https://lists.proxmox.com/cgi-bin/mailman/listinfo/pve-devel
>
>
>
More information about the pve-devel
mailing list