Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: add tokio implementation for pelikan_segcache_rs #135

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ switchboard = "0.2.1"
syn = "2.0.38"
thiserror = "1.0.49"
tiny_http = "0.12.0"
tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread"] }
toml = "0.8.2"
twox-hash = { version = "1.6.3", default-features = false }
urlencoding = "2.1.3"
Expand Down
4 changes: 3 additions & 1 deletion config/segcache.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
daemonize = false
[general]
# choose between 'mio' and 'tokio'
engine = "mio"

[admin]
# interfaces listening on
Expand Down
2 changes: 1 addition & 1 deletion src/server/pingserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ rustls-native-certs = "0.8.0"
serde = { workspace = true, features = ["derive"] }
server = { path = "../../core/server" }
session = { path = "../../session" }
tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
toml = { workspace = true }
tonic = { version = "0.12.2" }
warp = "0.3.7"
Expand Down
28 changes: 8 additions & 20 deletions src/server/segcache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,11 @@ homepage = { workspace = true }
repository = { workspace = true }
license = { workspace = true }

[lib]
name = "pelikan_segcache_rs"
path = "src/lib.rs"
doc = true

[[bin]]
name = "pelikan_segcache_rs"
path = "src/main.rs"
doc = false

[[test]]
name = "integration"
path = "tests/integration.rs"
harness = false

[[test]]
name = "integration_multi"
path = "tests/integration_multi.rs"
harness = false

[[bench]]
name = "benchmark"
path = "benches/benchmark.rs"
harness = false

[features]
debug = ["entrystore/debug"]

Expand All @@ -43,10 +23,18 @@ clap = { workspace = true }
common = { path = "../../common" }
config = { path = "../../config" }
entrystore = { path = "../../entrystore" }
humantime = "2.1.0"
logger = { path = "../../logger" }
metriken = { workspace = true }
parking_lot = "0.12.3"
protocol-common = { path = "../../protocol/common" }
protocol-memcache = { path = "../../protocol/memcache" }
serde = { workspace = true, features = ["derive"] }
server = { path = "../../core/server", features = ["boringssl"] }
session = { path = "../../session" }
tokio = { workspace = true, features = ["full"] }
toml = { workspace = true }
warp = "0.3.7"

[dev-dependencies]
criterion = "0.5.1"
96 changes: 0 additions & 96 deletions src/server/segcache/benches/benchmark.rs

This file was deleted.

195 changes: 195 additions & 0 deletions src/server/segcache/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use config::*;

use serde::{Deserialize, Serialize};

use std::io::Read;
use std::net::SocketAddr;
use std::time::Duration;

#[derive(Serialize, Deserialize, Debug, Default)]
pub struct Config {
#[serde(default)]
pub general: General,
#[serde(default)]
pub metrics: Metrics,

// application modules
#[serde(default)]
pub admin: Admin,
#[serde(default)]
pub server: Server,
#[serde(default)]
pub worker: Worker,
#[serde(default)]
pub time: Time,
#[serde(default)]
pub tls: Tls,
#[serde(default)]
pub seg: Seg,

// ccommon
#[serde(default)]
pub buf: Buf,
#[serde(default)]
pub debug: Debug,
#[serde(default)]
pub klog: Klog,
#[serde(default)]
pub sockio: Sockio,
#[serde(default)]
pub tcp: Tcp,
}

#[derive(Serialize, Deserialize, Debug, Default)]
pub struct General {
pub engine: Engine,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Metrics {
#[serde(default = "interval")]
pub interval: String,
}

impl Default for Metrics {
fn default() -> Self {
Self {
interval: interval(),
}
}
}

impl Metrics {
pub fn interval(&self) -> Duration {
self.interval.parse::<humantime::Duration>().unwrap().into()
}
}

fn interval() -> String {
"1s".into()
}

#[derive(Serialize, Deserialize, Debug, Default, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum Engine {
#[default]
Mio,
Tokio,
}

impl Config {
pub fn load(file: &str) -> Result<Self, std::io::Error> {
let mut file = std::fs::File::open(file)?;
let mut content = String::new();
file.read_to_string(&mut content)?;

let config: Config = match toml::from_str(&content) {
Ok(t) => t,
Err(e) => {
error!("{}", e);
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Error parsing config",
));
}
};

match config.metrics.interval.parse::<humantime::Duration>() {
Ok(interval) => {
if Into::<Duration>::into(interval) < Duration::from_millis(10) {
eprintln!("metrics interval cannot be less than 10ms");
std::process::exit(1);
}
}
Err(e) => {
eprintln!("metrics interval is not valid: {e}");
std::process::exit(1);
}
}

Ok(config)
}

pub fn listen(&self) -> SocketAddr {
self.server
.socket_addr()
.map_err(|e| {
error!("{}", e);
std::io::Error::new(std::io::ErrorKind::Other, "Bad listen address")
})
.map_err(|_| {
std::process::exit(1);
})
.unwrap()
}
}

impl AdminConfig for Config {
fn admin(&self) -> &Admin {
&self.admin
}
}

impl BufConfig for Config {
fn buf(&self) -> &Buf {
&self.buf
}
}

impl DebugConfig for Config {
fn debug(&self) -> &Debug {
&self.debug
}
}

impl KlogConfig for Config {
fn klog(&self) -> &Klog {
&self.klog
}
}

impl SegConfig for Config {
fn seg(&self) -> &Seg {
&self.seg
}
}

impl ServerConfig for Config {
fn server(&self) -> &Server {
&self.server
}
}

impl SockioConfig for Config {
fn sockio(&self) -> &Sockio {
&self.sockio
}
}

impl TcpConfig for Config {
fn tcp(&self) -> &Tcp {
&self.tcp
}
}

impl TimeConfig for Config {
fn time(&self) -> &Time {
&self.time
}
}

impl TlsConfig for Config {
fn tls(&self) -> &Tls {
&self.tls
}
}

impl WorkerConfig for Config {
fn worker(&self) -> &Worker {
&self.worker
}

fn worker_mut(&mut self) -> &mut Worker {
&mut self.worker
}
}
Loading
Loading