Skip to content

Commit

Permalink
Work on selectors based of feedback from pingora team
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesmunns committed Jun 19, 2024
1 parent fd63fcb commit 713dbdb
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 25 deletions.
2 changes: 1 addition & 1 deletion source/river/assets/test-config.kdl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ services {

connectors {
load-balance {
selection "RoundRobin"
selection "Ketama" key="UriPath"
discovery "Static"
health-check "None"
}
Expand Down
4 changes: 4 additions & 0 deletions source/river/src/config/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use pingora::{
upstreams::peer::HttpPeer,
};

use crate::proxy::request_selector::{null_selector, RequestSelector};

/// River's internal configuration
#[derive(Debug, Clone)]
pub struct Config {
Expand Down Expand Up @@ -104,6 +106,7 @@ pub enum ListenerKind {
#[derive(Debug, PartialEq, Clone)]
pub struct UpstreamOptions {
pub(crate) selection: SelectionKind,
pub(crate) selector: RequestSelector,
pub(crate) health_checks: HealthCheckKind,
pub(crate) discovery: DiscoveryKind,
}
Expand All @@ -112,6 +115,7 @@ impl Default for UpstreamOptions {
fn default() -> Self {
Self {
selection: SelectionKind::RoundRobin,
selector: null_selector,
health_checks: HealthCheckKind::None,
discovery: DiscoveryKind::Static,
}
Expand Down
85 changes: 75 additions & 10 deletions source/river/src/config/kdl.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
use std::{collections::BTreeMap, net::SocketAddr, path::PathBuf};
use std::{
collections::{BTreeMap, HashMap},
net::SocketAddr,
path::PathBuf,
};

use kdl::{KdlDocument, KdlEntry, KdlNode};
use miette::{bail, Diagnostic, SourceSpan};
use pingora::upstreams::peer::HttpPeer;

use crate::config::internal::{DiscoveryKind, HealthCheckKind, SelectionKind};
use crate::{
config::internal::{DiscoveryKind, HealthCheckKind, SelectionKind},
proxy::request_selector::{
null_selector, source_addr_and_uri_path_selector, uri_path_selector, RequestSelector,
},
};

use super::internal::{
Config, ListenerConfig, ListenerKind, PathControl, ProxyConfig, TlsConfig, UpstreamOptions,
Expand Down Expand Up @@ -231,23 +240,46 @@ fn extract_load_balance(doc: &KdlDocument, node: &KdlNode) -> miette::Result<Ups
let mut selection: Option<SelectionKind> = None;
let mut health: Option<HealthCheckKind> = None;
let mut discover: Option<DiscoveryKind> = None;
let mut selector: RequestSelector = null_selector;

for (node, name, args) in items {
match name {
"selection" => {
selection = Some(extract_one_str_arg(
doc,
node,
name,
args,
|val| match val {
let (sel, args) =
extract_one_str_arg_with_kv_args(doc, node, name, args, |val| match val {
"RoundRobin" => Some(SelectionKind::RoundRobin),
"Random" => Some(SelectionKind::Random),
"FNV" => Some(SelectionKind::Fnv),
"Ketama" => Some(SelectionKind::Ketama),
_ => None,
},
)?);
})?;
match sel {
SelectionKind::RoundRobin | SelectionKind::Random => {
// No key required, selection is random
}
SelectionKind::Fnv | SelectionKind::Ketama => {
let sel_ty = args.get("key").or_bail(
format!("selection {sel:?} requires a 'key' argument"),
doc,
node.span(),
)?;

selector = match sel_ty.as_str() {
"UriPath" => uri_path_selector,
"SourceAddrAndUriPath" => source_addr_and_uri_path_selector,
other => {
return Err(Bad::docspan(
format!("Unknown key: '{other}'"),
doc,
node.span(),
)
.into())
}
};
}
}

selection = Some(sel);
}
"health-check" => {
health = Some(extract_one_str_arg(
Expand Down Expand Up @@ -282,6 +314,7 @@ fn extract_load_balance(doc: &KdlDocument, node: &KdlNode) -> miette::Result<Ups
}
Ok(UpstreamOptions {
selection: selection.unwrap_or(SelectionKind::RoundRobin),
selector,
health_checks: health.unwrap_or(HealthCheckKind::None),
discovery: discover.unwrap_or(DiscoveryKind::Static),
})
Expand All @@ -301,6 +334,38 @@ fn extract_one_str_arg<T, F: FnOnce(&str) -> Option<T>>(
.or_bail(format!("Incorrect argument for '{name}'"), doc, node.span())
}

fn extract_one_str_arg_with_kv_args<T, F: FnOnce(&str) -> Option<T>>(
doc: &KdlDocument,
node: &KdlNode,
name: &str,
args: &[KdlEntry],
f: F,
) -> miette::Result<(T, HashMap<String, String>)> {
let (first, rest) =
args.split_first()
.or_bail(format!("Missing arguments for '{name}'"), doc, node.span())?;
let first = first.value().as_string().and_then(f).or_bail(
format!("Incorrect argument for '{name}'"),
doc,
node.span(),
)?;
let mut kvs = HashMap::new();
rest.iter().try_for_each(|arg| -> miette::Result<()> {
let key = arg
.name()
.or_bail("Should be a named argument", doc, arg.span())?
.value();
let val = arg
.value()
.as_string()
.or_bail("Should be a string value", doc, arg.span())?;
kvs.insert(key.to_string(), val.to_string());
Ok(())
})?;

Ok((first, kvs))
}

/// Extracts a single connector from the `connectors` section
fn extract_connector(
doc: &KdlDocument,
Expand Down
21 changes: 13 additions & 8 deletions source/river/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,9 @@ impl Modifiers {
}

/// Per-peer context. Not currently used
pub struct RiverContext {}
pub struct RiverContext {
selector_buf: Vec<u8>,
}

#[async_trait]
impl<BS> ProxyHttp for RiverProxyService<BS>
Expand All @@ -223,7 +225,9 @@ where
type CTX = RiverContext;

fn new_ctx(&self) -> Self::CTX {
RiverContext {}
RiverContext {
selector_buf: Vec::new(),
}
}

/// Handle the "upstream peer" phase, where we pick which upstream to proxy to.
Expand All @@ -235,13 +239,14 @@ where
session: &mut Session,
ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
let mut buf = [0u8; 64];
let hash = (self.request_selector)(&mut buf, ctx, session);
let key = (self.request_selector)(ctx, session);

let backend = self.load_balancer.select(key, 256);

// Manually clear the selector buf to avoid accidental leaks
ctx.selector_buf.clear();

let backend = self
.load_balancer
.select(hash, 256)
.ok_or_else(|| pingora::Error::new_str("oops"))?;
let backend = backend.ok_or_else(|| pingora::Error::new_str("oops"))?;

// For now, we only support one upstream
Ok(Box::new(HttpPeer::new(backend, true, "wrong".to_string())))
Expand Down
47 changes: 41 additions & 6 deletions source/river/src/proxy/request_selector.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,48 @@
use std::io::Write;

use pingora_proxy::Session;

use super::RiverContext;

pub type RequestSelector = for<'a> fn(&'a mut [u8], &mut RiverContext, &mut Session) -> &'a [u8];
/// A function used to determine the "key" to use for the selection process.
///
/// The function may choose an existing series of bytes, or may format into
/// the RiverContext.selector_buf field, using `write!` or similar formatting
/// options.
///
/// TODO: Should I just do `Cow<'a, [u8]>` instead of providing a buffer? The intent is
/// to avoid allocations on every select (reusing and growing one instead), but this might
/// have "weird" mem-leaky characteristics
pub type RequestSelector = for<'a> fn(&'a mut RiverContext, &'a mut Session) -> &'a [u8];

pub fn null_selector<'a>(
_buf: &'a mut [u8],
_ctxt: &mut RiverContext,
_ses: &mut Session,
) -> &'a [u8] {
/// Null selector, useful when using "Random" or "RoundRobin" selection and this key is not used
///
/// Performs no formatting
pub fn null_selector<'a>(_ctxt: &'a mut RiverContext, _ses: &'a mut Session) -> &'a [u8] {
&[]
}

/// Basic selector that looks at ONLY the URI of the request as the input key
///
/// Peforms no formatting
pub fn uri_path_selector<'a>(_ctxt: &'a mut RiverContext, ses: &'a mut Session) -> &'a [u8] {
ses.req_header().uri.path().as_bytes()
}

/// Selector that uses the source address (if available) and the URI of the request as the input key
///
/// Performs formatting into the selector buf
pub fn source_addr_and_uri_path_selector<'a>(
ctxt: &'a mut RiverContext,
ses: &'a mut Session,
) -> &'a [u8] {
write!(
&mut ctxt.selector_buf,
"{:?}:{}",
ses.as_downstream().client_addr(),
ses.req_header().uri.path(),
)
.expect("Formatting into a Vec<u8> should never fail");

ctxt.selector_buf.as_slice()
}

0 comments on commit 713dbdb

Please sign in to comment.