mirror of
git://git.proxmox.com/git/pve-http-server.git
synced 2025-08-02 16:21:56 +03:00
support streaming data form fh to client
Use an explicit AnyEvent::Handle similar to websocket proxying. Needs some special care to make sure we apply backpressure correctly to avoid caching too much data. Note that because of AnyEvent restrictions, specifying a "fh" to point to a file or a packet-based socket may result in unwanted behaviour[0]. [0]: https://metacpan.org/pod/AnyEvent::Handle#DESCRIPTION Signed-off-by: Stefan Reiter <s.reiter@proxmox.com>
This commit is contained in:
committed by
Thomas Lamprecht
parent
9afe1e89ea
commit
389ad881f9
@ -188,8 +188,93 @@ sub finish_response {
|
||||
}
|
||||
}
|
||||
|
||||
sub response_stream {
|
||||
my ($self, $reqstate, $stream_fh) = @_;
|
||||
|
||||
# disable timeout, we don't know how big the data is
|
||||
$reqstate->{hdl}->timeout(0);
|
||||
|
||||
my $buf_size = 4*1024*1024;
|
||||
|
||||
my $on_read;
|
||||
$on_read = sub {
|
||||
my ($hdl) = @_;
|
||||
my $reqhdl = $reqstate->{hdl};
|
||||
return if !$reqhdl;
|
||||
|
||||
my $wbuf_len = length($reqhdl->{wbuf});
|
||||
my $rbuf_len = length($hdl->{rbuf});
|
||||
# TODO: Take into account $reqhdl->{wbuf_max} ? Right now
|
||||
# that's unbounded, so just assume $buf_size
|
||||
my $to_read = $buf_size - $wbuf_len;
|
||||
$to_read = $rbuf_len if $rbuf_len < $to_read;
|
||||
if ($to_read > 0) {
|
||||
my $data = substr($hdl->{rbuf}, 0, $to_read, '');
|
||||
$reqhdl->push_write($data);
|
||||
$rbuf_len -= $to_read;
|
||||
} elsif ($hdl->{_eof}) {
|
||||
# workaround: AnyEvent gives us a fake EPIPE if we don't consume
|
||||
# any data when called at EOF, so unregister ourselves - data is
|
||||
# flushed by on_eof anyway
|
||||
# see: https://sources.debian.org/src/libanyevent-perl/7.170-2/lib/AnyEvent/Handle.pm/#L1329
|
||||
$hdl->on_read();
|
||||
return;
|
||||
}
|
||||
|
||||
# apply backpressure so we don't accept any more data into
|
||||
# buffer if the client isn't downloading fast enough
|
||||
# note: read_size can double upon read, and we also need to
|
||||
# account for one more read after start_read, so *4
|
||||
if ($rbuf_len + $hdl->{read_size}*4 > $buf_size) {
|
||||
# stop reading until write buffer is empty
|
||||
$hdl->on_read();
|
||||
my $prev_on_drain = $reqhdl->{on_drain};
|
||||
$reqhdl->on_drain(sub {
|
||||
my ($wrhdl) = @_;
|
||||
# on_drain called because write buffer is empty, continue reading
|
||||
$hdl->on_read($on_read);
|
||||
if ($prev_on_drain) {
|
||||
$wrhdl->on_drain($prev_on_drain);
|
||||
$prev_on_drain->($wrhdl);
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
$reqstate->{proxyhdl} = AnyEvent::Handle->new(
|
||||
fh => $stream_fh,
|
||||
rbuf_max => $buf_size,
|
||||
timeout => 0,
|
||||
on_read => $on_read,
|
||||
on_eof => sub {
|
||||
my ($hdl) = @_;
|
||||
eval {
|
||||
if (my $reqhdl = $reqstate->{hdl}) {
|
||||
$self->log_aborted_request($reqstate);
|
||||
# write out any remaining data
|
||||
$reqhdl->push_write($hdl->{rbuf}) if length($hdl->{rbuf}) > 0;
|
||||
$hdl->{rbuf} = "";
|
||||
$reqhdl->push_shutdown();
|
||||
$self->finish_response($reqstate);
|
||||
}
|
||||
};
|
||||
if (my $err = $@) { syslog('err', "$err"); }
|
||||
$on_read = undef;
|
||||
},
|
||||
on_error => sub {
|
||||
my ($hdl, $fatal, $message) = @_;
|
||||
eval {
|
||||
$self->log_aborted_request($reqstate, $message);
|
||||
$self->client_do_disconnect($reqstate);
|
||||
};
|
||||
if (my $err = $@) { syslog('err', "$err"); }
|
||||
$on_read = undef;
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
sub response {
|
||||
my ($self, $reqstate, $resp, $mtime, $nocomp, $delay) = @_;
|
||||
my ($self, $reqstate, $resp, $mtime, $nocomp, $delay, $stream_fh) = @_;
|
||||
|
||||
#print "$$: send response: " . Dumper($resp);
|
||||
|
||||
@ -231,7 +316,7 @@ sub response {
|
||||
$resp->header('Server' => "pve-api-daemon/3.0");
|
||||
|
||||
my $content_length;
|
||||
if ($content) {
|
||||
if ($content && !$stream_fh) {
|
||||
|
||||
$content_length = length($content);
|
||||
|
||||
@ -258,11 +343,16 @@ sub response {
|
||||
#print "SEND(without content) $res\n" if $self->{debug};
|
||||
|
||||
$res .= "\015\012";
|
||||
$res .= $content if $content;
|
||||
$res .= $content if $content && !$stream_fh;
|
||||
|
||||
$self->log_request($reqstate, $reqstate->{request});
|
||||
|
||||
if ($delay && $delay > 0) {
|
||||
if ($stream_fh) {
|
||||
# write headers and preamble...
|
||||
$reqstate->{hdl}->push_write($res);
|
||||
# ...then stream data via an AnyEvent::Handle
|
||||
$self->response_stream($reqstate, $stream_fh);
|
||||
} elsif ($delay && $delay > 0) {
|
||||
my $w; $w = AnyEvent->timer(after => $delay, cb => sub {
|
||||
undef $w; # delete reference
|
||||
$reqstate->{hdl}->push_write($res);
|
||||
@ -322,6 +412,13 @@ sub send_file_start {
|
||||
if (ref($download) eq 'HASH') {
|
||||
$fh = $download->{fh};
|
||||
$mime = $download->{'content-type'};
|
||||
|
||||
if ($download->{stream}) {
|
||||
my $header = HTTP::Headers->new(Content_Type => $mime);
|
||||
my $resp = HTTP::Response->new(200, "OK", $header);
|
||||
$self->response($reqstate, $resp, undef, 1, 0, $fh);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
my $filename = $download;
|
||||
$fh = IO::File->new($filename, '<') ||
|
||||
|
Reference in New Issue
Block a user