Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flytekit: Rename map_task to map, replace min_successes and min_success_ratio with tolerance, rename max_parallelism to concurrency #3099

Closed
wants to merge 43 commits into from

Conversation

ChihTsungLu
Copy link

@ChihTsungLu ChihTsungLu commented Jan 27, 2025

Tracking issue

Related to flyteorg/flyte#6139

Why are the changes needed?

The current Flytekit has several areas that could be improved for a better developer experience:

  1. The map_task name is unnecessarily verbose when imported via the recommended import flytekit as fl
  2. The failure tolerance parameters (min_successes and min_success_ratio) are powerful but overly verbose
  3. The max_parallelism parameter naming in workflow and LaunchPlan needs to be aligned with map_task's concurrency parameter

What changes were proposed in this pull request?

  1. Rename map_task to map

    • While this conflicts with Python's built-in map, it's acceptable since we recommend using import flytekit as fl
    • All changes will maintain backwards compatibility
  2. Simplify failure tolerance parameters

    • Deprecate min_successes and min_success_ratio
    • Introduce new tolerance parameter that accepts both float and int types
    • Maintain backwards compatibility with existing parameters
  3. Standardize parallelism parameter

    • Deprecate max_parallelism argument in workflow and LaunchPlan
    • Introduce new concurrency parameter to match map_task's parameter
    • Maintain backwards compatibility with existing parameter

Known issue

The changes introduce the concurrency field in Flytekit, which is not currently defined in flyteidl's LaunchPlanSpec

valueError

Code to Address the Issue:
The following code handles the transition between the concurrency and max_parallelism fields:

    @classmethod
    def from_flyte_idl(cls, pb2):
        """
        :param flyteidl.admin.launch_plan_pb2.LaunchPlanSpec pb2:
        :rtype: LaunchPlanSpec
        """

        auth_role = None
        # First check the newer field, auth_role.
        if pb2.auth_role is not None and (pb2.auth_role.assumable_iam_role or pb2.auth_role.kubernetes_service_account):
            auth_role = _common.AuthRole.from_flyte_idl(pb2.auth_role)
        # Fallback to the deprecated field.
        elif pb2.auth is not None:
            if pb2.auth.assumable_iam_role:
                auth_role = _common.AuthRole(assumable_iam_role=pb2.auth.assumable_iam_role)
            else:
                auth_role = _common.AuthRole(assumable_iam_role=pb2.auth.kubernetes_service_account)

        # Handle concurrency/max_parallelism transition
        concurrency = None
        max_parallelism = None

        if hasattr(pb2, "concurrency"):
            try:
                if pb2.HasField("concurrency"):
                    concurrency = pb2.concurrency
            except ValueError:
                pass  # Field doesn't exist in protobuf yet

        # Fallback to max_parallelism (deprecated field)
        if hasattr(pb2, "max_parallelism"):
            max_parallelism = pb2.max_parallelism

        # Use concurrency if available, otherwise use max_parallelism
        final_concurrency = concurrency if concurrency is not None else max_parallelism

        return cls(
            workflow_id=_identifier.Identifier.from_flyte_idl(pb2.workflow_id),
            entity_metadata=LaunchPlanMetadata.from_flyte_idl(pb2.entity_metadata),
            default_inputs=_interface.ParameterMap.from_flyte_idl(pb2.default_inputs),
            fixed_inputs=_literals.LiteralMap.from_flyte_idl(pb2.fixed_inputs),
            labels=_common.Labels.from_flyte_idl(pb2.labels),
            annotations=_common.Annotations.from_flyte_idl(pb2.annotations),
            auth_role=auth_role,
            raw_output_data_config=_common.RawOutputDataConfig.from_flyte_idl(pb2.raw_output_data_config),
            concurrency=final_concurrency,
            max_parallelism=pb2.max_parallelism,
            security_context=security.SecurityContext.from_flyte_idl(pb2.security_context)
            if pb2.security_context
            else None,
            overwrite_cache=pb2.overwrite_cache if pb2.overwrite_cache else None,
        )

How was this patch tested?

Ran tests with the command: make test

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Summary by Bito

This PR implements API improvements in Flytekit by standardizing parallelism parameters and simplifying the API. Key changes include replacing 'max_parallelism' with 'concurrency', introducing a 'tolerance' parameter to replace 'min_successes' and 'min_success_ratio', and updating Dockerfile configurations. The changes maintain backward compatibility through deprecation warnings and include comprehensive test updates. The modifications aim to streamline the API and improve developer experience.

Unit tests added: True

Estimated effort to review (1-5, lower is better): 5

ChihTsungLu and others added 30 commits January 27, 2025 17:20
Signed-off-by: lu00122 <[email protected]>
flyteorg#3059)

* Make FlyteUserRuntimeException to return error_code in the ContainerError

Signed-off-by: Rafael Ribeiro Raposo <[email protected]>
Signed-off-by: lu00122 <[email protected]>
* update

Signed-off-by: Future-Outlier <[email protected]>

* ficx

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* test

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* udpate

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* update

Signed-off-by: Future-Outlier <[email protected]>

* updatre

Signed-off-by: Future-Outlier <[email protected]>

---------

Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: lu00122 <[email protected]>
* feat: separate remote signal functions

Signed-off-by: mao3267 <[email protected]>

* refactor: make lint

Signed-off-by: mao3267 <[email protected]>

* test: add integration test for separated signal functions

Signed-off-by: mao3267 <[email protected]>

* fix: register workflow to admin

Signed-off-by: mao3267 <[email protected]>

* fix: integration test and approve function

Signed-off-by: mao3267 <[email protected]>

* fix: remove approve node output

Signed-off-by: mao3267 <[email protected]>

* fix: replace single sleep command to retry statement

Signed-off-by: mao3267 <[email protected]>

* fix: update comments

Signed-off-by: mao3267 <[email protected]>

* fix: simplify duplicate retry operations

Signed-off-by: mao3267 <[email protected]>

---------

Signed-off-by: mao3267 <[email protected]>
Signed-off-by: lu00122 <[email protected]>
flyteorg#3067)

* Do not copy over ca-certifcates.crt if the base image has one

Signed-off-by: Thomas J. Fan <[email protected]>

* Only copy over cat-certificates.crt if it does not exist in base image

Signed-off-by: Thomas J. Fan <[email protected]>

---------

Signed-off-by: Thomas J. Fan <[email protected]>
Signed-off-by: lu00122 <[email protected]>
…of parent node (flyteorg#2982)

* test

Signed-off-by: Paul Dittamo <[email protected]>

* add support for with_overrides for map tasks

Signed-off-by: Paul Dittamo <[email protected]>

* expand unit test

Signed-off-by: Paul Dittamo <[email protected]>

* cleanup

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: lu00122 <[email protected]>
… to task exception (flyteorg#3057)

* Fix: Always propagate pytorch task worker process exception timestamp to task exception

Signed-off-by: Fabio Grätz <[email protected]>

* Fix exist recoverable error test

Signed-off-by: Fabio Grätz <[email protected]>

---------

Signed-off-by: Fabio Grätz <[email protected]>
Co-authored-by: Fabio Grätz <[email protected]>
Signed-off-by: lu00122 <[email protected]>
Signed-off-by: lu00122 <[email protected]>
Signed-off-by: lu00122 <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: lu00122 <[email protected]>
* Allow for user-defined dataclass type tranformers

Signed-off-by: Eduardo Apolinario <[email protected]>

* Finish comment and remote user-defined dataclass transformer from registry

Signed-off-by: Eduardo Apolinario <[email protected]>

---------

Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Signed-off-by: lu00122 <[email protected]>
)

* [flyteagent][CLI] Make agent prometheus port configurable

Signed-off-by: Future-Outlier <[email protected]>

* lint

Signed-off-by: Future-Outlier <[email protected]>

* update promethus port comment from Eduardo

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: Eduardo Apolinario  <[email protected]>

* lint

Signed-off-by: Future-Outlier <[email protected]>

---------

Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Signed-off-by: lu00122 <[email protected]>
Signed-off-by: Thomas J. Fan <[email protected]>
Signed-off-by: lu00122 <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: lu00122 <[email protected]>
…ep learning (GNN) use cases (flyteorg#3004)

Signed-off-by: Shuying Liang <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: JiaWei Jiang <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Niels Bantilan <[email protected]>
Co-authored-by: Han-Ru Chen (Future-Outlier) <[email protected]>
Co-authored-by: Thomas J. Fan <[email protected]>
Co-authored-by: 江家瑋 <[email protected]>
Co-authored-by: Yee Hing Tong <[email protected]>
Co-authored-by: Niels Bantilan <[email protected]>
Signed-off-by: lu00122 <[email protected]>
…#3056)

* Treat builtins.NoneType explicitly when parsing type descriptions

Signed-off-by: Adrian Loy <[email protected]>

* Modified tests to cover NoneType case

Signed-off-by: Adrian Loy <[email protected]>

---------

Signed-off-by: Adrian Loy <[email protected]>
Signed-off-by: lu00122 <[email protected]>
* Adds secret env name

Signed-off-by: Thomas J. Fan <[email protected]>

* Use env_var

Signed-off-by: Thomas J. Fan <[email protected]>

* Bump flyteidl

Signed-off-by: Thomas J. Fan <[email protected]>

* Smaller change

Signed-off-by: Thomas J. Fan <[email protected]>

* Add integration test

Signed-off-by: Thomas J. Fan <[email protected]>

* Add integration test

Signed-off-by: Thomas J. Fan <[email protected]>

* Use env_var

Signed-off-by: Thomas J. Fan <[email protected]>

* Check for file and env_var

Signed-off-by: Thomas J. Fan <[email protected]>

* Add check for kubectl

Signed-off-by: Thomas J. Fan <[email protected]>

---------

Signed-off-by: Thomas J. Fan <[email protected]>
Signed-off-by: lu00122 <[email protected]>
Signed-off-by: amitani <[email protected]>
Signed-off-by: lu00122 <[email protected]>
* Enable pydoclint

Signed-off-by: Eduardo Apolinario <[email protected]>

* Regenerate baseline

Signed-off-by: Eduardo Apolinario <[email protected]>

* Regenerate baseline for real this time

Signed-off-by: Eduardo Apolinario <[email protected]>

---------

Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Signed-off-by: lu00122 <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Signed-off-by: lu00122 <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
Co-authored-by: Eduardo Apolinario <[email protected]>
Signed-off-by: lu00122 <[email protected]>
Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: lu00122 <[email protected]>
@flyte-bot
Copy link
Contributor

flyte-bot commented Jan 27, 2025

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Feature Improvement - API Parameter Standardization

__init__.py - Deprecated map_task in favor of map function

array_node_map_task.py - Introduced tolerance parameter and deprecated min_successes/min_success_ratio

launch_plan.py - Replaced max_parallelism with concurrency parameter

options.py - Updated Options class with concurrency parameter

execution.py - Added concurrency field and deprecated max_parallelism

launch_plan.py - Implemented concurrency parameter handling

Testing - Test Updates for API Changes

test_array_node.py - Updated tests to use map instead of map_task

test_array_node_map_task.py - Modified tests for new tolerance parameter

test_pod.py - Updated map_task references to map

test_task.py - Updated test cases for map function

array_map.py - Updated workflow tests for map function

pydantic_wf.py - Modified tests to use new map function

Documentation - Documentation Updates

pydoclint-errors-baseline.txt - Removed documentation errors related to Options class

Feature Improvement - API Parameter Standardization

__init__.py - Deprecated map_task in favor of map function

array_node_map_task.py - Introduced tolerance parameter and deprecated min_successes/min_success_ratio

launch_plan.py - Replaced max_parallelism with concurrency parameter

options.py - Updated Options class with concurrency parameter

execution.py - Added concurrency field and deprecated max_parallelism

launch_plan.py - Implemented concurrency parameter handling

Testing - Test Updates for API Changes

test_array_node_map_task.py - Updated map_task references to map function

test_artifacts.py - Updated imports and function calls to use map

test_interface.py - Modified test cases for map function

test_launch_plan.py - Updated tests to use concurrency instead of max_parallelism

test_node_creation.py - Updated map_task references to map

test_partials.py - Updated imports to use map

test_type_hints.py - Modified tests to use new map function

test_remote.py - Updated imports and function calls to use map

test_listdir.py - Updated imports and function calls to use map

@@ -12,4 +12,4 @@ def fn(x: int, y: int) -> int:
@workflow
def workflow_with_maptask(data: typing.List[int], y: int) -> typing.List[int]:
partial_fn = partial(fn, y=y)
return map_task(partial_fn)(x=data)
return map(partial_fn)(x=data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential change in execution behavior

Consider if replacing map_task with map is intentional as it may change the execution behavior. map_task typically handles task-level parallelization while map is a standard Python function.

Code suggestion
Check the AI-generated fix before applying
Suggested change
return map(partial_fn)(x=data)
return map_task(partial_fn)(x=data)

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -598,7 +598,7 @@ def t2(x: typing.List[int]) -> int:

@workflow
def my_wf(a: typing.List[int]) -> int:
x = map_task(t1, metadata=TaskMetadata(retries=1))(a=a)
x = map(t1, metadata=TaskMetadata(retries=1))(a=a)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using map_task instead of map

The function name has been changed from map_task to map. This change appears to be using the built-in map function instead of the intended map_task function.

Code suggestion
Check the AI-generated fix before applying
Suggested change
x = map(t1, metadata=TaskMetadata(retries=1))(a=a)
x = map_task(t1, metadata=TaskMetadata(retries=1))(a=a)

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -369,11 +370,12 @@ def _raw_execute(self, **kwargs) -> Any:
return outputs


def map_task(
def map(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider keeping descriptive function name

Consider keeping the original function name map_task since renaming to map could conflict with Python's built-in map function and cause confusion. The original name was more descriptive of the function's purpose.

Code suggestion
Check the AI-generated fix before applying
Suggested change
def map(
def map_task(

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines +133 to +134
max_parallelism: Optional[int] = None, # Deprecated: Use concurrency instead
concurrency: Optional[int] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider removing deprecated max_parallelism parameter

Consider removing the deprecated max_parallelism parameter since concurrency is now the preferred way to control parallel execution. Having both parameters may cause confusion.

Code suggestion
Check the AI-generated fix before applying
Suggested change
max_parallelism: Optional[int] = None, # Deprecated: Use concurrency instead
concurrency: Optional[int] = None,
concurrency: Optional[int] = None,
max_parallelism: Optional[int] = None, # Deprecated: Use concurrency instead
if max_parallelism is not None:
warnings.warn(
"max_parallelism is deprecated, use concurrency instead",
DeprecationWarning,
stacklevel=2
)

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -326,7 +336,8 @@ def get_or_create(
labels,
annotations,
raw_output_data_config,
max_parallelism,
concurrency=concurrency,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor init method signature

The 'init' method has too many parameters (14 > 5) and is missing docstring and return type annotation.

Code suggestion
Check the AI-generated fix before applying
 @@ -339,20 +339,25 @@
 + class LaunchPlanConfig:
 +     """Configuration for LaunchPlan initialization."""
 +     def __init__(
 +         self,
 +         schedule: Optional[_schedule_model.Schedule] = None,
 +         notifications: Optional[List[_common_models.Notification]] = None,
 +         labels: Optional[_common_models.Labels] = None,
 +         annotations: Optional[_common_models.Annotations] = None,
 +         raw_output_data_config: Optional[_common_models.RawOutputDataConfig] = None,
 +         max_parallelism: Optional[int] = None,
 +         security_context: Optional[security.SecurityContext] = None,
 +         trigger: Optional[LaunchPlanTriggerBase] = None,
 +         overwrite_cache: Optional[bool] = None,
 +         auto_activate: bool = False,
 +     ):
 +         self.schedule = schedule
 +         self.notifications = notifications
 +         self.labels = labels
 +         self.annotations = annotations
 +         self.raw_output_data_config = raw_output_data_config
 +         self.max_parallelism = max_parallelism
 +         self.security_context = security_context
 +         self.trigger = trigger
 +         self.overwrite_cache = overwrite_cache
 +         self.auto_activate = auto_activate
 +
 - def __init__(
 -     self,
 -     name: str,
 -     workflow: _annotated_workflow.WorkflowBase,
 -     parameters: _interface_models.ParameterMap,
 -     fixed_inputs: _literal_models.LiteralMap,
 -     schedule: Optional[_schedule_model.Schedule] = None,
 -     notifications: Optional[List[_common_models.Notification]] = None,
 -     labels: Optional[_common_models.Labels] = None,
 -     annotations: Optional[_common_models.Annotations] = None,
 -     raw_output_data_config: Optional[_common_models.RawOutputDataConfig] = None,
 -     max_parallelism: Optional[int] = None,
 -     security_context: Optional[security.SecurityContext] = None,
 -     trigger: Optional[LaunchPlanTriggerBase] = None,
 -     overwrite_cache: Optional[bool] = None,
 -     auto_activate: bool = False,
 - ):
 + def __init__(
 +     self,
 +     name: str,
 +     workflow: _annotated_workflow.WorkflowBase,
 +     parameters: _interface_models.ParameterMap,
 +     fixed_inputs: _literal_models.LiteralMap,
 +     config: Optional[LaunchPlanConfig] = None,
 + ) -> None:
 +     """Initialize a LaunchPlan instance.
 +     
 +     Args:
 +         name: Name of the launch plan
 +         workflow: Workflow to create launch plan for
 +         parameters: Parameter map for the launch plan
 +         fixed_inputs: Fixed input map for the launch plan
 +         config: Optional configuration settings
 +     """

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -575,7 +575,7 @@ def say_hello(name: str) -> str:
for index, map_input_str in enumerate(list_strs):
monkeypatch.setenv("BATCH_JOB_ARRAY_INDEX_VAR_NAME", "name")
monkeypatch.setenv("name", str(index))
t = map_task(say_hello)
t = map(say_hello)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using map_task instead of map

Consider using map_task instead of map as it appears to be the intended API based on the imports and test context. The map function could be confused with Python's built-in map.

Code suggestion
Check the AI-generated fix before applying
Suggested change
t = map(say_hello)
t = map_task(say_hello)

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -109,15 +109,15 @@ def test_map_task_with_pickle():
def say_hello(name: typing.Any) -> str:
return f"hello {name}!"

map_task(say_hello)(name=["abc", "def"])
map(say_hello)(name=["abc", "def"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using map_task instead of map

Consider using map_task instead of map as it appears to be the intended API based on the test file name and imports. Using map could lead to confusion with Python's built-in map function.

Code suggestion
Check the AI-generated fix before applying
Suggested change
map(say_hello)(name=["abc", "def"])
map_task(say_hello)(name=["abc", "def"])

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -316,7 +316,7 @@ def test_bounded_inputs_vars_order(serialization_settings):
def task1(a: int, b: float, c: str) -> str:
return f"{a} - {b} - {c}"

mt = map_task(functools.partial(task1, c=1.0, b="hello", a=1))
mt = map(functools.partial(task1, c=1.0, b="hello", a=1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using explicit map function name

The function call to map_task() has been changed to map(). This could lead to confusion as it's unclear whether this is Python's built-in map() or a custom implementation. Consider being explicit about which map function is being used.

Code suggestion
Check the AI-generated fix before applying
Suggested change
mt = map(functools.partial(task1, c=1.0, b="hello", a=1))
mt = map_task(functools.partial(task1, c=1.0, b="hello", a=1))

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -492,7 +492,7 @@ def test_supported_node_type():
def test_task():
...

map_task(test_task)
map(test_task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using map_task instead of map

Consider using map_task instead of map as it appears to be the intended decorator based on the imports and test context. Using map could potentially call Python's built-in map function instead of Flyte's mapping functionality.

Code suggestion
Check the AI-generated fix before applying
Suggested change
map(test_task)
map_task(test_task)

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -575,7 +575,7 @@ def say_hello(name: str) -> str:
for index, map_input_str in enumerate(list_strs):
monkeypatch.setenv("BATCH_JOB_ARRAY_INDEX_VAR_NAME", "name")
monkeypatch.setenv("name", str(index))
t = map_task(say_hello)
t = map(say_hello)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verify intended use of map function

Consider if using map() instead of map_task() is intentional as this could change the behavior of task mapping.

Code suggestion
Check the AI-generated fix before applying
Suggested change
t = map(say_hello)
t = map_task(say_hello)

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

from flytekit._version import __version__
from flytekit.configuration import Config
from flytekit.core.array_node_map_task import map_task
from flytekit.core.array_node_map_task import map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider maintaining both map imports

Consider keeping both map and map_task imports to maintain backward compatibility. The alias map_task = map on line 273 suggests this was intentional.

Code suggestion
Check the AI-generated fix before applying
Suggested change
from flytekit.core.array_node_map_task import map
from flytekit.core.array_node_map_task import map as map_task

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@@ -760,6 +760,18 @@ class FlyteLaunchPlan(hash_mixin.HashOnReferenceMixin, RemoteEntity, _launch_pla
"""A class encapsulating a remote Flyte launch plan."""

def __init__(self, id, *args, **kwargs):
if "concurrency" in kwargs:
kwargs["max_parallelism"] = kwargs.pop("concurreny")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix typo in concurrency parameter name

There appears to be a typo in the key name concurreny. This could cause the concurrency parameter to not be properly handled.

Code suggestion
Check the AI-generated fix before applying
Suggested change
kwargs["max_parallelism"] = kwargs.pop("concurreny")
kwargs["max_parallelism"] = kwargs.pop("concurrency")

Code Review Run #7379ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Dockerfile Outdated
USER flytekit

ENV FLYTE_INTERNAL_IMAGE="$DOCKER_IMAGE"
&& :
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we changing this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the confusion, this was an unintended change in the Dockerfile. I've reverted it back as they were originally.

@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 3, 2025

Code Review Agent Run #dd90ce

Actionable Suggestions - 3
  • tests/flytekit/unit/core/test_array_node_map_task.py - 2
  • flytekit/core/array_node_map_task.py - 1
    • Remove duplicate parameter assignments in calls · Line 435-444
Review Details
  • Files reviewed - 4 · Commit Range: 22db713..85ddf8b
    • Dockerfile
    • flytekit/core/array_node_map_task.py
    • pydoclint-errors-baseline.txt
    • tests/flytekit/unit/core/test_array_node_map_task.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@@ -373,44 +373,21 @@ def my_mappable_task_1(a: int) -> typing.Optional[str]:
[my_mappable_task, my_mappable_task_1]
)
def test_map_task_override(serialization_settings, task_func):
array_node_map_task = map_task(task_func)
array_node_map_task = map(task_func)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using more explicit map_task function

Consider using the more explicit map_task() instead of map() to maintain clarity and avoid potential confusion with Python's built-in map() function.

Code suggestion
Check the AI-generated fix before applying
Suggested change
array_node_map_task = map(task_func)
array_node_map_task = map_task(task_func)

Code Review Run #dd90ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


od = OrderedDict()
wf_spec = get_serializable(od, serialization_settings, wf)
map(my_mappable_task)(a=x).with_overrides(container_image="random:image")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using defined task parameter

The variable my_mappable_task appears to be undefined. Consider using task_func instead which is passed as a parameter to the test function.

Code suggestion
Check the AI-generated fix before applying
Suggested change
map(my_mappable_task)(a=x).with_overrides(container_image="random:image")
map(task_func)(a=x).with_overrides(container_image="random:image")

Code Review Run #dd90ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

Comment on lines 435 to 444
min_successes=final_min_successes,
min_success_ratio=final_min_ratio,
)
return array_node_map_task(
task_function=target,
concurrency=concurrency,
min_successes=min_successes,
min_success_ratio=min_success_ratio,
min_successes=final_min_successes,
min_success_ratio=final_min_ratio,
min_successes=final_min_successes,
min_success_ratio=final_min_ratio,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove duplicate parameter assignments in calls

The parameters min_successes and min_success_ratio are passed twice in both array_node and array_node_map_task calls. Consider removing the duplicate parameter assignments.

Code suggestion
Check the AI-generated fix before applying
 -            min_successes=final_min_successes,
 -            min_success_ratio=final_min_ratio,
          )
      return array_node_map_task(
          task_function=target,
          concurrency=concurrency,
          min_successes=final_min_successes,
          min_success_ratio=final_min_ratio,
 -        min_successes=final_min_successes,
 -        min_success_ratio=final_min_ratio,

Code Review Run #dd90ce


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

@flyte-bot
Copy link
Contributor

flyte-bot commented Feb 3, 2025

Code Review Agent Run #e22476

Actionable Suggestions - 0
Review Details
  • Files reviewed - 1 · Commit Range: 85ddf8b..86ce73b
    • flytekit/core/array_node_map_task.py
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

AI Code Review powered by Bito Logo

@ChihTsungLu ChihTsungLu closed this Feb 4, 2025
@wild-endeavor
Copy link
Contributor

hey @ChihTsungLu just wondering why you closed this? were you planning on re-opening as a different pr? couldn't tell if something was fundamentally off with the change.

thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.