diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/configs.py b/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/configs.py index d969d7f820823..ca94099613767 100644 --- a/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/configs.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/configs.py @@ -1,4 +1,4 @@ -from dagster import Field, StringSource +from dagster import Field, Permissive, StringSource from dagster_gcp.dataproc.configs_dataproc_cluster import define_dataproc_cluster_config from dagster_gcp.dataproc.configs_dataproc_job import define_dataproc_job_config @@ -11,12 +11,23 @@ def define_dataproc_create_cluster_config(): Names of deleted clusters can be reused.""", is_required=True, ) + labels = Field( + Permissive(), + description="""Optional. The labels to associate with this cluster. Label keys must + contain 1 to 63 characters, and must conform to RFC 1035 + (https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if + present, must contain 1 to 63 characters, and must conform to RFC 1035 + (https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated + with a cluster.""", + is_required=False, + ) return { "projectId": _define_project_id_config(), "region": _define_region_config(), "clusterName": cluster_name, "cluster_config": define_dataproc_cluster_config(), + "labels": labels, } diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/resources.py b/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/resources.py index bd739f7197621..35db40399f0ea 100644 --- a/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/resources.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp/dataproc/resources.py @@ -33,8 +33,9 @@ def __init__(self, config): self.config = config - (self.project_id, self.region, self.cluster_name, self.cluster_config) = ( - self.config.get(k) for k in ("projectId", "region", "clusterName", "cluster_config") + (self.project_id, self.region, self.cluster_name, self.cluster_config, self.labels) = ( + self.config.get(k) + for k in ("projectId", "region", "clusterName", "cluster_config", "labels") ) @property @@ -60,6 +61,7 @@ def create_cluster(self): "projectId": self.project_id, "clusterName": self.cluster_name, "config": self.cluster_config, + "labels": self.labels, }, ).execute() ) @@ -177,6 +179,17 @@ def my_asset(dataproc: DataprocResource): " deleted clusters can be reused." ) ) + labels: Optional[dict[str, str]] = Field( + default=None, + description=( + "Optional. The labels to associate with this cluster. Label keys must" + " contain 1 to 63 characters, and must conform to RFC 1035" + " (https://www.ietf.org/rfc/rfc1035.txt). Label values may be empty, but, if" + " present, must contain 1 to 63 characters, and must conform to RFC 1035" + " (https://www.ietf.org/rfc/rfc1035.txt). No more than 32 labels can be associated" + " with a cluster." + ), + ) cluster_config_yaml_path: Optional[str] = Field( default=None, description=( @@ -249,6 +262,7 @@ def get_client(self) -> DataprocClient: "region": self.region, "clusterName": self.cluster_name, "cluster_config": cluster_config, + "labels": self.labels, } return DataprocClient(config=client_config_dict) diff --git a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/dataproc_tests/test_resources.py b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/dataproc_tests/test_resources.py index c1ff37fb269da..9cf0b1ea2147a 100644 --- a/python_modules/libraries/dagster-gcp/dagster_gcp_tests/dataproc_tests/test_resources.py +++ b/python_modules/libraries/dagster-gcp/dagster_gcp_tests/dataproc_tests/test_resources.py @@ -22,6 +22,7 @@ DATAPROC_CLUSTERS_URI = f"{DATAPROC_BASE_URI}/clusters" DATAPROC_JOBS_URI = f"{DATAPROC_BASE_URI}/jobs" DATAPROC_SCHEMA_URI = "https://www.googleapis.com/discovery/v1/apis/dataproc/v1/rest" +DATAPROC_LABELS = {"first_label": "true", "second_label": "true"} EXPECTED_RESULTS = [ # OAuth authorize credentials @@ -239,6 +240,59 @@ def test_dataproc(): assert result.success +@pytest.mark.integration +def test_dataproc_resource_labels(): + """Tests pydantic dataproc cluster creation/deletion. Requests are captured by the responses library, so + no actual HTTP requests are made here. + + Note that inspecting the HTTP requests can be useful for debugging, which can be done by adding: + + import httplib2 + httplib2.debuglevel = 4 + """ + with mock.patch("httplib2.Http", new=HttpSnooper): + + @job + def test_dataproc(): + configurable_dataproc_op() + + result = test_dataproc.execute_in_process( + run_config=RunConfig( + ops={ + "configurable_dataproc_op": DataprocOpConfig( + job_scoped_cluster=True, + project_id=PROJECT_ID, + region=REGION, + job_config={ + "reference": {"projectId": PROJECT_ID}, + "placement": {"clusterName": CLUSTER_NAME}, + "hiveJob": {"queryList": {"queries": ["SHOW DATABASES"]}}, + }, + ) + }, + ), + resources={ + "dataproc": DataprocResource( + project_id=PROJECT_ID, + cluster_name=CLUSTER_NAME, + region=REGION, + labels=DATAPROC_LABELS, + cluster_config_dict={ + "softwareConfig": { + "properties": { + # Create a single-node cluster + # This needs to be the string "true" when + # serialized, not a boolean true + "dataproc:dataproc.allow.zero.workers": "true" + } + } + }, + ) + }, + ) + assert result.success + + @pytest.mark.integration def test_wait_for_job_with_timeout_pydantic(): """Test submitting a job with timeout of 0 second so that it always fails."""