-
Notifications
You must be signed in to change notification settings - Fork 312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flytekit: Rename map_task to map, replace min_successes
and min_success_ratio
with tolerance, rename max_parallelism to concurrency
#3099
Changes from all commits
0868c82
aab9a75
a31fe74
524470e
4b8f473
3add6fc
91da9b4
2ae765d
06671a3
9513557
9c5a217
90b33f9
2c58ddc
97c465f
4f47c6a
baec8c2
650153d
1b343cd
a45d4a7
902b838
e14b123
c9b5150
3916b92
566ba0b
20d8c01
718dd49
99567a7
8b5f1f2
b102619
0eb2133
ca2671d
6844f75
119a077
22db713
b352a46
285a719
f359f32
1cd6cbf
ef27cc4
a3877d1
98ff5b0
85ddf8b
86ce73b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3,6 +3,7 @@ | |||||
import hashlib | ||||||
import math | ||||||
import os # TODO: use flytekit logger | ||||||
import warnings | ||||||
from contextlib import contextmanager | ||||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union, cast | ||||||
|
||||||
|
@@ -369,11 +370,12 @@ def _raw_execute(self, **kwargs) -> Any: | |||||
return outputs | ||||||
|
||||||
|
||||||
def map_task( | ||||||
def map( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider keeping descriptive function name
Consider keeping the original function name Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #7379ce Is this a valid issue, or was it incorrectly flagged by the Agent?
|
||||||
target: Union[LaunchPlan, PythonFunctionTask, "FlyteLaunchPlan"], | ||||||
concurrency: Optional[int] = None, | ||||||
min_successes: Optional[int] = None, | ||||||
min_success_ratio: float = 1.0, | ||||||
tolerance: Optional[Union[float, int]] = None, | ||||||
min_successes: Optional[int] = None, # Deprecated | ||||||
min_success_ratio: Optional[float] = None, # Deprecated | ||||||
**kwargs, | ||||||
): | ||||||
""" | ||||||
|
@@ -385,23 +387,51 @@ def map_task( | |||||
size. If the size of the input exceeds the concurrency value, then multiple batches will be run serially until | ||||||
all inputs are processed. If set to 0, this means unbounded concurrency. If left unspecified, this means the | ||||||
array node will inherit parallelism from the workflow | ||||||
:param min_successes: The minimum number of successful executions | ||||||
:param min_success_ratio: The minimum ratio of successful executions | ||||||
:param min_successes: The minimum number of successful executions [Deprecated] Use tolerance instead | ||||||
:param min_success_ratio: The minimum ratio of successful executions [Deprecated] Use tolerance instead | ||||||
:param tolerance: Failure tolerance threshold. | ||||||
If float (0-1): represents minimum success ratio | ||||||
If int (>1): represents minimum number of successes | ||||||
""" | ||||||
from flytekit.remote import FlyteLaunchPlan | ||||||
|
||||||
if min_successes is not None and min_success_ratio != 1.0: | ||||||
warnings.warn( | ||||||
"min_success and min_success_ratio are deprecated. Please use 'tolerance' parameter instead", | ||||||
DeprecationWarning, | ||||||
stacklevel=2, | ||||||
) | ||||||
|
||||||
computed_min_ratio = 1.0 | ||||||
computed_min_success = None | ||||||
|
||||||
if tolerance is not None: | ||||||
if isinstance(tolerance, float): | ||||||
if not 0 <= tolerance <= 1: | ||||||
raise ValueError("tolerance must be between 0 and 1") | ||||||
computed_min_ratio = tolerance | ||||||
elif isinstance(tolerance, int): | ||||||
if tolerance < 1: | ||||||
raise ValueError("tolerance must be greater than 0") | ||||||
computed_min_success = tolerance | ||||||
else: | ||||||
raise TypeError("tolerance must be float or int") | ||||||
|
||||||
final_min_ratio = computed_min_ratio if min_success_ratio is None else min_success_ratio | ||||||
final_min_successes = computed_min_success if min_successes is None else min_successes | ||||||
|
||||||
if isinstance(target, (LaunchPlan, FlyteLaunchPlan, ReferenceTask)): | ||||||
return array_node( | ||||||
target=target, | ||||||
concurrency=concurrency, | ||||||
min_successes=min_successes, | ||||||
min_success_ratio=min_success_ratio, | ||||||
min_successes=final_min_successes, | ||||||
min_success_ratio=final_min_ratio, | ||||||
) | ||||||
return array_node_map_task( | ||||||
task_function=target, | ||||||
concurrency=concurrency, | ||||||
min_successes=min_successes, | ||||||
min_success_ratio=min_success_ratio, | ||||||
min_successes=final_min_successes, | ||||||
min_success_ratio=final_min_ratio, | ||||||
**kwargs, | ||||||
) | ||||||
|
||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,6 +1,7 @@ | ||||||||||||||||||||||
from __future__ import annotations | ||||||||||||||||||||||
|
||||||||||||||||||||||
import typing | ||||||||||||||||||||||
import warnings | ||||||||||||||||||||||
from typing import Any, Callable, Dict, List, Optional, Type | ||||||||||||||||||||||
|
||||||||||||||||||||||
from flytekit.core import workflow as _annotated_workflow | ||||||||||||||||||||||
|
@@ -129,7 +130,8 @@ def create( | |||||||||||||||||||||
labels: Optional[_common_models.Labels] = None, | ||||||||||||||||||||||
annotations: Optional[_common_models.Annotations] = None, | ||||||||||||||||||||||
raw_output_data_config: Optional[_common_models.RawOutputDataConfig] = None, | ||||||||||||||||||||||
max_parallelism: Optional[int] = None, | ||||||||||||||||||||||
max_parallelism: Optional[int] = None, # Deprecated: Use concurrency instead | ||||||||||||||||||||||
concurrency: Optional[int] = None, | ||||||||||||||||||||||
Comment on lines
+133
to
+134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider removing deprecated max_parallelism parameter
Consider removing the deprecated Code suggestionCheck the AI-generated fix before applying
Suggested change
Code Review Run #7379ce Is this a valid issue, or was it incorrectly flagged by the Agent?
|
||||||||||||||||||||||
security_context: Optional[security.SecurityContext] = None, | ||||||||||||||||||||||
auth_role: Optional[_common_models.AuthRole] = None, | ||||||||||||||||||||||
trigger: Optional[LaunchPlanTriggerBase] = None, | ||||||||||||||||||||||
|
@@ -183,7 +185,8 @@ def create( | |||||||||||||||||||||
labels=labels, | ||||||||||||||||||||||
annotations=annotations, | ||||||||||||||||||||||
raw_output_data_config=raw_output_data_config, | ||||||||||||||||||||||
max_parallelism=max_parallelism, | ||||||||||||||||||||||
concurrency=concurrency, # Pass new parameter | ||||||||||||||||||||||
max_parallelism=max_parallelism, # Pass deprecated parameter | ||||||||||||||||||||||
security_context=security_context, | ||||||||||||||||||||||
trigger=trigger, | ||||||||||||||||||||||
overwrite_cache=overwrite_cache, | ||||||||||||||||||||||
|
@@ -213,7 +216,8 @@ def get_or_create( | |||||||||||||||||||||
labels: Optional[_common_models.Labels] = None, | ||||||||||||||||||||||
annotations: Optional[_common_models.Annotations] = None, | ||||||||||||||||||||||
raw_output_data_config: Optional[_common_models.RawOutputDataConfig] = None, | ||||||||||||||||||||||
max_parallelism: Optional[int] = None, | ||||||||||||||||||||||
concurrency: Optional[int] = None, | ||||||||||||||||||||||
max_parallelism: Optional[int] = None, # Deprecated | ||||||||||||||||||||||
security_context: Optional[security.SecurityContext] = None, | ||||||||||||||||||||||
auth_role: Optional[_common_models.AuthRole] = None, | ||||||||||||||||||||||
trigger: Optional[LaunchPlanTriggerBase] = None, | ||||||||||||||||||||||
|
@@ -241,9 +245,10 @@ def get_or_create( | |||||||||||||||||||||
:param annotations: Optional annotations to attach to executions created by this launch plan. | ||||||||||||||||||||||
:param raw_output_data_config: Optional location of offloaded data for things like S3, etc. | ||||||||||||||||||||||
:param auth_role: Add an auth role if necessary. | ||||||||||||||||||||||
:param max_parallelism: Controls the maximum number of tasknodes that can be run in parallel for the entire | ||||||||||||||||||||||
workflow. This is useful to achieve fairness. Note: MapTasks are regarded as one unit, and | ||||||||||||||||||||||
parallelism/concurrency of MapTasks is independent from this. | ||||||||||||||||||||||
:param concurrency: Controls the maximum number of tasknodes that can be run in parallel for the entire | ||||||||||||||||||||||
workflow. This is useful to achieve fairness. Note: MapTasks are regarded as one unit, and | ||||||||||||||||||||||
parallelism/concurrency of MapTasks is independent from this. | ||||||||||||||||||||||
:param max_parallelism: [Deprecated] Use concurrency instead. | ||||||||||||||||||||||
:param trigger: [alpha] This is a new syntax for specifying schedules. | ||||||||||||||||||||||
:param overwrite_cache: If set to True, the execution will always overwrite cache | ||||||||||||||||||||||
:param auto_activate: If set to True, the launch plan will be activated automatically on registration. | ||||||||||||||||||||||
|
@@ -258,6 +263,7 @@ def get_or_create( | |||||||||||||||||||||
or annotations is not None | ||||||||||||||||||||||
or raw_output_data_config is not None | ||||||||||||||||||||||
or auth_role is not None | ||||||||||||||||||||||
or concurrency is not None | ||||||||||||||||||||||
or max_parallelism is not None | ||||||||||||||||||||||
or security_context is not None | ||||||||||||||||||||||
or trigger is not None | ||||||||||||||||||||||
|
@@ -296,7 +302,11 @@ def get_or_create( | |||||||||||||||||||||
("labels", labels, cached_outputs["_labels"]), | ||||||||||||||||||||||
("annotations", annotations, cached_outputs["_annotations"]), | ||||||||||||||||||||||
("raw_output_data_config", raw_output_data_config, cached_outputs["_raw_output_data_config"]), | ||||||||||||||||||||||
("max_parallelism", max_parallelism, cached_outputs["_max_parallelism"]), | ||||||||||||||||||||||
( | ||||||||||||||||||||||
"concurrency", | ||||||||||||||||||||||
concurrency if concurrency is not None else max_parallelism, | ||||||||||||||||||||||
cached_outputs.get("_concurrency", cached_outputs.get("")), | ||||||||||||||||||||||
), | ||||||||||||||||||||||
("security_context", security_context, cached_outputs["_security_context"]), | ||||||||||||||||||||||
("overwrite_cache", overwrite_cache, cached_outputs["_overwrite_cache"]), | ||||||||||||||||||||||
("auto_activate", auto_activate, cached_outputs["_auto_activate"]), | ||||||||||||||||||||||
|
@@ -326,7 +336,8 @@ def get_or_create( | |||||||||||||||||||||
labels, | ||||||||||||||||||||||
annotations, | ||||||||||||||||||||||
raw_output_data_config, | ||||||||||||||||||||||
max_parallelism, | ||||||||||||||||||||||
concurrency=concurrency, | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Refactor init method signature
The 'init' method has too many parameters (14 > 5) and is missing docstring and return type annotation. Code suggestionCheck the AI-generated fix before applying
Code Review Run #7379ce Is this a valid issue, or was it incorrectly flagged by the Agent?
|
||||||||||||||||||||||
max_parallelism=max_parallelism, | ||||||||||||||||||||||
auth_role=auth_role, | ||||||||||||||||||||||
security_context=security_context, | ||||||||||||||||||||||
trigger=trigger, | ||||||||||||||||||||||
|
@@ -347,7 +358,8 @@ def __init__( | |||||||||||||||||||||
labels: Optional[_common_models.Labels] = None, | ||||||||||||||||||||||
annotations: Optional[_common_models.Annotations] = None, | ||||||||||||||||||||||
raw_output_data_config: Optional[_common_models.RawOutputDataConfig] = None, | ||||||||||||||||||||||
max_parallelism: Optional[int] = None, | ||||||||||||||||||||||
concurrency: Optional[int] = None, | ||||||||||||||||||||||
max_parallelism: Optional[int] = None, # Deprecated | ||||||||||||||||||||||
security_context: Optional[security.SecurityContext] = None, | ||||||||||||||||||||||
trigger: Optional[LaunchPlanTriggerBase] = None, | ||||||||||||||||||||||
overwrite_cache: Optional[bool] = None, | ||||||||||||||||||||||
|
@@ -367,7 +379,14 @@ def __init__( | |||||||||||||||||||||
self._labels = labels | ||||||||||||||||||||||
self._annotations = annotations | ||||||||||||||||||||||
self._raw_output_data_config = raw_output_data_config | ||||||||||||||||||||||
self._max_parallelism = max_parallelism | ||||||||||||||||||||||
self._concurrency = concurrency | ||||||||||||||||||||||
self._max_parallelism = concurrency if concurrency is not None else max_parallelism | ||||||||||||||||||||||
if max_parallelism is not None: | ||||||||||||||||||||||
warnings.warn( | ||||||||||||||||||||||
"max_parallelism is deprecated and will be removed in a future version. Use concurrency instead.", | ||||||||||||||||||||||
DeprecationWarning, | ||||||||||||||||||||||
stacklevel=2, | ||||||||||||||||||||||
) | ||||||||||||||||||||||
self._security_context = security_context | ||||||||||||||||||||||
self._trigger = trigger | ||||||||||||||||||||||
self._overwrite_cache = overwrite_cache | ||||||||||||||||||||||
|
@@ -385,7 +404,8 @@ def clone_with( | |||||||||||||||||||||
labels: Optional[_common_models.Labels] = None, | ||||||||||||||||||||||
annotations: Optional[_common_models.Annotations] = None, | ||||||||||||||||||||||
raw_output_data_config: Optional[_common_models.RawOutputDataConfig] = None, | ||||||||||||||||||||||
max_parallelism: Optional[int] = None, | ||||||||||||||||||||||
concurrency: Optional[int] = None, | ||||||||||||||||||||||
max_parallelism: Optional[int] = None, # Dreprecated | ||||||||||||||||||||||
security_context: Optional[security.SecurityContext] = None, | ||||||||||||||||||||||
trigger: Optional[LaunchPlanTriggerBase] = None, | ||||||||||||||||||||||
overwrite_cache: Optional[bool] = None, | ||||||||||||||||||||||
|
@@ -401,6 +421,7 @@ def clone_with( | |||||||||||||||||||||
labels=labels or self.labels, | ||||||||||||||||||||||
annotations=annotations or self.annotations, | ||||||||||||||||||||||
raw_output_data_config=raw_output_data_config or self.raw_output_data_config, | ||||||||||||||||||||||
concurrency=concurrency or self.concurrency, | ||||||||||||||||||||||
max_parallelism=max_parallelism or self.max_parallelism, | ||||||||||||||||||||||
security_context=security_context or self.security_context, | ||||||||||||||||||||||
trigger=trigger, | ||||||||||||||||||||||
|
@@ -466,7 +487,17 @@ def raw_output_data_config(self) -> Optional[_common_models.RawOutputDataConfig] | |||||||||||||||||||||
|
||||||||||||||||||||||
@property | ||||||||||||||||||||||
def max_parallelism(self) -> Optional[int]: | ||||||||||||||||||||||
return self._max_parallelism | ||||||||||||||||||||||
"""[Deprecated] Use concurrency instead. This property is maintained for backward compatibility""" | ||||||||||||||||||||||
warnings.warn( | ||||||||||||||||||||||
"max_parallelism is deprecated and will be removed in a future version. Use concurrency instead.", | ||||||||||||||||||||||
DeprecationWarning, | ||||||||||||||||||||||
stacklevel=2, | ||||||||||||||||||||||
) | ||||||||||||||||||||||
return self._concurrency | ||||||||||||||||||||||
|
||||||||||||||||||||||
@property | ||||||||||||||||||||||
def concurrency(self) -> Optional[int]: | ||||||||||||||||||||||
return self._concurrency | ||||||||||||||||||||||
|
||||||||||||||||||||||
@property | ||||||||||||||||||||||
def security_context(self) -> Optional[security.SecurityContext]: | ||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider keeping both
map
andmap_task
imports to maintain backward compatibility. The aliasmap_task = map
on line 273 suggests this was intentional.Code suggestion
Code Review Run #7379ce
Is this a valid issue, or was it incorrectly flagged by the Agent?