Skip to content

Commit

Permalink
Enabling Dataproc cluster labels (#26481)
Browse files Browse the repository at this point in the history
## Summary & Motivation
It's currently not possible to attach labels to Dataproc clusters
created with `DataprocResource`.

This PR implements changes to `DataprocResource` so it accepts a
`labels` optional parameter to be attached on cluster creation.

## How I Tested These Changes
- Added a new test assessing the setting for `labels` on
DataprocResource
- Tested locally and verified that the labels were correctly attached

## Changelog

> [dagster-gcp] `DataprocResource` now receives an optional parameter
`labels` to be attached to Dataproc clusters.
  • Loading branch information
thiagoazcampos authored Dec 17, 2024
1 parent 01082dd commit a806ab6
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import Field, StringSource
from dagster import Field, Permissive, StringSource

from dagster_gcp.dataproc.configs_dataproc_cluster import define_dataproc_cluster_config
from dagster_gcp.dataproc.configs_dataproc_job import define_dataproc_job_config
Expand All @@ -11,12 +11,23 @@ def define_dataproc_create_cluster_config():
Names of deleted clusters can be reused.""",
is_required=True,
)
labels = Field(
Permissive(),
description="""Optional. The labels to associate with this cluster. Label keys must
contain 1 to 63 characters, and must conform to RFC 1035
(https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if
present, must contain 1 to 63 characters, and must conform to RFC 1035
(https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated
with a cluster.""",
is_required=False,
)

return {
"projectId": _define_project_id_config(),
"region": _define_region_config(),
"clusterName": cluster_name,
"cluster_config": define_dataproc_cluster_config(),
"labels": labels,
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ def __init__(self, config):

self.config = config

(self.project_id, self.region, self.cluster_name, self.cluster_config) = (
self.config.get(k) for k in ("projectId", "region", "clusterName", "cluster_config")
(self.project_id, self.region, self.cluster_name, self.cluster_config, self.labels) = (
self.config.get(k)
for k in ("projectId", "region", "clusterName", "cluster_config", "labels")
)

@property
Expand All @@ -60,6 +61,7 @@ def create_cluster(self):
"projectId": self.project_id,
"clusterName": self.cluster_name,
"config": self.cluster_config,
"labels": self.labels,
},
).execute()
)
Expand Down Expand Up @@ -177,6 +179,17 @@ def my_asset(dataproc: DataprocResource):
" deleted clusters can be reused."
)
)
labels: Optional[dict[str, str]] = Field(
default=None,
description=(
"Optional. The labels to associate with this cluster. Label keys must"
" contain 1 to 63 characters, and must conform to RFC 1035"
" (https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if"
" present, must contain 1 to 63 characters, and must conform to RFC 1035"
" (https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated"
" with a cluster."
),
)
cluster_config_yaml_path: Optional[str] = Field(
default=None,
description=(
Expand Down Expand Up @@ -249,6 +262,7 @@ def get_client(self) -> DataprocClient:
"region": self.region,
"clusterName": self.cluster_name,
"cluster_config": cluster_config,
"labels": self.labels,
}

return DataprocClient(config=client_config_dict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
DATAPROC_CLUSTERS_URI = f"{DATAPROC_BASE_URI}/clusters"
DATAPROC_JOBS_URI = f"{DATAPROC_BASE_URI}/jobs"
DATAPROC_SCHEMA_URI = "https://www.googleapis.com/discovery/v1/apis/dataproc/v1/rest"
DATAPROC_LABELS = {"first_label": "true", "second_label": "true"}

EXPECTED_RESULTS = [
# OAuth authorize credentials
Expand Down Expand Up @@ -239,6 +240,59 @@ def test_dataproc():
assert result.success


@pytest.mark.integration
def test_dataproc_resource_labels():
"""Tests pydantic dataproc cluster creation/deletion. Requests are captured by the responses library, so
no actual HTTP requests are made here.
Note that inspecting the HTTP requests can be useful for debugging, which can be done by adding:
import httplib2
httplib2.debuglevel = 4
"""
with mock.patch("httplib2.Http", new=HttpSnooper):

@job
def test_dataproc():
configurable_dataproc_op()

result = test_dataproc.execute_in_process(
run_config=RunConfig(
ops={
"configurable_dataproc_op": DataprocOpConfig(
job_scoped_cluster=True,
project_id=PROJECT_ID,
region=REGION,
job_config={
"reference": {"projectId": PROJECT_ID},
"placement": {"clusterName": CLUSTER_NAME},
"hiveJob": {"queryList": {"queries": ["SHOW DATABASES"]}},
},
)
},
),
resources={
"dataproc": DataprocResource(
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
labels=DATAPROC_LABELS,
cluster_config_dict={
"softwareConfig": {
"properties": {
# Create a single-node cluster
# This needs to be the string "true" when
# serialized, not a boolean true
"dataproc:dataproc.allow.zero.workers": "true"
}
}
},
)
},
)
assert result.success


@pytest.mark.integration
def test_wait_for_job_with_timeout_pydantic():
"""Test submitting a job with timeout of 0 second so that it always fails."""
Expand Down

0 comments on commit a806ab6

Please sign in to comment.