Skip to content

Commit

Permalink
Implement "single" or "grouped" URI matching
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesmunns committed Aug 29, 2024
1 parent 82ccad5 commit 3f8cff1
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 133 deletions.
5 changes: 4 additions & 1 deletion source/river/assets/test-config.kdl
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ services {
// * Up to the last 2000 URI paths will be remembered
// * Each URI path can make a burst of 20 requests
// * The bucket for each URI will refill at a rate of 5 requests per 1 millisecond
rule kind="uri" pattern="static/.*" \
rule kind="specific-uri" pattern="static/.*" \
max-buckets=2000 tokens-per-bucket=20 refill-qty=5 refill-rate-ms=1

rule kind="any-matching-uri" pattern=r".*\.mp4" \
tokens-per-bucket=50 refill-qty=2 refill-rate-ms=3
}

// Connectors are the "upstream" interfaces that we connect with. We can name as many
Expand Down
4 changes: 2 additions & 2 deletions source/river/src/config/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use pingora::{
use tracing::warn;

use crate::proxy::{
rate_limiting::RaterInstanceConfig,
rate_limiting::AllRateConfig,
request_selector::{null_selector, RequestSelector},
};

Expand Down Expand Up @@ -115,7 +115,7 @@ impl Config {
///
#[derive(Debug, Default, Clone, PartialEq)]
pub struct RateLimitingConfig {
pub(crate) rules: Vec<RaterInstanceConfig>,
pub(crate) rules: Vec<AllRateConfig>,
}

/// Add Path Control Modifiers
Expand Down
70 changes: 46 additions & 24 deletions source/river/src/config/kdl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use crate::{
PathControl, ProxyConfig, SelectionKind, TlsConfig, UpstreamOptions,
},
proxy::{
rate_limiting::{AllRequestKeyKind, RaterConfig, RaterInstanceConfig, RegexShim},
rate_limiting::{
multi::{MultiRaterConfig, MultiRequestKeyKind},
single::{SingleInstanceConfig, SingleRequestKeyKind},
AllRateConfig, RegexShim,
},
request_selector::{
null_selector, source_addr_and_uri_path_selector, uri_path_selector, RequestSelector,
},
Expand Down Expand Up @@ -324,7 +328,7 @@ fn make_rate_limiter(
doc: &KdlDocument,
node: &KdlNode,
args: BTreeMap<&str, &KdlValue>,
) -> miette::Result<RaterInstanceConfig> {
) -> miette::Result<AllRateConfig> {
let take_num = |key: &str| -> miette::Result<usize> {
let Some(val) = args.get(key) else {
return Err(Bad::docspan(format!("Missing key: '{key}'"), doc, node.span()).into());
Expand Down Expand Up @@ -359,39 +363,57 @@ fn make_rate_limiter(

// mandatory/common fields
let kind = take_str("kind")?;
let max_buckets = take_num("max-buckets")?;
let tokens_per_bucket = take_num("tokens-per-bucket")?;
let refill_qty = take_num("refill-qty")?;
let refill_rate_ms = take_num("refill-rate-ms")?;

let rater_cfg = RaterConfig {
threads: threads_per_service,
max_buckets,
let multi_cfg = || -> miette::Result<MultiRaterConfig> {
let max_buckets = take_num("max-buckets")?;
Ok(MultiRaterConfig {
threads: threads_per_service,
max_buckets,
max_tokens_per_bucket: tokens_per_bucket,
refill_interval_millis: refill_rate_ms,
refill_qty,
})
};

let single_cfg = || SingleInstanceConfig {
max_tokens_per_bucket: tokens_per_bucket,
refill_interval_millis: refill_rate_ms,
refill_qty,
};

let regex_pattern = || -> miette::Result<RegexShim> {
let pattern = take_str("pattern")?;
let Ok(pattern) = RegexShim::new(pattern) else {
return Err(Bad::docspan(
format!("'{pattern} should be a valid regular expression"),
doc,
node.span(),
)
.into());
};
Ok(pattern)
};

match kind {
"source-ip" => Ok(RaterInstanceConfig {
rater_cfg,
kind: AllRequestKeyKind::SourceIp,
"source-ip" => Ok(AllRateConfig::Multi {
kind: MultiRequestKeyKind::SourceIp,
config: multi_cfg()?,
}),
"specific-uri" => Ok(AllRateConfig::Multi {
kind: MultiRequestKeyKind::Uri {
pattern: regex_pattern()?,
},
config: multi_cfg()?,
}),
"any-matching-uri" => Ok(AllRateConfig::Single {
kind: SingleRequestKeyKind::UriGroup {
pattern: regex_pattern()?,
},
config: single_cfg(),
}),
"uri" => {
let pattern = take_str("pattern")?;
let Ok(pattern) = RegexShim::new(pattern) else {
return Err(Bad::docspan(
format!("'{pattern} should be a valid regular expression"),
doc,
node.span(),
)
.into());
};
Ok(RaterInstanceConfig {
rater_cfg,
kind: AllRequestKeyKind::Uri { pattern },
})
}
other => Err(Bad::docspan(
format!("'{other} is not a known kind of rate limiting"),
doc,
Expand Down
25 changes: 17 additions & 8 deletions source/river/src/config/kdl/test.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use std::{collections::BTreeMap, net::SocketAddr};

use pingora::upstreams::peer::HttpPeer;
use regex::Regex;

use crate::{
config::internal::{
FileServerConfig, ListenerConfig, ListenerKind, ProxyConfig, UpstreamOptions,
},
proxy::{
rate_limiting::{RaterConfig, RaterInstanceConfig, RegexShim},
rate_limiting::{multi::MultiRaterConfig, AllRateConfig, RegexShim},
request_selector::uri_path_selector,
},
};
Expand Down Expand Up @@ -93,28 +92,38 @@ fn load_test() {
},
rate_limiting: crate::config::internal::RateLimitingConfig {
rules: vec![
RaterInstanceConfig {
rater_cfg: RaterConfig {
AllRateConfig::Multi {
config: MultiRaterConfig {
threads: 8,
max_buckets: 4000,
max_tokens_per_bucket: 10,
refill_interval_millis: 10,
refill_qty: 1,
},
kind: crate::proxy::rate_limiting::RequestKeyKind::SourceIp,
kind: crate::proxy::rate_limiting::multi::MultiRequestKeyKind::SourceIp,
},
RaterInstanceConfig {
rater_cfg: RaterConfig {
AllRateConfig::Multi {
config: MultiRaterConfig {
threads: 8,
max_buckets: 2000,
max_tokens_per_bucket: 20,
refill_interval_millis: 1,
refill_qty: 5,
},
kind: crate::proxy::rate_limiting::RequestKeyKind::Uri {
kind: crate::proxy::rate_limiting::multi::MultiRequestKeyKind::Uri {
pattern: RegexShim::new("static/.*").unwrap(),
},
},
AllRateConfig::Single {
config: crate::proxy::rate_limiting::single::SingleInstanceConfig {
max_tokens_per_bucket: 50,
refill_interval_millis: 3,
refill_qty: 2,
},
kind: crate::proxy::rate_limiting::single::SingleRequestKeyKind::UriGroup {
pattern: RegexShim::new(r".*\.mp4").unwrap(),
},
},
],
},
},
Expand Down
91 changes: 57 additions & 34 deletions source/river/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ use crate::{
};

use self::{
rate_limiting::{
multi::{MultiRequestKeyKind, Rater, RaterInstance},
AllRequestKeyKind,
},
rate_limiting::{multi::MultiRaterInstance, single::SingleInstance, Outcome},
request_filters::RequestFilterMod,
};

Expand All @@ -45,7 +42,8 @@ pub mod request_selector;
pub mod response_modifiers;

pub struct RateLimiters {
request_filter_stage: Vec<RaterInstance>,
request_filter_stage_multi: Vec<MultiRaterInstance>,
request_filter_stage_single: Vec<SingleInstance>,
}

/// The [RiverProxyService] is intended to capture the behaviors used to extend
Expand Down Expand Up @@ -112,28 +110,21 @@ where
.expect("static should not error");
// end of TODO

// NOTE: Using a silly filter map here because we might have rules in the future
// that can't be matched until later stages, such as if we wanted rate limiting
// buckets for each upstream that we choose - I want the match statement in this
// to no longer compile when we add more RequestKeyKinds.
#[allow(clippy::unnecessary_filter_map)]
let request_filter_stage = conf
.rate_limiting
.rules
.iter()
.filter_map(|cfg| match &cfg.kind {
AllRequestKeyKind::SourceIp => Some(RaterInstance {
rater: Rater::new(cfg.rater_cfg.clone()),
kind: MultiRequestKeyKind::SourceIp,
}),
AllRequestKeyKind::Uri { pattern } => Some(RaterInstance {
rater: Rater::new(cfg.rater_cfg.clone()),
kind: MultiRequestKeyKind::Uri {
pattern: pattern.clone(),
},
}),
})
.collect();
let mut request_filter_stage_multi = vec![];
let mut request_filter_stage_single = vec![];

for rule in conf.rate_limiting.rules {
match rule {
rate_limiting::AllRateConfig::Single { kind, config } => {
let rater = SingleInstance::new(config, kind);
request_filter_stage_single.push(rater);
}
rate_limiting::AllRateConfig::Multi { kind, config } => {
let rater = MultiRaterInstance::new(config, kind);
request_filter_stage_multi.push(rater);
}
}
}

let mut my_proxy = pingora_proxy::http_proxy_service_with_name(
&server.configuration,
Expand All @@ -142,7 +133,8 @@ where
load_balancer: upstreams,
request_selector: conf.upstream_options.selector,
rate_limiters: RateLimiters {
request_filter_stage,
request_filter_stage_multi,
request_filter_stage_single,
},
},
&conf.name,
Expand Down Expand Up @@ -280,9 +272,40 @@ where
where
Self::CTX: Send + Sync,
{
// First: do rate limiting at this stage - quickly check if there are none and skip over
// entirely if so
if !self.rate_limiters.request_filter_stage.is_empty() {
let multis = self
.rate_limiters
.request_filter_stage_multi
.iter()
.filter_map(|l| l.get_ticket(session));

let singles = self
.rate_limiters
.request_filter_stage_single
.iter()
.filter_map(|l| l.get_ticket(session));

// Attempt to get all tokens
//
// TODO: If https://github.com/udoprog/leaky-bucket/issues/17 is resolved we could
// remember the buckets that we did get approved for, and "return" the unused tokens.
//
// For now, if some tickets succeed but subsequent tickets fail, the preceeding
// approved tokens are just "burned".
//
// TODO: If https://github.com/udoprog/leaky-bucket/issues/34 is resolved we could
// support a "max debt" number, allowing us to delay if acquisition of the token
// would happen soon-ish, instead of immediately 429-ing if the token we need is
// about to become available.
if singles
.chain(multis)
.any(|t| t.now_or_never() == Outcome::Declined)
{
tracing::trace!("Rejecting due to rate limiting failure");
session.downstream_session.respond_error(429).await;
return Ok(true);
}

if !self.rate_limiters.request_filter_stage_single.is_empty() {
// Attempt to get all tokens
//
// TODO: If https://github.com/udoprog/leaky-bucket/issues/17 is resolved we could
Expand All @@ -295,13 +318,13 @@ where
// support a "max debt" number, allowing us to delay if acquisition of the token
// would happen soon-ish, instead of immediately 429-ing if the token we need is
// about to become available.
for limiter in self.rate_limiters.request_filter_stage.iter() {
for limiter in self.rate_limiters.request_filter_stage_single.iter() {
if let Some(ticket) = limiter.get_ticket(session) {
match ticket.now_or_never() {
rate_limiting::Outcome::Approved => {
Outcome::Approved => {
// Approved, move on
}
rate_limiting::Outcome::Declined => {
Outcome::Declined => {
tracing::trace!("Rejecting due to rate limiting failure");
session.downstream_session.respond_error(429).await;
return Ok(true);
Expand Down
Loading

0 comments on commit 3f8cff1

Please sign in to comment.