Skip to content

Commit

Permalink
feat: add generator code (#203)
Browse files Browse the repository at this point in the history
* feature: add generator code

* chore: formatting

* feat: operator to create a separate job for load generation

* feat: add api access for load gen

* fix: command name

* chore: review comment suggestions for better rust coding

* feat: docs

* fix: naming + clenaups

* fix: update cargo

* fix: update before build

* fix: add update before all targets

* chore: review comments

* chore: renaming + updating deps

---------

Co-authored-by: Samika Kashyap <[email protected]>
Co-authored-by: Samika Kashyap <[email protected]>
  • Loading branch information
3 people authored Jul 30, 2024
1 parent 54d1633 commit 24ed857
Show file tree
Hide file tree
Showing 22 changed files with 1,735 additions and 677 deletions.
1,299 changes: 646 additions & 653 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CARGO = CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse RUSTFLAGS="--cfg tokio_unstab
all: build check-fmt check-clippy test

.PHONY: test
test:
test: update
# Test with default features
${CARGO} test --locked
# Test with all features
Expand All @@ -16,7 +16,11 @@ test:
check-fmt:
cargo fmt --all -- --check

.PHONY: check-clippy
.PHONY: update
update:
${CARGO} update

.PHONY: check-clippy
check-clippy:
# Check with default features
${CARGO} clippy --workspace
Expand Down
2 changes: 1 addition & 1 deletion k8s/operator/manifests/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ rules:
resources: ["clusterroles", "clusterrolebindings"]
verbs: ["create", "get", "patch"]
- apiGroups: ["keramik.3box.io"]
resources: ["networks", "networks/status", "simulations", "simulations/status"]
resources: ["networks", "networks/status", "simulations", "simulations/status", "loadgenerators", "loadgenerators/status"]
verbs: ["get", "list", "watch", "patch", "delete"]
- apiGroups: ["monitoring.coreos.com"]
resources: ["podmonitors"]
Expand Down
18 changes: 18 additions & 0 deletions keramik/src/developing_runner.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,21 @@ spec:
image: keramik/runner:dev
imagePullPolicy: IfNotPresent
```
## Setup Load Generator with the runner image
```yaml
# Custom load generator
---
apiVersion: "keramik.3box.io/v1alpha1"
kind: LoadGenerator
metadata:
name: load-gen
namespace: keramik-lgen-demo
spec:
scenario: "CreateModelInstancesSynced"
runTime: 3
image: "keramik/runner:dev"
imagePullPolicy: "IfNotPresent"
throttleRequests: 20
tasks: 2
```
43 changes: 43 additions & 0 deletions keramik/src/load_generator.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Load Generation

To run a load generator, you need to create a `LoadGenerator` resource. This resource is similar to a `Simulation` resource. However, the load generation can last for up to a week. They are used to generate sustained load on the system for a longer period of time.

## Parameters

- **`scenario`**: The scenario to run. Supported scenarios are:
- `CreateModelInstancesSynced`: Requires at least two ceramic instances. Creates models on one node and has the other node sync them.
- **`runTime`**: The duration to run the load generator, in hours.
- **`image`**: The image to use for the load generator. This is the same as the `image` in the `Simulation` resource.
- **`throttleRequests`**: WIP, not ready yet. The number of requests to send per second. This is the same as the `throttleRequests` in the `Simulation` resource.
- **`tasks`**: The number of tasks to run. Increasing the number of tasks will increase the load on the node. A value of 2 generates a steady load of 20 requests per second. Values between 2-100 are recommended. Keep in mind the increase of tasks to throughput is non-linear. A value of 100 generates what we consider high load, which is 200 TPS.

## Sample configuration

```yaml
apiVersion: "keramik.3box.io/v1alpha1"
kind: LoadGenerator
metadata:
name: load-gen
# Namespace of the network you wish to run against
namespace: keramik-<unique-name>-small
spec:
scenario: CreateModelInstancesSynced
runTime: 3
image: "keramik/runner:dev"
throttleRequests: 20
tasks: 2
```
If you want to run this against a defined network, set the namespace to the same as the network. In this example, the namespace is set to the same network applied when [the network was set up](./setup_network.md).
The load generator will automatically stop once the `runTime` is up. You should be able to see some success and error metrics at the end of the run. To see the metrics, you can use the `kubectl` command to get the logs of the load generator:


```shell
kubectl logs load-gen-<unique-string-for-each-run> -n keramik-<unique-name>-small
```
You can get the name of the load-gen pod by running:

```shell
kubectl get pods -n keramik-<unique-name>-small
```
6 changes: 4 additions & 2 deletions operator/src/crdgen.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use kube::CustomResourceExt;

use keramik_operator::lgen::spec::LoadGenerator;
use keramik_operator::network::Network;
use keramik_operator::simulation::Simulation;
use kube::CustomResourceExt;

fn main() {
print!("{}", serde_yaml::to_string(&Network::crd()).unwrap());
println!("---");
print!("{}", serde_yaml::to_string(&Simulation::crd()).unwrap());
println!("---");
print!("{}", serde_yaml::to_string(&LoadGenerator::crd()).unwrap());
}
200 changes: 200 additions & 0 deletions operator/src/lgen/controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use std::{sync::Arc, time::Duration};

use futures::stream::StreamExt;
use k8s_openapi::api::batch::v1::Job;
use kube::{
api::{Patch, PatchParams},
client::Client,
core::object::HasSpec,
runtime::Controller,
Api,
};
use kube::{
runtime::{
controller::Action,
watcher::{self, Config},
},
Resource, ResourceExt,
};
use opentelemetry::{global, KeyValue};
use rand::{distributions::Alphanumeric, thread_rng, Rng, RngCore};

use tracing::{debug, error, info};

use crate::{
labels::MANAGED_BY_LABEL_SELECTOR,
lgen::{
job::{job_spec, JobConfig, JobImageConfig},
spec::{LoadGenerator, LoadGeneratorState},
},
simulation::controller::monitoring_ready,
utils::Clock,
};

use crate::network::ipfs_rpc::{HttpRpcClient, IpfsRpcClient};

use crate::utils::{apply_job, Context};

/// The name of the load generator job.
pub const LOAD_GENERATOR_JOB_NAME: &str = "load-gen-job";

/// Handle errors during reconciliation.
fn on_error(
_network: Arc<LoadGenerator>,
_error: &Error,
_context: Arc<Context<impl IpfsRpcClient, impl RngCore, impl Clock>>,
) -> Action {
Action::requeue(Duration::from_secs(5))
}

/// Errors produced by the reconcile function.
#[derive(Debug, thiserror::Error)]
enum Error {
#[error("App error: {source}")]
App {
#[from]
source: anyhow::Error,
},
#[error("Kube error: {source}")]
Kube {
#[from]
source: kube::Error,
},
}

/// Start a controller for the LoadGenerator CRD.
pub async fn run() {
let k_client = Client::try_default().await.unwrap();
let context = Arc::new(
Context::new(k_client.clone(), HttpRpcClient).expect("should be able to create context"),
);

let load_generators: Api<LoadGenerator> = Api::all(k_client.clone());
let jobs = Api::<Job>::all(k_client.clone());

Controller::new(load_generators.clone(), Config::default())
.owns(
jobs,
watcher::Config::default().labels(MANAGED_BY_LABEL_SELECTOR),
)
.run(reconcile, on_error, context)
.for_each(|rec_res| async move {
match rec_res {
Ok((load_generator, _)) => {
info!(load_generator.name, "reconcile success");
}
Err(err) => {
error!(?err, "reconcile error")
}
}
})
.await;
}

/// Perform a reconcile pass for the LoadGenerator CRD
async fn reconcile(
load_generator: Arc<LoadGenerator>,
cx: Arc<Context<impl IpfsRpcClient, impl RngCore, impl Clock>>,
) -> Result<Action, Error> {
let meter = global::meter("keramik");
let runs = meter
.u64_counter("load_generator_reconcile_count")
.with_description("Number of load generator reconciles")
.init();

match reconcile_(load_generator, cx).await {
Ok(action) => {
runs.add(
1,
&[KeyValue {
key: "result".into(),
value: "ok".into(),
}],
);
Ok(action)
}
Err(err) => {
runs.add(
1,
&[KeyValue {
key: "result".into(),
value: "err".into(),
}],
);
Err(err)
}
}
}

/// Perform a reconcile pass for the LoadGenerator CRD
async fn reconcile_(
load_generator: Arc<LoadGenerator>,
cx: Arc<Context<impl IpfsRpcClient, impl RngCore, impl Clock>>,
) -> Result<Action, Error> {
let spec = load_generator.spec();

let status = if let Some(status) = &load_generator.status {
status.clone()
} else {
// Generate new status with random name and nonce
LoadGeneratorState {
nonce: thread_rng().gen(),
name: "load-gen-"
.chars()
.chain(
thread_rng()
.sample_iter(&Alphanumeric)
.take(6)
.map(char::from),
)
.collect::<String>(),
}
};
debug!(?spec, ?status, "reconcile");

let ns = load_generator.namespace().unwrap();

// The load generator does not deploy the monitoring resources but they must exist in order to
// collect the results of load generators.
let ready = monitoring_ready(cx.clone(), &ns).await?;

if !ready {
return Ok(Action::requeue(Duration::from_secs(10)));
}

let job_image_config = JobImageConfig::from(spec);

let job_config = JobConfig {
name: status.name.clone(),
scenario: spec.scenario.to_owned(),
tasks: spec.tasks.to_owned(),
run_time: spec.run_time.to_owned(),
nonce: status.nonce,
job_image_config: job_image_config.clone(),
throttle_requests: spec.throttle_requests,
};
let orefs = load_generator
.controller_owner_ref(&())
.map(|oref| vec![oref])
.unwrap_or_default();

apply_job(
cx.clone(),
&ns,
orefs.clone(),
LOAD_GENERATOR_JOB_NAME,
job_spec(job_config),
)
.await?;

let load_generators: Api<LoadGenerator> = Api::namespaced(cx.k_client.clone(), &ns);
let _patched = load_generators
.patch_status(
&load_generator.name_any(),
&PatchParams::default(),
&Patch::Merge(serde_json::json!({ "status": status })),
)
.await?;

Ok(Action::requeue(Duration::from_secs(10)))
}
Loading

0 comments on commit 24ed857

Please sign in to comment.