name: Test
+ push:
+ paths-ignore:
+ - "*.md"
+ pull_request:
+ paths-ignore:
+ - "*.md"
+ test:
+ runs-on: ${{ matrix.os }}
+ strategy:
+ fail-fast: false
+ matrix:
+ python-version: ["3.8"] # "3.x", "3.7", "3.8", "3.9", "3.10"
+ os: [ubuntu-latest] # , macOS-latest, windows-latest
+ steps:
+ - uses: actions/checkout@v3
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v4
+ with:
+ python-version: ${{ matrix.python-version }}
+ - name: Install dependencies
+ run: |
+ make create_virtualenv
+ make install_requirements
+ make install_test_requirements
+ - name: Run unit tests
+ run: |
+ make run_unit_tests
+ - name: Install LocalStack
+ run: |
+ pip install localstack
+ - name: Install Serverless
+ run: |
+ npm install -g serverless
+ - name: Install Serverless plugins
+ run: |
+ npm i
+ - name: Deploy Serverless to LocalStack
+ run: |
+ make deploy_local
+ - name: Run integration tests
+ run: |
+ make run_integration_tests
+ - repo: https://github.com/psf/black
+ rev: 23.3.0
+ hooks:
+ - id: black
+ - repo: https://github.com/PyCQA/isort
+ rev: 5.12.0
+ hooks:
+ - id: isort
+ args: ["--profile", "black"]
+ - repo: https://github.com/PyCQA/flake8
+ rev: 6.0.0
+ hooks:
+ - id: flake8
+ - repo: https://github.com/pre-commit/mirrors-mypy
+ rev: v1.1.1
+ hooks:
+ - id: mypy
+ args: [--strict, --ignore-missing-imports]
+ additional_dependencies: ['types-requests']
pre-commit install
+ pre-commit run --all-files
+ @echo "Creating virtualenv..."
+ python3 -m venv "${VIRTUALENV_PATH}"
+ @echo "Done!"
+ @echo "Installing requirements..."
+ @echo "Done!"
+ @echo "Installing dev requirements..."
+ @echo "Done!"
+ @echo "Installing test requirements..."
+ @echo "Done!"
+install_all_requirements: install_requirements install_dev_requirements install_test_requirements
+ @echo "Running unit tests..."
+ @. ${VIRTUALENV_PATH}/bin/activate && python -m unittest discover -s functions -p '*_test.py'
+ @echo "Done!"
+ @echo "Running integration tests..."
+ @. ${VIRTUALENV_PATH}/bin/activate && python -m unittest discover -s integration_tests -p '*_test.py'
+ @echo "Done!"
+ localstack logs --follow
+ localstack stop || true
+ DISABLE_EVENTS=1 localstack start -d
+ sls deploy --stage local
+deploy_functions_local: deploy_generate_thumbnails_function_local deploy_retry_from_dlq_function_local
+ sls deploy function --stage local --function ${GENERATE_THUMBNAILS_FUNCTION_NAME}
+ sls deploy function --stage local --function ${RETRY_FROM_DLQ_FUNCTION_NAME}
+ ${AWS_CLI} logs tail /aws/lambda/${GENERATE_THUMBNAILS_FUNCTION_FULL_NAME} --follow
+ ${AWS_CLI} logs tail /aws/lambda/${RETRY_FROM_DLQ_FUNCTION_FULL_NAME} --follow
+ ${AWS_CLI} s3 cp ${TEST_IMAGES_FOLDER} s3://${IMAGES_BUCKET_NAME}/images/ --recursive
+ ${AWS_CLI} sqs receive-message --queue-url ${IMAGES_DLQ_URL} --max-number-of-messages 10 --output json
+ ${AWS_CLI} lambda invoke --function-name ${RETRY_FROM_DLQ_FUNCTION_FULL_NAME} --invocation-type Event response.json
+ rm response.json
-# serverless_s3_pipeline
# Serverless S3 Pipeline
+Yet another AWS service to generate thumbnails. This one is based on AWS Lambda, S3, and SQS.
+It can be deployed to AWS or [LocalStack](https://github.com/localstack/localstack) using the [Serverless Framework](https://www.serverless.com/).
+It also includes:
+ * Unit tests.
+ * Functional tests, which are executed against [LocalStack](https://github.com/localstack/localstack).
+ * [Pre-commit](https://pre-commit.com/) hooks: [Black](https://github.com/psf/black), [ISort](https://pycqa.github.io/isort/), [Flake8](https://flake8.pycqa.org/en/latest/), and [MyPy](https://mypy-lang.org/).
+ * A [Makefile](https://www.gnu.org/software/make/manual/make.html) with useful commands.
+## Design
+![Design](docs/diagram.png?raw=true "Design")
+1. An image is stored to the images bucket
+2. The image creation event is queued into an SQS queue
+3. The lambda function tries to generate the thumbnails
+ * **Success**: Thumbnails are saved to the thumbnails bucket
+ * **Error**:
+ 1. The message is retried as many times as configured in the queue
+ 2. If it continues to fail, the message is sent to a DLQ
+ 3. You can manually invoke another lambda function that dequeues from the DLQ and sends the messages back to the original queue to be retried
+## Structure
+The [serverless.yml](/serverless.yml) file contains the Serverless configuration to deploy the stack to either AWS or LocalStack.
+The lambda functions are located in the [functions](/functions) package. Each AWS Lambda handler function is on a separate file. Common code is in the same package.
+Unit tests are in the [functions/tests](/functions/tests) package.
+Integration tests are in the [integration_tests](/integration_tests) package.
+You can find useful commands in the [Makefile](/Makefile).
+Python requirements:
+ 1. The *requirements.txt* file contains the essential Python dependencies required by the application logic to run.
+ 2. The *requirements.dev.txt* file contains the Python dependencies you need to have installed in your environment to contribute to the application logic.
+ 3. The *requirements.test.txt* file contains the Python dependencies required to run tests.
+## Setup
+### Install the Serverless Framework
+npm install -g serverless
+### Install LocalStack:
+pip install localstack
+### Install Serverless Framework Plugins
+Go to the root directory of this repo and install the plugins:
+cd serverless_s3_pipeline
+npm i
+### Install and Configure the AWS CLI
+Follow [these instructions](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) to install the AWS CLI.
+To interact with LocalStack through the AWS CLI, you can create a profile with dummy region and access key.
+Add this to your `~/.aws/config` file:
+[profile localstack]
+region = us-east-1
+output = json
+And this to your `~/.aws/credentials` file:
+aws_access_key_id = dummyaccesskey
+aws_secret_access_key = dummysecretaccesskey
+## Deploy to LocalStack
+Start LocalStack:
+localstack start
+Deploy to LocalStack:
+serverless deploy --stage local
+You should get something like the following. Notice the endpoint URL:
+✔ Service deployed to stack thumbnails-service-local
+ generate_thumbnails: thumbnails-service-local-generate_thumbnails
+ retry_from_dlq: thumbnails-service-local-retry_from_dlq
+You can alternatively start localstack as a daemon and deploy with a single command:
+make deploy_local
+## Makefile commands
+### Install GIT Hooks
+make install_git_hooks
+### Run GIT Hooks
+make run_git_hooks
+### Create Python virtualenv
+make create_virtualenv
+### Install requirements into the virtualenv
+make install_requirements
+make install_dev_requirements
+make install_test_requirements
+# Or just
+make install_all_requirements
+### Run tests
+make run_unit_tests
+make run_integration_tests
+### Deploy stack to LocalStack
+Restarts LocalStack (if running) before deploying.
+make deploy_local
+### Deploy only the Lambda functions to LocalStack
+make deploy_functions_local
+### Show logs of the Lambda function that generates the thumbnails
+make tail_generate_thumbnails_function_logs
+### Show logs of the Lambda function that enqueues failed messages to retry
+make tail_retry_from_sqs_function_logs
+### Upload a few test images to the input bucket
+make upload_test_images_to_s3
+### Show up to 10 messages in the DLQ
+make show_messages_in_dlq
+### Invoke the Lambda function that enqueues failed messages to retry
+make retry_from_dql
+from typing import Any, Dict
+import boto3
+from functions.environment import EnvironmentVariable, get_environment_variable_or_raise
+class Service:
+ S3: str = "s3"
+ SQS: str = "sqs"
+def download_file_from_s3_bucket(
+ bucket_name: str, remote_path: str, local_path: str
+) -> None:
+ client = get_client(Service.S3)
+ client.download_file(bucket_name, remote_path, local_path)
+def upload_file_to_s3_bucket(
+ local_path: str, bucket_name: str, remote_path: str
+) -> None:
+ client = get_client(Service.S3)
+ client.upload_file(local_path, bucket_name, remote_path)
+def receive_messages_from_sqs_queue(
+ queue_url: str,
+ max_number_of_messages: int,
+ visibility_timeout: int,
+ wait_time_seconds: int,
+) -> Dict[str, Any]:
+ client = get_client(Service.SQS)
+ return dict(
+ client.receive_message(
+ QueueUrl=queue_url,
+ MaxNumberOfMessages=max_number_of_messages,
+ VisibilityTimeout=visibility_timeout,
+ WaitTimeSeconds=wait_time_seconds,
+ )
+ )
+def send_message_to_sqs_queue(queue_url: str, message_body: str) -> None:
+ client = get_client(Service.SQS)
+ client.send_message(QueueUrl=queue_url, MessageBody=message_body)
+def delete_message_from_sqs_queue(queue_url: str, receipt_handle: str) -> None:
+ client = get_client(Service.SQS)
+ client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
+def get_client(name: str) -> Any:
+ endpoint_url = get_environment_variable_or_raise(
+ EnvironmentVariable.AWS_ENDPOINT_URL
+ )
+ return boto3.client(name, endpoint_url=endpoint_url)
+from os import environ
+class EnvironmentVariable:
+def get_environment_variable_or_raise(name: str) -> str:
+ assert name in environ, f"{name} environment variable is not set"
+ return environ[name]
diff --git a/functions/generate_thumbnails.py b/functions/generate_thumbnails.py
+import json
+import logging
+from collections import namedtuple
+from os.path import basename, dirname, join, splitext
+from tempfile import TemporaryDirectory
+from typing import Any, Dict, List, Tuple
+from PIL import Image
+from functions.aws import download_file_from_s3_bucket, upload_file_to_s3_bucket
+from functions.environment import EnvironmentVariable, get_environment_variable_or_raise
+from functions.settings import (
+RemoteFile = namedtuple("RemoteFile", ["bucket_name", "file_path", "message_id"])
+def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
+ remote_files: List[RemoteFile] = get_files_from_event(event)
+ failed_files: List[RemoteFile] = process_files(remote_files)
+ return build_response(failed_files)
+def get_files_from_event(event: Dict[str, Any]) -> List[RemoteFile]:
+ return [
+ RemoteFile(
+ body_record["s3"]["bucket"]["name"],
+ body_record["s3"]["object"]["key"],
+ record["messageId"],
+ )
+ for record in event["Records"]
+ for body_record in json.loads(record["body"])["Records"]
+ ]
+def process_files(remote_files: List[RemoteFile]) -> List[RemoteFile]:
+ failed_files: List[RemoteFile] = []
+ for remote_file in remote_files:
+ try_to_process_file(remote_file, failed_files)
+ return failed_files
+def build_response(failed_files: List[RemoteFile]) -> Dict[str, List[Dict[str, str]]]:
+ return {
+ "batchItemFailures": [
+ {
+ "itemIdentifier": failed_file.message_id,
+ }
+ for failed_file in failed_files
+ ]
+ }
+def try_to_process_file(
+ remote_file: RemoteFile, failed_files: List[RemoteFile]
+) -> None:
+ try:
+ process_file(remote_file)
+ logging.info("Successfully processed %s", remote_file)
+ except Exception:
+ logging.error("Failed to process %s", remote_file)
+ failed_files.append(remote_file)
+def process_file(remote_file: RemoteFile) -> None:
+ with TemporaryDirectory() as folder_path:
+ local_file_path = download_file(remote_file, folder_path)
+ thumbnails_paths = generate_thumbnails(local_file_path)
+ upload_thumbnails(remote_file, thumbnails_paths)
+def download_file(remote_file: RemoteFile, folder_path: str) -> str:
+ filename = basename(remote_file.file_path)
+ local_file_path = join(folder_path, filename)
+ download_file_from_s3_bucket(
+ remote_file.bucket_name,
+ remote_file.file_path,
+ local_file_path,
+ )
+ return local_file_path
+def generate_thumbnails(local_file_path: str) -> List[str]:
+ image = Image.open(local_file_path)
+ return [
+ generate_thumbnail(
+ image,
+ size,
+ local_file_path,
+ )
+ for size in THUMBNAIL_SIZES
+ ]
+def generate_thumbnail(
+ image: Image,
+ size: Tuple[int, int],
+ local_file_path: str,
+) -> str:
+ file_name = basename(local_file_path)
+ local_folder_path = dirname(local_file_path)
+ thumbnail_image = image.copy()
+ thumbnail_image.thumbnail(size)
+ _, extension = splitext(file_name)
+ width, height = size
+ thumbnail_filename = f"{width}x{height}{extension}"
+ thumbnail_path = join(local_folder_path, thumbnail_filename)
+ thumbnail_image.save(thumbnail_path)
+ return thumbnail_path
+def upload_thumbnails(
+ remote_file: RemoteFile,
+ thumbnails_paths: List[str],
+) -> None:
+ for local_thumbnail_path in thumbnails_paths:
+ upload_thumbnail(remote_file, local_thumbnail_path)
+def upload_thumbnail(
+ remote_file: RemoteFile,
+ local_thumbnail_path: str,
+) -> None:
+ file_name = basename(remote_file.file_path)
+ remote_thumbnails_folder_name = template.format(file_name)
+ thumbnail_filename = basename(local_thumbnail_path)
+ remote_thumbnail_path = join(
+ remote_thumbnails_folder_name,
+ thumbnail_filename,
+ )
+ thumbnails_bucket_name = get_environment_variable_or_raise(
+ EnvironmentVariable.THUMBNAILS_BUCKET_NAME
+ )
+ upload_file_to_s3_bucket(
+ local_thumbnail_path,
+ thumbnails_bucket_name,
+ remote_thumbnail_path,
+ )
+import logging
+from typing import Any, Dict, List
+from functions.aws import (
+ delete_message_from_sqs_queue,
+ receive_messages_from_sqs_queue,
+ send_message_to_sqs_queue,
+from functions.environment import EnvironmentVariable, get_environment_variable_or_raise
+# TODO: What if I sent it back to the queue, but fails to delete from the DLQ?
+def handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
+ queue_url = get_environment_variable_or_raise(EnvironmentVariable.IMAGES_QUEUE_URL)
+ dlq_url = get_environment_variable_or_raise(EnvironmentVariable.IMAGES_DLQ_URL)
+ retried_messages_count = 0
+ messages = get_messages_from_dlq(dlq_url)
+ while messages:
+ process_messages(messages, queue_url, dlq_url)
+ retried_messages_count += len(messages)
+ messages = get_messages_from_dlq(dlq_url)
+ return {
+ "statusCode": 200,
+ "body": {
+ "retriedMessagesCount": retried_messages_count,
+ },
+ }
+def get_messages_from_dlq(dlq_url: str) -> List[Dict[str, Any]]:
+ response = receive_messages_from_sqs_queue(
+ dlq_url,
+ )
+ return response.get("Messages", [])
+def process_messages(
+ messages: List[Dict[str, Any]],
+ queue_url: str,
+ dlq_url: str,
+) -> None:
+ for message in messages:
+ process_message(message, queue_url, dlq_url)
+def process_message(
+ message: Dict[str, Any],
+ queue_url: str,
+ dlq_url: str,
+) -> None:
+ message_id = message["MessageId"]
+ logging.info("[%s] Sending message to queue", message_id)
+ send_message_to_sqs_queue(queue_url, message["Body"])
+ logging.info("[%s] Deleting message from DLQ", message_id)
+ delete_message_from_sqs_queue(dlq_url, message["ReceiptHandle"])
+ logging.info("[%s] Successfully processed message", message_id)
+ (75, 75),
+ (125, 125),
+ (1280, 720),
+# Template to generate the name of the folder that will contain
+# the thumbnails of the file in the thumbnails bucket
+ "devDependencies": {
+ "serverless-localstack": "^1.0.6",
+ "serverless-python-requirements": "^6.0.0"
+ }
diff --git a/requirements.test.txt b/requirements.test.txt
+service: thumbnails-service
+ - serverless-localstack
+ - serverless-python-requirements
+ localstack:
+ stages:
+ - local
+ debug: false
+ pythonRequirements:
+ dockerizePip: false
+ useDownloadCache: true
+ useStaticCache: true
+ slim: true
+ individually: true
+ patterns:
+ - "!test"
+ - "!functions/__pycache__"
+ - "!node_modules"
+ - "!venv"
+ - "!.mypy_cache"
+ - "!.idea"
+ - "!.vscode"
+ - "!.pre-commit-config.yaml"
+ - "!package-lock.json"
+ - "!tox.ini"
+ - "!Makefile"
+ name: aws
+ runtime: python3.8
+ region: us-east-1
+ environment:
+ IMAGES_BUCKET_NAME: images-bucket
+ THUMBNAILS_BUCKET_NAME: thumbnails-bucket
+ IMAGES_QUEUE_NAME: images-queue
+ IMAGES_QUEUE_URL: { Ref: ImagesQueue }
+ IMAGES_DLQ_NAME: images-dlq
+ IMAGES_DLQ_URL: { Ref: ImagesDLQ }
+ generate_thumbnails:
+ handler: functions/generate_thumbnails.handler
+ events:
+ - sqs:
+ arn: !GetAtt ImagesQueue.Arn
+ batchSize: 10
+ functionResponseType: ReportBatchItemFailures
+ retry_from_dlq:
+ handler: functions/retry_from_dlq.handler
+ timeout: 300
+ Resources:
+ ImagesBucket:
+ Type: AWS::S3::Bucket
+ Properties:
+ BucketName: ${self:provider.environment.IMAGES_BUCKET_NAME}
+ NotificationConfiguration:
+ QueueConfigurations:
+ - Event: 's3:ObjectCreated:Put'
+ Filter:
+ S3Key:
+ Rules:
+ - Name: suffix
+ Value: .png
+ Queue: !GetAtt ImagesQueue.Arn
+ - Event: 's3:ObjectCreated:Put'
+ Filter:
+ S3Key:
+ Rules:
+ - Name: suffix
+ Value: .jpg
+ Queue: !GetAtt ImagesQueue.Arn
+ ImagesQueue:
+ Type: "AWS::SQS::Queue"
+ Properties:
+ QueueName: ${self:provider.environment.IMAGES_QUEUE_NAME}
+ RedrivePolicy:
+ deadLetterTargetArn:
+ Fn::GetAtt: [ImagesDLQ, Arn]
+ maxReceiveCount: 2
+ ImagesDLQ:
+ Type: AWS::SQS::Queue
+ Properties:
+ QueueName: ${self:provider.environment.IMAGES_DLQ_NAME}
+ ThumbnailsBucket:
+ Type: AWS::S3::Bucket
+ Properties:
+ BucketName: ${self:provider.environment.THUMBNAILS_BUCKET_NAME}
+ ImagesQueuePolicy:
+ Type: "AWS::SQS::QueuePolicy"
+ Properties:
+ PolicyDocument:
+ Version: "2012-10-17"
+ Statement:
+ - Effect: "Allow"
+ Principal:
+ Service: "s3.amazonaws.com"
+ Action: "sqs:SendMessage"
+ Resource:
+ Ref: "ImagesQueue"
+ Condition:
+ ArnLike:
+ aws:SourceArn: !Sub "arn:aws:s3:::${self:provider.environment.IMAGES_BUCKET_NAME}/*"
+ Queues:
+ - !Ref ImagesQueue
+ GenerateThumbnailsLambdaExecutionRole:
+ Type: "AWS::IAM::Role"
+ Properties:
+ RoleName: "GenerateThumbnailsLambdaExecutionRole"
+ AssumeRolePolicyDocument:
+ Version: "2012-10-17"
+ Statement:
+ - Effect: "Allow"
+ Principal:
+ Service: "lambda.amazonaws.com"
+ Action: "sts:AssumeRole"
+ ManagedPolicyArns:
+ - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
+ Policies:
+ - PolicyName: "s3Access"
+ PolicyDocument:
+ Version: "2012-10-17"
+ Statement:
+ - Effect: "Allow"
+ Action:
+ - "s3:GetObject"
+ Resource:
+ - "arn:aws:s3:::${self:provider.environment.IMAGES_BUCKET_NAME}/*"
+ - PolicyName: "s3Access"
+ PolicyDocument:
+ Version: "2012-10-17"
+ Statement:
+ - Effect: "Allow"
+ Action:
+ - "s3:PutObject"
+ Resource:
+ - "arn:aws:s3:::${self:provider.environment.THUMBNAILS_BUCKET_NAME}/*"
max-line-length = 88