Skip to content

Commit

Permalink
[4/n] add python api for replacing local file references with source …
Browse files Browse the repository at this point in the history
…control links (#21675)

## Summary

Adds the `link_to_source_control` utility fn which converts all local source code reference metadata in the passed assets to references to source control. These local paths are mapped to the path in source control using a user-passed local git root. The path from the git root to the file locally is used as the filepath within source control.

For example:

```python
@asset 
def my_asset() -> pd.DataFrame:
  ...

@asset 
def my_other_asset() -> pd.DataFrame:
  ...

defs = Definitions(
  assets=link_to_source_control(
    with_source_code_references([my_asset, my_other_asset]),
    source_control_url="https://github.com/dagster-io/dagster",
    source_control_branch="master",
    repository_root_absolute_path=file_relative_path(__file__, "../../"),
  )
)

```

A stacked PR will introduce a utility method that will wrap both `link_to_source_control` and `with_source_code_references` and will decide whether to link to source control based on whether the definitions are being loaded in a cloud context.

## Test Plan

Unit tests.
  • Loading branch information
benpankow authored May 15, 2024
1 parent 8cde44c commit 52cc1b8
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
MetadataValue,
TableColumnLineageMetadataValue,
)
from dagster._core.definitions.metadata.source_code import LocalFileCodeReference
from dagster._core.events import (
DagsterEventType,
HandledOutputData,
Expand Down Expand Up @@ -165,6 +166,7 @@ def iterate_metadata_entries(metadata: Mapping[str, MetadataValue]) -> Iterator[
label=reference.label,
)
for reference in value.code_references
if isinstance(reference, LocalFileCodeReference)
],
)
elif isinstance(value, TableMetadataValue):
Expand Down
1 change: 0 additions & 1 deletion python_modules/dagster/dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@
TextMetadataValue as TextMetadataValue,
TimestampMetadataValue as TimestampMetadataValue,
UrlMetadataValue as UrlMetadataValue,
with_source_code_references as with_source_code_references,
)
from dagster._core.definitions.metadata.table import (
TableColumn as TableColumn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
CodeReferencesMetadataSet as CodeReferencesMetadataSet,
CodeReferencesMetadataValue as CodeReferencesMetadataValue,
LocalFileCodeReference as LocalFileCodeReference,
UrlCodeReference as UrlCodeReference,
link_to_source_control as link_to_source_control,
with_source_code_references as with_source_code_references,
)
from .table import ( # re-exported
Expand Down
159 changes: 139 additions & 20 deletions python_modules/dagster/dagster/_core/definitions/metadata/source_code.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import inspect
import os
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -10,8 +11,6 @@
Union,
)

import pydantic

import dagster._check as check
from dagster._annotations import experimental
from dagster._model import DagsterModel
Expand Down Expand Up @@ -40,6 +39,17 @@ class LocalFileCodeReference(DagsterModel):
label: Optional[str] = None


@experimental
@whitelist_for_serdes
class UrlCodeReference(DagsterModel):
"""Represents a source location which points at a URL, for example
in source control.
"""

url: str
label: Optional[str] = None


@experimental
@whitelist_for_serdes
class CodeReferencesMetadataValue(DagsterModel, MetadataValue["CodeReferencesMetadataValue"]):
Expand All @@ -48,19 +58,19 @@ class CodeReferencesMetadataValue(DagsterModel, MetadataValue["CodeReferencesMet
asset is defined.
Attributes:
sources (List[LocalFileCodeReference]):
sources (List[Union[LocalFileCodeReference, SourceControlCodeReference]]):
A list of code references for the asset, such as file locations or
references to source control.
"""

code_references: List[LocalFileCodeReference]
code_references: List[Union[LocalFileCodeReference, UrlCodeReference]]

@property
def value(self) -> "CodeReferencesMetadataValue":
return self


def source_path_from_fn(fn: Callable[..., Any]) -> Optional[LocalFileCodeReference]:
def local_source_path_from_fn(fn: Callable[..., Any]) -> Optional[LocalFileCodeReference]:
cwd = os.getcwd()

origin_file = os.path.abspath(os.path.join(cwd, inspect.getsourcefile(fn))) # type: ignore
Expand All @@ -75,7 +85,7 @@ class CodeReferencesMetadataSet(NamespacedMetadataSet):
source code for the asset can be found.
"""

code_references: CodeReferencesMetadataValue
code_references: Optional[CodeReferencesMetadataValue] = None

@classmethod
def namespace(cls) -> str:
Expand All @@ -101,24 +111,25 @@ def _with_code_source_single_definition(
if isinstance(assets_def.op.compute_fn, DecoratedOpFunction)
else assets_def.op.compute_fn
)
source_path = source_path_from_fn(base_fn)
source_path = local_source_path_from_fn(base_fn)

if source_path:
sources = [source_path]

for key in assets_def.keys:
# defer to any existing metadata
sources_for_asset = [*sources]
try:
existing_source_code_metadata = CodeReferencesMetadataSet.extract(
metadata_by_key.get(key, {})
)
sources_for_asset = [
*existing_source_code_metadata.code_references.code_references,
*sources,
]
except pydantic.ValidationError:
pass
# merge with any existing metadata
existing_source_code_metadata = CodeReferencesMetadataSet.extract(
metadata_by_key.get(key, {})
)
existing_code_references = (
existing_source_code_metadata.code_references.code_references
if existing_source_code_metadata.code_references
else []
)
sources_for_asset: List[Union[LocalFileCodeReference, UrlCodeReference]] = [
*existing_code_references,
*sources,
]

metadata_by_key[key] = {
**metadata_by_key.get(key, {}),
Expand All @@ -130,11 +141,119 @@ def _with_code_source_single_definition(
return assets_def.with_attributes(metadata_by_key=metadata_by_key)


def convert_local_path_to_source_control_path(
base_source_control_url: str,
repository_root_absolute_path: str,
local_path: LocalFileCodeReference,
) -> UrlCodeReference:
source_file_from_repo_root = os.path.relpath(
local_path.file_path, repository_root_absolute_path
)

return UrlCodeReference(
url=f"{base_source_control_url}/{source_file_from_repo_root}#L{local_path.line_number}",
label=local_path.label,
)


def _convert_local_path_to_source_control_path_single_definition(
base_source_control_url: str,
repository_root_absolute_path: str,
assets_def: Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"],
) -> Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]:
from dagster._core.definitions.assets import AssetsDefinition

# SourceAsset doesn't have an op definition to point to - cacheable assets
# will be supported eventually but are a bit trickier
if not isinstance(assets_def, AssetsDefinition):
return assets_def

metadata_by_key = dict(assets_def.metadata_by_key) or {}

for key in assets_def.keys:
existing_source_code_metadata = CodeReferencesMetadataSet.extract(
metadata_by_key.get(key, {})
)
if not existing_source_code_metadata.code_references:
continue

sources_for_asset: List[Union[LocalFileCodeReference, UrlCodeReference]] = [
convert_local_path_to_source_control_path(
base_source_control_url,
repository_root_absolute_path,
source,
)
if isinstance(source, LocalFileCodeReference)
else source
for source in existing_source_code_metadata.code_references.code_references
]

metadata_by_key[key] = {
**metadata_by_key.get(key, {}),
**CodeReferencesMetadataSet(
code_references=CodeReferencesMetadataValue(code_references=sources_for_asset)
),
}

return assets_def.with_attributes(metadata_by_key=metadata_by_key)


def _build_github_url(url: str, branch: str) -> str:
return f"{url}/tree/{branch}"


def _build_gitlab_url(url: str, branch: str) -> str:
return f"{url}/-/tree/{branch}"


@experimental
def link_to_source_control(
assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]],
source_control_url: str,
source_control_branch: str,
repository_root_absolute_path: Union[Path, str],
) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]]:
"""Wrapper function which converts local file path code references to source control URLs
based on the provided source control URL and branch.
Args:
assets_defs (Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]):
The asset definitions to which source control metadata should be attached.
Only assets with local file code references (such as those created by
`with_source_code_references`) will be converted.
source_control_url (str): The base URL for the source control system. For example,
"https://github.com/dagster-io/dagster".
source_control_branch (str): The branch in the source control system, such as "master".
repository_root_absolute_path (Union[Path, str]): The absolute path to the root of the
repository on disk. This is used to calculate the relative path to the source file
from the repository root and append it to the source control URL.
"""
if "gitlab.com" in source_control_url:
source_control_url = _build_gitlab_url(source_control_url, source_control_branch)
elif "github.com" in source_control_url:
source_control_url = _build_github_url(source_control_url, source_control_branch)
else:
raise ValueError(
"Invalid `source_control_url`."
" Only GitHub and GitLab are supported for linking to source control at this time."
)

return [
_convert_local_path_to_source_control_path_single_definition(
base_source_control_url=source_control_url,
repository_root_absolute_path=str(repository_root_absolute_path),
assets_def=assets_def,
)
for assets_def in assets_defs
]


@experimental
def with_source_code_references(
assets_defs: Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]],
) -> Sequence[Union["AssetsDefinition", "SourceAsset", "CacheableAssetsDefinition"]]:
"""Wrapper function which attaches source code metadata to the provided asset definitions.
"""Wrapper function which attaches local code reference metadata to the provided asset definitions.
This points to the filepath and line number where the asset body is defined.
Args:
assets_defs (Sequence[Union[AssetsDefinition, SourceAsset, CacheableAssetsDefinition]]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def make_list_of_assets():
def james_brown():
pass

@asset
@asset(metadata={"foo": "bar"})
def fats_domino():
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,34 @@
from dagster import AssetsDefinition, load_assets_from_modules
from dagster._core.definitions.metadata import (
LocalFileCodeReference,
UrlCodeReference,
link_to_source_control,
with_source_code_references,
)
from dagster._utils import file_relative_path

# path of the `dagster` package on the filesystem
DAGSTER_PACKAGE_PATH = os.path.normpath(file_relative_path(__file__, "../../"))
GIT_ROOT_PATH = os.path.normpath(os.path.join(DAGSTER_PACKAGE_PATH, "../../"))

# path of the current file relative to the `dagster` package root
PATH_IN_PACKAGE = "/dagster_tests/asset_defs_tests/"

# {path to module}:{path to file relative to module root}:{line number}
EXPECTED_ORIGINS = {
"james_brown": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:12",
"chuck_berry": (
DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/module_with_assets.py:16"
),
"little_richard": (DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:4"),
"fats_domino": DAGSTER_PACKAGE_PATH + PATH_IN_PACKAGE + "asset_package/__init__.py:16",
"miles_davis": (
DAGSTER_PACKAGE_PATH
+ PATH_IN_PACKAGE
+ "asset_package/asset_subpackage/another_module_with_assets.py:6"
),
}


def test_asset_code_origins() -> None:
from dagster_tests.asset_defs_tests import asset_package
Expand All @@ -28,33 +52,12 @@ def test_asset_code_origins() -> None:

collection_with_source_metadata = with_source_code_references(collection)

# path of the `dagster` module on the filesystem
dagster_module_path = os.path.normpath(file_relative_path(__file__, "../../"))

# path of the current file relative to the `dagster` module root
path_in_module = "/dagster_tests/asset_defs_tests/"

# {path to module}:{path to file relative to module root}:{line number}
expected_origins = {
"james_brown": dagster_module_path + path_in_module + "asset_package/__init__.py:12",
"chuck_berry": (
dagster_module_path + path_in_module + "asset_package/module_with_assets.py:16"
),
"little_richard": (dagster_module_path + path_in_module + "asset_package/__init__.py:4"),
"fats_domino": dagster_module_path + path_in_module + "asset_package/__init__.py:16",
"miles_davis": (
dagster_module_path
+ path_in_module
+ "asset_package/asset_subpackage/another_module_with_assets.py:6"
),
}

for asset in collection_with_source_metadata:
if isinstance(asset, AssetsDefinition):
op_name = asset.op.name
assert op_name in expected_origins, f"Missing expected origin for op {op_name}"
assert op_name in EXPECTED_ORIGINS, f"Missing expected origin for op {op_name}"

expected_file_path, expected_line_number = expected_origins[op_name].split(":")
expected_file_path, expected_line_number = EXPECTED_ORIGINS[op_name].split(":")

for key in asset.keys:
assert "dagster/code_references" in asset.metadata_by_key[key]
Expand Down Expand Up @@ -83,3 +86,54 @@ def test_asset_code_origins() -> None:

assert meta.file_path == expected_file_path
assert meta.line_number == int(expected_line_number)


def test_asset_code_origins_source_control() -> None:
from dagster_tests.asset_defs_tests import asset_package

from .asset_package import module_with_assets

collection = load_assets_from_modules([asset_package, module_with_assets])

for asset in collection:
if isinstance(asset, AssetsDefinition):
for key in asset.keys:
# `chuck_berry` is the only asset with source code metadata manually
# attached to it
if asset.op.name == "chuck_berry":
assert "dagster/code_references" in asset.metadata_by_key[key]
else:
assert "dagster/code_references" not in asset.metadata_by_key[key]

collection_with_source_metadata = with_source_code_references(collection)
collection_with_source_control_metadata = link_to_source_control(
collection_with_source_metadata,
source_control_url="https://github.com/dagster-io/dagster",
source_control_branch="master",
repository_root_absolute_path=GIT_ROOT_PATH,
)

for asset in collection_with_source_control_metadata:
if isinstance(asset, AssetsDefinition):
op_name = asset.op.name
assert op_name in EXPECTED_ORIGINS, f"Missing expected origin for op {op_name}"

expected_file_path, expected_line_number = EXPECTED_ORIGINS[op_name].split(":")

for key in asset.keys:
assert "dagster/code_references" in asset.metadata_by_key[key]

assert isinstance(
asset.metadata_by_key[key]["dagster/code_references"].code_references[-1],
UrlCodeReference,
)
meta = cast(
UrlCodeReference,
asset.metadata_by_key[key]["dagster/code_references"].code_references[-1],
)

assert meta.url == (
"https://github.com/dagster-io/dagster/tree/master/python_modules/dagster"
+ (expected_file_path[len(DAGSTER_PACKAGE_PATH) :])
+ f"#L{expected_line_number}"
)

0 comments on commit 52cc1b8

Please sign in to comment.