Skip to content

Commit

Permalink
[embedded-elt] Deprecate Sling factory method and SlingSource and Sli…
Browse files Browse the repository at this point in the history
…ngTarget Resources (#19998)

## Summary & Motivation

Deprecate the old Sling asset builders in favor of the new asset decorator.
  • Loading branch information
PedramNavid authored and shalabhc committed Feb 28, 2024
1 parent 8e64ef5 commit 5696a38
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,16 @@
MaterializeResult,
multi_asset,
)
from dagster._annotations import experimental
from dagster._annotations import deprecated
from dagster._utils.warnings import deprecation_warning

from dagster_embedded_elt.sling.resources import SlingMode, SlingResource


@experimental
@deprecated(
breaking_version="0.23.0",
additional_warn_text="Use `@sling_assets` instead.",
)
def build_sling_asset(
asset_spec: AssetSpec,
source_stream: str,
Expand Down Expand Up @@ -76,6 +80,12 @@ def build_sling_asset(
required_resource_keys={sling_resource_key},
)
def sync(context: AssetExecutionContext) -> MaterializeResult:
deprecation_warning(
"build_sling_asset",
breaking_version="0.23.0",
additional_warn_text="Use `@sling_assets` property instead.",
)

sling: SlingResource = getattr(context.resources, sling_resource_key)
last_row_count_observed = None
for stdout_line in sling.sync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
PermissiveConfig,
get_dagster_logger,
)
from dagster._annotations import experimental
from dagster._annotations import deprecated, experimental
from dagster._utils.env import environ
from dagster._utils.warnings import deprecation_warning
from pydantic import Field

from dagster_embedded_elt.sling.asset_decorator import get_streams_from_replication
Expand All @@ -29,6 +30,7 @@
logger = get_dagster_logger()

ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
DEPRECATION_WARNING_TEXT = "{name} has been deprecated, use `SlingConnectionResource` for both source and target connections."


class SlingMode(str, Enum):
Expand All @@ -44,6 +46,10 @@ class SlingMode(str, Enum):
BACKFILL = "backfill"


@deprecated(
breaking_version="0.23.0",
additional_warn_text=DEPRECATION_WARNING_TEXT.format(name="SlingSourceConnection"),
)
class SlingSourceConnection(PermissiveConfig):
"""A Sling Source Connection defines the source connection used by :py:class:`~dagster_elt.sling.SlingResource`.
Expand Down Expand Up @@ -77,6 +83,10 @@ class SlingSourceConnection(PermissiveConfig):
)


@deprecated(
breaking_version="0.23.0",
additional_warn_text=DEPRECATION_WARNING_TEXT.format(name="SlingTargetConnection"),
)
class SlingTargetConnection(PermissiveConfig):
"""A Sling Target Connection defines the target connection used by :py:class:`~dagster_elt.sling.SlingResource`.
Expand Down Expand Up @@ -181,6 +191,22 @@ class SlingResource(ConfigurableResource):
@contextlib.contextmanager
def _setup_config(self) -> Generator[None, None, None]:
"""Uses environment variables to set the Sling source and target connections."""
if self.source_connection:
deprecation_warning(
"source_connection",
"0.23",
"source_connection has been deprecated, provide a list of SlingConnectionResource to the `connections` parameter instead.",
stacklevel=4,
)

if self.target_connection:
deprecation_warning(
"target_connection",
"0.23",
"target_connection has been deprecated, provide a list of SlingConnectionResource to the `connections` parameter instead.",
stacklevel=4,
)

sling_source = None
sling_target = None
if self.source_connection:
Expand Down Expand Up @@ -235,6 +261,10 @@ def _exec_sling_cmd(
if proc.returncode != 0:
raise Exception("Sling command failed with error code %s", proc.returncode)

@deprecated(
breaking_version="0.23.0",
additional_warn_text="sync has been deprecated, use `replicate` instead.",
)
def sync(
self,
source_stream: str,
Expand Down

0 comments on commit 5696a38

Please sign in to comment.