From 9dbce15bac2dcc60884488defa86afa24b0afe04 Mon Sep 17 00:00:00 2001 From: avivs Date: Sat, 9 Dec 2023 09:27:51 +0200 Subject: [PATCH] added OtherMethodOutput --- README.md | 19 +++++----- docs/index.md | 61 +++++++++++++++++++++--------- fastmessage/__init__.py | 1 + fastmessage/callable_wrapper.py | 10 ++++- fastmessage/common.py | 12 +++++- fastmessage/fastmessage_handler.py | 2 +- tests/method_validation_test.py | 54 +++++++++++++++++++++++++- 7 files changed, 127 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 4871040..d0b5f9e 100644 --- a/README.md +++ b/README.md @@ -26,17 +26,17 @@ $ pip install fastmessage ## Examples ```python -from fastmessage import FastMessage +from fastmessage import FastMessage, OtherMethodOutput from messageflux.iodevices.rabbitmq import RabbitMQInputDeviceManager, RabbitMQOutputDeviceManager fm = FastMessage() -@fm.map(output_device='next_year') # this sends its outputs to 'next_year' method +@fm.map() def hello(name: str, birthYear: int): age = 2023 - birthYear print(f'Hello {name}. your age is {age}') - return dict(age=age) + return OtherMethodOutput(next_year, age=age) # this sends its output to 'next_year' method @fm.map() @@ -52,22 +52,20 @@ if __name__ == "__main__": output_device_manager = RabbitMQOutputDeviceManager(hosts='my.rabbit.host', user='username', password='password') - + service = fm.create_service(input_device_manager=input_device_manager, - output_device_manager=output_device_manager) + output_device_manager=output_device_manager) service.start() # this runs the PipelineService and blocks ``` -This example shows two methods: ```hello``` and ```next_year```, each listening on its own queue +This example shows two methods: ```hello``` and ```next_year```, each listening on its own queue (with the same name) -the ```hello``` method is decorated with ```output_device='next_year'``` which means its output is directed to the -```next_year``` device (and the corrosponding method) - -the ```__main__``` creates an input and output device managers (```RabbitMQ``` in this case), and starts the service +the ```__main__``` creates an input and output device managers (```RabbitMQ``` in this case), and starts the service with these devices. every message that is sent to the ```hello``` queue should have the following format: + ```json { "name": "john", @@ -76,6 +74,7 @@ every message that is sent to the ```hello``` queue should have the following fo ``` in that case the process will print (in 2023...): + ``` Hello john. your age is 24 next year you will be 25 diff --git a/docs/index.md b/docs/index.md index a187e66..0202c1a 100644 --- a/docs/index.md +++ b/docs/index.md @@ -26,22 +26,22 @@ fm = FastMessage(default_output_device='output') @fm.map() def do_something(x: int, y: str): - pass # do something with x and y + pass # do something with x and y @fm.map() async def do_something_async(x: int, y: str): - pass # do something with x and y asynchronously + pass # do something with x and y asynchronously class SomeModel(BaseModel): - x: int - y: str + x: int + y: str @fm.map() def do_something_else(m: SomeModel, a: int): - return "some_value" # do somthing with m and a + return "some_value" # do somthing with m and a ``` @@ -104,8 +104,9 @@ There are special types which you can annotate the arguments for the callback wi from. useful for registering the same callback for several input devices * ```Message``` - arguments annotated with this type will receive the raw message which came from the device * ```MessageBundle``` - arguments annotated with this type will receive the complete MessageBundle (with device headers) -* ```MethodValidator``` - argument of this type, will receive an object that can help validate return values for other methods -Notice that arguments annotated with these types MUST NOT have default values (Since they always have values). +* ```MethodValidator``` - argument of this type, will receive an object that can help validate return values for other + methods + Notice that arguments annotated with these types MUST NOT have default values (Since they always have values). ```python @@ -118,20 +119,22 @@ fm = FastMessage() @fm.map(input_device='some_queue') def do_something(i: InputDeviceName, m: Message, mb: MessageBundle, x: int): - # i will be 'some_queue' - # m will be the message that arrived - # mb will be the MessageBundle that arrived - # x will be the serialized value of the message - pass # do something + # i will be 'some_queue' + # m will be the message that arrived + # mb will be the MessageBundle that arrived + # x will be the serialized value of the message + pass # do something @fm.map() def func1(mv: MethodValidator): - yield mv.validate_and_return(func2, x=3, y="hello") # this will succeed - yield mv.validate_and_return(func2, x=4) # this will raise MethodValidationError because y param is required but missing + yield mv.validate_and_return(func2, x=3, y="hello") # this will succeed + yield mv.validate_and_return(func2, + x=4) # this will raise MethodValidationError because y param is required but missing + @fm.map() -def func2(x:int, y:str): +def func2(x: int, y: str): pass ``` @@ -178,7 +181,31 @@ fm = FastMessage() @fm.map(input_device='some_queue', output_device='default_output_device') def do_something(x: int): - return CustomOutput(value=1, - output_device='other_output_device') # this will send the value 1 to 'other_output_device' instead of the default + return CustomOutput(value=1, + output_device='other_output_device') # this will send the value 1 to 'other_output_device' instead of the default ``` +### Returning Result to another method + +You can make the function return a result to another mapped method, while validating its values BEFORE sending the +output to the destination queue + +you do this by returning ```OtherMethodOutput``` class, that receives the callable as its first parameter, and the +arguments as kwargs: + +```python +from fastmessage import FastMessage, OtherMethodOutput + +fm = FastMessage() + + +@fm.map() +def func1(): + yield OtherMethodOutput(func2, x=3, y="hello") # this will succeed + yield OtherMethodOutput(func2, x=4) # this will raise MethodValidationError because y param is required but missing + + +@fm.map() +def func2(x: int, y: str): + pass +``` \ No newline at end of file diff --git a/fastmessage/__init__.py b/fastmessage/__init__.py index 7bbf23b..f180764 100644 --- a/fastmessage/__init__.py +++ b/fastmessage/__init__.py @@ -1,5 +1,6 @@ from .common import ( CustomOutput, + OtherMethodOutput, InputDeviceName, MultipleReturnValues, ) diff --git a/fastmessage/callable_wrapper.py b/fastmessage/callable_wrapper.py index c629433..1a24c1c 100644 --- a/fastmessage/callable_wrapper.py +++ b/fastmessage/callable_wrapper.py @@ -10,7 +10,7 @@ from pydantic.config import get_config from pydantic.typing import get_all_type_hints -from fastmessage.common import CustomOutput, InputDeviceName, MultipleReturnValues +from fastmessage.common import CustomOutput, InputDeviceName, MultipleReturnValues, OtherMethodOutput from fastmessage.common import _CALLABLE_TYPE, get_callable_name, _logger from fastmessage.exceptions import NotAllowedParamKindException, SpecialDefaultValueException from fastmessage.method_validator import MethodValidator @@ -56,6 +56,7 @@ def __init__(self, *, self._callable = wrapped_callable self._input_device_name = input_device_name self._output_device_name = output_device_name + self._method_validator = MethodValidator(self._fastmessage_handler) self._callable_analysis = self._analyze_callable(self._callable) self._model: Type[BaseModel] = self._create_model(model_name=self._get_model_name(), @@ -179,7 +180,7 @@ def __call__(self, elif param_info.annotation is Message: kwargs[param_name] = message_bundle.message elif param_info.annotation is MethodValidator: - kwargs[param_name] = MethodValidator(self._fastmessage_handler) + kwargs[param_name] = self._method_validator model: BaseModel = self._model.parse_raw(message_bundle.message.bytes) kwargs.update(dict(model)) @@ -211,6 +212,11 @@ def _get_pipeline_results(self, elif isinstance(value, CustomOutput): return self._get_pipeline_results(value=value.value, default_output_device=value.output_device) + elif isinstance(value, OtherMethodOutput): + custom_output = self._method_validator.validate_and_return(value.method, **value.kwargs) + + return self._get_pipeline_results(value=custom_output.value, + default_output_device=custom_output.output_device) else: pipeline_result = self._get_single_pipeline_result(value=value, output_device=default_output_device) diff --git a/fastmessage/common.py b/fastmessage/common.py index a8c488f..d51e2ba 100644 --- a/fastmessage/common.py +++ b/fastmessage/common.py @@ -1,6 +1,6 @@ import logging from dataclasses import dataclass -from typing import Callable, Any, TypeVar +from typing import Callable, Any, TypeVar, Union from fastmessage.exceptions import UnnamedCallableException @@ -44,3 +44,13 @@ class CustomOutput: """ output_device: str value: Any + + +class OtherMethodOutput: + """ + a result that contains the other method to send the result to + """ + + def __init__(self, method: Union[str, Callable], **kwargs): + self.method = method + self.kwargs = kwargs diff --git a/fastmessage/fastmessage_handler.py b/fastmessage/fastmessage_handler.py index bf31115..08ccfe8 100644 --- a/fastmessage/fastmessage_handler.py +++ b/fastmessage/fastmessage_handler.py @@ -25,7 +25,7 @@ def __init__(self, default_output_device: Optional[str] = None, validation_error_handler: Optional[Callable[ [InputDevice, MessageBundle, ValidationError], - Optional[Union[PipelineResult, List[PipelineResult]]]]] = None): + Optional[Union[PipelineResult, Iterable[PipelineResult]]]]] = None): """ :param default_output_device: an optional default output device to send callback results to, diff --git a/tests/method_validation_test.py b/tests/method_validation_test.py index 30904bc..e5e732d 100644 --- a/tests/method_validation_test.py +++ b/tests/method_validation_test.py @@ -1,6 +1,6 @@ import pytest -from fastmessage import FastMessage, MissingCallbackException +from fastmessage import FastMessage, MissingCallbackException, OtherMethodOutput from fastmessage.exceptions import MethodValidationError from fastmessage.method_validator import MethodValidator from messageflux.iodevices.base.common import MessageBundle, Message @@ -31,6 +31,30 @@ def func_output(x: int, y: str): assert result[0].message_bundle.message.bytes == b'"Success: x=3, y=hello"' +def test_by_othermethodoutput(): + fm: FastMessage = FastMessage() + + @fm.map() + def func_input(): + return OtherMethodOutput(func_output, x=3, y="hello") + + @fm.map(input_device='func2_device', output_device='output') + def func_output(x: int, y: str): + return f"Success: x={x}, y={y}" + + result = fm.handle_message(FakeInputDevice('func_input'), MessageBundle(message=Message(data=b'{"y": 10}'))) + assert result is not None + result = list(result) + assert len(result) == 1 + assert result[0].output_device_name == "func2_device" + + result = fm.handle_message(FakeInputDevice(result[0].output_device_name), result[0].message_bundle) + assert result is not None + result = list(result) + assert len(result) == 1 + assert result[0].message_bundle.message.bytes == b'"Success: x=3, y=hello"' + + def test_by_input_device_name(): fm: FastMessage = FastMessage() @@ -70,6 +94,21 @@ def func_output(x: int, y: str): _ = fm.handle_message(FakeInputDevice('func_input'), MessageBundle(message=Message(data=b'{"y": 10}'))) +def test_validation_error_by_output(): + fm: FastMessage = FastMessage() + + @fm.map() + def func_input(): + return OtherMethodOutput(func_output, x=3) + + @fm.map(input_device='func2_device', output_device='output') + def func_output(x: int, y: str): + return f"Success: x={x}, y={y}" + + with pytest.raises(MethodValidationError): + _ = fm.handle_message(FakeInputDevice('func_input'), MessageBundle(message=Message(data=b'{"y": 10}'))) + + def test_missing_callback(): fm: FastMessage = FastMessage() @@ -77,8 +116,21 @@ def test_missing_callback(): def func_input(method_validator: MethodValidator): return method_validator.validate_and_return(func_output, x=3) + # notice there's no mapping decorator here... def func_output(x: int, y: str): return f"Success: x={x}, y={y}" with pytest.raises(MissingCallbackException): _ = fm.handle_message(FakeInputDevice('func_input'), MessageBundle(message=Message(data=b'{"y": 10}'))) + + +def test_missing_callback_by_output(): + fm: FastMessage = FastMessage() + + @fm.map() + def func_input(): + return OtherMethodOutput(func_output, x=3) + + # notice there's no mapping decorator here... + def func_output(x: int, y: str): + return f"Success: x={x}, y={y}"