From a806ab6c91112c4e6ba968f68841196988e74e76 Mon Sep 17 00:00:00 2001 From: Thiago Azevedo <35049565+thiagoazcampos@users.noreply.github.com> Date: Tue, 17 Dec 2024 12:14:12 -0300 Subject: [PATCH] Enabling Dataproc cluster labels (#26481) ## Summary & Motivation It's currently not possible to attach labels to Dataproc clusters created with `DataprocResource`. This PR implements changes to `DataprocResource` so it accepts a `labels` optional parameter to be attached on cluster creation. ## How I Tested These Changes - Added a new test assessing the setting for `labels` on DataprocResource - Tested locally and verified that the labels were correctly attached ## Changelog > [dagster-gcp] `DataprocResource` now receives an optional parameter `labels` to be attached to Dataproc clusters. --- .../dagster_gcp/dataproc/configs.py | 13 ++++- .../dagster_gcp/dataproc/resources.py | 18 ++++++- .../dataproc_tests/test_resources.py | 54 +++++++++++++++++++ 3 files changed, 82 insertions(+), 3 deletions(-) diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/configs.py b/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/configs.py index d969d7f820823..ca94099613767 100644 --- a/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/configs.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/configs.py @@ -1,4 +1,4 @@ -from dagster import Field, StringSource +from dagster import Field, Permissive, StringSource from dagster_gcp.dataproc.configs_dataproc_cluster import define_dataproc_cluster_config from dagster_gcp.dataproc.configs_dataproc_job import define_dataproc_job_config @@ -11,12 +11,23 @@ def define_dataproc_create_cluster_config(): Names of deleted clusters can be reused.""", is_required=True, ) + labels = Field( + Permissive(), + description="""Optional. The labels to associate with this cluster. Label keys must + contain 1 to 63 characters, and must conform to RFC 1035 + (https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if + present, must contain 1 to 63 characters, and must conform to RFC 1035 + (https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated + with a cluster.""", + is_required=False, + ) return { "projectId": _define_project_id_config(), "region": _define_region_config(), "clusterName": cluster_name, "cluster_config": define_dataproc_cluster_config(), + "labels": labels, } diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/resources.py b/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/resources.py index bd739f7197621..35db40399f0ea 100644 --- a/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/resources.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/resources.py @@ -33,8 +33,9 @@ def __init__(self, config): self.config = config - (self.project_id, self.region, self.cluster_name, self.cluster_config) = ( - self.config.get(k) for k in ("projectId", "region", "clusterName", "cluster_config") + (self.project_id, self.region, self.cluster_name, self.cluster_config, self.labels) = ( + self.config.get(k) + for k in ("projectId", "region", "clusterName", "cluster_config", "labels") ) @property @@ -60,6 +61,7 @@ def create_cluster(self): "projectId": self.project_id, "clusterName": self.cluster_name, "config": self.cluster_config, + "labels": self.labels, }, ).execute() ) @@ -177,6 +179,17 @@ def my_asset(dataproc: DataprocResource): " deleted clusters can be reused." ) ) + labels: Optional[dict[str, str]] = Field( + default=None, + description=( + "Optional. The labels to associate with this cluster. Label keys must" + " contain 1 to 63 characters, and must conform to RFC 1035" + " (https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if" + " present, must contain 1 to 63 characters, and must conform to RFC 1035" + " (https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated" + " with a cluster." + ), + ) cluster_config_yaml_path: Optional[str] = Field( default=None, description=( @@ -249,6 +262,7 @@ def get_client(self) -> DataprocClient: "region": self.region, "clusterName": self.cluster_name, "cluster_config": cluster_config, + "labels": self.labels, } return DataprocClient(config=client_config_dict) diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/dataproc_tests/test_resources.py b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/dataproc_tests/test_resources.py index c1ff37fb269da..9cf0b1ea2147a 100644 --- a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/dataproc_tests/test_resources.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/dataproc_tests/test_resources.py @@ -22,6 +22,7 @@ DATAPROC_CLUSTERS_URI = f"{DATAPROC_BASE_URI}/clusters" DATAPROC_JOBS_URI = f"{DATAPROC_BASE_URI}/jobs" DATAPROC_SCHEMA_URI = "https://www.googleapis.com/discovery/v1/apis/dataproc/v1/rest" +DATAPROC_LABELS = {"first_label": "true", "second_label": "true"} EXPECTED_RESULTS = [ # OAuth authorize credentials @@ -239,6 +240,59 @@ def test_dataproc(): assert result.success +@pytest.mark.integration +def test_dataproc_resource_labels(): + """Tests pydantic dataproc cluster creation/deletion. Requests are captured by the responses library, so + no actual HTTP requests are made here. + + Note that inspecting the HTTP requests can be useful for debugging, which can be done by adding: + + import httplib2 + httplib2.debuglevel = 4 + """ + with mock.patch("httplib2.Http", new=HttpSnooper): + + @job + def test_dataproc(): + configurable_dataproc_op() + + result = test_dataproc.execute_in_process( + run_config=RunConfig( + ops={ + "configurable_dataproc_op": DataprocOpConfig( + job_scoped_cluster=True, + project_id=PROJECT_ID, + region=REGION, + job_config={ + "reference": {"projectId": PROJECT_ID}, + "placement": {"clusterName": CLUSTER_NAME}, + "hiveJob": {"queryList": {"queries": ["SHOW DATABASES"]}}, + }, + ) + }, + ), + resources={ + "dataproc": DataprocResource( + project_id=PROJECT_ID, + cluster_name=CLUSTER_NAME, + region=REGION, + labels=DATAPROC_LABELS, + cluster_config_dict={ + "softwareConfig": { + "properties": { + # Create a single-node cluster + # This needs to be the string "true" when + # serialized, not a boolean true + "dataproc:dataproc.allow.zero.workers": "true" + } + } + }, + ) + }, + ) + assert result.success + + @pytest.mark.integration def test_wait_for_job_with_timeout_pydantic(): """Test submitting a job with timeout of 0 second so that it always fails."""