Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modifying Firewall rules to provide Internet Access to T0/T1 #2327

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
Open
1 change: 1 addition & 0 deletions data_safe_haven/config/config_sections.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class ConfigSectionSRE(BaseModel, validate_assignment=True):
# https://docs.pydantic.dev/latest/concepts/models/#fields-with-non-hashable-default-values
admin_email_address: EmailAddress
admin_ip_addresses: list[IpAddress] = []
allow_workspace_internet: bool = False
databases: UniqueList[DatabaseSystem] = []
data_provider_ip_addresses: list[IpAddress] | AzureServiceTag = []
remote_desktop: ConfigSubsectionRemoteDesktopOpts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def __call__(self) -> None:
"sre_firewall",
self.stack_name,
SREFirewallProps(
allow_workspace_internet=self.config.sre.allow_workspace_internet,
location=self.config.azure.location,
resource_group_name=resource_group.name,
route_table_name=networking.route_table_name,
Expand Down
414 changes: 214 additions & 200 deletions data_safe_haven/infrastructure/programs/sre/firewall.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions tests/config/test_config_sections.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def test_constructor_defaults(
)
assert sre_config.admin_email_address == "[email protected]"
assert sre_config.admin_ip_addresses == []
assert not sre_config.allow_workspace_internet
assert sre_config.databases == []
assert sre_config.data_provider_ip_addresses == []
assert sre_config.remote_desktop == config_subsection_remote_desktop
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ def sre_config_yaml(request):
admin_email_address: [email protected]
admin_ip_addresses:
- 1.2.3.4/32
allow_workspace_internet: false
data_provider_ip_addresses: []
databases: []
remote_desktop:
Expand Down
56 changes: 56 additions & 0 deletions tests/infrastructure/programs/sre/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,59 @@ def subnet_guacamole_containers() -> network.GetSubnetResult:
address_prefix=SREIpRanges.guacamole_containers.prefix,
id="subnet_guacamole_containers_id",
)


@fixture
def subnet_apt_proxy_server() -> network.GetSubnetResult:
return network.GetSubnetResult(
address_prefix=SREIpRanges.apt_proxy_server.prefix,
id="subnet_apt_proxy_server_id",
)


@fixture
def subnet_clamav_mirror() -> network.GetSubnetResult:
return network.GetSubnetResult(
address_prefix=SREIpRanges.clamav_mirror.prefix,
id="subnet_clamav_mirror_id",
)


@fixture
def subnet_firewall() -> network.GetSubnetResult:
return network.GetSubnetResult(
address_prefix=SREIpRanges.firewall.prefix,
id="subnet_firewall_id",
)


@fixture
def subnet_firewall_management() -> network.GetSubnetResult:
return network.GetSubnetResult(
address_prefix=SREIpRanges.firewall_management.prefix,
id="subnet_firewall_management_id",
)


@fixture
def subnet_identity_containers() -> network.GetSubnetResult:
return network.GetSubnetResult(
address_prefix=SREIpRanges.identity_containers.prefix,
id="subnet_identity_containers_id",
)


@fixture
def subnet_user_services_software_repositories() -> network.GetSubnetResult:
return network.GetSubnetResult(
address_prefix=SREIpRanges.user_services_software_repositories.prefix,
id="subnet_user_services_software_repositories_id",
)


@fixture
def subnet_workspaces() -> network.GetSubnetResult:
return network.GetSubnetResult(
address_prefix=SREIpRanges.workspaces.prefix,
id="subnet_workspaces_id",
)
175 changes: 175 additions & 0 deletions tests/infrastructure/programs/sre/test_firewall.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
from collections.abc import Callable
from enum import Enum
from functools import partial

import pulumi
import pulumi.runtime
import pytest
from pulumi_azure_native import network

from data_safe_haven.infrastructure.programs.sre.firewall import (
SREFirewallComponent,
SREFirewallProps,
)

from ..resource_assertions import assert_equal


class MyMocks(pulumi.runtime.Mocks):
def new_resource(self, args: pulumi.runtime.MockResourceArgs):
resources = [args.name + "_id", args.inputs]
return resources

def call(self, _: pulumi.runtime.MockCallArgs):
return {}


# TODO: These breaks many other tests!
pulumi.runtime.set_mocks(
MyMocks(),
preview=False,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Luckily, you don't need these :). See the test_application_gateway.py for an example of Pulumi runtime tests without set_mocks.



@pytest.fixture
def allow_internet_props_setter(
location: str,
resource_group_name: str,
subnet_apt_proxy_server: network.GetSubnetResult,
subnet_clamav_mirror: network.GetSubnetResult,
subnet_firewall: network.GetSubnetResult,
subnet_firewall_management: network.GetSubnetResult,
subnet_guacamole_containers: network.GetSubnetResult,
subnet_identity_containers: network.GetSubnetResult,
subnet_user_services_software_repositories: network.GetSubnetResult,
subnet_workspaces: network.GetSubnetResult,
) -> Callable[[bool], SREFirewallProps]:

def set_allow_workspace_internet(
allow_workspace_internet: bool, # noqa: FBT001
) -> SREFirewallProps:
return SREFirewallProps(
allow_workspace_internet=allow_workspace_internet,
location=location,
resource_group_name=resource_group_name,
route_table_name="test-route-table", # TODO: Move to fixture if works.
subnet_apt_proxy_server=subnet_apt_proxy_server,
subnet_clamav_mirror=subnet_clamav_mirror,
subnet_firewall=subnet_firewall,
subnet_firewall_management=subnet_firewall_management,
subnet_guacamole_containers=subnet_guacamole_containers,
subnet_identity_containers=subnet_identity_containers,
subnet_user_services_software_repositories=subnet_user_services_software_repositories,
subnet_workspaces=subnet_workspaces,
)

return set_allow_workspace_internet


@pytest.fixture
def allow_internet_component_setter(
stack_name: str,
allow_internet_props_setter: Callable[[bool], SREFirewallProps],
tags: dict[str, str],
) -> Callable[[bool], SREFirewallComponent]:

def set_allow_workspace_internet(allow_workspace_internet) -> SREFirewallComponent:
firewall_props: SREFirewallProps = allow_internet_props_setter(
allow_workspace_internet
)

return SREFirewallComponent(
name="firewall-name",
stack_name=stack_name,
props=firewall_props,
tags=tags,
)

return set_allow_workspace_internet


class InternetAccess(Enum):
ENABLED = True
DISABLED = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we need this enum, it's probably useful outside just testing. Should it live in types/enums.py?



class TestSREFirewallProps:

@pulumi.runtime.test
def test_props_allow_workspace_internet_enabled(
self, allow_internet_props_setter: Callable[[bool], SREFirewallProps]
):
firewall_props: SREFirewallProps = allow_internet_props_setter(
allow_workspace_internet=True
)
pulumi.Output.from_input(firewall_props.allow_workspace_internet).apply(
partial(assert_equal, True), run_with_unknowns=True # noqa: FBT003
)

@pulumi.runtime.test
def test_props_allow_workspace_internet_disabled(
self, allow_internet_props_setter: Callable[[bool], SREFirewallProps]
):
firewall_props: SREFirewallProps = allow_internet_props_setter(
allow_workspace_internet=False
)
pulumi.Output.from_input(firewall_props.allow_workspace_internet).apply(
partial(assert_equal, False), run_with_unknowns=True # noqa: FBT003
)


class TestSREFirewallComponent:

@pulumi.runtime.test
def test_component_allow_workspace_internet_enabled(
self,
allow_internet_component_setter: Callable[[bool], SREFirewallComponent],
):
firewall_component: SREFirewallComponent = allow_internet_component_setter(
allow_workspace_internet=True
)

firewall_component.firewall.application_rule_collections.apply(
partial(TestSREFirewallComponent.assert_allow_internet_access, InternetAccess.ENABLED), # type: ignore
run_with_unknowns=True,
)

@pulumi.runtime.test
def test_component_allow_workspace_internet_disabled(
self,
allow_internet_component_setter: Callable[[bool], SREFirewallComponent],
):
firewall_component: SREFirewallComponent = allow_internet_component_setter(
allow_workspace_internet=False
)

firewall_component.firewall.application_rule_collections.apply(
partial(TestSREFirewallComponent.assert_allow_internet_access, InternetAccess.DISABLED), # type: ignore
run_with_unknowns=True,
)

@staticmethod
def assert_allow_internet_access(
internet_access: InternetAccess,
application_rule_collections: (
list[network.outputs.AzureFirewallApplicationRuleCollectionResponse] | None
),
):

if application_rule_collections is not None:

workspace_deny_collection: list[
network.outputs.AzureFirewallApplicationRuleCollectionResponse
] = [
rule_collection
for rule_collection in application_rule_collections
if rule_collection.name == "workspaces-deny"
]

if internet_access == InternetAccess.ENABLED:
assert not workspace_deny_collection
else:
assert len(workspace_deny_collection) == 1

else:
raise AssertionError()
Loading