Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pythonic resources][rfc] Adjust EnvVar behavior to explicitly disallow direct usage #14490

Merged
merged 7 commits into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions python_modules/dagster/dagster/_config/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Mapping, NamedTuple, Sequence, Union

import dagster._check as check
from dagster._config.field_utils import EnvVar, IntEnvVar
from dagster._utils.error import SerializableErrorInfo

from .config_type import ConfigTypeKind
Expand Down Expand Up @@ -378,19 +379,23 @@ def create_scalar_error(context: ContextData, config_value: object) -> Evaluatio
)


def create_pydantic_env_var_error(context: ContextData, config_value: object) -> EvaluationError:
correct_env_var = str({"env": config_value})
def create_pydantic_env_var_error(
context: ContextData, config_value: Union[EnvVar, IntEnvVar]
) -> EvaluationError:
env_var_name = config_value.env_var_name

correct_env_var = str({"env": env_var_name})
return EvaluationError(
stack=context.stack,
reason=DagsterEvaluationErrorReason.RUNTIME_TYPE_MISMATCH,
message=(
f'Invalid use of environment variable wrapper. Value "{config_value}" is wrapped with'
f'Invalid use of environment variable wrapper. Value "{env_var_name}" is wrapped with'
" EnvVar(), which is reserved for passing to structured pydantic config objects only."
" To provide an environment variable to a run config dictionary, replace"
f' EnvVar("{config_value}") with {correct_env_var}, or pass a structured RunConfig'
f' EnvVar("{env_var_name}") with {correct_env_var}, or pass a structured RunConfig'
" object."
),
error_data=RuntimeMismatchErrorData(context.config_type_snap, repr(config_value)),
error_data=RuntimeMismatchErrorData(context.config_type_snap, repr(env_var_name)),
)


Expand Down
54 changes: 53 additions & 1 deletion python_modules/dagster/dagster/_config/field_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8
import hashlib
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Mapping, Optional, Sequence
import os
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Mapping, Optional, Sequence, Type

import dagster._check as check
from dagster._annotations import public
Expand Down Expand Up @@ -474,6 +475,16 @@ def config_dictionary_from_values(
return check.is_dict(_config_value_to_dict_representation(None, values))


def _create_direct_access_exception(cls: Type, env_var_name: str) -> Exception:
return RuntimeError(
f'Attempted to directly retrieve environment variable {cls.__name__}("{env_var_name}").'
f" {cls.__name__} defers resolution of the environment variable value until run time, and"
" should only be used as input to Dagster config or resources.\n\nTo access the"
f" environment variable value, call `get_value` on the {cls.__name__}, or use os.getenv"
" directly."
)


class IntEnvVar(int):
"""Class used to represent an environment variable in the Dagster config system.

Expand All @@ -489,14 +500,55 @@ def create(cls, name: str) -> "IntEnvVar":
var.name = name
return var

def __int__(self) -> int:
"""Raises an exception of the EnvVar value is directly accessed. Users should instead use
the `get_value` method, or use the EnvVar as an input to Dagster config or resources.
"""
raise _create_direct_access_exception(self.__class__, self.env_var_name)

def __str__(self) -> str:
return str(int(self))

def get_value(self, default: Optional[int] = None) -> Optional[int]:
"""Returns the value of the environment variable, or the default value if the
environment variable is not set. If no default is provided, None will be returned.
"""
value = os.getenv(self.name, default=default)
return int(value) if value else None

@property
def env_var_name(self) -> str:
"""Returns the name of the environment variable."""
return self.name


class EnvVar(str):
"""Class used to represent an environment variable in the Dagster config system.

This class is intended to be used to populate config fields or resources.
The environment variable will be resolved to a string value when the config is
loaded.

To access the value of the environment variable, use the `get_value` method.
"""

@classmethod
def int(cls, name: str) -> "IntEnvVar":
return IntEnvVar.create(name=name)

def __str__(self) -> str:
"""Raises an exception of the EnvVar value is directly accessed. Users should instead use
the `get_value` method, or use the EnvVar as an input to Dagster config or resources.
"""
raise _create_direct_access_exception(self.__class__, self.env_var_name)

@property
def env_var_name(self) -> str:
"""Returns the name of the environment variable."""
return super().__str__()

def get_value(self, default: Optional[str] = None) -> Optional[str]:
"""Returns the value of the environment variable, or the default value if the
environment variable is not set. If no default is provided, None will be returned.
"""
return os.getenv(self.env_var_name, default=default)
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def _config_value_to_dict_representation(field: Optional[ModelField], value: Any
elif isinstance(value, list):
return [_config_value_to_dict_representation(None, v) for v in value]
elif isinstance(value, EnvVar):
return {"env": str(value)}
return {"env": value.env_var_name}
elif isinstance(value, IntEnvVar):
return {"env": value.name}
if isinstance(value, Config):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,80 @@
import os

import pytest
from dagster import Definitions, EnvVar, RunConfig, asset
from dagster._config.pythonic_config import Config
from dagster._core.errors import DagsterInvalidConfigError
from dagster._core.test_utils import environ


def test_direct_use_env_var_ok() -> None:
with environ({"A_STR": "foo"}):
assert EnvVar("A_STR").get_value() == "foo"
assert EnvVar("A_STR").get_value(default="bar") == "foo"

if "A_NON_EXISTENT_VAR" in os.environ:
del os.environ["A_NON_EXISTENT_VAR"]

assert EnvVar("A_NON_EXISTENT_VAR").get_value() is None
assert EnvVar("A_NON_EXISTENT_VAR").get_value(default="bar") == "bar"


def test_direct_use_int_env_var_ok() -> None:
with environ({"AN_INT": "55"}):
assert EnvVar.int("AN_INT").get_value() == 55
assert EnvVar.int("AN_INT").get_value(default=10) == 55

if "A_NON_EXISTENT_VAR" in os.environ:
del os.environ["A_NON_EXISTENT_VAR"]

assert EnvVar.int("A_NON_EXISTENT_VAR").get_value() is None
assert EnvVar.int("A_NON_EXISTENT_VAR").get_value(default=100) == 100


def test_direct_use_env_var_err() -> None:
with pytest.raises(
RuntimeError,
match=(
'Attempted to directly retrieve environment variable EnvVar\\("A_STR"\\). EnvVar defers'
" resolution of the environment variable value until run time, and should only be used"
" as input to Dagster config or resources.\n\n"
),
):
str(EnvVar("A_STR"))

with pytest.raises(
RuntimeError,
match=(
'Attempted to directly retrieve environment variable EnvVar\\("A_STR"\\). EnvVar defers'
" resolution of the environment variable value until run time, and should only be used"
" as input to Dagster config or resources.\n\n"
),
):
print(EnvVar("A_STR")) # noqa: T201


def test_direct_use_int_env_var_err() -> None:
with pytest.raises(
RuntimeError,
match=(
'Attempted to directly retrieve environment variable IntEnvVar\\("AN_INT"\\). IntEnvVar'
" defers resolution of the environment variable value until run time, and should only"
" be used as input to Dagster config or resources."
),
):
int(EnvVar.int("AN_INT"))

with pytest.raises(
RuntimeError,
match=(
'Attempted to directly retrieve environment variable IntEnvVar\\("AN_INT"\\). IntEnvVar'
" defers resolution of the environment variable value until run time, and should only"
" be used as input to Dagster config or resources."
),
):
print(EnvVar.int("AN_INT")) # noqa: T201


def test_str_env_var() -> None:
executed = {}

Expand Down