diff --git a/.doc_gen/metadata/controltower_metadata.yaml b/.doc_gen/metadata/controltower_metadata.yaml new file mode 100644 index 00000000000..3cf49427046 --- /dev/null +++ b/.doc_gen/metadata/controltower_metadata.yaml @@ -0,0 +1,176 @@ +controltower_Hello: + title: Hello &CTowerlong; + title_abbrev: Hello &CTower; + synopsis: get started using &CTower;. + category: Hello + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + excerpts: + - description: + snippet_tags: + - python.example_code.controltower.Hello + services: + controltower: {ListBaselines} + +controltower_ListBaselines: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + excerpts: + - description: + snippet_tags: + - python.example_code.controltower.ControlTowerWrapper.decl + - python.example_code.controltower.ListBaselines + services: + controltower: {ListBaselines} + +controltower_ListEnabledBaselines: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + excerpts: + - description: + snippet_tags: + - python.example_code.controltower.ControlTowerWrapper.decl + - python.example_code.controltower.ListEnabledBaselines + services: + controltower: {ListEnabledBaselines} + +controltower_EnableBaseline: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + excerpts: + - description: + snippet_tags: + - python.example_code.controltower.ControlTowerWrapper.decl + - python.example_code.controltower.EnableBaseline + services: + controltower: {EnableBaseline} + +controltower_ResetEnabledBaseline: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + excerpts: + - description: + snippet_tags: + - python.example_code.controltower.ControlTowerWrapper.decl + - python.example_code.controltower.ResetEnabledBaseline + services: + controltower: {ResetEnabledBaseline} + +controltower_DisableBaseline: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + excerpts: + - description: + snippet_tags: + - python.example_code.controltower.ControlTowerWrapper.decl + - python.example_code.controltower.DisableBaseline + services: + controltower: {DisableBaseline} + +controltower_ListEnabledControls: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + excerpts: + - description: + snippet_tags: + - python.example_code.controltower.ControlTowerWrapper.decl + - python.example_code.controltower.ListEnabledControls + services: + controltower: {ListEnabledControls} + +controltower_EnableControl: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + excerpts: + - description: + snippet_tags: + - python.example_code.controltower.ControlTowerWrapper.decl + - python.example_code.controltower.EnableControl + services: + controltower: {EnableControl} + +controltower_GetControlOperation: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + excerpts: + - description: + snippet_tags: + - python.example_code.controltower.ControlTowerWrapper.decl + - python.example_code.controltower.GetControlOperation + services: + controltower: {GetControlOperation} + +controltower_DisableControl: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + excerpts: + - description: + snippet_tags: + - python.example_code.controltower.ControlTowerWrapper.decl + - python.example_code.controltower.DisableControl + services: + controltower: {DisableControl} + +controltower_ListLandingZones: + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + excerpts: + - description: + snippet_tags: + - python.example_code.controltower.ControlTowerWrapper.decl + - python.example_code.controltower.ListLandingZones + services: + controltower: {ListLandingZones} + +controltower_Scenario: + synopsis_list: + - List landing zones. + - List, enable, get, reset, and disable baselines. + - List, enable, get, and disable controls. + category: Basics + languages: + Python: + versions: + - sdk_version: 3 + github: python/example_code/controltower + sdkguide: + excerpts: + - description: Run an interactive scenario demonstrating &CTowerlong; features. + snippet_tags: + - python.example_code.controltower.ControlTowerScenario + - python.example_code.controltower.ControlTowerWrapper.class + services: + controltower: {CreateLandingZone, DeleteLandingZone, ListBaselines, ListEnabledBaselines, EnableBaseline, ResetEnabledBaseline, DisableBaseline, EnableControl, GetControlOperation, DisableControl, GetLandingZoneOperation, ListLandingZones, ListEnabledControls} diff --git a/python/example_code/controltower/README.md b/python/example_code/controltower/README.md new file mode 100644 index 00000000000..0246c301ca9 --- /dev/null +++ b/python/example_code/controltower/README.md @@ -0,0 +1,134 @@ +# AWS Control Tower code examples for the SDK for Python + +## Overview + +Shows how to use the AWS SDK for Python (Boto3) to work with AWS Control Tower. + + + + +_AWS Control Tower enables you to enforce and manage governance rules for security, operations, and compliance at scale across all your organizations and accounts._ + +## ⚠ Important + +* Running this code might result in charges to your AWS account. For more details, see [AWS Pricing](https://aws.amazon.com/pricing/) and [Free Tier](https://aws.amazon.com/free/). +* Running the tests might result in charges to your AWS account. +* We recommend that you grant your code least privilege. At most, grant only the minimum permissions required to perform the task. For more information, see [Grant least privilege](https://docs.aws.amazon.com/IAM/latest/UserGuide/best-practices.html#grant-least-privilege). +* This code is not tested in every AWS Region. For more information, see [AWS Regional Services](https://aws.amazon.com/about-aws/global-infrastructure/regional-product-services). + + + + +## Code examples + +### Prerequisites + +For prerequisites, see the [README](../../README.md#Prerequisites) in the `python` folder. + +Install the packages required by these examples by running the following in a virtual environment: + +``` +python -m pip install -r requirements.txt +``` + + +Before running the example, set up a landing zone in order to run the baseline and control management sections. +Follow the instructions provided by the [quick start](https://docs.aws.amazon.com/controltower/latest/userguide/quick-start.html) guide. + + +### Get started + +- [Hello AWS Control Tower](hello/hello_controltower.py#L4) (`ListBaselines`) + + +### Basics + +Code examples that show you how to perform the essential operations within a service. + +- [Learn the basics](scenario_controltower.py) + + +### Single actions + +Code excerpts that show you how to call individual service functions. + +- [DisableBaseline](controltower_wrapper.py#L365) +- [DisableControl](controltower_wrapper.py#L240) +- [EnableBaseline](controltower_wrapper.py#L64) +- [EnableControl](controltower_wrapper.py#L143) +- [GetControlOperation](controltower_wrapper.py#L186) +- [ListBaselines](controltower_wrapper.py#L36) +- [ListEnabledBaselines](controltower_wrapper.py#L305) +- [ListEnabledControls](controltower_wrapper.py#L401) +- [ListLandingZones](controltower_wrapper.py#L278) +- [ResetEnabledBaseline](controltower_wrapper.py#L332) + + + + + +## Run the examples + +### Instructions + + + + + +#### Hello AWS Control Tower + +This example shows you how to get started using AWS Control Tower. + +``` +python hello/hello_controltower.py +``` + +#### Learn the basics + +This example shows you how to do the following: + +- List landing zones. +- List, enable, get, reset, and disable baselines. +- List, enable, get, and disable controls. + + + + +Start the example by running the following at a command prompt: + +``` +python scenario_controltower.py +``` + + + + + + +### Tests + +⚠ Running tests might result in charges to your AWS account. + + +To find instructions for running these tests, see the [README](../../README.md#Tests) +in the `python` folder. + + + + + + +## Additional resources + +- [AWS Control Tower User Guide](https://docs.aws.amazon.com/controltower/latest/userguide/what-is-control-tower.html) +- [AWS Control Tower API Reference](https://docs.aws.amazon.com/controltower/latest/APIReference/Welcome.html) +- [SDK for Python AWS Control Tower reference](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html) + + + + +--- + +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 diff --git a/python/example_code/controltower/controltower_wrapper.py b/python/example_code/controltower/controltower_wrapper.py new file mode 100644 index 00000000000..8034b5ebb14 --- /dev/null +++ b/python/example_code/controltower/controltower_wrapper.py @@ -0,0 +1,453 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import logging +import boto3 +import time + +from botocore.exceptions import ClientError + +logger = logging.getLogger(__name__) + + +# snippet-start:[python.example_code.controltower.ControlTowerWrapper.class] +# snippet-start:[python.example_code.controltower.ControlTowerWrapper.decl] + + +class ControlTowerWrapper: + """Encapsulates AWS Control Tower and Control Catalog functionality.""" + + def __init__(self, controltower_client, controlcatalog_client): + """ + :param controltower_client: A Boto3 Amazon ControlTower client. + :param controlcatalog_client: A Boto3 Amazon ControlCatalog client. + """ + self.controltower_client = controltower_client + self.controlcatalog_client = controlcatalog_client + + @classmethod + def from_client(cls): + controltower_client = boto3.client("controltower") + controlcatalog_client = boto3.client("controlcatalog") + return cls(controltower_client, controlcatalog_client) + + # snippet-end:[python.example_code.controltower.ControlTowerWrapper.decl] + + # snippet-start:[python.example_code.controltower.ListBaselines] + def list_baselines(self): + """ + Lists all baselines. + + :return: List of baselines. + :raises ClientError: If the listing operation fails. + """ + try: + paginator = self.controltower_client.get_paginator("list_baselines") + baselines = [] + for page in paginator.paginate(): + baselines.extend(page["baselines"]) + return baselines + + except ClientError as err: + if err.response["Error"]["Code"] == "AccessDeniedException": + logger.error( + "Access denied. Please ensure you have the necessary permissions." + ) + else: + logger.error( + "Couldn't list baselines. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.ListBaselines] + + # snippet-start:[python.example_code.controltower.EnableBaseline] + def enable_baseline( + self, + target_identifier, + identity_center_baseline, + baseline_identifier, + baseline_version, + ): + """ + Enables a baseline for the specified target if it's not already enabled. + + :param target_identifier: The ARN of the target. + :param baseline_identifier: The identifier of baseline to enable. + :param identity_center_baseline: The identifier of identity center baseline if it is enabled. + :param baseline_version: The version of baseline to enable. + :return: The enabled baseline ARN or None if already enabled. + :raises ClientError: If enabling the baseline fails for reasons other than it being already enabled. + """ + try: + response = self.controltower_client.enable_baseline( + baselineIdentifier=baseline_identifier, + baselineVersion=baseline_version, + targetIdentifier=target_identifier, + parameters=[ + { + "key": "IdentityCenterEnabledBaselineArn", + "value": identity_center_baseline, + } + ], + ) + + operation_id = response["operationIdentifier"] + while True: + status = self.get_baseline_operation(operation_id) + print(f"Baseline operation status: {status}") + if status in ["SUCCEEDED", "FAILED"]: + break + time.sleep(30) + + return response["arn"] + except ClientError as err: + if err.response["Error"]["Code"] == "ValidationException": + if "already enabled" in err.response["Error"]["Message"]: + print("Baseline is already enabled for this target") + return None + else: + print( + "Unable to enable baseline due to validation exception: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + logger.error( + "Couldn't enable baseline. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.EnableBaseline] + + # snippet-start:[python.example_code.controltower.ListControls] + def list_controls(self): + """ + Lists all controls in the Control Tower control catalog. + + :return: List of controls. + :raises ClientError: If the listing operation fails. + """ + try: + paginator = self.controlcatalog_client.get_paginator("list_controls") + controls = [] + for page in paginator.paginate(): + controls.extend(page["Controls"]) + return controls + + except ClientError as err: + if err.response["Error"]["Code"] == "AccessDeniedException": + logger.error( + "Access denied. Please ensure you have the necessary permissions." + ) + else: + logger.error( + "Couldn't list controls. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.ListControls] + + # snippet-start:[python.example_code.controltower.EnableControl] + def enable_control(self, control_arn, target_identifier): + """ + Enables a control for a specified target. + + :param control_arn: The ARN of the control to enable. + :param target_identifier: The identifier of the target (e.g., OU ARN). + :return: The operation ID. + :raises ClientError: If enabling the control fails. + """ + try: + print(control_arn) + print(target_identifier) + response = self.controltower_client.enable_control( + controlIdentifier=control_arn, targetIdentifier=target_identifier + ) + + operation_id = response["operationIdentifier"] + while True: + status = self.get_control_operation(operation_id) + print(f"Control operation status: {status}") + if status in ["SUCCEEDED", "FAILED"]: + break + time.sleep(30) + + return operation_id + + except ClientError as err: + if ( + err.response["Error"]["Code"] == "ValidationException" + and "already enabled" in err.response["Error"]["Message"] + ): + logger.info("Control is already enabled for this target") + return None + logger.error( + "Couldn't enable control. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.EnableControl] + + # snippet-start:[python.example_code.controltower.GetControlOperation] + def get_control_operation(self, operation_id): + """ + Gets the status of a control operation. + + :param operation_id: The ID of the control operation. + :return: The operation status. + :raises ClientError: If getting the operation status fails. + """ + try: + response = self.controltower_client.get_control_operation( + operationIdentifier=operation_id + ) + return response["controlOperation"]["status"] + except ClientError as err: + if err.response["Error"]["Code"] == "ResourceNotFoundException": + logger.error("Operation not found.") + else: + logger.error( + "Couldn't get control operation status. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.GetControlOperation] + + # snippet-start:[python.example_code.controltower.GetBaselineOperation] + def get_baseline_operation(self, operation_id): + """ + Gets the status of a baseline operation. + + :param operation_id: The ID of the baseline operation. + :return: The operation status. + :raises ClientError: If getting the operation status fails. + """ + try: + response = self.controltower_client.get_baseline_operation( + operationIdentifier=operation_id + ) + return response["baselineOperation"]["status"] + except ClientError as err: + if err.response["Error"]["Code"] == "ResourceNotFoundException": + logger.error("Operation not found.") + else: + logger.error( + "Couldn't get baseline operation status. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.GetBaselineOperation] + + # snippet-start:[python.example_code.controltower.DisableControl] + def disable_control(self, control_arn, target_identifier): + """ + Disables a control for a specified target. + + :param control_arn: The ARN of the control to disable. + :param target_identifier: The identifier of the target (e.g., OU ARN). + :return: The operation ID. + :raises ClientError: If disabling the control fails. + """ + try: + response = self.controltower_client.disable_control( + controlIdentifier=control_arn, targetIdentifier=target_identifier + ) + + operation_id = response["operationIdentifier"] + while True: + status = self.get_control_operation(operation_id) + print(f"Control operation status: {status}") + if status in ["SUCCEEDED", "FAILED"]: + break + time.sleep(30) + + return operation_id + except ClientError as err: + if err.response["Error"]["Code"] == "ResourceNotFoundException": + logger.error("Control not found.") + else: + logger.error( + "Couldn't disable control. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.DisableControl] + + # snippet-start:[python.example_code.controltower.ListLandingZones] + def list_landing_zones(self): + """ + Lists all landing zones. + + :return: List of landing zones. + :raises ClientError: If the listing operation fails. + """ + try: + paginator = self.controltower_client.get_paginator("list_landing_zones") + landing_zones = [] + for page in paginator.paginate(): + landing_zones.extend(page["landingZones"]) + return landing_zones + + except ClientError as err: + if err.response["Error"]["Code"] == "AccessDeniedException": + logger.error( + "Access denied. Please ensure you have the necessary permissions." + ) + else: + logger.error( + "Couldn't list landing zones. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.ListLandingZones] + + # snippet-start:[python.example_code.controltower.ListEnabledBaselines] + def list_enabled_baselines(self): + """ + Lists all enabled baselines. + + :return: List of enabled baselines. + :raises ClientError: If the listing operation fails. + """ + try: + paginator = self.controltower_client.get_paginator("list_enabled_baselines") + enabled_baselines = [] + for page in paginator.paginate(): + enabled_baselines.extend(page["enabledBaselines"]) + return enabled_baselines + + except ClientError as err: + if err.response["Error"]["Code"] == "ResourceNotFoundException": + logger.error("Target not found.") + else: + logger.error( + "Couldn't list enabled baselines. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.ListEnabledBaselines] + + # snippet-start:[python.example_code.controltower.ResetEnabledBaseline] + def reset_enabled_baseline(self, enabled_baseline_identifier): + """ + Resets an enabled baseline for a specific target. + + :param enabled_baseline_identifier: The identifier of the enabled baseline to reset. + :return: The operation ID. + :raises ClientError: If resetting the baseline fails. + """ + try: + response = self.controltower_client.reset_enabled_baseline( + enabledBaselineIdentifier=enabled_baseline_identifier + ) + operation_id = response["operationIdentifier"] + while True: + status = self.get_baseline_operation(operation_id) + print(f"Baseline operation status: {status}") + if status in ["SUCCEEDED", "FAILED"]: + break + time.sleep(30) + return operation_id + except ClientError as err: + if err.response["Error"]["Code"] == "ResourceNotFoundException": + logger.error("Target not found.") + else: + logger.error( + "Couldn't reset enabled baseline. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.ResetEnabledBaseline] + + # snippet-start:[python.example_code.controltower.DisableBaseline] + def disable_baseline(self, enabled_baseline_identifier): + """ + Disables a baseline for a specific target and waits for the operation to complete. + + :param enabled_baseline_identifier: The identifier of the baseline to disable. + :return: The operation ID. + :raises ClientError: If disabling the baseline fails. + """ + try: + response = self.controltower_client.disable_baseline( + enabledBaselineIdentifier=enabled_baseline_identifier + ) + + operation_id = response["operationIdentifier"] + while True: + status = self.get_baseline_operation(operation_id) + print(f"Baseline operation status: {status}") + if status in ["SUCCEEDED", "FAILED"]: + break + time.sleep(30) + + return response["operationIdentifier"] + except ClientError as err: + if err.response["Error"]["Code"] == "ConflictException": + print( + f"Conflict disabling baseline: {err.response['Error']['Message']}. Skipping disable step." + ) + return None + else: + logger.error( + "Couldn't disable baseline. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.DisableBaseline] + + # snippet-start:[python.example_code.controltower.ListEnabledControls] + def list_enabled_controls(self, target_identifier): + """ + Lists all enabled controls for a specific target. + + :param target_identifier: The identifier of the target (e.g., OU ARN). + :return: List of enabled controls. + :raises ClientError: If the listing operation fails. + """ + try: + paginator = self.controltower_client.get_paginator("list_enabled_controls") + enabled_controls = [] + for page in paginator.paginate(targetIdentifier=target_identifier): + enabled_controls.extend(page["enabledControls"]) + return enabled_controls + + except ClientError as err: + if err.response["Error"]["Code"] == "AccessDeniedException": + logger.error( + "Access denied. Please ensure you have the necessary permissions." + ) + else: + logger.error( + "Couldn't list enabled controls. Here's why: %s: %s", + err.response["Error"]["Code"], + err.response["Error"]["Message"], + ) + raise + + # snippet-end:[python.example_code.controltower.ListEnabledControls] + + +# snippet-end:[python.example_code.controltower.ControlTowerWrapper.class] diff --git a/python/example_code/controltower/hello/hello_controltower.py b/python/example_code/controltower/hello/hello_controltower.py new file mode 100644 index 00000000000..6a4d0dc3c0e --- /dev/null +++ b/python/example_code/controltower/hello/hello_controltower.py @@ -0,0 +1,40 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +# snippet-start:[python.example_code.controltower.Hello] +import boto3 + + +def hello_controltower(controltower_client): + """ + Use the AWS SDK for Python (Boto3) to create an AWS Control Tower client + and list all available baselines. + This example uses the default settings specified in your shared credentials + and config files. + + :param controltower_client: A Boto3 AWS Control Tower Client object. This object wraps + the low-level AWS Control Tower service API. + """ + print("Hello, AWS Control Tower! Let's list available baselines:\n") + paginator = controltower_client.get_paginator("list_baselines") + page_iterator = paginator.paginate() + + baseline_names: [str] = [] + try: + for page in page_iterator: + for baseline in page["baselines"]: + baseline_names.append(baseline["name"]) + + print(f"{len(baseline_names)} baseline(s) retrieved.") + for baseline_name in baseline_names: + print(f"\t{baseline_name}") + + except controltower_client.exceptions.AccessDeniedException: + print("Access denied. Please ensure you have the necessary permissions.") + except Exception as e: + print(f"An error occurred: {str(e)}") + + +if __name__ == "__main__": + hello_controltower(boto3.client("controltower")) +# snippet-end:[python.example_code.controltower.Hello] diff --git a/python/example_code/controltower/requirements.txt b/python/example_code/controltower/requirements.txt new file mode 100644 index 00000000000..e74f0c584b9 --- /dev/null +++ b/python/example_code/controltower/requirements.txt @@ -0,0 +1,4 @@ +boto3>=1.26.79 +pytest>=7.2.1 +qrcode>=7.4.2 +pycognito>=2022.12.0 diff --git a/python/example_code/controltower/scenario_controltower.py b/python/example_code/controltower/scenario_controltower.py new file mode 100644 index 00000000000..83804892c52 --- /dev/null +++ b/python/example_code/controltower/scenario_controltower.py @@ -0,0 +1,315 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + + +import logging +import sys +import time + +import boto3 +from botocore.exceptions import ClientError + +from controltower_wrapper import ControlTowerWrapper + +# Add relative path to include demo_tools in this code example without need for setup. +sys.path.append("../..") +import demo_tools.question as q # noqa + +logger = logging.getLogger(__name__) + + +# snippet-start:[python.example_code.controltower.ControlTowerScenario] +class ControlTowerScenario: + stack_name = "" + + def __init__(self, controltower_wrapper, org_client): + """ + :param controltower_wrapper: An instance of the ControlTowerWrapper class. + :param org_client: A Boto3 Organization client. + """ + self.controltower_wrapper = controltower_wrapper + self.org_client = org_client + self.stack = None + self.ou_id = None + self.ou_arn = None + self.account_id = None + self.landing_zone_id = None + self.use_landing_zone = False + + def run_scenario(self): + print("-" * 88) + print( + "\tWelcome to the AWS Control Tower with ControlCatalog example scenario." + ) + print("-" * 88) + + print( + "This demo will walk you through working with AWS Control Tower for landing zones," + ) + print("managing baselines, and working with controls.") + + self.account_id = boto3.client("sts").get_caller_identity()["Account"] + + print( + "Some demo operations require the use of a landing zone. " + "\nYou can use an existing landing zone or opt out of these operations in the demo." + "\nFor instructions on how to set up a landing zone, " + "\nsee https://docs.aws.amazon.com/controltower/latest/userguide/getting-started-from-console.html" + ) + # List available landing zones + landing_zones = self.controltower_wrapper.list_landing_zones() + if landing_zones: + print("\nAvailable Landing Zones:") + for i, lz in enumerate(landing_zones, 1): + print(f"{i} {lz['arn']})") + + # Ask if user wants to use the first landing zone in the list + if q.ask( + f"Do you want to use the first landing zone in the list ({landing_zones[0]['arn']})? (y/n) ", + q.is_yesno, + ): + self.use_landing_zone = True + self.landing_zone_id = landing_zones[0]["arn"] + print(f"Using landing zone ID: {self.landing_zone_id})") + # Set up organization and get Sandbox OU ID. + sandbox_ou_id = self.setup_organization() + # Store the OU ID for use in the CloudFormation template. + self.ou_id = sandbox_ou_id + elif q.ask( + f"Do you want to use a different existing Landing Zone for this demo? (y/n) ", + q.is_yesno, + ): + self.use_landing_zone = True + self.landing_zone_id = q.ask("Enter landing zone id: ", q.non_empty) + # Set up organization and get Sandbox OU ID. + sandbox_ou_id = self.setup_organization() + # Store the OU ID for use in the CloudFormation template. + self.ou_id = sandbox_ou_id + + # List and Enable Baseline. + print("\nManaging Baselines:") + control_tower_baseline = None + identity_center_baseline = None + baselines = self.controltower_wrapper.list_baselines() + print("\nListing available Baselines:") + for baseline in baselines: + if baseline["name"] == "AWSControlTowerBaseline": + control_tower_baseline = baseline + print(f"{baseline['name']}") + + if self.use_landing_zone: + print("\nListing enabled baselines:") + enabled_baselines = self.controltower_wrapper.list_enabled_baselines() + for baseline in enabled_baselines: + # If the Identity Center baseline is enabled, the identifier must be used for other baselines. + if "baseline/LN25R72TTG6IGPTQ" in baseline["baselineIdentifier"]: + identity_center_baseline = baseline + print(f"{baseline['baselineIdentifier']}") + + if q.ask( + f"Do you want to enable the Control Tower Baseline? (y/n) ", + q.is_yesno, + ): + print("\nEnabling Control Tower Baseline.") + ic_baseline_arn = ( + identity_center_baseline["arn"] + if identity_center_baseline + else None + ) + baseline_arn = self.controltower_wrapper.enable_baseline( + self.ou_arn, ic_baseline_arn, control_tower_baseline["arn"], "4.0" + ) + if baseline_arn: + print(f"Enabled baseline ARN: {baseline_arn}") + else: + # Find the enabled baseline so we can reset it. + for enabled_baseline in enabled_baselines: + if ( + enabled_baseline["baselineIdentifier"] + == control_tower_baseline["arn"] + ): + baseline_arn = enabled_baseline["arn"] + print("No change, the selected baseline was already enabled.") + + if q.ask( + f"Do you want to reset the Control Tower Baseline? (y/n) ", + q.is_yesno, + ): + print(f"\nResetting Control Tower Baseline. {baseline_arn}") + operation_id = self.controltower_wrapper.reset_enabled_baseline( + baseline_arn + ) + print(f"\nReset baseline operation id {operation_id}.") + + if baseline_arn and q.ask( + f"Do you want to disable the Control Tower Baseline? (y/n) ", + q.is_yesno, + ): + print(f"Disabling baseline ARN: {baseline_arn}") + operation_id = self.controltower_wrapper.disable_baseline( + baseline_arn + ) + print(f"\nDisabled baseline operation id {operation_id}.") + + # List and Enable Controls. + print("\nManaging Controls:") + controls = self.controltower_wrapper.list_controls() + print("\nListing first 5 available Controls:") + for i, control in enumerate(controls[:5], 1): + print(f"{i}. {control['Name']} - {control['Arn']}") + + if self.use_landing_zone: + target_ou = self.ou_arn + enabled_controls = self.controltower_wrapper.list_enabled_controls( + target_ou + ) + print("\nListing enabled controls:") + for i, control in enumerate(enabled_controls, 1): + print(f"{i}. {control['controlIdentifier']}") + + # Enable first non-enabled control as an example. + enabled_control_arns = [control["arn"] for control in enabled_controls] + control_arn = next( + control["Arn"] + for control in controls + if control["Arn"] not in enabled_control_arns + ) + + if control_arn and q.ask( + f"Do you want to enable the control {control_arn}? (y/n) ", + q.is_yesno, + ): + print(f"\nEnabling control: {control_arn}") + operation_id = self.controltower_wrapper.enable_control( + control_arn, target_ou + ) + + if operation_id: + print(f"Enabled control with operation id {operation_id}") + else: + print("Control is already enabled for this target") + + if q.ask( + f"Do you want to disable the control? (y/n) ", + q.is_yesno, + ): + print("\nDisabling the control...") + operation_id = self.controltower_wrapper.disable_control( + control_arn, target_ou + ) + print(f"Disable operation ID: {operation_id}") + + print("\nThis concludes the example scenario.") + + print("Thanks for watching!") + print("-" * 88) + + def setup_organization(self): + """ + Checks if the current account is part of an organization and creates one if needed. + Also ensures a Sandbox OU exists and returns its ID. + + :return: The ID of the Sandbox OU + """ + print("\nChecking organization status...") + + try: + # Check if account is part of an organization + org_response = self.org_client.describe_organization() + org_id = org_response["Organization"]["Id"] + print(f"Account is part of organization: {org_id}") + + except ClientError as error: + if error.response["Error"]["Code"] == "AWSOrganizationsNotInUseException": + print("No organization found. Creating a new organization...") + try: + create_response = self.org_client.create_organization( + FeatureSet="ALL" + ) + org_id = create_response["Organization"]["Id"] + print(f"Created new organization: {org_id}") + + # Wait for organization to be available. + waiter = self.org_client.get_waiter("organization_active") + waiter.wait( + Organization=org_id, + WaiterConfig={"Delay": 5, "MaxAttempts": 12}, + ) + + except ClientError as create_error: + logger.error( + "Couldn't create organization. Here's why: %s: %s", + create_error.response["Error"]["Code"], + create_error.response["Error"]["Message"], + ) + raise + else: + logger.error( + "Couldn't describe organization. Here's why: %s: %s", + error.response["Error"]["Code"], + error.response["Error"]["Message"], + ) + raise + + # Look for Sandbox OU. + sandbox_ou_id = None + paginator = self.org_client.get_paginator( + "list_organizational_units_for_parent" + ) + + try: + # Get root ID first. + roots = self.org_client.list_roots()["Roots"] + if not roots: + raise ValueError("No root found in organization") + root_id = roots[0]["Id"] + + # Search for existing Sandbox OU. + print("Checking for Sandbox OU...") + for page in paginator.paginate(ParentId=root_id): + for ou in page["OrganizationalUnits"]: + if ou["Name"] == "Sandbox": + sandbox_ou_id = ou["Id"] + self.ou_arn = ou["Arn"] + print(f"Found existing Sandbox OU: {sandbox_ou_id}") + break + if sandbox_ou_id: + break + + # Create Sandbox OU if it doesn't exist. + if not sandbox_ou_id: + print("Creating Sandbox OU...") + create_ou_response = self.org_client.create_organizational_unit( + ParentId=root_id, Name="Sandbox" + ) + sandbox_ou_id = create_ou_response["OrganizationalUnit"]["Id"] + print(f"Created new Sandbox OU: {sandbox_ou_id}") + + # Wait for OU to be available. + waiter = self.org_client.get_waiter("organizational_unit_active") + waiter.wait( + OrganizationalUnitId=sandbox_ou_id, + WaiterConfig={"Delay": 5, "MaxAttempts": 12}, + ) + + except ClientError as error: + logger.error( + "Couldn't set up Sandbox OU. Here's why: %s: %s", + error.response["Error"]["Code"], + error.response["Error"]["Message"], + ) + raise + + return sandbox_ou_id + + +if __name__ == "__main__": + try: + org = boto3.client("organizations") + control_tower_wrapper = ControlTowerWrapper.from_client() + + scenario = ControlTowerScenario(control_tower_wrapper, org) + scenario.run_scenario() + except Exception: + logging.exception("Something went wrong with the scenario.") +# snippet-end:[python.example_code.controltower.ControlTowerScenario] diff --git a/python/example_code/controltower/test/conftest.py b/python/example_code/controltower/test/conftest.py new file mode 100644 index 00000000000..7307c5c8313 --- /dev/null +++ b/python/example_code/controltower/test/conftest.py @@ -0,0 +1,73 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Contains common test fixtures used to run unit tests. +""" + +import sys +import os +import boto3 +import pytest + +script_dir = os.path.dirname(os.path.abspath(__file__)) + +# Add relative path to include ControlTowerWrapper. +sys.path.append(script_dir) +sys.path.append(os.path.dirname(script_dir)) +import scenario_controltower +from controltower_wrapper import ControlTowerWrapper + +# Add relative path to include demo_tools in this code example without need for setup. +sys.path.append(os.path.join(script_dir, "../..")) + +from test_tools.fixtures.common import * + + +class ScenarioData: + def __init__( + self, + controltower_client, + controlcatalog_client, + organizations_client, + controltower_stubber, + controlcatalog_stubber, + organizations_stubber, + ): + self.controltower_client = controltower_client + self.controlcatalog_client = controlcatalog_client + self.organizations_client = organizations_client + self.controltower_stubber = controltower_stubber + self.controlcatalog_stubber = controlcatalog_stubber + self.organizations_stubber = organizations_stubber + self.scenario = scenario_controltower.ControlTowerScenario( + controltower_wrapper=ControlTowerWrapper( + self.controltower_client, self.controlcatalog_client + ), + org_client=self.organizations_client, + ) + + +@pytest.fixture +def scenario_data(make_stubber): + controltower_client = boto3.client("controltower") + controlcatalog_client = boto3.client("controlcatalog") + organizations_client = boto3.client("organizations") + + controltower_stubber = make_stubber(controltower_client) + controlcatalog_stubber = make_stubber(controlcatalog_client) + organizations_stubber = make_stubber(organizations_client) + + return ScenarioData( + controltower_client, + controlcatalog_client, + organizations_client, + controltower_stubber, + controlcatalog_stubber, + organizations_stubber, + ) + + +@pytest.fixture +def mock_wait(monkeypatch): + return diff --git a/python/example_code/controltower/test/test_scenario_run.py b/python/example_code/controltower/test/test_scenario_run.py new file mode 100644 index 00000000000..5b54e68809d --- /dev/null +++ b/python/example_code/controltower/test/test_scenario_run.py @@ -0,0 +1,266 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Unit tests for the run_scenario method in scenario_controltower.py. +""" + +import pytest +from botocore.exceptions import ClientError +import datetime +import boto3 + +from example_code.controltower.controltower_wrapper import ControlTowerWrapper +from example_code.controltower.scenario_controltower import ControlTowerScenario + + +class MockManager: + def __init__(self, stub_runner, scenario_data, input_mocker): + self.scenario_data = scenario_data + self.account_id = "123456789012" + self.org_id = "o-exampleorgid" + self.root_id = "r-examplerootid" + self.sandbox_ou_id = "ou-exampleouid123456" + self.sandbox_ou_arn = ( + "arn:aws:organizations::123456789012:ou/o-exampleorgid/ou-exampleouid" + ) + self.landing_zone_arn = ( + "arn:aws:controltower:us-east-1:123456789012:landingzone/lz-example" + ) + self.operation_id = "op-1234567890abcdef01234567890abcdef" + self.baseline_operation_id = "op-1234567890abcdef01234567890abcdef" + self.stack_id = ( + "arn:aws:cloudformation:us-east-1:123456789012:stack/test-stack/abcdef" + ) + self.baseline_arn = "arn:aws:controltower:us-east-1:123456789012:baseline/AWSControlTowerBaseline" + self.enabled_baseline_arn = "arn:aws:controltower:us-east-1:123456789012:baseline/AWSControlTowerBaseline/isenabled" + self.control_arn = ( + "arn:aws:controlcatalog:us-east-1:123456789012:control/aws-control-1234" + ) + self.control_arn_enabled = ( + "arn:aws:controlcatalog:us-east-1:123456789012:control/aws-control-5678" + ) + + self.landing_zones = [{"arn": self.landing_zone_arn}] + + self.baselines = [{"name": "AWSControlTowerBaseline", "arn": self.baseline_arn}] + + self.enabled_baselines = [ + { + "targetIdentifier": self.sandbox_ou_arn, + "baselineIdentifier": self.enabled_baseline_arn, + "arn": self.baseline_arn, + "statusSummary": { + "status": "SUCCEEDED", + "lastOperationIdentifier": self.baseline_operation_id, + }, + } + ] + + self.controls = [ + { + "Arn": self.control_arn, + "Name": "TestControl1", + "Description": "Test control description", + } + ] + + self.enabled_controls = [ + { + "arn": self.control_arn_enabled, + "controlIdentifier": self.control_arn_enabled, + "statusSummary": { + "status": "SUCCEEDED", + "lastOperationIdentifier": self.baseline_operation_id, + }, + "targetIdentifier": self.sandbox_ou_id, + } + ] + + self.stub_runner = stub_runner + self.input_mocker = input_mocker + + def setup_stubs(self, error, stop_on, monkeypatch): + """Setup stubs for the scenario""" + # Mock user inputs + answers = [ + "y", # Use first landing zone in the list. + "y", # Enable baseline. + "y", # Reset baseline. + "y", # Disable baseline. + "y", # Enable control. + "y", # Disable control. + ] + self.input_mocker.mock_answers(answers) + + # Mock STS get_caller_identity + def mock_get_caller_identity(): + return {"Account": self.account_id} + + monkeypatch.setattr( + boto3.client("sts"), "get_caller_identity", mock_get_caller_identity + ) + + with self.stub_runner(error, stop_on) as runner: + # List landing zones + runner.add( + self.scenario_data.controltower_stubber.stub_list_landing_zones, + self.landing_zones, + ) + + # Organization setup + runner.add( + self.scenario_data.organizations_stubber.stub_describe_organization, + self.org_id, + ) + runner.add( + self.scenario_data.organizations_stubber.stub_list_roots, + [{"Id": self.root_id, "Name": "Root"}], + ) + runner.add( + self.scenario_data.organizations_stubber.stub_list_organizational_units_for_parent, + self.root_id, + [ + { + "Id": self.sandbox_ou_id, + "Name": "Sandbox", + "Arn": self.sandbox_ou_arn, + } + ], + ) + + # List and enable baselines + runner.add( + self.scenario_data.controltower_stubber.stub_list_baselines, + self.baselines, + ) + runner.add( + self.scenario_data.controltower_stubber.stub_list_enabled_baselines, + self.enabled_baselines, + ) + runner.add( + self.scenario_data.controltower_stubber.stub_enable_baseline, + self.baseline_arn, + "4.0", + self.sandbox_ou_arn, + self.enabled_baseline_arn, + self.baseline_operation_id, + ) + runner.add( + self.scenario_data.controltower_stubber.stub_get_baseline_operation, + self.baseline_operation_id, + "SUCCEEDED", + ) + runner.add( + self.scenario_data.controltower_stubber.stub_reset_enabled_baseline, + self.enabled_baseline_arn, + self.baseline_operation_id, + ) + runner.add( + self.scenario_data.controltower_stubber.stub_get_baseline_operation, + self.baseline_operation_id, + "SUCCEEDED", + ) + runner.add( + self.scenario_data.controltower_stubber.stub_disable_baseline, + self.enabled_baseline_arn, + self.baseline_operation_id, + ) + runner.add( + self.scenario_data.controltower_stubber.stub_get_baseline_operation, + self.baseline_operation_id, + "SUCCEEDED", + ) + + # List and enable controls + runner.add( + self.scenario_data.controlcatalog_stubber.stub_list_controls, + self.controls, + ) + runner.add( + self.scenario_data.controltower_stubber.stub_list_enabled_controls, + self.sandbox_ou_arn, + self.enabled_controls, + ) + runner.add( + self.scenario_data.controltower_stubber.stub_enable_control, + self.control_arn, + self.sandbox_ou_arn, + self.operation_id, + ) + runner.add( + self.scenario_data.controltower_stubber.stub_get_control_operation, + self.operation_id, + "SUCCEEDED", + ) + runner.add( + self.scenario_data.controltower_stubber.stub_disable_control, + self.control_arn, + self.sandbox_ou_arn, + self.operation_id, + ) + runner.add( + self.scenario_data.controltower_stubber.stub_get_control_operation, + self.operation_id, + "SUCCEEDED", + ) + + def setup_integ(self, error, stop_on): + """Set up the scenario for an integration test.""" + # Mock user inputs for using the suggested landing zone + answers = [ + "n", # Use first landing zone in the list. + "n", # Enable baseline. + ] + self.stub_runner = None + self.input_mocker.mock_answers(answers) + + +@pytest.fixture +def mock_mgr(stub_runner, scenario_data, input_mocker): + return MockManager(stub_runner, scenario_data, input_mocker) + + +# Define ANY constant for template body matching +ANY = object() + + +def test_run_scenario(mock_mgr, capsys, monkeypatch): + """Test the scenario that uses the suggested landing zone.""" + mock_mgr.setup_stubs(None, None, monkeypatch) + + # Run the scenario + mock_mgr.scenario_data + mock_mgr.scenario_data.scenario.run_scenario() + + # Verify the scenario completed successfully + captured = capsys.readouterr() + assert "This concludes the example scenario." in captured.out + + +@pytest.mark.integ +def test_run_scenario_integ(input_mocker, capsys): + """Test the scenario with an integration test.""" + answers = [ + "n", # Run the sections that don't require a landing zone. + "n", + ] + + input_mocker.mock_answers(answers) + controltower_client = boto3.client("controltower") + controlcatalog_client = boto3.client("controlcatalog") + organizations_client = boto3.client("organizations") + + scenario = ControlTowerScenario( + controltower_wrapper=ControlTowerWrapper( + controltower_client, controlcatalog_client + ), + org_client=organizations_client, + ) + + # Run the scenario + scenario.run_scenario() + + # Verify the scenario completed successfully + captured = capsys.readouterr() + assert "This concludes the example scenario." in captured.out diff --git a/python/test_tools/controlcatalog_stubber.py b/python/test_tools/controlcatalog_stubber.py new file mode 100644 index 00000000000..03a7aa5c14b --- /dev/null +++ b/python/test_tools/controlcatalog_stubber.py @@ -0,0 +1,66 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Stub functions that are used by the AWS Control Catalog unit tests. + +When tests are run against an actual AWS account, the stubber class does not +set up stubs and passes all calls through to the Boto 3 client. +""" + +from botocore.stub import ANY +from boto3 import client + +from test_tools.example_stubber import ExampleStubber + + +class ControlCatalogStubber(ExampleStubber): + """ + A class that implements stub functions used by AWS Control Catalog unit tests. + + The stubbed functions expect certain parameters to be passed to them as + part of the tests, and will raise errors when the actual parameters differ from + the expected. + """ + + def __init__(self, controlcatalog_client: client, use_stubs=True) -> None: + """ + Initializes the object with a specific client and configures it for + stubbing or AWS passthrough. + + :param controlcatalog_client: A Boto 3 AWS Control Catalog client. + :param use_stubs: When True, use stubs to intercept requests. Otherwise, + pass requests through to AWS. + """ + super().__init__(controlcatalog_client, use_stubs) + + def stub_list_controls(self, controls: list, error_code: str = None) -> None: + """ + Stub the list_controls function. + + :param controls: List of controls to return. + :param error_code: Simulated error code to raise. + """ + expected_params = {} + response = { + "Controls": controls + } + self._stub_bifurcator( + "list_controls", expected_params, response, error_code=error_code + ) + + def stub_get_control(self, control_arn: str, control_details: dict, error_code: str = None) -> None: + """ + Stub the get_control function. + + :param control_arn: The ARN of the control. + :param control_details: The details of the control. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "ControlArn": control_arn + } + response = control_details + self._stub_bifurcator( + "get_control", expected_params, response, error_code=error_code + ) \ No newline at end of file diff --git a/python/test_tools/controltower_stubber.py b/python/test_tools/controltower_stubber.py new file mode 100644 index 00000000000..2462864c58e --- /dev/null +++ b/python/test_tools/controltower_stubber.py @@ -0,0 +1,239 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Stub functions that are used by the AWS Control Tower unit tests. + +When tests are run against an actual AWS account, the stubber class does not +set up stubs and passes all calls through to the Boto 3 client. +""" + +from botocore.stub import ANY +from boto3 import client + +from test_tools.example_stubber import ExampleStubber + + +class ControlTowerStubber(ExampleStubber): + """ + A class that implements stub functions used by AWS Control Tower unit tests. + + The stubbed functions expect certain parameters to be passed to them as + part of the tests, and will raise errors when the actual parameters differ from + the expected. + """ + + def __init__(self, controltower_client: client, use_stubs=True) -> None: + """ + Initializes the object with a specific client and configures it for + stubbing or AWS passthrough. + + :param controltower_client: A Boto 3 AWS Control Tower client. + :param use_stubs: When True, use stubs to intercept requests. Otherwise, + pass requests through to AWS. + """ + super().__init__(controltower_client, use_stubs) + + def stub_list_landing_zones(self, landing_zones: list, error_code: str = None) -> None: + """ + Stub the list_landing_zones function. + + :param landing_zones: List of landing zones to return. + :param error_code: Simulated error code to raise. + """ + expected_params = {} + response = { + "landingZones": landing_zones + } + self._stub_bifurcator( + "list_landing_zones", expected_params, response, error_code=error_code + ) + + def stub_list_baselines(self, baselines: list, error_code: str = None) -> None: + """ + Stub the list_baselines function. + + :param baselines: List of baselines to return. + :param error_code: Simulated error code to raise. + """ + expected_params = {} + response = { + "baselines": baselines + } + self._stub_bifurcator( + "list_baselines", expected_params, response, error_code=error_code + ) + + def stub_list_enabled_baselines(self, enabled_baselines: list, error_code: str = None) -> None: + """ + Stub the list_enabled_baselines function. + + :param enabled_baselines: List of enabled baselines to return. + :param error_code: Simulated error code to raise. + """ + expected_params = { + } + response = { + "enabledBaselines": enabled_baselines + } + self._stub_bifurcator( + "list_enabled_baselines", expected_params, response, error_code=error_code + ) + + def stub_reset_enabled_baseline(self, baseline_identifier: str, operation_identifier: str, error_code: str = None) -> None: + """ + Stub the reset_enabled_baseline function. + + :param baseline_identifier: The identifier of the baseline to reset. + :param operation_identifier: The identifier of the operation. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "enabledBaselineIdentifier": baseline_identifier + } + response = { + "operationIdentifier": operation_identifier + } + self._stub_bifurcator( + "reset_enabled_baseline", expected_params, response, error_code=error_code + ) + + def stub_disable_baseline(self, baseline_identifier: str, operation_identifier: str, error_code: str = None) -> None: + """ + Stub the disable_baseline function. + + :param baseline_identifier: The identifier of the baseline to disable. + :param operation_identifier: The identifier of the operation. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "enabledBaselineIdentifier": baseline_identifier + } + response = { + "operationIdentifier": operation_identifier + } + self._stub_bifurcator( + "disable_baseline", expected_params, response, error_code=error_code + ) + + def stub_list_enabled_controls(self, target_identifier: str, enabled_controls: list, error_code: str = None) -> None: + """ + Stub the list_enabled_controls function. + + :param target_identifier: The identifier of the target. + :param enabled_controls: List of enabled controls to return. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "targetIdentifier": target_identifier + } + response = { + "enabledControls": enabled_controls + } + self._stub_bifurcator( + "list_enabled_controls", expected_params, response, error_code=error_code + ) + + def stub_enable_baseline(self, baseline_identifier: str, baseline_version: str, target_identifier: str, arn: str, operation_identifier: str, error_code: str = None) -> None: + """ + Stub the enable_baseline function. + + :param baseline_identifier: The identifier of the baseline. + :param baseline_version: The version of the baseline. + :param target_identifier: The identifier of the target. + :param arn: The ARN of the enabled baseline. + :param operation_identifier: The operation identifier of the enable operation. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "baselineIdentifier": baseline_identifier, + "baselineVersion": baseline_version, + "targetIdentifier": target_identifier, + 'parameters': [{'key': 'IdentityCenterEnabledBaselineArn', 'value': None}], + } + response = { + "arn": arn, + "operationIdentifier": operation_identifier, + } + self._stub_bifurcator( + "enable_baseline", expected_params, response, error_code=error_code + ) + def stub_get_baseline_operation(self, operation_identifier: str, status: str, error_code: str = None) -> None: + """ + Stub the get_baseline_operation function. + + :param operation_identifier: The identifier of the operation. + :param status: The status of the operation. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "operationIdentifier": operation_identifier + } + response = { + "baselineOperation": { + "status": status, + } + } + self._stub_bifurcator( + "get_baseline_operation", expected_params, response, error_code=error_code + ) + + def stub_enable_control(self, control_identifier: str, target_identifier: str, operation_identifier: str, error_code: str = None) -> None: + """ + Stub the enable_control function. + + :param control_identifier: The identifier of the control. + :param target_identifier: The identifier of the target. + :param operation_identifier: The identifier of the operation. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "controlIdentifier": control_identifier, + "targetIdentifier": target_identifier + } + response = { + "operationIdentifier": operation_identifier + } + self._stub_bifurcator( + "enable_control", expected_params, response, error_code=error_code + ) + + def stub_disable_control(self, control_identifier: str, target_identifier: str, operation_id: str, error_code: str = None) -> None: + """ + Stub the disable_control function. + + :param control_identifier: The identifier of the control. + :param target_identifier: The identifier of the target. + :param operation_id: The ID of the operation. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "controlIdentifier": control_identifier, + "targetIdentifier": target_identifier + } + response = { + "operationIdentifier": operation_id + } + self._stub_bifurcator( + "disable_control", expected_params, response, error_code=error_code + ) + + def stub_get_control_operation(self, operation_identifier: str, status: str, error_code: str = None) -> None: + """ + Stub the get_control_operation function. + + :param operation_identifier: The identifier of the operation. + :param status: The status of the operation. + :param error_code: Simulated error code to raise. + """ + expected_params = { + "operationIdentifier": operation_identifier + } + response = { + "controlOperation": { + "status": status, + } + } + self._stub_bifurcator( + "get_control_operation", expected_params, response, error_code=error_code + ) \ No newline at end of file diff --git a/python/test_tools/organizations_stubber.py b/python/test_tools/organizations_stubber.py index 30fbbfd881c..8fc80685429 100644 --- a/python/test_tools/organizations_stubber.py +++ b/python/test_tools/organizations_stubber.py @@ -98,3 +98,95 @@ def stub_delete_policy(self, policy_id, error_code=None): self._stub_bifurcator( "delete_policy", expected_parameters, error_code=error_code ) + + def stub_describe_organization(self, org_id, error_code=None): + """ + Stub for the describe_organization function. + + :param org_id: The ID of the organization. + :param error_code: The error code to raise, if any. + """ + response = { + "Organization": { + "Id": org_id, + "Arn": f"arn:aws:organizations::123456789012:organization/{org_id}", + "FeatureSet": "ALL", + "MasterAccountArn": "arn:aws:organizations::123456789012:account/o-exampleorgid/123456789012", + "MasterAccountId": "123456789012", + "MasterAccountEmail": "admin@example.org" + } + } + self._stub_bifurcator( + "describe_organization", {}, response, error_code=error_code + ) + + def stub_create_organization(self, feature_set, org_id, error_code=None): + """ + Stub for the create_organization function. + + :param feature_set: The feature set for the organization. + :param org_id: The ID of the created organization. + :param error_code: The error code to raise, if any. + """ + expected_parameters = {"FeatureSet": feature_set} + response = { + "Organization": { + "Id": org_id, + "Arn": f"arn:aws:organizations::123456789012:organization/{org_id}", + "FeatureSet": feature_set, + "MasterAccountArn": "arn:aws:organizations::123456789012:account/o-exampleorgid/123456789012", + "MasterAccountId": "123456789012", + "MasterAccountEmail": "admin@example.org" + } + } + self._stub_bifurcator( + "create_organization", expected_parameters, response, error_code=error_code + ) + + def stub_list_roots(self, roots, error_code=None): + """ + Stub for the list_roots function. + + :param roots: List of root objects to return. + :param error_code: The error code to raise, if any. + """ + response = {"Roots": roots} + self._stub_bifurcator( + "list_roots", {}, response, error_code=error_code + ) + + def stub_list_organizational_units_for_parent(self, parent_id, ous, error_code=None): + """ + Stub for the list_organizational_units_for_parent function. + + :param parent_id: The ID of the parent organizational unit or root. + :param ous: List of organizational unit objects to return. + :param error_code: The error code to raise, if any. + """ + expected_parameters = {"ParentId": parent_id} + response = {"OrganizationalUnits": ous} + self._stub_bifurcator( + "list_organizational_units_for_parent", expected_parameters, response, error_code=error_code + ) + + def stub_create_organizational_unit(self, parent_id, name, ou_id, ou_arn, error_code=None): + """ + Stub for the create_organizational_unit function. + + :param parent_id: The ID of the parent organizational unit or root. + :param name: The name of the organizational unit. + :param ou_id: The ID of the created organizational unit. + :param ou_arn: The ARN of the created organizational unit. + :param error_code: The error code to raise, if any. + """ + expected_parameters = {"ParentId": parent_id, "Name": name} + response = { + "OrganizationalUnit": { + "Id": ou_id, + "Arn": ou_arn, + "Name": name + } + } + self._stub_bifurcator( + "create_organizational_unit", expected_parameters, response, error_code=error_code + ) diff --git a/python/test_tools/stubber_factory.py b/python/test_tools/stubber_factory.py index a761d3ce545..d43bd83e24a 100644 --- a/python/test_tools/stubber_factory.py +++ b/python/test_tools/stubber_factory.py @@ -25,6 +25,8 @@ from test_tools.cognito_idp_stubber import CognitoIdpStubber from test_tools.comprehend_stubber import ComprehendStubber from test_tools.config_stubber import ConfigStubber +from test_tools.controltower_stubber import ControlTowerStubber +from test_tools.controlcatalog_stubber import ControlCatalogStubber from test_tools.dynamodb_stubber import DynamoStubber from test_tools.ec2_stubber import Ec2Stubber from test_tools.ecr_stubber import EcrStubber @@ -108,6 +110,18 @@ def stubber_factory(service_name): return ComprehendStubber elif service_name == "config": return ConfigStubber + elif service_name == "controltower": + return ControlTowerStubber + elif service_name == "controlcatalog": + return ControlCatalogStubber + elif service_name == "dynamodb": + return DynamoStubber + elif service_name == "ec2": + return Ec2Stubber + elif service_name == "ecr": + return EcrStubber + elif service_name == "config": + return ConfigStubber elif service_name == "dynamodb": return DynamoStubber elif service_name == "ec2": diff --git a/scenarios/basics/controltower/README.md b/scenarios/basics/controltower/README.md new file mode 100644 index 00000000000..4fd0fc029ab --- /dev/null +++ b/scenarios/basics/controltower/README.md @@ -0,0 +1,56 @@ +# AWS Control Tower Basics Scenario + +## Overview + +This example shows how to use AWS SDKs to work with AWS Control Tower and Control Catalog services. The scenario demonstrates how to manage baselines, controls, and landing zones in AWS Control Tower. + +[AWS Control Tower](https://docs.aws.amazon.com/controltower/latest/userguide/what-is-control-tower.html) helps you set up and govern a secure, multi-account AWS environment based on best practices. + +This example illustrates typical interactions with AWS Control Tower, including: + +1. Listing available baselines and controls. +2. Managing baselines (enabling, disabling, and resetting). +3. Working with controls (enabling, disabling, and checking operation status). +4. Interacting with landing zones. + +The scenario follows these steps: + +### Hello +- Set up the service client. +- List available baselines by name. + +### Scenario +#### Setup +- List available landing zones and prompt the user if they would like to use an existing landing zone. +- If no landing zones exist, provide [information about setting up a landing zone](https://docs.aws.amazon.com/controltower/latest/userguide/quick-start.html). + +#### Baselines +- List available baselines. +- If a landing zone exists: + - List enabled baselines. + - Enable a baseline. + - Get the operational status of the baseline operation. + - Reset the baseline. + - Disable the baseline. + +#### Controls +- List controls in Control Catalog. +- If a landing zone exists: + - Enable a control. + - Get the operational status of the control. + - List enabled controls. + - Disable the control. + +## Implementations + +This example is implemented in the following languages: + +- [Python](../../../python/example_code/controltower/README.md) + +## Additional resources + +- [Documentation: AWS Control Tower User Guide](https://docs.aws.amazon.com/controltower/latest/userguide/what-is-control-tower.html) +- [Documentation: AWS Control Tower API Reference](https://docs.aws.amazon.com/controltower/latest/APIReference/Welcome.html) +--- + +Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: Apache-2.0 \ No newline at end of file diff --git a/scenarios/basics/controltower/SPECIFICATION.md b/scenarios/basics/controltower/SPECIFICATION.md new file mode 100644 index 00000000000..643b17b592e --- /dev/null +++ b/scenarios/basics/controltower/SPECIFICATION.md @@ -0,0 +1,228 @@ +# AWS Control Tower Basics Scenario - Technical specification + +This document contains the technical specifications for _AWS Control Tower Basics Scenario_, +a basics scenario that showcases AWS services and SDKs. It is primarily intended for the AWS code +examples team to use while developing this example in additional languages. + +This document explains the following: + +- Architecture and features of the example scenario. +- Metadata information for the scenario. +- Sample reference output. + +For an introduction, see the [README.md](README.md). + +--- + +### Table of contents + +- [Resources and User Input](#resources-and-user-input) +- [Hello](#hello) +- [Scenario](#scenario) +- [Errors](#errors) +- [Metadata](#metadata) + +## Resources and User Input + +- This example can run with no additional resources, or can use an existing landing zone. Since landing zone creation +- requires multiple AWS accounts (which cannot be deleted for 7 days), this example does not support creating new +- landing zones. The example will prompt to use a current landing zone, or run only that portion that doesn't +- require landing zone identifiers. To set up a landing zone, follow the [QuickStart guide](https://docs.aws.amazon.com/controltower/latest/userguide/quick-start.html), and create new accounts when prompted. + +### Hello +The Hello example is a separate runnable example. + +- Set up the service client. +- List available Baselines by name. + +Example +``` +Hello, AWS Control Tower! Let's list available baselines: + +7 baseline(s) retrieved. + AuditBaseline + LogArchiveBaseline + IdentityCenterBaseline + BackupCentralVaultBaseline + BackupAdminBaseline + BackupBaseline + +``` +## Scenario + +#### Setup +- List available landing zones, and prompt the user if they would like to use the first or other landing zone. +- If no landing zones, provide a link to set up a landing zone and only use the list operations that do not require a target id. +- For the selected landing zone, the control tower actions may require the arn of the target organizational id. To get it, find the +- Sandbox organizational unit inside the root organization, or create it, and store the id of that OU for the other calls. + +Example +``` +---------------------------------------------------------------------------------------- + Welcome to the AWS Control Tower with ControlCatalog example scenario. +---------------------------------------------------------------------------------------- +This demo will walk you through working with AWS Control Tower for landing zones, +managing baselines, and working with controls. +Some demo operations require the use of a landing zone. +You can use an existing landing zone or opt out of these operations in the demo. +For instructions on how to set up a landing zone, +see https://docs.aws.amazon.com/controltower/latest/userguide/getting-started-from-console.html + +Available Landing Zones: +1 arn:aws:controltower:us-east-1:478643181688:landingzone/MBQZZWORLOCC8WZ7) +Do you want to use the first landing zone in the list (arn:aws:controltower:us-east-1:478643181688:landingzone/MBQZZWORLOCC8WZ7)? (y/n) y +Using landing zone ID: arn:aws:controltower:us-east-1:478643181688:landingzone/MBQZZWORLOCC8WZ7) + +Checking organization status... +Account is part of organization: o-phmlq23w1e +Checking for Sandbox OU... +Found existing Sandbox OU: ou-spdo-e3mtcidv + + +``` + +#### Baselines +- List available baselines. +- If a landing zone exists: + - List enabled baselines. + - Prompt the user if they would like to enable another baseline from the list. + - Get the operational status of the baseline operation. + - Reset the baseline. + - Disable the baseline. + +Example +``` +Managing Baselines: + +Listing available Baselines: +AuditBaseline +LogArchiveBaseline +IdentityCenterBaseline +AWSControlTowerBaseline +BackupCentralVaultBaseline +BackupAdminBaseline +BackupBaseline + +Listing enabled baselines: +arn:aws:controltower:us-east-1::baseline/LN25R72TTG6IGPTQ +arn:aws:controltower:us-east-1::baseline/4T4HA1KMO10S6311 +arn:aws:controltower:us-east-1::baseline/J8HX46AHS5MIKQPD +arn:aws:controltower:us-east-1::baseline/17BSJV3IGJ2QSGA2 +Do you want to enable the Control Tower Baseline? (y/n) y + +Enabling Control Tower Baseline. +Baseline is already enabled for this target +No change, the selected baseline was already enabled. +Do you want to reset the Control Tower Baseline? (y/n) y + +Resetting Control Tower Baseline. arn:aws:controltower:us-east-1:478643181688:enabledbaseline/XOVK3ATZUCD5A04QV +Baseline operation status: IN_PROGRESS +Baseline operation status: IN_PROGRESS +Baseline operation status: IN_PROGRESS +Baseline operation status: IN_PROGRESS +Baseline operation status: IN_PROGRESS +Baseline operation status: SUCCEEDED + +Reset baseline operation id 64f9c26e-c2d4-46c1-8863-f2b6382c2b4d. + +Do you want to disable the Control Tower Baseline? (y/n) y +Disabling baseline ARN: arn:aws:controltower:us-east-1:478643181688:enabledbaseline/XOVK3ATZUCD5A04QV +Conflict disabling baseline: AWS Control Tower cannot perform a DisableBaseline operation on a target OU with enabled optional controls.. Skipping disable step. + +Disabled baseline operation id None. + + +``` + +#### Controls +Some control operations require the use of the ControlCatalog client. This client does not have it's own documentation, +and so is included as part of this example. + +- List Controls in Control Catalog. +- If a landing zone exists: + - Enable a control. + - Get the operational status of the control. + - List enabled controls. + - Disable the control. + +Example +``` +Managing Controls: + +Listing first 5 available Controls: +1. Checks if a recovery point expires no earlier than after the specified period +2. Require any AWS CodeBuild project environment to have logging configured +3. Checks if AWS AppConfig configuration profiles have tags +4. ECS containers should run as non-privileged +5. Disallow changes to Amazon CloudWatch Logs log groups set up by AWS Control Tower + +Listing enabled controls: +1. arn:aws:controltower:us-east-1::control/AWS-GR_CLOUDTRAIL_CHANGE_PROHIBITED +2. arn:aws:controltower:us-east-1::control/AWS-GR_CLOUDTRAIL_CLOUDWATCH_LOGS_ENABLED +3. arn:aws:controltower:us-east-1::control/AWS-GR_CLOUDTRAIL_ENABLED +4. arn:aws:controltower:us-east-1::control/AWS-GR_CLOUDTRAIL_VALIDATION_ENABLED +5. arn:aws:controltower:us-east-1::control/AWS-GR_CLOUDWATCH_EVENTS_CHANGE_PROHIBITED +6. arn:aws:controltower:us-east-1::control/AWS-GR_CONFIG_AGGREGATION_AUTHORIZATION_POLICY +7. arn:aws:controltower:us-east-1::control/AWS-GR_CONFIG_AGGREGATION_CHANGE_PROHIBITED +8. arn:aws:controltower:us-east-1::control/AWS-GR_CONFIG_CHANGE_PROHIBITED +9. arn:aws:controltower:us-east-1::control/AWS-GR_CONFIG_ENABLED +10. arn:aws:controltower:us-east-1::control/AWS-GR_CONFIG_RULE_CHANGE_PROHIBITED +11. arn:aws:controltower:us-east-1::control/AWS-GR_IAM_ROLE_CHANGE_PROHIBITED +12. arn:aws:controltower:us-east-1::control/AWS-GR_LAMBDA_CHANGE_PROHIBITED +13. arn:aws:controltower:us-east-1::control/AWS-GR_LOG_GROUP_POLICY +15. arn:aws:controltower:us-east-1::control/AWS-GR_SNS_SUBSCRIPTION_CHANGE_PROHIBITED +16. arn:aws:controlcatalog:::control/m7a5gbdf08wg2o0en010mkng +Do you want to enable the control arn:aws:controlcatalog:::control/m7a5gbdf08wg2o0en010mkng? (y/n) y + +Enabling control: arn:aws:controlcatalog:::control/m7a5gbdf08wg2o0en010mkng +arn:aws:controlcatalog:::control/m7a5gbdf08wg2o0en010mkng +arn:aws:organizations::478643181688:ou/o-phmlq23w1e/ou-spdo-e3mtcidv +Control is already enabled for this target +Do you want to disable the control? (y/n) y + +Disabling the control... +Control operation status: IN_PROGRESS +Control operation status: SUCCEEDED +Disable operation ID: c9c24ab0-9988-48fa-a8f3-1c5daf979176 +This concludes the control tower scenario. +Thanks for watching! + + +``` + + +--- + +## Errors +The following errors are handled in the Control Tower wrapper class: + +| action | Error | Handling | +|------------------------|-----------------------|------------------------------------------------------------------------| +| `ListBaselines` | AccessDeniedException | Notify the user of insufficient permissions and exit. | +| `ListEnabledBaselines` | AccessDeniedException | Notify the user of insufficient permissions and exit. | +| `EnableBaseline` | ValidationException | Handle case where baseline is already enabled and return None. | +| `DisableBaseline` | ConflictException | Notify the user that the baseline could not be disabled, and continue. | +| `ListControls` | AccessDeniedException | Notify the user of insufficient permissions and exit. | +| `EnableControl` | ValidationException | Handle case where control is already enabled and return None. | +| `GetControlOperation` | ResourceNotFound | Notify the user that the control operation was not found. | +| `DisableControl` | ResourceNotFound | Notify the user that the control was not found. | +| `ListLandingZones` | AccessDeniedException | Notify the user of insufficient permissions and exit. | + + +--- + +## Metadata + +| action / scenario | metadata file | metadata key | +|---------------------------------|----------------------------|-----------------------------------| +| `ListBaselines` | controltower_metadata.yaml | controltower_Hello | +| `ListBaselines` | controltower_metadata.yaml | controltower_ListBaselines | +| `ListEnabledBaselines` | controltower_metadata.yaml | controltower_ListEnabledBaselines | +| `EnableBaseline` | controltower_metadata.yaml | controltower_EnableBaseline | +| `DisableBaseline` | controltower_metadata.yaml | controltower_DisableBaseline | +| `EnableControl` | controltower_metadata.yaml | controltower_EnableControl | +| `GetControlOperation` | controltower_metadata.yaml | controltower_GetControlOperation | +| `DisableControl` | controltower_metadata.yaml | controltower_DisableControl | +| `ListLandingZones` | controltower_metadata.yaml | controltower_ListLandingZones | +| `Control Tower Basics Scenario` | controltower_metadata.yaml | controltower_Scenario | + diff --git a/scenarios/basics/controltower/resources/cfn_template.yaml b/scenarios/basics/controltower/resources/cfn_template.yaml new file mode 100644 index 00000000000..402a33f159f --- /dev/null +++ b/scenarios/basics/controltower/resources/cfn_template.yaml @@ -0,0 +1,105 @@ +Parameters: + ParentOrganizationId: + Type: String + Description: Parent organization ID +Resources: + InfrastructureOU2: + Type: AWS::Organizations::OrganizationalUnit + Properties: + Name: Infrastructure2 + ParentId: !Ref ParentOrganizationId + SecurityOU2: + Type: AWS::Organizations::OrganizationalUnit + Properties: + Name: Security2 + ParentId: !Ref ParentOrganizationId + AWSControlTowerAdmin: + Type: 'AWS::IAM::Role' + Properties: + RoleName: AWSControlTowerAdmin + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: controltower.amazonaws.com + Action: 'sts:AssumeRole' + Path: '/service-role/' + ManagedPolicyArns: + - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSControlTowerServiceRolePolicy + AWSControlTowerAdminPolicy: + Type: 'AWS::IAM::Policy' + Properties: + PolicyName: AWSControlTowerAdminPolicy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Action: 'ec2:DescribeAvailabilityZones' + Resource: '*' + Roles: + - !Ref AWSControlTowerAdmin + AWSControlTowerCloudTrailRole: + Type: 'AWS::IAM::Role' + Properties: + RoleName: AWSControlTowerCloudTrailRole + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: cloudtrail.amazonaws.com + Action: 'sts:AssumeRole' + Path: '/service-role/' + AWSControlTowerCloudTrailRolePolicy: + Type: 'AWS::IAM::Policy' + Properties: + PolicyName: AWSControlTowerCloudTrailRolePolicy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Action: + - 'logs:CreateLogStream' + - 'logs:PutLogEvents' + Resource: !Sub arn:${AWS::Partition}:logs:*:*:log-group:aws-controltower/CloudTrailLogs:* + Effect: Allow + Roles: + - !Ref AWSControlTowerCloudTrailRole + AWSControlTowerConfigAggregatorRoleForOrganizations: + Type: 'AWS::IAM::Role' + Properties: + RoleName: AWSControlTowerConfigAggregatorRoleForOrganizations + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: config.amazonaws.com + Action: 'sts:AssumeRole' + Path: '/service-role/' + ManagedPolicyArns: + - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSConfigRoleForOrganizations + AWSControlTowerStackSetRole: + Type: 'AWS::IAM::Role' + Properties: + RoleName: AWSControlTowerStackSetRole + AssumeRolePolicyDocument: + Version: 2012-10-17 + Statement: + - Effect: Allow + Principal: + Service: cloudformation.amazonaws.com + Action: 'sts:AssumeRole' + Path: '/service-role/' + AWSControlTowerStackSetRolePolicy: + Type: 'AWS::IAM::Policy' + Properties: + PolicyName: AWSControlTowerStackSetRolePolicy + PolicyDocument: + Version: 2012-10-17 + Statement: + - Action: 'sts:AssumeRole' + Resource: !Sub 'arn:${AWS::Partition}:iam::*:role/AWSControlTowerExecution' + Effect: Allow + Roles: + - !Ref AWSControlTowerStackSetRole \ No newline at end of file