From a0f8194d22384cd223ac80f11776f625b575e195 Mon Sep 17 00:00:00 2001 From: Julien Maupetit Date: Mon, 22 Jan 2024 15:22:40 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A8(api)=20add=20Warren=20CLI=20`indic?= =?UTF-8?q?ator`=20command=20(and=20sub-commands)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The `indicator` command and its sub-commands allow to perform indicators calculations from the CLI. This will be useful to batch pre-calculate indicators. It ships with 3 sub-commands: - `list`: list indicators registered for batch computing using the CLI - `inspect`: show expected indicator arguments required to run - 'compute': compute the indicator given input arguments and optionally cache the result. --- bin/warren | 4 +- src/api/core/warren/cli.py | 139 ++++++++++++++- src/api/core/warren/tests/test_cli.py | 248 +++++++++++++++++++++++++- src/api/plugins/video/pyproject.toml | 8 + 4 files changed, 390 insertions(+), 9 deletions(-) diff --git a/bin/warren b/bin/warren index 6b0e3adf..662b2b78 100755 --- a/bin/warren +++ b/bin/warren @@ -3,8 +3,6 @@ declare DOCKER_USER DOCKER_USER="$(id -u):$(id -g)" -DOCKER_USER=${DOCKER_USER} docker compose run \ - --rm \ - --no-deps \ +DOCKER_USER=${DOCKER_USER} docker compose exec \ api \ warren "$@" diff --git a/src/api/core/warren/cli.py b/src/api/core/warren/cli.py index f06dff25..892fcde0 100644 --- a/src/api/core/warren/cli.py +++ b/src/api/core/warren/cli.py @@ -1,10 +1,25 @@ """Warren CLI entrypoint.""" +import asyncio +import json +import logging +import sys +from inspect import Parameter, Signature, signature +from typing import Optional + import click +from pydantic import BaseModel from warren import __version__ as warren_version from . import migrations as alembic_migrations +if sys.version_info < (3, 10): + from importlib_metadata import EntryPoint, EntryPoints, entry_points +else: + from importlib.metadata import EntryPoint, EntryPoints, entry_points + +logger = logging.getLogger(__name__) + @click.group(name="Warren") @click.version_option(version=warren_version) @@ -20,27 +35,143 @@ def migration(): @migration.command() @click.option("--verbose", "-v", is_flag=True, default=False) -def current(verbose): +def current(verbose: bool): """Show current database migration.""" alembic_migrations.current(verbose) @migration.command() @click.argument("revision", type=str) -def downgrade(revision): +def downgrade(revision: str): """Downgrade database migration to a target revision.""" alembic_migrations.downgrade(revision) @migration.command() @click.option("--verbose", "-v", is_flag=True, default=False) -def history(verbose): +def history(verbose: bool): """Show database migrations history.""" alembic_migrations.history(verbose) @migration.command() @click.argument("revision", type=str, default="head") -def upgrade(revision): +def upgrade(revision: str): """Upgrade database migration to a target revision.""" alembic_migrations.upgrade(revision) + + +# -- INDICATOR COMMAND -- +@cli.group(name="indicator") +def indicator(): + """Indicator commands.""" + + +def _get_indicator_entrypoints() -> EntryPoints: + """Get 'warren.indicators' entry points.""" + return entry_points(group="warren.indicators") + + +def _get_indicator(name: str) -> EntryPoint: + """Get an indicator from its entry point name.""" + try: + return next(filter(lambda ep: ep.value == name, _get_indicator_entrypoints())) + except StopIteration as exc: + raise click.BadParameter(f'Indicator "{name}" is not registered.') from exc + + +@indicator.command("list") +def indicator_list(): + """List registered active indicators.""" + for entry_point in _get_indicator_entrypoints(): + click.echo(entry_point.value) + + +@indicator.command("inspect") +@click.argument("indicator") +def indicator_inspect(indicator: str): + """Show indicator required arguments.""" + entry_point: EntryPoint = _get_indicator(indicator) + + # Load the indicator class + klass = entry_point.load() + indicator_signature: Signature = signature(klass) + + for parameter in indicator_signature.parameters.values(): + default = "no" + if parameter.default != Parameter.empty: + default = parameter.default + click.secho(f"{parameter.name}\t", bold=True, fg="cyan", nl=False) + click.echo(f"{parameter.kind}\t{default=}\t{parameter.annotation}") + + +@indicator.command( + "compute", + context_settings={ + "ignore_unknown_options": True, + "allow_extra_args": True, + }, +) +@click.pass_context +@click.argument("indicator") +@click.option("--cache", "-c", is_flag=True, default=False) +def indicator_compute(ctx: click.Context, indicator: str, cache: bool): + """Pre-compute a registered target indicator.""" + entry_point: EntryPoint = _get_indicator(indicator) + + # Load the indicator class + klass = entry_point.load() + indicator_signature: Signature = signature(klass) + compute_annotation = signature(klass.compute).return_annotation + + if compute_annotation == Signature.empty: + raise click.BadParameter( + ( + f"{indicator} Indicator compute method return " + "should be annotated to run from the CLI." + ) + ) + + if len(ctx.args) < len(indicator_signature.parameters): + raise click.UsageError( + ( + f"Parameters are missing for the '{indicator}' indicator. " + "See 'inspect' command output." + ) + ) + + # Parse indicator arguments + indicator_kwargs: dict = {} + for arg in ctx.args: + name, value = arg.split("=") + + # Get expected parameter from its name + parameter: Optional[Parameter] = indicator_signature.parameters.get(name) + if parameter is None: + raise click.BadParameter(f'Unknown indicator parameter "{name}".') + + if parameter.annotation == Parameter.empty: + raise click.BadParameter( + ( + f"{parameter}" + "Indicator parameters should be annotated to run from the CLI." + ) + ) + + # Cast value given parameter annotation + if issubclass(parameter.annotation, str): + pass + elif issubclass(parameter.annotation, (dict, list)): + value = json.loads(value) + elif issubclass(parameter.annotation, BaseModel): + value = parameter.annotation.parse_raw(value) + indicator_kwargs[name] = value + + instance = klass(**indicator_kwargs) + run = instance.compute + if cache and hasattr(instance, "cache_key"): + run = instance.get_or_compute + result = asyncio.run(run()) + click.echo( + result.json() if issubclass(compute_annotation, BaseModel) else str(result) + ) diff --git a/src/api/core/warren/tests/test_cli.py b/src/api/core/warren/tests/test_cli.py index 466e7240..003bdbb2 100644 --- a/src/api/core/warren/tests/test_cli.py +++ b/src/api/core/warren/tests/test_cli.py @@ -1,11 +1,15 @@ """Test Warren commands functions.""" -from unittest.mock import MagicMock +from unittest.mock import AsyncMock, MagicMock +import pytest from alembic import command as alembic_command +from click import BadParameter from click.testing import CliRunner +from pydantic import BaseModel +from warren_video.indicators import DailyUniqueCompletedViews from warren import migrations -from warren.cli import cli +from warren.cli import _get_indicator, _get_indicator_entrypoints, cli def test_migration_current_command(monkeypatch): @@ -57,3 +61,243 @@ def test_migration_upgrade_command(monkeypatch): runner.invoke(cli, ["migration", "upgrade", "123abc"]) alembic_command.upgrade.assert_called_with(migrations.ALEMBIC_CFG, "123abc") + + +def test_get_indicator_entrypoints(): + """Test _get_indicator_entrypoints utility.""" + assert [ + "daily_completed_views", + "daily_downloads", + "daily_unique_completed_views", + "daily_unique_downloads", + "daily_unique_views", + "daily_views", + ] == [e.name for e in _get_indicator_entrypoints()] + + +def test_get_indicator(): + """Test _get_indicator utility.""" + entry_point = _get_indicator("warren_video.indicators:DailyCompletedViews") + assert entry_point.name == "daily_completed_views" + + entry_point = _get_indicator("warren_video.indicators:DailyUniqueDownloads") + assert entry_point.name == "daily_unique_downloads" + + with pytest.raises(BadParameter, match='Indicator "foo" is not registered.'): + _get_indicator("foo") + + +def test_indicator_list_command(): + """Test warren indicator list command.""" + runner = CliRunner() + result = runner.invoke(cli, ["indicator", "list"]) + assert ( + "warren_video.indicators:DailyCompletedViews\n" + "warren_video.indicators:DailyDownloads\n" + "warren_video.indicators:DailyUniqueCompletedViews\n" + "warren_video.indicators:DailyUniqueDownloads\n" + "warren_video.indicators:DailyUniqueViews\n" + "warren_video.indicators:DailyViews\n" + ) == result.output + + +def test_indicator_inspect_command(monkeypatch): + """Test warren indicator inspect command.""" + runner = CliRunner() + result = runner.invoke( + cli, + ["indicator", "inspect", "warren_video.indicators:DailyUniqueCompletedViews"], + ) + assert ( + "video_id\tPOSITIONAL_OR_KEYWORD\tdefault='no'\t\n" + "span_range\tPOSITIONAL_OR_KEYWORD\tdefault='no'\t" + "\n" + ) == result.output + + # Test parameter default value + def init_mock(self, foo: int = 1): + pass + + monkeypatch.setattr(DailyUniqueCompletedViews, "__init__", init_mock) + + result = runner.invoke( + cli, + ["indicator", "inspect", "warren_video.indicators:DailyUniqueCompletedViews"], + ) + assert ("foo\tPOSITIONAL_OR_KEYWORD\tdefault=1\t\n") == result.output + + +def test_indicator_compute_command_usage(): + """Test warren indicator compute command usage.""" + runner = CliRunner() + + # Test missing parameter + result = runner.invoke( + cli, + [ + "indicator", + "compute", + "warren_video.indicators:DailyUniqueCompletedViews", + 'video_id="uuid://foo"', + ], + ) + assert result.exit_code == 2 + assert "Parameters are missing" in result.output + + # Test unknown parameter + result = runner.invoke( + cli, + [ + "indicator", + "compute", + "warren_video.indicators:DailyUniqueCompletedViews", + 'video_id="uuid://foo"', + "foo=1", + ], + ) + assert result.exit_code == 2 + assert 'Unknown indicator parameter "foo"' in result.output + + +def test_indicator_compute_command_for_standard_type_return(monkeypatch): + """Test warren indicator compute command for standard type return.""" + runner = CliRunner() + + # Test return value for a standard type + async def compute(self) -> dict: + return {"foo": 1} + + monkeypatch.setattr(DailyUniqueCompletedViews, "compute", compute) + result = runner.invoke( + cli, + [ + "indicator", + "compute", + "warren_video.indicators:DailyUniqueCompletedViews", + 'video_id="uuid://foo"', + "span_range={}", + ], + ) + assert "{'foo': 1}\n" == result.output + + +def test_indicator_compute_command_for_pydantic_type_return(monkeypatch): + """Test warren indicator compute command for pydantic model return.""" + runner = CliRunner() + + # Test return value for a Pydantic model + class Fake(BaseModel): + foo: int + + async def compute(self) -> Fake: + return Fake(foo=1) + + monkeypatch.setattr(DailyUniqueCompletedViews, "compute", compute) + result = runner.invoke( + cli, + [ + "indicator", + "compute", + "warren_video.indicators:DailyUniqueCompletedViews", + 'video_id="uuid://foo"', + "span_range={}", + ], + ) + assert '{"foo": 1}\n' == result.output + + +def test_indicator_compute_command_no_annotated_type_return(monkeypatch): + """Test warren indicator compute command for no return annotation.""" + runner = CliRunner() + + # Test invalid compute implementation (no annotations) + async def compute(self): + pass + + monkeypatch.setattr(DailyUniqueCompletedViews, "compute", compute) + result = runner.invoke( + cli, + [ + "indicator", + "compute", + "warren_video.indicators:DailyUniqueCompletedViews", + 'video_id="uuid://foo"', + "span_range={}", + ], + ) + assert result.exit_code == 2 + assert ( + "Indicator compute method return should be annotated to run from the CLI" + in result.output + ) + + +def test_indicator_compute_command_for_not_annotated_indicator(monkeypatch): + """Test warren indicator compute command for an indicator that is not annotated.""" + runner = CliRunner() + + def init_mock(self, foo): + pass + + monkeypatch.setattr(DailyUniqueCompletedViews, "__init__", init_mock) + result = runner.invoke( + cli, + [ + "indicator", + "compute", + "warren_video.indicators:DailyUniqueCompletedViews", + "foo=1", + ], + ) + assert result.exit_code == 2 + assert ( + "Indicator parameters should be annotated to run from the CLI" in result.output + ) + + +def test_indicator_compute_command_with_list_or_dict_parameter(monkeypatch): + """Test warren indicator compute command for an indicator that is not annotated.""" + runner = CliRunner() + + def init_mock(self, foo: list, bar: dict): + pass + + async def compute(self) -> dict: + return {"foo": 1} + + monkeypatch.setattr(DailyUniqueCompletedViews, "__init__", init_mock) + monkeypatch.setattr(DailyUniqueCompletedViews, "compute", compute) + result = runner.invoke( + cli, + [ + "indicator", + "compute", + "warren_video.indicators:DailyUniqueCompletedViews", + "foo=[1, 2, 3]", + "bar={}", + ], + ) + assert result.exit_code == 0 + assert "{'foo': 1}\n" == result.output + + +def test_indicator_compute_command_with_cache(monkeypatch): + """Test warren indicator compute command for no return annotation.""" + runner = CliRunner() + + get_or_compute_mock = AsyncMock() + monkeypatch.setattr( + DailyUniqueCompletedViews, "get_or_compute", get_or_compute_mock + ) + runner.invoke( + cli, + [ + "indicator", + "compute", + "-c", + "warren_video.indicators:DailyUniqueCompletedViews", + 'video_id="uuid://foo"', + "span_range={}", + ], + ) + get_or_compute_mock.assert_awaited() diff --git a/src/api/plugins/video/pyproject.toml b/src/api/plugins/video/pyproject.toml index d5198dbc..38d290f8 100644 --- a/src/api/plugins/video/pyproject.toml +++ b/src/api/plugins/video/pyproject.toml @@ -38,5 +38,13 @@ dynamic = ["version"] [project.entry-points."warren.routers"] video = "warren_video.api:router" +[project.entry-points."warren.indicators"] +daily_views = "warren_video.indicators:DailyViews" +daily_unique_views = "warren_video.indicators:DailyUniqueViews" +daily_completed_views = "warren_video.indicators:DailyCompletedViews" +daily_unique_completed_views = "warren_video.indicators:DailyUniqueCompletedViews" +daily_downloads = "warren_video.indicators:DailyDownloads" +daily_unique_downloads = "warren_video.indicators:DailyUniqueDownloads" + [tool.setuptools.dynamic] version = { attr = "warren_video.__version__" }