Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft for Appsync Events module described in issue #940 #944

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lambda-events/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ default = [
"alb",
"apigw",
"appsync",
"appsync_events",
"autoscaling",
"bedrock_agent_runtime",
"chime_bot",
Expand Down Expand Up @@ -83,6 +84,7 @@ activemq = []
alb = ["bytes", "http", "http-body", "http-serde", "query_map"]
apigw = ["bytes", "http", "http-body", "http-serde", "iam", "query_map"]
appsync = []
appsync_events = ["bytes", "http", "http-body"]
autoscaling = ["chrono"]
bedrock_agent_runtime = []
chime_bot = ["chrono"]
Expand Down
31 changes: 29 additions & 2 deletions lambda-events/src/custom_serde/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use base64::Engine;
#[cfg(feature = "appsync_events")]
use serde::{de::DeserializeOwned, ser::Error as SerError, Serialize};
use serde::{
de::{Deserialize, Deserializer, Error as DeError},
ser::Serializer,
};

use std::collections::HashMap;

#[cfg(feature = "codebuild")]
Expand All @@ -15,15 +18,17 @@ pub type CodeBuildNumber = f32;
feature = "apigw",
feature = "s3",
feature = "iot",
feature = "lambda_function_urls"
feature = "lambda_function_urls",
feature = "appsync_events"
))]
mod headers;
#[cfg(any(
feature = "alb",
feature = "apigw",
feature = "s3",
feature = "iot",
feature = "lambda_function_urls"
feature = "lambda_function_urls",
feature = "appsync_events"
))]
pub(crate) use self::headers::*;

Expand Down Expand Up @@ -94,6 +99,28 @@ where
Ok(opt.unwrap_or_default())
}

#[cfg(feature = "appsync_events")]
pub(crate) fn serialize_stringified_json<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
where
T: Serialize,
S: Serializer,
{
let json_str = serde_json::to_string(value).map_err(S::Error::custom)?;

json_str.serialize(serializer)
}

#[cfg(feature = "appsync_events")]
pub(crate) fn deserialize_stringified_json<'de, T, D>(deserializer: D) -> Result<T, D::Error>
where
T: DeserializeOwned,
D: Deserializer<'de>,
{
let json_str = String::deserialize(deserializer)?;

serde_json::from_str(&json_str).map_err(D::Error::custom)
}

#[cfg(test)]
#[allow(deprecated)]
mod test {
Expand Down
178 changes: 178 additions & 0 deletions lambda-events/src/event/appsync_events/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use std::{collections::HashMap, fmt};

use crate::custom_serde::{
deserialize_headers, deserialize_lambda_map, deserialize_stringified_json, serialize_headers,
serialize_stringified_json,
};
use http::HeaderMap;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;

/// `AppSyncEventsLambdaAuthorizerRequest` contains an authorization request from AppSync Events.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppSyncEventsLambdaAuthorizerRequest {
#[serde(default)]
pub authorization_token: Option<String>,
pub request_context: AppSyncEventsLambdaAuthorizerRequestContext,
#[serde(deserialize_with = "deserialize_headers", default)]
#[serde(serialize_with = "serialize_headers")]
pub headers: HeaderMap,
}

/// `AppSyncEventsLambdaAuthorizerRequestContext` contains the parameters of the AppSync Events invocation which triggered
/// this authorization request.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppSyncEventsLambdaAuthorizerRequestContext {
#[serde(default)]
pub account_id: Option<String>,
#[serde(default)]
pub api_id: Option<String>,
#[serde(default)]
pub operation: Option<AppSyncEventsOperation>,
#[serde(default)]
pub request_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub channel_namespace_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub channel: Option<String>,
}

/// `AppSyncEventsOperation` represent all the possible operations which
/// triggered this authorization request.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum AppSyncEventsOperation {
EventConnect,
EventSubscribe,
EventPublish,
}

impl fmt::Display for AppSyncEventsOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let val = match self {
AppSyncEventsOperation::EventConnect => "EVENT_CONNECT",
AppSyncEventsOperation::EventSubscribe => "EVENT_SUBSCRIBE",
AppSyncEventsOperation::EventPublish => "EVENT_PUBLISH",
};

write!(f, "{val}")
}
}

/// `AppSyncEventsLambdaAuthorizerResponse` represents the expected format of an authorization response to AppSync Events.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppSyncEventsLambdaAuthorizerResponse<T1 = Value>
where
T1: DeserializeOwned + Serialize,
{
pub is_authorized: bool,
#[serde(deserialize_with = "deserialize_lambda_map")]
#[serde(default)]
#[serde(bound = "")]
pub handler_context: HashMap<String, T1>,
pub ttl_override: Option<i64>,
}

/// `AppSyncEventsWebscoketMessage` represents all possible messages which can be sent between
/// AppSync Events and a connected websocket client.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "snake_case", tag = "type")]
pub enum AppSyncEventsWebsocketMessage<T1 = Value>
where
T1: DeserializeOwned + Serialize,
{
ConnectionInit,
ConnectionAck(AppSyncEventsConnectionAckMessage),
#[serde(rename = "ka")]
KeepAlive,
#[serde(bound = "")]
Subscribe(AppSyncEventsSubscribeMessage),
SubscribeSuccess(AppSyncEventsSubscribeSuccessMessage),
SubscribeError(AppSyncEventsErrorMessage),
#[serde(bound = "")]
Data(AppSyncEventsDataMessage<T1>),
BroadcastError(AppSyncEventsErrorMessage),
Unsubscribe(AppSyncEventsUnsubscribeMessage),
UnsubscribeSuccess(AppSyncEventsUnsubscribeSuccessMessage),
UnsubscribeError(AppSyncEventsErrorMessage),
}

/// `AppSyncEventsConnectionAckMessage` contains the connection paramters for this acknowledged
/// connection.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppSyncEventsConnectionAckMessage {
pub connection_timeout_ms: Option<i64>,
}

/// `AppSyncEventsSubscribeMessage` contains the parameters to subscribe to an AppSync Events channel.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppSyncEventsSubscribeMessage {
pub id: Option<String>,
pub channel: Option<String>,
#[serde(deserialize_with = "deserialize_headers", default)]
#[serde(serialize_with = "serialize_headers")]
pub authorization: HeaderMap,
}

/// `AppSyncEventsSubscribeSuccessMessage` contains the subscription parameters for this
/// successful subscription.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppSyncEventsSubscribeSuccessMessage {
pub id: Option<String>,
}

/// `AppSyncEventsErrorMessage` contains one or more AppSync Events errors.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppSyncEventsErrorMessage {
pub id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub errors: Option<Vec<AppSyncEventErrorDescription>>,
}

/// `AppSyncEventSubscribeErrorDescription` contains information about an error.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppSyncEventErrorDescription {
#[serde(default)]
pub error_type: Option<String>,
#[serde(default)]
pub message: Option<String>,
}

/// `AppSyncEventsDataMessage` represents an incoming event on a subscribed AppSync Events channel.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppSyncEventsDataMessage<T1 = Value>
where
T1: DeserializeOwned + Serialize,
{
pub id: Option<String>,
#[serde(
bound = "",
deserialize_with = "deserialize_stringified_json",
serialize_with = "serialize_stringified_json"
)]
pub event: T1,
}

/// `AppSyncEventsUnsubscribeMessage` contains the parameters to unsubscribe to an AppSync Events channel.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppSyncEventsUnsubscribeMessage {
pub id: Option<String>,
}

/// `AppSyncEventsUnsubscribeSuccessMessage` contains the unsubscription parameters for this
/// successful unsubscription.
#[derive(Debug, Clone, Eq, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AppSyncEventsUnsubscribeSuccessMessage {
pub id: Option<String>,
}
4 changes: 4 additions & 0 deletions lambda-events/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ pub mod apigw;
#[cfg(feature = "appsync")]
pub mod appsync;

/// AWS Lambda event definitions for appsync.
#[cfg(feature = "appsync_events")]
pub mod appsync_events;

/// AWS Lambda event definitions for autoscaling.
#[cfg(feature = "autoscaling")]
pub mod autoscaling;
Expand Down
4 changes: 4 additions & 0 deletions lambda-events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub use event::apigw;
#[cfg(feature = "appsync")]
pub use event::appsync;

/// AWS Lambda event definitions for appsync events.
#[cfg(feature = "appsync_events")]
pub use event::appsync_events;

/// AWS Lambda event definitions for autoscaling.
#[cfg(feature = "autoscaling")]
pub use event::autoscaling;
Expand Down