From e9b6301284ecf6bffec69f3eda2a42242e133227 Mon Sep 17 00:00:00 2001 From: Hal Blackburn Date: Mon, 17 Mar 2025 07:09:18 +0000 Subject: [PATCH 01/10] chore: improve typing of functions returning AnyCloudEvent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit kafka.conversion.from_binary() and from_structured() return AnyCloudEvent type var according to their event_type argument, but when event_type is None, type checkers cannot infer the return type. We now use an overload to declare that the return type is http.CloudEvent when event_type is None. Previously users had to explicitly annotate this type when calling without event_type. This happens quite a lot in this repo's test_kafka_conversions.py — this fixes quite a few type errors like: > error: Need type annotation for "result" [var-annotated] Signed-off-by: Hal Blackburn --- cloudevents/kafka/conversion.py | 48 ++++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/cloudevents/kafka/conversion.py b/cloudevents/kafka/conversion.py index bfddca6..bdf2aca 100644 --- a/cloudevents/kafka/conversion.py +++ b/cloudevents/kafka/conversion.py @@ -111,11 +111,29 @@ def to_binary( return KafkaMessage(headers, message_key, data) +@typing.overload def from_binary( message: KafkaMessage, - event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None, + event_type: None = None, + data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, +) -> http.CloudEvent: + pass + + +@typing.overload +def from_binary( + message: KafkaMessage, + event_type: typing.Type[AnyCloudEvent], data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, ) -> AnyCloudEvent: + pass + + +def from_binary( + message: KafkaMessage, + event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None, + data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, +) -> typing.Union[http.CloudEvent, AnyCloudEvent]: """ Returns a CloudEvent from a KafkaMessage in binary format. @@ -144,10 +162,11 @@ def from_binary( raise cloud_exceptions.DataUnmarshallerError( f"Failed to unmarshall data with error: {type(e).__name__}('{e}')" ) + result: typing.Union[http.CloudEvent, AnyCloudEvent] if event_type: result = event_type.create(attributes, data) else: - result = http.CloudEvent.create(attributes, data) # type: ignore + result = http.CloudEvent.create(attributes, data) return result @@ -210,12 +229,32 @@ def to_structured( return KafkaMessage(headers, message_key, value) +@typing.overload def from_structured( message: KafkaMessage, - event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None, + event_type: None = None, + data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, + envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None, +) -> http.CloudEvent: + pass + + +@typing.overload +def from_structured( + message: KafkaMessage, + event_type: typing.Type[AnyCloudEvent], data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None, ) -> AnyCloudEvent: + pass + + +def from_structured( + message: KafkaMessage, + event_type: typing.Optional[typing.Type[AnyCloudEvent]] = None, + data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, + envelope_unmarshaller: typing.Optional[types.UnmarshallerType] = None, +) -> typing.Union[http.CloudEvent, AnyCloudEvent]: """ Returns a CloudEvent from a KafkaMessage in structured format. @@ -264,8 +303,9 @@ def from_structured( attributes["datacontenttype"] = val.decode() else: attributes[header.lower()] = val.decode() + result: typing.Union[AnyCloudEvent, http.CloudEvent] if event_type: result = event_type.create(attributes, data) else: - result = http.CloudEvent.create(attributes, data) # type: ignore + result = http.CloudEvent.create(attributes, data) return result From eacac4fbbc4e21999d13e3aa5a05281a32ad2782 Mon Sep 17 00:00:00 2001 From: Hal Blackburn Date: Mon, 17 Mar 2025 07:34:05 +0000 Subject: [PATCH 02/10] chore: type v1.Event chainable Set*() methods The v1.Event self-returning Set*() methods like SetData() were returning BaseEvent, which doesn't declare the same Set* methods. As a result, chaining more than one Set* method would make the return type unknown. This was causing type errors in test_event_pipeline.py. The Set*() methods now return the Self type. Signed-off-by: Hal Blackburn --- cloudevents/sdk/event/v1.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/cloudevents/sdk/event/v1.py b/cloudevents/sdk/event/v1.py index 18d1f3a..0f2e1d5 100644 --- a/cloudevents/sdk/event/v1.py +++ b/cloudevents/sdk/event/v1.py @@ -11,10 +11,15 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from __future__ import annotations + import typing from cloudevents.sdk.event import base, opt +if typing.TYPE_CHECKING: + from typing_extensions import Self + class Event(base.BaseEvent): _ce_required_fields = {"id", "source", "type", "specversion"} @@ -79,39 +84,39 @@ def Extensions(self) -> dict: return {} return dict(result) - def SetEventType(self, eventType: str) -> base.BaseEvent: + def SetEventType(self, eventType: str) -> Self: self.Set("type", eventType) return self - def SetSource(self, source: str) -> base.BaseEvent: + def SetSource(self, source: str) -> Self: self.Set("source", source) return self - def SetEventID(self, eventID: str) -> base.BaseEvent: + def SetEventID(self, eventID: str) -> Self: self.Set("id", eventID) return self - def SetEventTime(self, eventTime: typing.Optional[str]) -> base.BaseEvent: + def SetEventTime(self, eventTime: typing.Optional[str]) -> Self: self.Set("time", eventTime) return self - def SetSubject(self, subject: typing.Optional[str]) -> base.BaseEvent: + def SetSubject(self, subject: typing.Optional[str]) -> Self: self.Set("subject", subject) return self - def SetSchema(self, schema: typing.Optional[str]) -> base.BaseEvent: + def SetSchema(self, schema: typing.Optional[str]) -> Self: self.Set("dataschema", schema) return self - def SetContentType(self, contentType: typing.Optional[str]) -> base.BaseEvent: + def SetContentType(self, contentType: typing.Optional[str]) -> Self: self.Set("datacontenttype", contentType) return self - def SetData(self, data: typing.Optional[object]) -> base.BaseEvent: + def SetData(self, data: typing.Optional[object]) -> Self: self.Set("data", data) return self - def SetExtensions(self, extensions: typing.Optional[dict]) -> base.BaseEvent: + def SetExtensions(self, extensions: typing.Optional[dict]) -> Self: self.Set("extensions", extensions) return self From f81c902843f8959a4d4e7a24d79c678409cfe386 Mon Sep 17 00:00:00 2001 From: Hal Blackburn Date: Mon, 17 Mar 2025 06:49:38 +0000 Subject: [PATCH 03/10] chore: fix type errors in tests mypy was failing with lots of type errors in test modules. I've not annotated all fixtures, mostly fixed existing type errors. Signed-off-by: Hal Blackburn --- cloudevents/tests/test_converters.py | 6 +- .../test_event_from_request_converter.py | 2 +- cloudevents/tests/test_event_pipeline.py | 2 +- cloudevents/tests/test_http_events.py | 13 +- cloudevents/tests/test_kafka_conversions.py | 7 +- cloudevents/tests/test_marshaller.py | 4 +- cloudevents/tests/test_pydantic_events.py | 151 +++++++++++++----- 7 files changed, 132 insertions(+), 53 deletions(-) diff --git a/cloudevents/tests/test_converters.py b/cloudevents/tests/test_converters.py index b91d6b3..50d783b 100644 --- a/cloudevents/tests/test_converters.py +++ b/cloudevents/tests/test_converters.py @@ -21,7 +21,7 @@ def test_binary_converter_raise_unsupported(): with pytest.raises(exceptions.UnsupportedEvent): cnvtr = binary.BinaryHTTPCloudEventConverter() - cnvtr.read(None, {}, None, None) + cnvtr.read(None, {}, None, None) # type: ignore[arg-type] # intentionally wrong type # noqa: E501 def test_base_converters_raise_exceptions(): @@ -35,8 +35,8 @@ def test_base_converters_raise_exceptions(): with pytest.raises(Exception): cnvtr = base.Converter() - cnvtr.write(None, None) + cnvtr.write(None, None) # type: ignore[arg-type] # intentionally wrong type with pytest.raises(Exception): cnvtr = base.Converter() - cnvtr.read(None, None, None, None) + cnvtr.read(None, None, None, None) # type: ignore[arg-type] # intentionally wrong type # noqa: E501 diff --git a/cloudevents/tests/test_event_from_request_converter.py b/cloudevents/tests/test_event_from_request_converter.py index 901284b..362b1ca 100644 --- a/cloudevents/tests/test_event_from_request_converter.py +++ b/cloudevents/tests/test_event_from_request_converter.py @@ -25,7 +25,7 @@ @pytest.mark.parametrize("event_class", [v03.Event, v1.Event]) def test_binary_converter_upstream(event_class): m = marshaller.NewHTTPMarshaller([binary.NewBinaryHTTPCloudEventConverter()]) - event = m.FromRequest(event_class(), data.headers[event_class], None, lambda x: x) + event = m.FromRequest(event_class(), data.headers[event_class], b"", lambda x: x) assert event is not None assert event.EventType() == data.ce_type assert event.EventID() == data.ce_id diff --git a/cloudevents/tests/test_event_pipeline.py b/cloudevents/tests/test_event_pipeline.py index efc7974..dae3dc2 100644 --- a/cloudevents/tests/test_event_pipeline.py +++ b/cloudevents/tests/test_event_pipeline.py @@ -77,7 +77,7 @@ def test_object_event_v1(): _, structured_body = m.ToRequest(event) assert isinstance(structured_body, bytes) structured_obj = json.loads(structured_body) - error_msg = f"Body was {structured_body}, obj is {structured_obj}" + error_msg = f"Body was {structured_body!r}, obj is {structured_obj}" assert isinstance(structured_obj, dict), error_msg assert isinstance(structured_obj["data"], dict), error_msg assert len(structured_obj["data"]) == 1, error_msg diff --git a/cloudevents/tests/test_http_events.py b/cloudevents/tests/test_http_events.py index b21c372..3d4c8d5 100644 --- a/cloudevents/tests/test_http_events.py +++ b/cloudevents/tests/test_http_events.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from __future__ import annotations import bz2 import io @@ -241,11 +242,11 @@ def test_structured_to_request(specversion): assert headers["content-type"] == "application/cloudevents+json" for key in attributes: assert body[key] == attributes[key] - assert body["data"] == data, f"|{body_bytes}|| {body}" + assert body["data"] == data, f"|{body_bytes!r}|| {body}" @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_attributes_view_accessor(specversion: str): +def test_attributes_view_accessor(specversion: str) -> None: attributes: dict[str, typing.Any] = { "specversion": specversion, "type": "word.found.name", @@ -333,7 +334,7 @@ def test_valid_structured_events(specversion): events_queue = [] num_cloudevents = 30 for i in range(num_cloudevents): - event = { + raw_event = { "id": f"id{i}", "source": f"source{i}.com.test", "type": "cloudevent.test.type", @@ -343,7 +344,7 @@ def test_valid_structured_events(specversion): events_queue.append( from_http( {"content-type": "application/cloudevents+json"}, - json.dumps(event), + json.dumps(raw_event), ) ) @@ -454,7 +455,7 @@ def test_invalid_data_format_structured_from_http(): headers = {"Content-Type": "application/cloudevents+json"} data = 20 with pytest.raises(cloud_exceptions.InvalidStructuredJSON) as e: - from_http(headers, data) + from_http(headers, data) # type: ignore[arg-type] # intentionally wrong type assert "Expected json of type (str, bytes, bytearray)" in str(e.value) @@ -526,7 +527,7 @@ def test_generic_exception(): e.errisinstance(cloud_exceptions.MissingRequiredFields) with pytest.raises(cloud_exceptions.GenericException) as e: - from_http({}, 123) + from_http({}, 123) # type: ignore[arg-type] # intentionally wrong type e.errisinstance(cloud_exceptions.InvalidStructuredJSON) with pytest.raises(cloud_exceptions.GenericException) as e: diff --git a/cloudevents/tests/test_kafka_conversions.py b/cloudevents/tests/test_kafka_conversions.py index 5580773..584a05e 100644 --- a/cloudevents/tests/test_kafka_conversions.py +++ b/cloudevents/tests/test_kafka_conversions.py @@ -19,6 +19,7 @@ import pytest from cloudevents import exceptions as cloud_exceptions +from cloudevents.abstract.event import AnyCloudEvent from cloudevents.http import CloudEvent from cloudevents.kafka.conversion import ( KafkaMessage, @@ -36,7 +37,9 @@ def simple_serialize(data: dict) -> bytes: def simple_deserialize(data: bytes) -> dict: - return json.loads(data.decode()) + value = json.loads(data.decode()) + assert isinstance(value, dict) + return value def failing_func(*args): @@ -47,7 +50,7 @@ class KafkaConversionTestBase: expected_data = {"name": "test", "amount": 1} expected_custom_mapped_key = "custom-key" - def custom_key_mapper(self, _) -> str: + def custom_key_mapper(self, _: AnyCloudEvent) -> str: return self.expected_custom_mapped_key @pytest.fixture diff --git a/cloudevents/tests/test_marshaller.py b/cloudevents/tests/test_marshaller.py index 9060989..6561418 100644 --- a/cloudevents/tests/test_marshaller.py +++ b/cloudevents/tests/test_marshaller.py @@ -50,14 +50,14 @@ def test_from_request_wrong_unmarshaller(): with pytest.raises(exceptions.InvalidDataUnmarshaller): m = marshaller.NewDefaultHTTPMarshaller() _ = m.FromRequest( - event=v1.Event(), headers={}, body="", data_unmarshaller=object() + event=v1.Event(), headers={}, body="", data_unmarshaller=object() # type: ignore[arg-type] # intentionally wrong type # noqa: E501 ) def test_to_request_wrong_marshaller(): with pytest.raises(exceptions.InvalidDataMarshaller): m = marshaller.NewDefaultHTTPMarshaller() - _ = m.ToRequest(v1.Event(), data_marshaller="") + _ = m.ToRequest(v1.Event(), data_marshaller="") # type: ignore[arg-type] # intentionally wrong type # noqa: E501 def test_from_request_cannot_read(binary_headers): diff --git a/cloudevents/tests/test_pydantic_events.py b/cloudevents/tests/test_pydantic_events.py index 3e536f0..30ad1fe 100644 --- a/cloudevents/tests/test_pydantic_events.py +++ b/cloudevents/tests/test_pydantic_events.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +from __future__ import annotations import bz2 import io @@ -28,10 +29,13 @@ from cloudevents.pydantic.v1.event import CloudEvent as PydanticV1CloudEvent from cloudevents.pydantic.v2.conversion import from_http as pydantic_v2_from_http from cloudevents.pydantic.v2.event import CloudEvent as PydanticV2CloudEvent -from cloudevents.sdk import converters +from cloudevents.sdk import converters, types from cloudevents.sdk.converters.binary import is_binary from cloudevents.sdk.converters.structured import is_structured +if typing.TYPE_CHECKING: + from typing_extensions import TypeAlias + invalid_test_headers = [ { "ce-source": "", @@ -70,7 +74,30 @@ app = Sanic("test_pydantic_http_events") -_pydantic_implementation = { + +AnyPydanticCloudEvent: TypeAlias = typing.Union[ + PydanticV1CloudEvent, PydanticV2CloudEvent +] + + +class FromHttpFn(typing.Protocol): + def __call__( + self, + headers: typing.Dict[str, str], + data: typing.Optional[typing.AnyStr], + data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, + ) -> AnyPydanticCloudEvent: + pass + + +class PydanticImplementation(typing.TypedDict): + event: typing.Type[AnyPydanticCloudEvent] + validation_error: typing.Type[Exception] + from_http: FromHttpFn + pydantic_version: typing.Literal["v1", "v2"] + + +_pydantic_implementation: typing.Mapping[str, PydanticImplementation] = { "v1": { "event": PydanticV1CloudEvent, "validation_error": PydanticV1ValidationError, @@ -87,7 +114,9 @@ @pytest.fixture(params=["v1", "v2"]) -def cloudevents_implementation(request): +def cloudevents_implementation( + request: pytest.FixtureRequest, +) -> PydanticImplementation: return _pydantic_implementation[request.param] @@ -108,7 +137,9 @@ async def echo(request, pydantic_version): @pytest.mark.parametrize("body", invalid_cloudevent_request_body) -def test_missing_required_fields_structured(body, cloudevents_implementation): +def test_missing_required_fields_structured( + body: dict, cloudevents_implementation: PydanticImplementation +) -> None: with pytest.raises(cloud_exceptions.MissingRequiredFields): _ = cloudevents_implementation["from_http"]( {"Content-Type": "application/cloudevents+json"}, json.dumps(body) @@ -116,20 +147,26 @@ def test_missing_required_fields_structured(body, cloudevents_implementation): @pytest.mark.parametrize("headers", invalid_test_headers) -def test_missing_required_fields_binary(headers, cloudevents_implementation): +def test_missing_required_fields_binary( + headers: dict, cloudevents_implementation: PydanticImplementation +) -> None: with pytest.raises(cloud_exceptions.MissingRequiredFields): _ = cloudevents_implementation["from_http"](headers, json.dumps(test_data)) @pytest.mark.parametrize("headers", invalid_test_headers) -def test_missing_required_fields_empty_data_binary(headers, cloudevents_implementation): +def test_missing_required_fields_empty_data_binary( + headers: dict, cloudevents_implementation: PydanticImplementation +) -> None: # Test for issue #115 with pytest.raises(cloud_exceptions.MissingRequiredFields): _ = cloudevents_implementation["from_http"](headers, None) @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_emit_binary_event(specversion, cloudevents_implementation): +def test_emit_binary_event( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: headers = { "ce-id": "my-id", "ce-source": "", @@ -159,7 +196,9 @@ def test_emit_binary_event(specversion, cloudevents_implementation): @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_emit_structured_event(specversion, cloudevents_implementation): +def test_emit_structured_event( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: headers = {"Content-Type": "application/cloudevents+json"} body = { "id": "my-id", @@ -188,7 +227,11 @@ def test_emit_structured_event(specversion, cloudevents_implementation): "converter", [converters.TypeBinary, converters.TypeStructured] ) @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_roundtrip_non_json_event(converter, specversion, cloudevents_implementation): +def test_roundtrip_non_json_event( + converter: str, + specversion: str, + cloudevents_implementation: PydanticImplementation, +) -> None: input_data = io.BytesIO() for _ in range(100): for j in range(20): @@ -217,7 +260,9 @@ def test_roundtrip_non_json_event(converter, specversion, cloudevents_implementa @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_missing_ce_prefix_binary_event(specversion, cloudevents_implementation): +def test_missing_ce_prefix_binary_event( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: prefixed_headers = {} headers = { "ce-id": "my-id", @@ -240,9 +285,11 @@ def test_missing_ce_prefix_binary_event(specversion, cloudevents_implementation) @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_valid_binary_events(specversion, cloudevents_implementation): +def test_valid_binary_events( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: # Test creating multiple cloud events - events_queue = [] + events_queue: list[AnyPydanticCloudEvent] = [] headers = {} num_cloudevents = 30 for i in range(num_cloudevents): @@ -258,7 +305,7 @@ def test_valid_binary_events(specversion, cloudevents_implementation): ) for i, event in enumerate(events_queue): - data = event.data + assert isinstance(event.data, dict) assert event["id"] == f"id{i}" assert event["source"] == f"source{i}.com.test" assert event["specversion"] == specversion @@ -266,7 +313,9 @@ def test_valid_binary_events(specversion, cloudevents_implementation): @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_structured_to_request(specversion, cloudevents_implementation): +def test_structured_to_request( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: attributes = { "specversion": specversion, "type": "word.found.name", @@ -283,11 +332,13 @@ def test_structured_to_request(specversion, cloudevents_implementation): assert headers["content-type"] == "application/cloudevents+json" for key in attributes: assert body[key] == attributes[key] - assert body["data"] == data, f"|{body_bytes}|| {body}" + assert body["data"] == data, f"|{body_bytes!r}|| {body}" @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_attributes_view_accessor(specversion: str, cloudevents_implementation): +def test_attributes_view_accessor( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: attributes: dict[str, typing.Any] = { "specversion": specversion, "type": "word.found.name", @@ -296,9 +347,7 @@ def test_attributes_view_accessor(specversion: str, cloudevents_implementation): } data = {"message": "Hello World!"} - event: cloudevents_implementation["event"] = cloudevents_implementation["event"]( - attributes, data - ) + event = cloudevents_implementation["event"](attributes, data) event_attributes: typing.Mapping[str, typing.Any] = event.get_attributes() assert event_attributes["specversion"] == attributes["specversion"] assert event_attributes["type"] == attributes["type"] @@ -308,7 +357,9 @@ def test_attributes_view_accessor(specversion: str, cloudevents_implementation): @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_binary_to_request(specversion, cloudevents_implementation): +def test_binary_to_request( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: attributes = { "specversion": specversion, "type": "word.found.name", @@ -327,7 +378,9 @@ def test_binary_to_request(specversion, cloudevents_implementation): @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_empty_data_structured_event(specversion, cloudevents_implementation): +def test_empty_data_structured_event( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: # Testing if cloudevent breaks when no structured data field present attributes = { "specversion": specversion, @@ -352,7 +405,9 @@ def test_empty_data_structured_event(specversion, cloudevents_implementation): @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_empty_data_binary_event(specversion, cloudevents_implementation): +def test_empty_data_binary_event( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: # Testing if cloudevent breaks when no structured data field present headers = { "Content-Type": "application/octet-stream", @@ -372,12 +427,14 @@ def test_empty_data_binary_event(specversion, cloudevents_implementation): @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_valid_structured_events(specversion, cloudevents_implementation): +def test_valid_structured_events( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: # Test creating multiple cloud events - events_queue = [] + events_queue: list[AnyPydanticCloudEvent] = [] num_cloudevents = 30 for i in range(num_cloudevents): - event = { + raw_event = { "id": f"id{i}", "source": f"source{i}.com.test", "type": "cloudevent.test.type", @@ -387,11 +444,12 @@ def test_valid_structured_events(specversion, cloudevents_implementation): events_queue.append( cloudevents_implementation["from_http"]( {"content-type": "application/cloudevents+json"}, - json.dumps(event), + json.dumps(raw_event), ) ) for i, event in enumerate(events_queue): + assert isinstance(event.data, dict) assert event["id"] == f"id{i}" assert event["source"] == f"source{i}.com.test" assert event["specversion"] == specversion @@ -399,7 +457,9 @@ def test_valid_structured_events(specversion, cloudevents_implementation): @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_structured_no_content_type(specversion, cloudevents_implementation): +def test_structured_no_content_type( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: # Test creating multiple cloud events data = { "id": "id", @@ -410,6 +470,7 @@ def test_structured_no_content_type(specversion, cloudevents_implementation): } event = cloudevents_implementation["from_http"]({}, json.dumps(data)) + assert isinstance(event.data, dict) assert event["id"] == "id" assert event["source"] == "source.com.test" assert event["specversion"] == specversion @@ -437,7 +498,9 @@ def test_is_binary(): @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_cloudevent_repr(specversion, cloudevents_implementation): +def test_cloudevent_repr( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: headers = { "Content-Type": "application/octet-stream", "ce-specversion": specversion, @@ -454,7 +517,9 @@ def test_cloudevent_repr(specversion, cloudevents_implementation): @pytest.mark.parametrize("specversion", ["1.0", "0.3"]) -def test_none_data_cloudevent(specversion, cloudevents_implementation): +def test_none_data_cloudevent( + specversion: str, cloudevents_implementation: PydanticImplementation +) -> None: event = cloudevents_implementation["event"]( { "source": "", @@ -466,7 +531,7 @@ def test_none_data_cloudevent(specversion, cloudevents_implementation): to_structured(event) -def test_wrong_specversion(cloudevents_implementation): +def test_wrong_specversion(cloudevents_implementation: PydanticImplementation) -> None: headers = {"Content-Type": "application/cloudevents+json"} data = json.dumps( { @@ -481,15 +546,19 @@ def test_wrong_specversion(cloudevents_implementation): assert "Found invalid specversion 0.2" in str(e.value) -def test_invalid_data_format_structured_from_http(cloudevents_implementation): +def test_invalid_data_format_structured_from_http( + cloudevents_implementation: PydanticImplementation, +) -> None: headers = {"Content-Type": "application/cloudevents+json"} data = 20 with pytest.raises(cloud_exceptions.InvalidStructuredJSON) as e: - cloudevents_implementation["from_http"](headers, data) + cloudevents_implementation["from_http"](headers, data) # type: ignore[type-var] # intentionally wrong type # noqa: E501 assert "Expected json of type (str, bytes, bytearray)" in str(e.value) -def test_wrong_specversion_to_request(cloudevents_implementation): +def test_wrong_specversion_to_request( + cloudevents_implementation: PydanticImplementation, +) -> None: event = cloudevents_implementation["event"]({"source": "s", "type": "t"}, None) with pytest.raises(cloud_exceptions.InvalidRequiredFields) as e: event["specversion"] = "0.2" @@ -513,7 +582,9 @@ def test_is_structured(): assert not is_structured(headers) -def test_empty_json_structured(cloudevents_implementation): +def test_empty_json_structured( + cloudevents_implementation: PydanticImplementation, +) -> None: headers = {"Content-Type": "application/cloudevents+json"} data = "" with pytest.raises(cloud_exceptions.MissingRequiredFields) as e: @@ -521,7 +592,9 @@ def test_empty_json_structured(cloudevents_implementation): assert "Failed to read specversion from both headers and data" in str(e.value) -def test_uppercase_headers_with_none_data_binary(cloudevents_implementation): +def test_uppercase_headers_with_none_data_binary( + cloudevents_implementation: PydanticImplementation, +) -> None: headers = { "Ce-Id": "my-id", "Ce-Source": "", @@ -538,7 +611,7 @@ def test_uppercase_headers_with_none_data_binary(cloudevents_implementation): assert new_data is None -def test_generic_exception(cloudevents_implementation): +def test_generic_exception(cloudevents_implementation: PydanticImplementation) -> None: headers = {"Content-Type": "application/cloudevents+json"} data = json.dumps( { @@ -554,7 +627,7 @@ def test_generic_exception(cloudevents_implementation): e.errisinstance(cloud_exceptions.MissingRequiredFields) with pytest.raises(cloud_exceptions.GenericException) as e: - cloudevents_implementation["from_http"]({}, 123) + cloudevents_implementation["from_http"]({}, 123) # type: ignore[type-var] # intentionally wrong type # noqa: E501 e.errisinstance(cloud_exceptions.InvalidStructuredJSON) with pytest.raises(cloud_exceptions.GenericException) as e: @@ -569,7 +642,9 @@ def test_generic_exception(cloudevents_implementation): e.errisinstance(cloud_exceptions.DataMarshallerError) -def test_non_dict_data_no_headers_bug(cloudevents_implementation): +def test_non_dict_data_no_headers_bug( + cloudevents_implementation: PydanticImplementation, +) -> None: # Test for issue #116 headers = {"Content-Type": "application/cloudevents+json"} data = "123" From b70ef0ad6b0f1c427ff3ca6913ec8e1c1e007c0b Mon Sep 17 00:00:00 2001 From: Hal Blackburn Date: Tue, 25 Mar 2025 05:18:00 +0000 Subject: [PATCH 04/10] chore: allow non-dict headers types in from_http() from_http() conversion function was requiring its headers argument to be a typing.Dict, which makes it incompatible with headers types of http libraries, which support features like multiple values per key. typing.Mapping and even _typeshed.SupportsItems do not cover these types. For example, samples/http-image-cloudevents/image_sample_server.py was failing to type check where it calls `from_http(request.headers, ...)`. To support these kind of headers types in from_http(), we now define our own SupportsDuplicateItems protocol, which is broader than _typeshed.SupportsItems. I've only applied this to from_http(), as typing.Mapping is OK for most other methods that accept dict-like objects, and using this more lenient interface everywhere would impose restrictions on our implementation, even though it might be more flexible for users. Signed-off-by: Hal Blackburn --- cloudevents/conversion.py | 2 +- cloudevents/http/conversion.py | 2 +- cloudevents/pydantic/v1/conversion.py | 2 +- cloudevents/pydantic/v2/conversion.py | 2 +- cloudevents/sdk/types.py | 16 ++++++++++++++++ 5 files changed, 20 insertions(+), 4 deletions(-) diff --git a/cloudevents/conversion.py b/cloudevents/conversion.py index c73e3ed..e307344 100644 --- a/cloudevents/conversion.py +++ b/cloudevents/conversion.py @@ -91,7 +91,7 @@ def from_json( def from_http( event_type: typing.Type[AnyCloudEvent], - headers: typing.Mapping[str, str], + headers: types.SupportsDuplicateItems[str, str], data: typing.Optional[typing.Union[str, bytes]], data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, ) -> AnyCloudEvent: diff --git a/cloudevents/http/conversion.py b/cloudevents/http/conversion.py index a7da926..c0830ee 100644 --- a/cloudevents/http/conversion.py +++ b/cloudevents/http/conversion.py @@ -37,7 +37,7 @@ def from_json( def from_http( - headers: typing.Dict[str, str], + headers: types.SupportsDuplicateItems[str, str], data: typing.Optional[typing.Union[str, bytes]], data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, ) -> CloudEvent: diff --git a/cloudevents/pydantic/v1/conversion.py b/cloudevents/pydantic/v1/conversion.py index dcf0b7d..ded202f 100644 --- a/cloudevents/pydantic/v1/conversion.py +++ b/cloudevents/pydantic/v1/conversion.py @@ -21,7 +21,7 @@ def from_http( - headers: typing.Dict[str, str], + headers: types.SupportsDuplicateItems[str, str], data: typing.Optional[typing.AnyStr], data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, ) -> CloudEvent: diff --git a/cloudevents/pydantic/v2/conversion.py b/cloudevents/pydantic/v2/conversion.py index 6510854..47ce282 100644 --- a/cloudevents/pydantic/v2/conversion.py +++ b/cloudevents/pydantic/v2/conversion.py @@ -22,7 +22,7 @@ def from_http( - headers: typing.Dict[str, str], + headers: types.SupportsDuplicateItems[str, str], data: typing.Optional[typing.AnyStr], data_unmarshaller: typing.Optional[types.UnmarshallerType] = None, ) -> CloudEvent: diff --git a/cloudevents/sdk/types.py b/cloudevents/sdk/types.py index e6ab46e..6baef6b 100644 --- a/cloudevents/sdk/types.py +++ b/cloudevents/sdk/types.py @@ -14,9 +14,25 @@ import typing +_K_co = typing.TypeVar("_K_co", covariant=True) +_V_co = typing.TypeVar("_V_co", covariant=True) + # Use consistent types for marshal and unmarshal functions across # both JSON and Binary format. MarshallerType = typing.Callable[[typing.Any], typing.AnyStr] UnmarshallerType = typing.Callable[[typing.AnyStr], typing.Any] + + +class SupportsDuplicateItems(typing.Protocol[_K_co, _V_co]): + """ + Dict-like objects with an items() method that may produce duplicate keys. + """ + + # This is wider than _typeshed.SupportsItems, which expects items() to + # return type an AbstractSet. werkzeug's Headers class satisfies this type, + # but not _typeshed.SupportsItems. + + def items(self) -> typing.Iterable[typing.Tuple[_K_co, _V_co]]: + pass From 66aa64b3aff95c0ebde4de2717e5c37fab8894a8 Mon Sep 17 00:00:00 2001 From: Hal Blackburn Date: Tue, 25 Mar 2025 06:14:47 +0000 Subject: [PATCH 05/10] build: run mypy via tox Tox now runs mypy on cloudevents itself, and the samples. Signed-off-by: Hal Blackburn --- requirements/dev.txt | 1 + requirements/mypy.txt | 5 +++++ tox.ini | 20 +++++++++++++++++++- 3 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 requirements/mypy.txt diff --git a/requirements/dev.txt b/requirements/dev.txt index 6387294..fa91028 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -5,3 +5,4 @@ pep8-naming flake8-print tox pre-commit +mypy diff --git a/requirements/mypy.txt b/requirements/mypy.txt new file mode 100644 index 0000000..2f2229c --- /dev/null +++ b/requirements/mypy.txt @@ -0,0 +1,5 @@ +mypy +# mypy has the pydantic plugin enabled +pydantic>=2.0.0,<3.0 +types-requests +deprecation>=2.0,<3.0 diff --git a/tox.ini b/tox.ini index 0436a1b..1b95d5f 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py{38,39,310,311,312},lint +envlist = py{38,39,310,311,312},lint,mypy,mypy-samples-{image,json} skipsdist = True [testenv] @@ -30,3 +30,21 @@ commands = black --check . isort -c cloudevents samples flake8 cloudevents samples --ignore W503,E731 --extend-ignore E203 --max-line-length 88 + +[testenv:mypy] +basepython = python3.11 +deps = + -r{toxinidir}/requirements/mypy.txt + # mypy needs test dependencies to check test modules + -r{toxinidir}/requirements/test.txt +commands = mypy cloudevents + +[testenv:mypy-samples-{image,json}] +basepython = python3.11 +setenv = + mypy-samples-image: SAMPLE_DIR={toxinidir}/samples/http-image-cloudevents + mypy-samples-json: SAMPLE_DIR={toxinidir}/samples/http-json-cloudevents +deps = + -r{toxinidir}/requirements/mypy.txt + -r{env:SAMPLE_DIR}/requirements.txt +commands = mypy {env:SAMPLE_DIR} From a3291f54538a4f49e781a7c26ab00f5c6912f048 Mon Sep 17 00:00:00 2001 From: Hal Blackburn Date: Tue, 25 Mar 2025 06:25:27 +0000 Subject: [PATCH 06/10] build(ci): run mypy in CI alongside linting Signed-off-by: Hal Blackburn --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 107bf9e..b5eb4b8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -17,7 +17,7 @@ jobs: - name: Install dev dependencies run: python -m pip install -r requirements/dev.txt - name: Run linting - run: python -m tox -e lint + run: python -m tox -e lint,mypy,mypy-samples-image,mypy-samples-json test: strategy: From 029d122efa01262e0dbf1db061b2818eb48f1abb Mon Sep 17 00:00:00 2001 From: Hal Blackburn Date: Tue, 25 Mar 2025 06:26:01 +0000 Subject: [PATCH 07/10] chore: fix minor mypy type complaint in samples Signed-off-by: Hal Blackburn --- samples/http-image-cloudevents/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/http-image-cloudevents/client.py b/samples/http-image-cloudevents/client.py index 021c1f5..ee00394 100644 --- a/samples/http-image-cloudevents/client.py +++ b/samples/http-image-cloudevents/client.py @@ -25,7 +25,7 @@ image_bytes = resp.content -def send_binary_cloud_event(url: str): +def send_binary_cloud_event(url: str) -> None: # Create cloudevent attributes = { "type": "com.example.string", @@ -42,7 +42,7 @@ def send_binary_cloud_event(url: str): print(f"Sent {event['id']} of type {event['type']}") -def send_structured_cloud_event(url: str): +def send_structured_cloud_event(url: str) -> None: # Create cloudevent attributes = { "type": "com.example.base64", From f611c299b6efb45c2ffe78b83ab22fb5dc1e0766 Mon Sep 17 00:00:00 2001 From: Hal Blackburn Date: Tue, 25 Mar 2025 06:32:00 +0000 Subject: [PATCH 08/10] feat: use Mapping, not Dict for input arguments Mapping imposes less restrictions on callers, because it's read-only and allows non-dict types to be passed without copying them as dict(), or passing dict-like values and ignoring the resulting type error. Signed-off-by: Hal Blackburn --- cloudevents/abstract/event.py | 2 +- cloudevents/conversion.py | 2 +- cloudevents/http/conversion.py | 2 +- cloudevents/http/event.py | 6 ++++-- cloudevents/pydantic/v1/conversion.py | 2 +- cloudevents/pydantic/v1/event.py | 6 ++++-- cloudevents/pydantic/v2/conversion.py | 2 +- cloudevents/pydantic/v2/event.py | 6 ++++-- 8 files changed, 17 insertions(+), 11 deletions(-) diff --git a/cloudevents/abstract/event.py b/cloudevents/abstract/event.py index c18ca34..18c6df1 100644 --- a/cloudevents/abstract/event.py +++ b/cloudevents/abstract/event.py @@ -32,7 +32,7 @@ class CloudEvent: @classmethod def create( cls: typing.Type[AnyCloudEvent], - attributes: typing.Dict[str, typing.Any], + attributes: typing.Mapping[str, typing.Any], data: typing.Optional[typing.Any], ) -> AnyCloudEvent: """ diff --git a/cloudevents/conversion.py b/cloudevents/conversion.py index e307344..94d15fe 100644 --- a/cloudevents/conversion.py +++ b/cloudevents/conversion.py @@ -260,7 +260,7 @@ def best_effort_encode_attribute_value(value: typing.Any) -> typing.Any: def from_dict( event_type: typing.Type[AnyCloudEvent], - event: typing.Dict[str, typing.Any], + event: typing.Mapping[str, typing.Any], ) -> AnyCloudEvent: """ Constructs an Event object of a given `event_type` from diff --git a/cloudevents/http/conversion.py b/cloudevents/http/conversion.py index c0830ee..5d75a31 100644 --- a/cloudevents/http/conversion.py +++ b/cloudevents/http/conversion.py @@ -58,7 +58,7 @@ def from_http( def from_dict( - event: typing.Dict[str, typing.Any], + event: typing.Mapping[str, typing.Any], ) -> CloudEvent: """ Constructs a CloudEvent from a dict `event` representation. diff --git a/cloudevents/http/event.py b/cloudevents/http/event.py index c7a066d..f3c0063 100644 --- a/cloudevents/http/event.py +++ b/cloudevents/http/event.py @@ -34,11 +34,13 @@ class CloudEvent(abstract.CloudEvent): @classmethod def create( - cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any] + cls, + attributes: typing.Mapping[str, typing.Any], + data: typing.Optional[typing.Any], ) -> "CloudEvent": return cls(attributes, data) - def __init__(self, attributes: typing.Dict[str, str], data: typing.Any = None): + def __init__(self, attributes: typing.Mapping[str, str], data: typing.Any = None): """ Event Constructor :param attributes: a dict with cloudevent attributes. Minimally diff --git a/cloudevents/pydantic/v1/conversion.py b/cloudevents/pydantic/v1/conversion.py index ded202f..649de27 100644 --- a/cloudevents/pydantic/v1/conversion.py +++ b/cloudevents/pydantic/v1/conversion.py @@ -63,7 +63,7 @@ def from_json( def from_dict( - event: typing.Dict[str, typing.Any], + event: typing.Mapping[str, typing.Any], ) -> CloudEvent: """ Construct an CloudEvent from a dict `event` representation. diff --git a/cloudevents/pydantic/v1/event.py b/cloudevents/pydantic/v1/event.py index d18736a..98c6136 100644 --- a/cloudevents/pydantic/v1/event.py +++ b/cloudevents/pydantic/v1/event.py @@ -100,7 +100,9 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore @classmethod def create( - cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any] + cls, + attributes: typing.Mapping[str, typing.Any], + data: typing.Optional[typing.Any], ) -> "CloudEvent": return cls(attributes, data) @@ -155,7 +157,7 @@ def create( def __init__( # type: ignore[no-untyped-def] self, - attributes: typing.Optional[typing.Dict[str, typing.Any]] = None, + attributes: typing.Optional[typing.Mapping[str, typing.Any]] = None, data: typing.Optional[typing.Any] = None, **kwargs, ): diff --git a/cloudevents/pydantic/v2/conversion.py b/cloudevents/pydantic/v2/conversion.py index 47ce282..f68b7d6 100644 --- a/cloudevents/pydantic/v2/conversion.py +++ b/cloudevents/pydantic/v2/conversion.py @@ -64,7 +64,7 @@ def from_json( def from_dict( - event: typing.Dict[str, typing.Any], + event: typing.Mapping[str, typing.Any], ) -> CloudEvent: """ Construct an CloudEvent from a dict `event` representation. diff --git a/cloudevents/pydantic/v2/event.py b/cloudevents/pydantic/v2/event.py index 643794c..d73306c 100644 --- a/cloudevents/pydantic/v2/event.py +++ b/cloudevents/pydantic/v2/event.py @@ -44,7 +44,9 @@ class CloudEvent(abstract.CloudEvent, BaseModel): # type: ignore @classmethod def create( - cls, attributes: typing.Dict[str, typing.Any], data: typing.Optional[typing.Any] + cls, + attributes: typing.Mapping[str, typing.Any], + data: typing.Optional[typing.Any], ) -> "CloudEvent": return cls(attributes, data) @@ -103,7 +105,7 @@ def create( def __init__( # type: ignore[no-untyped-def] self, - attributes: typing.Optional[typing.Dict[str, typing.Any]] = None, + attributes: typing.Optional[typing.Mapping[str, typing.Any]] = None, data: typing.Optional[typing.Any] = None, **kwargs, ): From dccd3cf29d48908fd4e2d5231a00f38d1454be6d Mon Sep 17 00:00:00 2001 From: Hal Blackburn Date: Tue, 25 Mar 2025 06:49:43 +0000 Subject: [PATCH 09/10] chore: fix tests on py3.8 Tests were failing because the sanic dependency dropped support for py3.8 in its current release. sanic is now pinned to the last compatible version for py3.8 only. Signed-off-by: Hal Blackburn --- requirements/test.txt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/requirements/test.txt b/requirements/test.txt index 3e32e4a..5787e09 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -4,7 +4,12 @@ flake8-print pytest pytest-cov # web app tests -sanic + +# sanic stopped supporting 3.8 in 24.12: +# https://sanic.dev/en/release-notes/changelog.html#version-24120- +sanic ; python_version >= '3.9' +sanic<24.12.0 ; python_version < '3.9' + sanic-testing aiohttp Pillow From 130cdfa46a334d97bbd87055e42a2c7591194999 Mon Sep 17 00:00:00 2001 From: Hal Blackburn Date: Tue, 1 Apr 2025 18:00:45 +0000 Subject: [PATCH 10/10] feat: support new model_validate_json() kwargs Pydantic added by_alias and by_name keyword arguments to BaseModel.model_validate_json in 2.11.1: https://github.com/pydantic/pydantic/commit/acb0f10fda1c78441e052c57b4288bc91431f852 This caused mypy to report that that the Pydantic v2 CloudEvent did not override model_validate_json() correctly. Our override now accepts these newly-added arguments. They have no effect, as the implementation does not use Pydantic to validate the JSON, but we also don't use field aliases, so the only effect they could have in the superclass would be to raise an error if they're both False. Signed-off-by: Hal Blackburn --- cloudevents/pydantic/v2/event.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cloudevents/pydantic/v2/event.py b/cloudevents/pydantic/v2/event.py index d73306c..34a9b65 100644 --- a/cloudevents/pydantic/v2/event.py +++ b/cloudevents/pydantic/v2/event.py @@ -175,6 +175,8 @@ def model_validate_json( *, strict: typing.Optional[bool] = None, context: typing.Optional[typing.Dict[str, Any]] = None, + by_alias: typing.Optional[bool] = None, + by_name: typing.Optional[bool] = None, ) -> "CloudEvent": return conversion.from_json(cls, json_data)