Skip to content

Support functionalities to enhance task traceability with metadata for dependency search. #450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: master
Choose a base branch
from

Conversation

TlexCypher
Copy link
Contributor

@TlexCypher TlexCypher commented Mar 5, 2025

Related works

#445
#446
#448

What does PR do?

In this Pull Request, I implement a metadata attribution feature that enables searching for tasks dependent on specific tasks executed with a given parameter set.

Why is this needed?

Gokart caches the execution results and parameter states of each task in GCS. As shown in the Related Works section, various metadata are attached to each GCS object to enhance traceability. A common use case is searching for tasks that depend on a specific task executed with a given parameter set. Currently, Gokart does not support searching and tracing task dependencies from GCS metadata. This PR introduces this functionality.

Pre-Requisists

The focus of this PR is embedding the necessary metadata to allow the CLI to search for specific dependencies. The search functionality itself will be implemented on the CLI side (CLI: https://github.com/TlexCypher/gcs-metadog).

Checklist

CI is passing
Code formatting follows project standards.
Necessary tests have been added.
Existing tests pass.

@TlexCypher TlexCypher marked this pull request as draft March 5, 2025 09:24
gokart/utils.py Outdated
K = TypeVar('K')


def map_flattenable_items(items: FlattenableItems[T], func: Callable[[T], K]) -> FlattenableItems[K]:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can use both Generics and isinstance at the same time, code would be below.

def map_flattenable_items(items: FlattenableItems[T], func: Callable[[T], K]) -> FlattenableItems[K]:
    if isinstance(items, dict):
        return {k: map_flattenable_items(v, func) for k, v in items.items()}
    if isinstance(str):
        return items
    if isinstance(items, Iterable[T]):
        return [map_flattenable_items(i, func) for i in items]
    return func(items)

@TlexCypher TlexCypher changed the title WIP: Feat/nestmode Support functionalities to add metadata to enable searching for tasks dependent on specific tasks executed with a given parameter set. Mar 5, 2025
@TlexCypher TlexCypher changed the title Support functionalities to add metadata to enable searching for tasks dependent on specific tasks executed with a given parameter set. Support functionalities to add metadata for searching tasks dependent on specific tasks executed with a given parameter set. Mar 5, 2025
@TlexCypher TlexCypher changed the title Support functionalities to add metadata for searching tasks dependent on specific tasks executed with a given parameter set. Support functionalities to enhance task traceability with metadata for dependency search. Mar 5, 2025
@TlexCypher TlexCypher marked this pull request as ready for review March 5, 2025 12:28
gokart/task.py Outdated
Comment on lines 368 to 387
@dataclass
class _RequiredTaskOutput:
task_name: str
output_path: str

_required_task_outputs = map_flattenable_items(
self.requires(),
func=lambda task: map_flattenable_items(
task.output(), func=lambda output: _RequiredTaskOutput(task_name=task.get_task_family(), output_path=output.path())
),
)
required_task_outputs: dict[str, str] | None = None
if isinstance(_required_task_outputs, list):
required_task_outputs = {r.task_name: r.output_path for r in _required_task_outputs}
elif isinstance(_required_task_outputs, dict):
required_task_outputs = _required_task_outputs
else:
required_task_outputs = (
{_required_task_outputs.task_name: _required_task_outputs.output_path} if isinstance(_required_task_outputs, _RequiredTaskOutput) else None
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[imo]
It would become more readable with extracting this section into a method, which returns required_task_outputs.

gokart/target.py Outdated
lock_at_dump: bool = True,
task_params: dict[str, str] | None = None,
custom_labels: dict[str, Any] | None = None,
required_task_outputs: dict[str, str] | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[imo]
This parameter seems to be just a metadata. But its name may indicate that it effects the functionality of the method or the class's attribute. It would be better to rename for avoiding such a misleading.

@mamo3gr
Copy link
Contributor

mamo3gr commented Mar 6, 2025

LGTM
I made some comments for improving code readability and leave them to your own choice to apply.

@TlexCypher
Copy link
Contributor Author

@mamo3gr Thank you for your thoughtful comments. I'm gonna deal with all of them.

gokart/utils.py Outdated
Comment on lines 80 to 81
if isinstance(items, str):
return items # type: ignore
Copy link
Member

@kitagry kitagry Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, T means `str, so you should apply func for this.

Suggested change
if isinstance(items, str):
return items # type: ignore
if isinstance(items, str):
return func(items) # type: ignore

gokart/utils.py Outdated
K = TypeVar('K')


def map_flattenable_items(items: FlattenableItems[T], func: Callable[[T], K]) -> FlattenableItems[K]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.python.org/3.13/library/functions.html#map

python original map define map(function, iterable), so you must suit python's manner.

Suggested change
def map_flattenable_items(items: FlattenableItems[T], func: Callable[[T], K]) -> FlattenableItems[K]:
def map_flattenable_items(func: Callable[[T], K], items: FlattenableItems[T]) -> FlattenableItems[K]:

gokart/utils.py Outdated
if isinstance(items, str):
return items # type: ignore
if isinstance(items, Iterable):
return [map_flattenable_items(i, func) for i in items]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When pass tuple[T], it should returns tuple[K]. But, in this implementation, this case is not cared for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, could you add testcase?

continue
merged_labels[label_name] = label_value
merged_labels: dict[str, str] = {}
for normalized_label in normalized_labels_list[:]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for normalized_label in normalized_labels_list[:]:
for normalized_label in normalized_labels_list:

@TlexCypher TlexCypher requested a review from kitagry March 6, 2025 07:58
@TlexCypher TlexCypher requested review from kitagry and mamo3gr March 6, 2025 10:17
Comment on lines 134 to 137
if isinstance(required_task_outputs, tuple):
return tuple(required_task_output.serialize() for required_task_output in required_task_outputs)
if isinstance(required_task_outputs, Iterable):
return _list_flatten([GCSObjectMetadataClient._get_serialized_string(ro) for ro in required_task_outputs])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tuple is one type of Iterable, and python has many iterable type, e.x) list, set, tuple, so on. You can write better like the following.

Suggested change
if isinstance(required_task_outputs, tuple):
return tuple(required_task_output.serialize() for required_task_output in required_task_outputs)
if isinstance(required_task_outputs, Iterable):
return _list_flatten([GCSObjectMetadataClient._get_serialized_string(ro) for ro in required_task_outputs])
if isinstance(required_task_outputs, Iterable):
iter_type = type(required_task_outputs)
return iter_type(_list_flatten([GCSObjectMetadataClient._get_serialized_string(ro) for ro in required_task_outputs]))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kitagry
At first, basically, I think your suggestion is acceptable.
But some iterable object cannot be dump as json like set.
Any iterable object can be iteratable, so basically they are kind of list, and in most cases, even if some iterable objects would be serialized as list, I think in most cases we don't have significant problems.
So I think this implementation might be ok.
How do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply.

Yes, you are correct, and you can change iterable to list,

And, What I actually meant was regarding the utils.map_flattenable_items method. I'll make another comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for replying.
I deal with your suggested changes.

@TlexCypher TlexCypher requested a review from kitagry March 13, 2025 06:46
gokart/utils.py Outdated
if isinstance(items, str):
return func(items) # type: ignore
if isinstance(items, Iterable):
return [map_flattenable_items(func, i) for i in items]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return [map_flattenable_items(func, i) for i in items]
return map(lambda item: map_flattenable_items(func, i), items)

@TlexCypher TlexCypher requested a review from kitagry April 17, 2025 09:32
@TlexCypher
Copy link
Contributor Author

@kitagry Sorry for late actions.
I accept your suggested changes.
Could you review this PR again?

gokart/task.py Outdated
from gokart.required_task_output import RequiredTaskOutput
from gokart.utils import map_flattenable_items

if sys.version_info < (3, 13):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this part is not needed?

local_temporary_directory=./resource/tmp

[core]
logging_conf_file=logging.ini
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nits]
add end of newline

patched_metadata = GCSObjectMetadataClient._get_patched_obj_metadata(
copy.deepcopy(original_metadata),
task_params,
custom_labels,
required_task_outputs if required_task_outputs else None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be redundant

Suggested change
required_task_outputs if required_task_outputs else None,
required_task_outputs,

@@ -101,23 +107,49 @@ def _get_patched_obj_metadata(
# However, users who utilize custom_labels are no longer expected to search using the labels generated from task parameters.
# Instead, users are expected to search using the labels they provided.
# Therefore, in the event of a key conflict, the value registered by the user-provided labels will take precedence.
_merged_labels = GCSObjectMetadataClient._merge_custom_labels_and_task_params_labels(normalized_task_params_labels, normalized_custom_labels)
normalized_labels = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[imo]
I prefer this because of readability

Suggested change
normalized_labels = (
normalized_labels = [normalized_custom_labels, normalized_task_params_labels]
if not required_task_outputs
normalized_labels.append({'__required_task_outputs': json.dumps(GCSObjectMetadataClient._get_serialized_string(required_task_outputs))})

merged_labels: dict[str, str] = {}
for normalized_label in normalized_labels_list[:]:
for label_name, label_value in normalized_label.items():
if len(label_value) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MUST] This code may fail, since it seems to assume that label_value is str.

I prefer checking if it is str, and then check the length as,

isinstance(label_value, str) and len(label_value)==0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing my code!
In my opinion, type checking is not necessary, because GCSObjectMetadataClient._normalize_labels convert all values stored in dictionary into string.
So, label_value definitely is string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TlexCypher
Then maybe the input normalized_labels_list: list[dict[str, Any]] should be normalized_labels_list: list[dict[str, str]] ?

continue
merged_labels[label_name] = label_value
merged_labels: dict[str, str] = {}
for normalized_label in normalized_labels_list[:]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[weak-IMO]

for normalized_label in normalized_labels_list:
    for label_name, label_value in normalized_label.items():
        if len(label_value) == 0:

I thought this part a bit difficult to understand, since it is deeply nested.

It may get better if you extract for label_name, label_value in... part as a separate function, and apply it with a functools.reduce().

However, current code is OK though. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for great suggestion!

For this specific task of merging labels, the simple nested loop is likely more readable and Pythonic than using functools.reduce.

While reduce can be used, in this scenario, the straightforward nested loop (or perhaps the alternative 'flattening' approach) probably offers better clarity and maintainability.

How do you think?

Copy link
Contributor

@mski-iksm mski-iksm Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preferred reduce approach, because it express the motivation of making merged_labels earlier, which makes the first time reader easier to understand.

merged_labels = reduce(...)

In the nested loop, you need to read to L.147 to understand the motivation of building merged_labels.

However, both approach is OK, since this is relatively small loop nest. :)

Copy link
Contributor

@mski-iksm mski-iksm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TlexCypher
I've made some comments but mainly LGTM! Thank you for your contribution!

Copy link
Collaborator

@hirosassa hirosassa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented!

Comment on lines +119 to +126
def _iterable_flatten(nested_list: Iterable) -> list[str]:
flattened_list: list[str] = []
for item in nested_list:
if isinstance(item, Iterable):
flattened_list.extend(_iterable_flatten(item))
else:
flattened_list.append(item)
return flattened_list
Copy link
Collaborator

@hirosassa hirosassa Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using Iterator like below. This can be (maybe) reduce temporal memory usage in some case and it looks a bit elegant.

Suggested change
def _iterable_flatten(nested_list: Iterable) -> list[str]:
flattened_list: list[str] = []
for item in nested_list:
if isinstance(item, Iterable):
flattened_list.extend(_iterable_flatten(item))
else:
flattened_list.append(item)
return flattened_list
def _iterable_flatten(nested_list: Iterable) -> Iterator[str]:
for item in nested_list:
if isinstance(item, Iterable):
yield from _iterable_flatten(item)
else:
yield item

and we should change L130 as

return list(_iterable_flatten([GCSObjectMetadataClient._get_serialized_string(ro) for ro in required_task_outputs]))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants