proxmox-http: impl RateLimiterVec

Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
This commit is contained in:
Dietmar Maurer 2021-11-15 08:17:21 +01:00
parent c5d396cdb9
commit 81e959548b
2 changed files with 121 additions and 43 deletions

View File

@ -3,7 +3,7 @@
//! Contains a lightweight wrapper around `hyper` with support for TLS connections.
mod rate_limiter;
pub use rate_limiter::{RateLimit, RateLimiter, ShareableRateLimit};
pub use rate_limiter::{RateLimit, RateLimiter, RateLimiterVec, ShareableRateLimit};
mod rate_limited_stream;
pub use rate_limited_stream::RateLimitedStream;

View File

@ -1,6 +1,8 @@
use std::time::{Duration, Instant};
use std::convert::TryInto;
use anyhow::{bail, Error};
/// Rate limiter interface.
pub trait RateLimit {
/// Update rate and bucket size
@ -23,23 +25,68 @@ pub trait ShareableRateLimit: Send + Sync {
fn register_traffic(&self, current_time: Instant, data_len: u64) -> Duration;
}
/// Token bucket based rate limiter
///
/// IMPORTANT: We use this struct in shared memory, so please do not
/// change/modify the layout (do not add fields)
#[repr(C)]
pub struct RateLimiter {
rate: u64, // tokens/second
bucket_size: u64, // TBF bucket size
#[derive(Clone)]
#[repr(C)]
struct TbfState {
traffic: u64, // overall traffic
last_update: Instant,
consumed_tokens: u64,
}
impl RateLimiter {
impl TbfState {
const NO_DELAY: Duration = Duration::from_millis(0);
fn refill_bucket(&mut self, rate: u64, current_time: Instant) {
let time_diff = match current_time.checked_duration_since(self.last_update) {
Some(duration) => duration.as_nanos(),
None => return,
};
if time_diff == 0 { return; }
self.last_update = current_time;
let allowed_traffic = ((time_diff.saturating_mul(rate as u128)) / 1_000_000_000)
.try_into().unwrap_or(u64::MAX);
self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
}
fn register_traffic(
&mut self,
rate: u64,
bucket_size: u64,
current_time: Instant,
data_len: u64,
) -> Duration {
self.refill_bucket(rate, current_time);
self.traffic += data_len;
self.consumed_tokens += data_len;
if self.consumed_tokens <= bucket_size {
return Self::NO_DELAY;
}
Duration::from_nanos((self.consumed_tokens - bucket_size).saturating_mul(1_000_000_000)/rate)
}
}
/// Token bucket based rate limiter
///
/// IMPORTANT: We use this struct in shared memory, so please do not
/// change/modify the layout (do not add fields)
#[repr(C)]
pub struct RateLimiter {
rate: u64, // tokens/second
bucket_size: u64, // TBF bucket size
state: TbfState,
}
impl RateLimiter {
/// Creates a new instance, using [Instant::now] as start time.
pub fn new(rate: u64, bucket_size: u64) -> Self {
let start_time = Instant::now();
@ -50,32 +97,15 @@ impl RateLimiter {
pub fn with_start_time(rate: u64, bucket_size: u64, start_time: Instant) -> Self {
Self {
rate,
traffic: 0,
bucket_size,
last_update: start_time,
// start with empty bucket (all tokens consumed)
consumed_tokens: bucket_size,
state: TbfState {
traffic: 0,
last_update: start_time,
// start with empty bucket (all tokens consumed)
consumed_tokens: bucket_size,
},
}
}
fn refill_bucket(&mut self, current_time: Instant) {
let time_diff = match current_time.checked_duration_since(self.last_update) {
Some(duration) => duration.as_nanos(),
None => {
//log::error!("update_time: got negative time diff");
return;
}
};
if time_diff == 0 { return; }
self.last_update = current_time;
let allowed_traffic = ((time_diff.saturating_mul(self.rate as u128)) / 1_000_000_000)
.try_into().unwrap_or(u64::MAX);
self.consumed_tokens = self.consumed_tokens.saturating_sub(allowed_traffic);
}
}
impl RateLimit for RateLimiter {
@ -83,27 +113,19 @@ impl RateLimit for RateLimiter {
fn update_rate(&mut self, rate: u64, bucket_size: u64) {
self.rate = rate;
if bucket_size < self.bucket_size && self.consumed_tokens > bucket_size {
self.consumed_tokens = bucket_size; // start again
if bucket_size < self.bucket_size && self.state.consumed_tokens > bucket_size {
self.state.consumed_tokens = bucket_size; // start again
}
self.bucket_size = bucket_size;
}
fn traffic(&self) -> u64 {
self.traffic
self.state.traffic
}
fn register_traffic(&mut self, current_time: Instant, data_len: u64) -> Duration {
self.refill_bucket(current_time);
self.traffic += data_len;
self.consumed_tokens += data_len;
if self.consumed_tokens <= self.bucket_size {
return Self::NO_DELAY;
}
Duration::from_nanos((self.consumed_tokens - self.bucket_size).saturating_mul(1_000_000_000)/ self.rate)
self.state.register_traffic(self.rate, self.bucket_size, current_time, data_len)
}
}
@ -121,3 +143,59 @@ impl <R: RateLimit + Send> ShareableRateLimit for std::sync::Mutex<R> {
self.lock().unwrap().register_traffic(current_time, data_len)
}
}
/// Array of rate limiters.
///
/// A group of rate limiters with same configuration.
pub struct RateLimiterVec {
rate: u64, // tokens/second
bucket_size: u64, // TBF bucket size
state: Vec<TbfState>,
}
impl RateLimiterVec {
/// Creates a new instance, using [Instant::now] as start time.
pub fn new(group_size: usize, rate: u64, bucket_size: u64) -> Self {
let start_time = Instant::now();
Self::with_start_time(group_size, rate, bucket_size, start_time)
}
/// Creates a new instance with specified `rate`, `bucket_size` and `start_time`.
pub fn with_start_time(group_size: usize, rate: u64, bucket_size: u64, start_time: Instant) -> Self {
let state = TbfState {
traffic: 0,
last_update: start_time,
// start with empty bucket (all tokens consumed)
consumed_tokens: bucket_size,
};
Self {
rate,
bucket_size,
state: vec![state; group_size],
}
}
/// Return the number of TBF entries (group_size)
pub fn len(&self) -> usize {
self.state.len()
}
/// Traffic for the specified index
pub fn traffic(&self, index: usize) -> Result<u64, Error> {
if index >= self.state.len() {
bail!("RateLimiterVec::traffic - index out of range");
}
Ok(self.state[index].traffic)
}
/// Register traffic at the specified index
pub fn register_traffic(&mut self, index: usize, current_time: Instant, data_len: u64) -> Result<Duration, Error> {
if index >= self.state.len() {
bail!("RateLimiterVec::register_traffic - index out of range");
}
Ok(self.state[index].register_traffic(self.rate, self.bucket_size, current_time, data_len))
}
}