Skip to content

Commit

Permalink
[dagster-pipes-rust] - Expose data version in report_asset_materializ…
Browse files Browse the repository at this point in the history
…ation (#67)

* expose data version in report_asset_materialization

* add to changelog
  • Loading branch information
marijncv authored Dec 25, 2024
1 parent 56085b8 commit ed6fe91
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 4 deletions.
1 change: 1 addition & 0 deletions libraries/pipes/implementations/rust/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Added

- (pull/67) Added the `data_version` parameter to `report_asset_materialization`
- (pull/60) Added `AssetCheckSeverity` to the jsonschema definitions
- (pull/59) Moved dagster pipes version into a constant
- (pull/61) Simplify construction of `PipesMetadataValue`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() -> Result<(), DagsterPipesError> {
let mut context = open_dagster_pipes()?;

let asset_metadata = HashMap::from([("row_count", PipesMetadataValue::from(100))]);
context.report_asset_materialization("example_rust_subprocess_asset", asset_metadata)?;
context.report_asset_materialization("example_rust_subprocess_asset", asset_metadata, None)?;

let check_metadata = HashMap::from([("quality", PipesMetadataValue::from(100))]);
context.report_asset_check(
Expand Down
7 changes: 4 additions & 3 deletions libraries/pipes/implementations/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ where
&mut self,
asset_key: &str,
metadata: HashMap<&str, PipesMetadataValue>,
data_version: Option<&str>,
) -> Result<(), MessageWriteError> {
let params: HashMap<&str, Option<serde_json::Value>> = HashMap::from([
("asset_key", Some(json!(asset_key))),
("metadata", Some(json!(metadata))),
("data_version", None), // TODO - support data versions
("data_version", data_version.map(|version| json!(version))),
]);

let msg = PipesMessage::new(Method::ReportAssetMaterialization, Some(params));
Expand Down Expand Up @@ -192,7 +193,7 @@ mod tests {
},
};
context
.report_asset_materialization("asset1", asset_metadata)
.report_asset_materialization("asset1", asset_metadata, Some("v1"))
.expect("Failed to report asset materialization");

assert_eq!(
Expand Down Expand Up @@ -267,7 +268,7 @@ mod tests {
}
}))
),
("data_version", None),
("data_version", Some(json!("v1"))),
])),
)
);
Expand Down

0 comments on commit ed6fe91

Please sign in to comment.