Skip to content

Commit

Permalink
Add a demo where materialize is used as a streaming microservice
Browse files Browse the repository at this point in the history
Co-authored-by: Brandon W Maister <[email protected]>

Closes #1307
  • Loading branch information
ruchirK authored Jan 27, 2020
1 parent ece59b0 commit 18422fb
Show file tree
Hide file tree
Showing 30 changed files with 1,168 additions and 72 deletions.
419 changes: 371 additions & 48 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"fuzz",
"src/billing-demo",
"src/ccsr",
"src/comm",
"src/dataflow-bin",
Expand Down
5 changes: 3 additions & 2 deletions bin/lint
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ copyright_files=$(grep -vE \
-e '\.(md|json|asc|png|jpe?g|svg|eot|ttf|woff2?)$' \
-e '^www/.*(\.html|\.scss|\.bnf|\.toml|\.yml)$' \
-e '^misc/docker/ci-builder/ssh_known_hosts$' \
-e '\.protospec$' \
-e '\.(protospec|pb)$' \
-e '\.in$' \
<<< "$files"
)

newline_files=$(grep -vE '\.(png|jpe?g|eot|ttf|woff2?|protospec)$' <<< "$files")
newline_files=$(grep -vE '\.(png|jpe?g|eot|ttf|woff2?|protospec|pb|in)$' <<< "$files")

shell_files=$(sort -u <(git ls-files '*.sh' '*.bash') <(git grep -l '#!.*bash' -- ':!*.md'))

Expand Down
8 changes: 8 additions & 0 deletions ci/deploy/docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,11 @@ for tag in "unstable-$BUILDKITE_COMMIT" latest; do
runv docker tag "materialize/ci-peeker:$MATERIALIZED_IMAGE_ID" "materialize/peeker:$tag"
runv docker push "materialize/peeker:$tag"
done

docker pull "materialize/ci-billing-demo:$MATERIALIZED_IMAGE_ID"

for tag in "unstable-$BUILDKITE_COMMIT" latest; do
echo "Processing docker tag for billing-demo: $tag"
runv docker tag "materialize/ci-billing-demo:$MATERIALIZED_IMAGE_ID" "materialize/billing-demo:$tag"
runv docker push "materialize/billing-demo:$tag"
done
2 changes: 2 additions & 0 deletions ci/test/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,15 @@ ci_collapsed_heading "Preparing Docker context"
mv target/release/testdrive misc/docker/ci-testdrive
mv target/release/sqllogictest misc/docker/ci-sqllogictest
mv target/release/peeker misc/docker/ci-peeker
mv target/release/billing-demo misc/docker/ci-billing-demo
}

images=(
materialized
testdrive
sqllogictest
peeker
billing-demo
cargo-test
)

Expand Down
1 change: 1 addition & 0 deletions misc/docker/ci-billing-demo/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/billing-demo
12 changes: 12 additions & 0 deletions misc/docker/ci-billing-demo/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Copyright 2019 Materialize, Inc. All rights reserved.
#
# This file is part of Materialize. Materialize may not be used or
# distributed without the express permission of Materialize, Inc.

FROM ubuntu:bionic

RUN apt-get update && apt-get install -qy postgresql-client

COPY billing-demo /usr/local/bin

ENTRYPOINT ["billing-demo"]
33 changes: 33 additions & 0 deletions src/billing-demo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "billing-demo"
version = "0.1.0"
authors = ["Brandon W Maister <[email protected]>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0.26"
base64 = "0.11.0"
bytes = "0.5.3"
chrono = "0.4.10"
env_logger = "0.7.1"
futures = "0.3.1"
futures-channel = "0.3.1"
log = "0.4.8"
parse_duration = "2.0.1"
postgres-types = "0.1.0"
prost = "0.6.0"
rand = "0.7.3"
rand_distr = "0.2.2"
rdkafka = { git = "https://github.com/fede1024/rust-rdkafka.git", features = ["cmake-build"] }
structopt = "0.3.7"
thiserror = "1.0.9"
tokio = { version = "0.2.9", features = ["full"] }
tokio-postgres = "0.5.1"
url = "2.1.1"
uuid-b64 = "0.1.1"

[build-dependencies]
prost-build = "0.6.0"
protoc = "2.10.1"
70 changes: 70 additions & 0 deletions src/billing-demo/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2020 Materialize, Inc. All rights reserved.
//
// This file is part of Materialize. Materialize may not be used or
// distributed without the express permission of Materialize, Inc.

use std::env;
use std::path::Path;

use protoc::{DescriptorSetOutArgs, Protoc};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

fn main() {
if let Err(e) = run() {
println!("ERROR: {}", e);
while let Some(e) = e.source() {
println!(" caused by: {}", e);
}
std::process::exit(1);
}
}

fn run() -> Result<()> {
let specs = &["billing.proto"];
// build the rust code
prost_build::compile_protos(specs, &["./resources"])?;

compile_proto_descriptors(specs)?;

println!(
"cargo:rustc-env=VIEWS_DIR={}",
Path::new(".")
.join("resources/views")
.canonicalize()
.unwrap()
.display()
);

Ok(())
}

fn compile_proto_descriptors(specs: &[&str]) -> Result<()> {
let env_var = "MZ_GENERATE_PROTO";
println!("cargo:rerun-if-env-changed={}", env_var);
for spec in specs {
let basename = Path::new(spec).file_stem().unwrap().to_str().unwrap();
let out = &(format!(
"{}/{}.pb",
Path::new(".")
.join("resources/gen")
.canonicalize()
.unwrap()
.display(),
basename
));
if env::var_os(env_var).is_some() {
let protoc = Protoc::from_env_path();
let args = DescriptorSetOutArgs {
out,
includes: &["resources"],
input: &[spec],
include_imports: false,
};
protoc.write_descriptor_set(args)?;
}
println!("cargo:rustc-env=DESCRIPTOR_{}={}", basename, out);
}

Ok(())
}
35 changes: 35 additions & 0 deletions src/billing-demo/resources/billing.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2020 Materialize, Inc. All rights reserved.
//
// This file is part of Materialize. Materialize may not be used or
// distributed without the express permission of Materialize, Inc.

syntax = "proto3";

package billing;

message ResourceInfo {
int32 cpu_num = 1;
int32 memory_gb = 2;
int32 disk_gb = 3;
int32 client_id = 4;
int32 vm_id = 5;
}

message Record {
string id = 1;
string interval_start = 2;
string interval_end = 3;

string meter = 4; // What's being measured
uint32 value = 5;

ResourceInfo info = 6;
}

message Batch {
string id = 1; // idempotency key
string interval_start = 3;
string interval_end = 4;

repeated Record records = 7;
}
21 changes: 21 additions & 0 deletions src/billing-demo/resources/gen/billing.pb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

�
billing.protobilling"�
ResourceInfo
cpu_num (RcpuNum
memory_gb (RmemoryGb
disk_gb (RdiskGb
client_id (RclientId
vm_id (RvmId"�
Record
id ( Rid%
interval_start ( RintervalStart!
interval_end ( R intervalEnd
meter ( Rmeter
value (Rvalue)
info ( 2.billing.ResourceInfoRinfo"�
Batch
id ( Rid%
interval_start ( RintervalStart!
interval_end ( R intervalEnd)
records ( 2.billing.RecordRrecordsbproto3
Expand Down
4 changes: 4 additions & 0 deletions src/billing-demo/resources/views/billing_agg_by_day.sql.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE VIEW billing_agg_by_day AS
SELECT substr as day, client_id, meter, cpu_num, memory_gb, disk_gb, sum(value)
FROM billing_records
GROUP BY substr(interval_start, 0, 11), client_id, meter, cpu_num, memory_gb, disk_gb;
4 changes: 4 additions & 0 deletions src/billing-demo/resources/views/billing_agg_by_hour.sql.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE VIEW billing_agg_by_hour AS
SELECT substr as hour, client_id, meter, cpu_num, memory_gb, disk_gb, sum(value)
FROM billing_records
GROUP BY substr(interval_start, 0, 14), client_id, meter, cpu_num, memory_gb, disk_gb;
4 changes: 4 additions & 0 deletions src/billing-demo/resources/views/billing_agg_by_minute.sql.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE VIEW billing_agg_by_minute AS
SELECT substr as minute, client_id, meter, cpu_num, memory_gb, disk_gb, sum(value)
FROM billing_records
GROUP BY substr(interval_start, 0, 17), client_id, meter, cpu_num, memory_gb, disk_gb;
4 changes: 4 additions & 0 deletions src/billing-demo/resources/views/billing_agg_by_month.sql.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE VIEW billing_agg_by_month AS
SELECT substr as month, client_id, meter, cpu_num, memory_gb, disk_gb, sum(value)
FROM billing_records
GROUP BY substr(interval_start, 0, 8), client_id, meter, cpu_num, memory_gb, disk_gb;
7 changes: 7 additions & 0 deletions src/billing-demo/resources/views/billing_batches.sql.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE VIEW billing_batches AS
SELECT
{source_name}.id,
{source_name}.interval_start,
{source_name}.interval_end
FROM
{source_name};
5 changes: 5 additions & 0 deletions src/billing-demo/resources/views/billing_raw_data.sql.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE VIEW billing_raw_data AS
SELECT
*
FROM
{source_name};
16 changes: 16 additions & 0 deletions src/billing-demo/resources/views/billing_records.sql.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE VIEW billing_records AS
SELECT
r.value->>'id' id,
billing_source.id batch_id,
r.value->>'interval_start' interval_start,
r.value->>'interval_end' interval_end,
r.value->>'meter' meter,
(r.value->'value')::int value,
(r.value->'info'->'client_id')::int client_id,
(r.value->'info'->'vm_id')::int vm_id,
(r.value->'info'->'cpu_num')::int cpu_num,
(r.value->'info'->'memory_gb')::int memory_gb,
(r.value->'info'->'disk_gb')::int disk_gb
FROM
billing_source,
jsonb_array_elements(records) AS r;
98 changes: 98 additions & 0 deletions src/billing-demo/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2020 Materialize, Inc. All rights reserved.
//
// This file is part of Materialize. Materialize may not be used or
// distributed without the express permission of Materialize, Inc.

use std::time::Duration;

use parse_duration::parse as parse_duration;
use structopt::StructOpt;

#[derive(Clone, Debug, StructOpt)]
pub struct Args {
/// The materialized host
#[structopt(long, default_value = "localhost")]
pub materialized_host: String,

#[structopt(long, default_value = "6875")]
pub materialized_port: u16,

/// The total number of messages to create
#[structopt(long, default_value = "100000000")]
pub message_count: usize,

/// Amount of time to sleep between messages, e.g. 2ms
#[structopt(long, parse(try_from_str = parse_duration))]
pub message_sleep: Option<Duration>,

/// The kafka host
#[structopt(long, default_value = "localhost")]
pub kafka_host: String,

/// The kafka port
#[structopt(long, default_value = "9092")]
pub kafka_port: u16,

#[structopt(long, default_value = "billing")]
pub kafka_topic: String,

#[structopt(long, default_value = "billing_source")]
pub source_name: String,

#[structopt(long, default_value = "billing")]
pub view_name: String,

/// Whether or not to delete the source and view before starting
///
/// By default this deletes the 'source-name' and 'view-name'
#[structopt(long)]
pub preserve_source: bool,
}

impl Args {
pub(crate) fn kafka_config(&self) -> KafkaConfig {
KafkaConfig {
url: self.kafka_url(),
group_id: "materialize.billing".into(),
topic: self.kafka_topic.clone(),
message_count: self.message_count,
message_sleep: self.message_sleep,
}
}

pub(crate) fn mz_config(&self) -> MzConfig {
MzConfig {
host: self.materialized_host.clone(),
port: self.materialized_port,
kafka_url: self.kafka_url(),
kafka_topic: self.kafka_topic.clone(),
source_name: self.source_name.clone(),
view_name: self.view_name.clone(),
preserve_source: self.preserve_source,
}
}

pub(crate) fn kafka_url(&self) -> String {
format!("{}:{}", self.kafka_host, self.kafka_port)
}
}

#[derive(Debug)]
pub struct KafkaConfig {
pub url: String,
pub group_id: String,
pub topic: String,
pub message_count: usize,
pub message_sleep: Option<Duration>,
}

#[derive(Debug)]
pub struct MzConfig {
pub host: String,
pub port: u16,
pub kafka_url: String,
pub kafka_topic: String,
pub source_name: String,
pub view_name: String,
pub preserve_source: bool,
}
Loading

0 comments on commit 18422fb

Please sign in to comment.