From b849399315ef6273e0369e006077141b3354ebfa Mon Sep 17 00:00:00 2001 From: Nicklas Warming Jacobsen Date: Sun, 17 Nov 2024 17:23:33 +0100 Subject: [PATCH 1/7] Add module `aws_lambda_events::event::appsync_events` --- lambda-events/Cargo.toml | 2 + lambda-events/src/custom_serde/mod.rs | 24 +++ lambda-events/src/event/appsync_events/mod.rs | 178 ++++++++++++++++++ lambda-events/src/event/mod.rs | 4 + 4 files changed, 208 insertions(+) create mode 100644 lambda-events/src/event/appsync_events/mod.rs diff --git a/lambda-events/Cargo.toml b/lambda-events/Cargo.toml index d9774104..fb5c8afd 100644 --- a/lambda-events/Cargo.toml +++ b/lambda-events/Cargo.toml @@ -38,6 +38,7 @@ default = [ "alb", "apigw", "appsync", + "appsync_events", "autoscaling", "bedrock_agent_runtime", "chime_bot", @@ -83,6 +84,7 @@ activemq = [] alb = ["bytes", "http", "http-body", "http-serde", "query_map"] apigw = ["bytes", "http", "http-body", "http-serde", "iam", "query_map"] appsync = [] +appsync_events = [] autoscaling = ["chrono"] bedrock_agent_runtime = [] chime_bot = ["chrono"] diff --git a/lambda-events/src/custom_serde/mod.rs b/lambda-events/src/custom_serde/mod.rs index 729dee3d..c4db502e 100644 --- a/lambda-events/src/custom_serde/mod.rs +++ b/lambda-events/src/custom_serde/mod.rs @@ -1,8 +1,11 @@ use base64::Engine; +#[cfg(any(feature = "appsync_events"))] +use serde::{de::DeserializeOwned, ser::Error as SerError, Serialize}; use serde::{ de::{Deserialize, Deserializer, Error as DeError}, ser::Serializer, }; + use std::collections::HashMap; #[cfg(feature = "codebuild")] @@ -94,6 +97,27 @@ where Ok(opt.unwrap_or_default()) } +#[cfg(any(feature = "appsync_events"))] +pub(crate) fn serialize_stringified_json(value: &T, serializer: S) -> Result +where + T: Serialize, + S: Serializer, +{ + let json_str = serde_json::to_string(value).map_err(|e| S::Error::custom(e))?; + + json_str.serialize(serializer) +} + +pub(crate) fn deserialize_stringified_json<'de, T, D>(deserializer: D) -> Result +where + T: DeserializeOwned, + D: Deserializer<'de>, +{ + let json_str = String::deserialize(deserializer)?; + + serde_json::from_str(&json_str).map_err(|e| D::Error::custom(e)) +} + #[cfg(test)] #[allow(deprecated)] mod test { diff --git a/lambda-events/src/event/appsync_events/mod.rs b/lambda-events/src/event/appsync_events/mod.rs new file mode 100644 index 00000000..357f6b81 --- /dev/null +++ b/lambda-events/src/event/appsync_events/mod.rs @@ -0,0 +1,178 @@ +use std::{collections::HashMap, fmt}; + +use crate::custom_serde::{ + deserialize_headers, deserialize_lambda_map, deserialize_stringified_json, serialize_headers, + serialize_stringified_json, +}; +use http::HeaderMap; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde_json::Value; + +/// `AppSyncEventsLambdaAuthorizerRequest` contains an authorization request from AppSync Events. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppSyncEventsLambdaAuthorizerRequest { + #[serde(default)] + pub authorization_token: Option, + pub request_context: AppSyncEventsLambdaAuthorizerRequestContext, + #[serde(deserialize_with = "deserialize_headers", default)] + #[serde(serialize_with = "serialize_headers")] + pub headers: HeaderMap, +} + +/// `AppSyncEventsLambdaAuthorizerRequestContext` contains the parameters of the AppSync Events invocation which triggered +/// this authorization request. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppSyncEventsLambdaAuthorizerRequestContext { + #[serde(default)] + pub account_id: Option, + #[serde(default)] + pub api_id: Option, + #[serde(default)] + pub operation: Option, + #[serde(default)] + pub request_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub channel_namespace_name: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub channel: Option, +} + +/// `AppSyncEventsOperation` represent all the possible operations which +/// triggered this authorization request. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum AppSyncEventsOperation { + EventConnect, + EventSubscribe, + EventPublish, +} + +impl fmt::Display for AppSyncEventsOperation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let val = match self { + AppSyncEventsOperation::EventConnect => "EVENT_CONNECT", + AppSyncEventsOperation::EventSubscribe => "EVENT_SUBSCRIBE", + AppSyncEventsOperation::EventPublish => "EVENT_PUBLISH", + }; + + write!(f, "{val}") + } +} + +/// `AppSyncEventsLambdaAuthorizerResponse` represents the expected format of an authorization response to AppSync Events. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppSyncEventsLambdaAuthorizerResponse +where + T1: DeserializeOwned + Serialize, +{ + pub is_authorized: bool, + #[serde(deserialize_with = "deserialize_lambda_map")] + #[serde(default)] + #[serde(bound = "")] + pub handler_context: HashMap, + pub ttl_override: Option, +} + +/// `AppSyncEventsWebscoketMessage` represents all possible messages which can be sent between +/// AppSync Events and a connected websocket client. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum AppSyncEventsWebsocketMessage +where + T1: DeserializeOwned + Serialize, +{ + ConnectionInit, + ConnectionAck(AppSyncEventsConnectionAckMessage), + #[serde(rename = "ka")] + KeepAlive, + #[serde(bound = "")] + Subscribe(AppSyncEventsSubscribeMessage), + SubscribeSuccess(AppSyncEventsSubscribeSuccessMessage), + SubscribeError(AppSyncEventsErrorMessage), + #[serde(bound = "")] + Data(AppSyncEventsDataMessage), + BroadcastError(AppSyncEventsErrorMessage), + Unsubscribe(AppSyncEventsUnsubscribeMessage), + UnsubscribeSuccess(AppSyncEventsUnsubscribeSuccessMessage), + UnsubscribeError(AppSyncEventsErrorMessage), +} + +/// `AppSyncEventsConnectionAckMessage` contains the connection paramters for this acknowledged +/// connection. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppSyncEventsConnectionAckMessage { + pub connection_timeout_ms: Option, +} + +/// `AppSyncEventsSubscribeMessage` contains the parameters to subscribe to an AppSync Events channel. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppSyncEventsSubscribeMessage { + pub id: Option, + pub channel: Option, + #[serde(deserialize_with = "deserialize_headers", default)] + #[serde(serialize_with = "serialize_headers")] + pub authorization: HeaderMap, +} + +/// `AppSyncEventsSubscribeSuccessMessage` contains the subscription parameters for this +/// successful subscription. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppSyncEventsSubscribeSuccessMessage { + pub id: Option, +} + +/// `AppSyncEventsErrorMessage` contains one or more AppSync Events errors. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppSyncEventsErrorMessage { + pub id: Option, + #[serde(default)] + pub errors: Option>, +} + +/// `AppSyncEventSubscribeErrorDescription` contains information about an error. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppSyncEventSubscribeErrorDescription { + #[serde(default)] + pub error_type: Option, + #[serde(default)] + pub message: Option, +} + +/// `AppSyncEventsDataMessage` represents an incoming event on a subscribed AppSync Events channel. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppSyncEventsDataMessage +where + T1: DeserializeOwned + Serialize, +{ + pub id: Option, + #[serde( + bound = "", + deserialize_with = "deserialize_stringified_json", + serialize_with = "serialize_stringified_json" + )] + pub event: T1, +} + +/// `AppSyncEventsUnsubscribeMessage` contains the parameters to unsubscribe to an AppSync Events channel. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppSyncEventsUnsubscribeMessage { + pub id: Option, +} + +/// `AppSyncEventsUnsubscribeSuccessMessage` contains the unsubscription parameters for this +/// successful unsubscription. +#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct AppSyncEventsUnsubscribeSuccessMessage { + pub id: Option, +} diff --git a/lambda-events/src/event/mod.rs b/lambda-events/src/event/mod.rs index d63acc4d..aca06e08 100644 --- a/lambda-events/src/event/mod.rs +++ b/lambda-events/src/event/mod.rs @@ -13,6 +13,10 @@ pub mod apigw; #[cfg(feature = "appsync")] pub mod appsync; +/// AWS Lambda event definitions for appsync. +#[cfg(feature = "appsync_events")] +pub mod appsync_events; + /// AWS Lambda event definitions for autoscaling. #[cfg(feature = "autoscaling")] pub mod autoscaling; From 5a1e196fd7cab820294e5a649eb8c1b286950a9a Mon Sep 17 00:00:00 2001 From: Nicklas Warming Jacobsen Date: Tue, 19 Nov 2024 12:58:35 +0100 Subject: [PATCH 2/7] add `pub use` for module `appsync_events` --- lambda-events/src/lib.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lambda-events/src/lib.rs b/lambda-events/src/lib.rs index e21cdc13..10536640 100644 --- a/lambda-events/src/lib.rs +++ b/lambda-events/src/lib.rs @@ -29,6 +29,10 @@ pub use event::apigw; #[cfg(feature = "appsync")] pub use event::appsync; +/// AWS Lambda event definitions for appsync events. +#[cfg(feature = "appsync_events")] +pub use event::appsync_events; + /// AWS Lambda event definitions for autoscaling. #[cfg(feature = "autoscaling")] pub use event::autoscaling; From 9203023c063f09eaef65c579def0f643c2186cf5 Mon Sep 17 00:00:00 2001 From: Nicklas Warming Jacobsen Date: Tue, 19 Nov 2024 13:01:02 +0100 Subject: [PATCH 3/7] Fix clippy lints --- lambda-events/src/custom_serde/mod.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lambda-events/src/custom_serde/mod.rs b/lambda-events/src/custom_serde/mod.rs index c4db502e..581740be 100644 --- a/lambda-events/src/custom_serde/mod.rs +++ b/lambda-events/src/custom_serde/mod.rs @@ -1,5 +1,5 @@ use base64::Engine; -#[cfg(any(feature = "appsync_events"))] +#[cfg(feature = "appsync_events")] use serde::{de::DeserializeOwned, ser::Error as SerError, Serialize}; use serde::{ de::{Deserialize, Deserializer, Error as DeError}, @@ -97,17 +97,18 @@ where Ok(opt.unwrap_or_default()) } -#[cfg(any(feature = "appsync_events"))] +#[cfg(feature = "appsync_events")] pub(crate) fn serialize_stringified_json(value: &T, serializer: S) -> Result where T: Serialize, S: Serializer, { - let json_str = serde_json::to_string(value).map_err(|e| S::Error::custom(e))?; + let json_str = serde_json::to_string(value).map_err(S::Error::custom)?; json_str.serialize(serializer) } +#[cfg(feature = "appsync_events")] pub(crate) fn deserialize_stringified_json<'de, T, D>(deserializer: D) -> Result where T: DeserializeOwned, @@ -115,7 +116,7 @@ where { let json_str = String::deserialize(deserializer)?; - serde_json::from_str(&json_str).map_err(|e| D::Error::custom(e)) + serde_json::from_str(&json_str).map_err(D::Error::custom) } #[cfg(test)] From 0e161f332b9181942bb795949ac070db0a63c0d3 Mon Sep 17 00:00:00 2001 From: Nicklas Warming Jacobsen Date: Tue, 19 Nov 2024 14:13:57 +0100 Subject: [PATCH 4/7] Fix build error when building with `--no-default-features --features appsync_events`. --- lambda-events/Cargo.toml | 2 +- lambda-events/src/custom_serde/mod.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lambda-events/Cargo.toml b/lambda-events/Cargo.toml index fb5c8afd..f6ecd3a6 100644 --- a/lambda-events/Cargo.toml +++ b/lambda-events/Cargo.toml @@ -84,7 +84,7 @@ activemq = [] alb = ["bytes", "http", "http-body", "http-serde", "query_map"] apigw = ["bytes", "http", "http-body", "http-serde", "iam", "query_map"] appsync = [] -appsync_events = [] +appsync_events = ["bytes", "http", "http-body"] autoscaling = ["chrono"] bedrock_agent_runtime = [] chime_bot = ["chrono"] diff --git a/lambda-events/src/custom_serde/mod.rs b/lambda-events/src/custom_serde/mod.rs index 581740be..983c77cd 100644 --- a/lambda-events/src/custom_serde/mod.rs +++ b/lambda-events/src/custom_serde/mod.rs @@ -18,7 +18,8 @@ pub type CodeBuildNumber = f32; feature = "apigw", feature = "s3", feature = "iot", - feature = "lambda_function_urls" + feature = "lambda_function_urls", + feature = "appsync_events" ))] mod headers; #[cfg(any( @@ -26,7 +27,8 @@ mod headers; feature = "apigw", feature = "s3", feature = "iot", - feature = "lambda_function_urls" + feature = "lambda_function_urls", + feature = "appsync_events" ))] pub(crate) use self::headers::*; From dce7327ce2dbdbe69055c6eef14ffe1ba25ac6f4 Mon Sep 17 00:00:00 2001 From: Nicklas Warming Jacobsen Date: Thu, 21 Nov 2024 12:26:18 +0100 Subject: [PATCH 5/7] Rename `AppSyncEventSubscribeErrorDescription` to `AppSyncEventErrorDescription` --- lambda-events/src/event/appsync_events/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lambda-events/src/event/appsync_events/mod.rs b/lambda-events/src/event/appsync_events/mod.rs index 357f6b81..890d7f75 100644 --- a/lambda-events/src/event/appsync_events/mod.rs +++ b/lambda-events/src/event/appsync_events/mod.rs @@ -133,13 +133,13 @@ pub struct AppSyncEventsSubscribeSuccessMessage { pub struct AppSyncEventsErrorMessage { pub id: Option, #[serde(default)] - pub errors: Option>, + pub errors: Option>, } /// `AppSyncEventSubscribeErrorDescription` contains information about an error. #[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] -pub struct AppSyncEventSubscribeErrorDescription { +pub struct AppSyncEventErrorDescription { #[serde(default)] pub error_type: Option, #[serde(default)] From 629364c11752b78b14cffba76150695552451f65 Mon Sep 17 00:00:00 2001 From: Nicklas Warming Jacobsen Date: Thu, 21 Nov 2024 12:32:31 +0100 Subject: [PATCH 6/7] Add serde attribute `skip_serializing_if = "Option::is_none"` on `AppSyncEventsErrorMessage::errors` --- lambda-events/src/event/appsync_events/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda-events/src/event/appsync_events/mod.rs b/lambda-events/src/event/appsync_events/mod.rs index 890d7f75..f9682b33 100644 --- a/lambda-events/src/event/appsync_events/mod.rs +++ b/lambda-events/src/event/appsync_events/mod.rs @@ -132,7 +132,7 @@ pub struct AppSyncEventsSubscribeSuccessMessage { #[serde(rename_all = "camelCase")] pub struct AppSyncEventsErrorMessage { pub id: Option, - #[serde(default)] + #[serde(default, skip_serializing_if = "Option::is_none")] pub errors: Option>, } From 5b6d13095cd353d7e26b4eb07db0abcbd844ac6f Mon Sep 17 00:00:00 2001 From: Nicklas Warming Jacobsen Date: Fri, 29 Nov 2024 20:15:19 +0100 Subject: [PATCH 7/7] Used tagged serialization for `AppSyncEventsWebsocketMessage` --- lambda-events/src/event/appsync_events/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda-events/src/event/appsync_events/mod.rs b/lambda-events/src/event/appsync_events/mod.rs index f9682b33..7c9536d8 100644 --- a/lambda-events/src/event/appsync_events/mod.rs +++ b/lambda-events/src/event/appsync_events/mod.rs @@ -79,7 +79,7 @@ where /// `AppSyncEventsWebscoketMessage` represents all possible messages which can be sent between /// AppSync Events and a connected websocket client. #[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)] -#[serde(rename_all = "snake_case")] +#[serde(rename_all = "snake_case", tag = "type")] pub enum AppSyncEventsWebsocketMessage where T1: DeserializeOwned + Serialize,