Skip to content

Commit

Permalink
✨ Implement MessageWriter trait (#20)
Browse files Browse the repository at this point in the history
* Post-release version update

* ✨ Implement simplified PipesMessageWriter trait

* 📝 Add docs

* 🔥 Simpify `DefaultWriter::Channel`

* 🐛 Fix error in example main.rs

* 📝 Update changelog

* Fix with Clippy lints

* Derive Clone for quicktype types

* Create constructor for PipesMessage

* ✅ Add unit tests

* ✚ Add `rstest` dev dep to test suite

* ✅ Add unit tests for message writer

And modify its return type to be an enum instead of Box

* fixup! Derive Clone for quicktype types
  • Loading branch information
christeefy authored Dec 12, 2024
1 parent a289cf9 commit dcf81cb
Show file tree
Hide file tree
Showing 12 changed files with 554 additions and 74 deletions.
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
## [UNRELEASED]

### Added
- (pull/9) Handle errors for `ParamsLoader` and `ContextLoader` implementations
- (pull/25) Use `cargo nextest` for GitHub CI jobs
- (pull/20) Added `FileChannel`, `StreamChannel`, `BufferedStreamChannel` implementing `MessageWriterChannel`
- (pull/20) Added a simplified `PipesDefaultMessageWriter` implementing `MessageWriter`
- (pull/20) Defined `MessageWriter` and the associated `MessageWriterChannel` traits
- (pull/14) Derived `PartialEq` for all types generated by `quicktype`
- (pull/14) Renamed `ParamsLoader` and `ContextLoader` traits to `LoadParams` and `LoadContext` respectively
- (pull/14) Fixed failing unit tests in `context_loader.rs`
- (pull/25) Use `cargo nextest` for GitHub CI jobs
- (pull/9) Handle errors for `ParamsLoader` and `ContextLoader` implementations

## 0.1.6

Expand Down
93 changes: 93 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ flate2 = "1"
thiserror = "2.0.3"

[dev-dependencies]
rstest = { version = "0.23.0", default-features = false }
tempfile = "3.14.0"

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion quicktype.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!bash

printf -- '%s\0' jsonschema/pipes/*.schema.json | xargs -0 \
quicktype -s schema -l rust --visibility public --derive-debug --derive-partial-eq -o src/types.rs
quicktype -s schema -l rust \
--visibility public --derive-debug --derive-clone --derive-partial-eq -o \
src/types.rs
109 changes: 52 additions & 57 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
mod context_loader;
mod params_loader;
mod types;
mod types_ext;
mod writer;

use std::collections::HashMap;
use std::fs::OpenOptions;
use std::io::Write;

use context_loader::PayloadErrorKind;
use params_loader::ParamsError;
use serde::{Deserialize, Serialize};
use serde::Serialize;
use serde_json::json;
use serde_json::Map;
use serde_json::Value;
use thiserror::Error;

use crate::context_loader::DefaultLoader as PipesDefaultContextLoader;
pub use crate::context_loader::LoadContext;
use crate::context_loader::PayloadErrorKind;
use crate::params_loader::EnvVarLoader as PipesEnvVarParamsLoader;
pub use crate::params_loader::LoadParams;
use crate::params_loader::ParamsError;
pub use crate::types::{Method, PipesContextData, PipesMessage};
use crate::writer::message_writer::get_opened_payload;
use crate::writer::message_writer::DefaultWriter as PipesDefaultMessageWriter;
pub use crate::writer::message_writer::MessageWriter;
pub use crate::writer::message_writer_channel::MessageWriterChannel;

#[derive(Serialize)]
#[serde(rename_all = "UPPERCASE")]
Expand All @@ -29,25 +34,47 @@ pub enum AssetCheckSeverity {
// partial translation of
// https://github.com/dagster-io/dagster/blob/258d9ca0db/python_modules/dagster-pipes/dagster_pipes/__init__.py#L859-L871
#[derive(Debug)]
pub struct PipesContext {
pub struct PipesContext<W>
where
W: MessageWriter,
{
data: PipesContextData,
writer: PipesFileMessageWriter,
message_channel: W::Channel,
}

impl PipesContext {
impl<W> PipesContext<W>
where
W: MessageWriter,
{
pub fn new(
context_data: PipesContextData,
message_params: Map<String, Value>,
message_writer: &W,
) -> Self {
let mut message_channel = message_writer.open(message_params);
let opened_payload = get_opened_payload(message_writer);
let opened_message = PipesMessage {
dagster_pipes_version: "0.1".to_string(), // TODO: Convert to `const`
method: Method::Opened,
params: Some(opened_payload),
};
message_channel.write_message(opened_message);

Self {
data: context_data,
message_channel,
}
}

pub fn report_asset_materialization(&mut self, asset_key: &str, metadata: serde_json::Value) {
let params: HashMap<String, Option<serde_json::Value>> = HashMap::from([
("asset_key".to_string(), Some(json!(asset_key))),
("metadata".to_string(), Some(metadata)),
("data_version".to_string(), None), // TODO - support data versions
]);

let msg = PipesMessage {
dagster_pipes_version: "0.1".to_string(),
method: Method::ReportAssetMaterialization,
params: Some(params),
};
self.writer.write_message(&msg);
let msg = PipesMessage::new(Method::ReportAssetMaterialization, Some(params));
self.message_channel.write_message(msg);
}

pub fn report_asset_check(
Expand All @@ -66,36 +93,11 @@ impl PipesContext {
("metadata".to_string(), Some(metadata)),
]);

let msg = PipesMessage {
dagster_pipes_version: "0.1".to_string(),
method: Method::ReportAssetCheck,
params: Some(params),
};
self.writer.write_message(&msg);
}
}

#[derive(Debug)]
struct PipesFileMessageWriter {
path: String,
}
impl PipesFileMessageWriter {
fn write_message(&mut self, message: &PipesMessage) {
let serialized_msg = serde_json::to_string(&message).unwrap();
let mut file = OpenOptions::new().append(true).open(&self.path).unwrap();
writeln!(file, "{serialized_msg}").unwrap();

// TODO - optional `stderr` based writing
//eprintln!("{}", serialized_msg);
let msg = PipesMessage::new(Method::ReportAssetCheck, Some(params));
self.message_channel.write_message(msg);
}
}

#[derive(Debug, Deserialize)]
struct PipesMessagesParams {
path: Option<String>, // write to file
stdio: Option<String>, // stderr | stdout (unsupported)
}

#[derive(Debug, Error)]
#[non_exhaustive]
pub enum DagsterPipesError {
Expand All @@ -111,26 +113,19 @@ pub enum DagsterPipesError {
// partial translation of
// https://github.com/dagster-io/dagster/blob/258d9ca0db/python_modules/dagster-pipes/dagster_pipes/__init__.py#L798-L838
#[must_use]
pub fn open_dagster_pipes() -> Result<PipesContext, DagsterPipesError> {
pub fn open_dagster_pipes() -> Result<PipesContext<PipesDefaultMessageWriter>, DagsterPipesError> {
let params_loader = PipesEnvVarParamsLoader::new();
let context_loader = PipesDefaultContextLoader::new();
let message_writer = PipesDefaultMessageWriter::new();

let context_params = params_loader.load_context_params()?;
let message_params = params_loader.load_message_params()?;

let context_data = context_loader.load_context(context_params)?;

let message_params = params_loader.load_message_params()?;
// TODO: Refactor into MessageWriter impl
let path = match &message_params["path"] {
Value::String(string) => string.clone(),
_ => panic!("Expected message \"path\" in bootstrap payload"),
};

//if stdio != "stderr" {
// panic!("only stderr supported for dagster pipes messages")
//}

Ok(PipesContext {
data: context_data,
writer: PipesFileMessageWriter { path },
})
Ok(PipesContext::new(
context_data,
message_params,
&message_writer,
))
}
2 changes: 1 addition & 1 deletion src/params_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const DAGSTER_PIPES_MESSAGES_ENV_VAR: &str = "DAGSTER_PIPES_MESSAGES";

/// Load params passed from the orchestration process by the context injector and
/// message reader. These params are used to respectively bootstrap the
/// [`PipesContextLoader`] and [`PipesMessageWriter`].
/// [`PipesContextLoader`] and [`MessageWriter`](crate::MessageWriter).
pub trait LoadParams {
/// Whether or not this process has been provided with provided with information
/// to create a `PipesContext` or should instead return a mock.
Expand Down
Loading

0 comments on commit dcf81cb

Please sign in to comment.