From de21d4efdcae6ee21be096dc3b6fb21aaf654277 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Sat, 13 Nov 2021 17:19:07 +0100 Subject: [PATCH] implement rate limiter in shared memory This kind of rate limiter can be used among several processes (as long as all set the same rate/burst). --- Cargo.toml | 1 + src/cached_traffic_control.rs | 30 ++++++++-- src/lib.rs | 4 ++ src/shared_rate_limiter.rs | 110 ++++++++++++++++++++++++++++++++++ 4 files changed, 141 insertions(+), 4 deletions(-) create mode 100644 src/shared_rate_limiter.rs diff --git a/Cargo.toml b/Cargo.toml index 7d9644ac..e103f42e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,7 @@ proxmox-section-config = "1" proxmox-tfa = { version = "1", features = [ "u2f" ] } proxmox-time = "1" proxmox-uuid = "1" +proxmox-shared-memory = "0.1.0" proxmox-acme-rs = "0.3" proxmox-apt = "0.8.0" diff --git a/src/cached_traffic_control.rs b/src/cached_traffic_control.rs index deb6e234..32c1d0c8 100644 --- a/src/cached_traffic_control.rs +++ b/src/cached_traffic_control.rs @@ -16,6 +16,8 @@ use pbs_api_types::TrafficControlRule; use pbs_config::ConfigVersionCache; +use super::SharedRateLimiter; + struct ParsedTcRule { config: TrafficControlRule, // original rule config networks: Vec, // parsed networks @@ -23,6 +25,7 @@ struct ParsedTcRule { } pub struct TrafficControlCache { + use_shared_memory: bool, last_update: i64, last_traffic_control_generation: usize, rules: Vec, @@ -85,17 +88,24 @@ fn cannonical_ip(ip: IpAddr) -> IpAddr { } fn create_limiter( + use_shared_memory: bool, + name: &str, rate: u64, burst: u64, - _direction: bool, // false => in, true => out ) -> Result, Error> { - Ok(Arc::new(Mutex::new(RateLimiter::new(rate, burst)))) + if use_shared_memory { + let limiter = SharedRateLimiter::mmap_shmem(name, rate, burst)?; + Ok(Arc::new(limiter)) + } else { + Ok(Arc::new(Mutex::new(RateLimiter::new(rate, burst)))) + } } impl TrafficControlCache { pub fn new() -> Self { Self { + use_shared_memory: true, rules: Vec::new(), limiter_map: HashMap::new(), last_traffic_control_generation: 0, @@ -162,7 +172,13 @@ impl TrafficControlCache { } None => { if let Some(rate_in) = rule.rate_in { - let limiter = create_limiter(rate_in, rule.burst_in.unwrap_or(rate_in), false)?; + let name = format!("{}.in", rule.name); + let limiter = create_limiter( + self.use_shared_memory, + &name, + rate_in, + rule.burst_in.unwrap_or(rate_in), + )?; entry.0 = Some(limiter); } } @@ -179,7 +195,13 @@ impl TrafficControlCache { } None => { if let Some(rate_out) = rule.rate_out { - let limiter = create_limiter(rate_out, rule.burst_out.unwrap_or(rate_out), true)?; + let name = format!("{}.out", rule.name); + let limiter = create_limiter( + self.use_shared_memory, + &name, + rate_out, + rule.burst_out.unwrap_or(rate_out), + )?; entry.1 = Some(limiter); } } diff --git a/src/lib.rs b/src/lib.rs index 8f5ed245..8e5b4d37 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,9 +33,13 @@ pub mod client_helpers; pub mod rrd_cache; +mod shared_rate_limiter; +pub use shared_rate_limiter::SharedRateLimiter; + mod cached_traffic_control; pub use cached_traffic_control::TrafficControlCache; + /// Get the server's certificate info (from `proxy.pem`). pub fn cert_info() -> Result { CertInfo::from_path(PathBuf::from(configdir!("/proxy.pem"))) diff --git a/src/shared_rate_limiter.rs b/src/shared_rate_limiter.rs new file mode 100644 index 00000000..3c282c85 --- /dev/null +++ b/src/shared_rate_limiter.rs @@ -0,0 +1,110 @@ +use std::path::PathBuf; +use std::mem::MaybeUninit; +use std::time::{Instant, Duration}; + +use anyhow::{bail, Error}; +use nix::sys::stat::Mode; + +use proxmox::tools::fs::{create_path, CreateOptions}; + +use proxmox_http::client::{RateLimit, RateLimiter, ShareableRateLimit}; +use proxmox_shared_memory::{Init, SharedMemory, SharedMutex}; +use proxmox_shared_memory::{check_subtype, initialize_subtype}; + +// openssl::sha::sha256(b"Proxmox Backup SharedRateLimiter v1.0")[0..8]; +pub const PROXMOX_BACKUP_SHARED_RATE_LIMITER_MAGIC_1_0: [u8; 8] = [6, 58, 213, 96, 161, 122, 130, 117]; + +const BASE_PATH: &str = pbs_buildcfg::rundir!("/shmem/tbf"); + +// Wrap RateLimiter, so that we can provide an Init impl +#[repr(C)] +struct WrapLimiter(RateLimiter); + +impl Init for WrapLimiter { + fn initialize(this: &mut MaybeUninit) { + // default does not matter here, because we override later + this.write(WrapLimiter(RateLimiter::new(1_000_000, 1_000_000))); + } +} + +#[repr(C)] +struct SharedRateLimiterData { + magic: [u8; 8], + tbf: SharedMutex, + padding: [u8; 4096 - 120], +} + +impl Init for SharedRateLimiterData { + fn initialize(this: &mut MaybeUninit) { + unsafe { + let me = &mut *this.as_mut_ptr(); + me.magic = PROXMOX_BACKUP_SHARED_RATE_LIMITER_MAGIC_1_0; + initialize_subtype(&mut me.tbf); + } + } + + fn check_type_magic(this: &MaybeUninit) -> Result<(), Error> { + unsafe { + let me = &*this.as_ptr(); + if me.magic != PROXMOX_BACKUP_SHARED_RATE_LIMITER_MAGIC_1_0 { + bail!("SharedRateLimiterData: wrong magic number"); + } + check_subtype(&me.tbf)?; + Ok(()) + } + } +} + +pub struct SharedRateLimiter { + shmem: SharedMemory +} + +impl SharedRateLimiter { + + pub fn mmap_shmem(name: &str, rate: u64, burst: u64) -> Result { + let mut path = PathBuf::from(BASE_PATH); + + let user = pbs_config::backup_user()?; + + let dir_opts = CreateOptions::new() + .perm(Mode::from_bits_truncate(0o770)) + .owner(user.uid) + .group(user.gid); + + create_path( + &path, + Some(dir_opts.clone()), + Some(dir_opts))?; + + path.push(name); + + let file_opts = CreateOptions::new() + .perm(Mode::from_bits_truncate(0o660)) + .owner(user.uid) + .group(user.gid); + + let shmem: SharedMemory = + SharedMemory::open(&path, file_opts)?; + + shmem.data().tbf.lock().0.update_rate(rate, burst); + + Ok(Self { shmem }) + } +} + +impl ShareableRateLimit for SharedRateLimiter { + fn update_rate(&self, rate: u64, bucket_size: u64) { + self.shmem.data().tbf.lock().0 + .update_rate(rate, bucket_size); + } + + fn average_rate(&self, current_time: Instant) -> f64 { + self.shmem.data().tbf.lock().0 + .average_rate(current_time) + } + + fn register_traffic(&self, current_time: Instant, data_len: u64) -> Duration { + self.shmem.data().tbf.lock().0 + .register_traffic(current_time, data_len) + } +}