diff --git a/pyproject.toml b/pyproject.toml index 74705364b..4b8cf02ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "problog >= 2.2.6,<3.0.0", "cryptography >=44.0.0,<45.0.0", "semgrep == 1.113.0", + "dnspython >=2.7.0,<3.0.0", ] keywords = [] # https://pypi.org/classifiers/ diff --git a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py index eebce5764..c37f763a5 100644 --- a/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py +++ b/src/macaron/malware_analyzer/pypi_heuristics/heuristics.py @@ -43,6 +43,9 @@ class Heuristics(str, Enum): #: Indicates that the package source code contains suspicious code patterns. SUSPICIOUS_PATTERNS = "suspicious_patterns" + #: Indicates that the package maintainer's email address is suspicious or invalid. + FAKE_EMAIL = "fake_email" + class HeuristicResult(str, Enum): """Result type indicating the outcome of a heuristic.""" diff --git a/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py b/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py new file mode 100644 index 000000000..e29fe123e --- /dev/null +++ b/src/macaron/malware_analyzer/pypi_heuristics/metadata/fake_email.py @@ -0,0 +1,96 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""The heuristic analyzer to check the email address of the package maintainers.""" + +import logging +import re + +import dns.resolver as dns_resolver + +from macaron.errors import HeuristicAnalyzerValueError +from macaron.json_tools import JsonType +from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset + +logger: logging.Logger = logging.getLogger(__name__) + + +class FakeEmailAnalyzer(BaseHeuristicAnalyzer): + """Analyze the email address of the package maintainers.""" + + def __init__(self) -> None: + super().__init__( + name="fake_email_analyzer", + heuristic=Heuristics.FAKE_EMAIL, + depends_on=None, + ) + + def is_valid_email(self, email: str) -> bool: + """Check if the email format is valid and the domain has MX records. + + Parameters + ---------- + email: str + The email address to check. + + Returns + ------- + bool: + ``True`` if the email address is valid, ``False`` otherwise. + + Raises + ------ + HeuristicAnalyzerValueError + if the failure is due to DNS resolution. + """ + if not re.match(r"[^@]+@[^@]+\.[^@]+", email): + return False + + domain = email.split("@")[1] + try: + records = dns_resolver.resolve(domain, "MX") + if not records: + return False + except Exception as err: + err_message = f"Failed to resolve domain {domain}: {err}" + raise HeuristicAnalyzerValueError(err_message) from err + return True + + def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: + """Analyze the package. + + Parameters + ---------- + pypi_package_json: PyPIPackageJsonAsset + The PyPI package JSON asset object. + + Returns + ------- + tuple[HeuristicResult, dict[str, JsonType]]: + The result and related information collected during the analysis. + + Raises + ------ + HeuristicAnalyzerValueError + if the analysis fails. + """ + response = pypi_package_json.download("") + if not response: + error_message = "Failed to download package JSON " + return HeuristicResult.FAIL, {"message": error_message} + + data = pypi_package_json.package_json + author_email = data.get("info", {}).get("author_email", None) + maintainer_email = data.get("info", {}).get("maintainer_email", None) + if maintainer_email is None and author_email is None: + message = "No maintainers are available" + return HeuristicResult.SKIP, {"message": message} + + if author_email is not None and not self.is_valid_email(author_email): + return HeuristicResult.FAIL, {"email": author_email} + if maintainer_email is not None and not self.is_valid_email(maintainer_email): + return HeuristicResult.FAIL, {"email": maintainer_email} + + return HeuristicResult.PASS, {} diff --git a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py index c9c44ae7c..725605508 100644 --- a/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py +++ b/src/macaron/slsa_analyzer/checks/detect_malicious_metadata_check.py @@ -20,6 +20,7 @@ from macaron.malware_analyzer.pypi_heuristics.metadata.anomalous_version import AnomalousVersionAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date import CloserReleaseJoinDateAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer +from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer from macaron.malware_analyzer.pypi_heuristics.metadata.source_code_repo import SourceCodeRepoAnalyzer @@ -357,6 +358,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: WheelAbsenceAnalyzer, AnomalousVersionAnalyzer, TyposquattingPresenceAnalyzer, + FakeEmailAnalyzer, ] # name used to query the result of all problog rules, so it can be accessed outside the model. @@ -424,6 +426,10 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: failed({Heuristics.ONE_RELEASE.value}), failed({Heuristics.ANOMALOUS_VERSION.value}). + % Package released recently with the a maintainer email address that is not valid. + {Confidence.MEDIUM.value}::trigger(malware_medium_confidence_3) :- + quickUndetailed, + failed({Heuristics.FAKE_EMAIL.value}). % ----- Evaluation ----- % Aggregate result @@ -431,6 +437,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData: {problog_result_access} :- trigger(malware_high_confidence_2). {problog_result_access} :- trigger(malware_high_confidence_3). {problog_result_access} :- trigger(malware_high_confidence_4). + {problog_result_access} :- trigger(malware_medium_confidence_3). {problog_result_access} :- trigger(malware_medium_confidence_2). {problog_result_access} :- trigger(malware_medium_confidence_1). query({problog_result_access}). diff --git a/tests/malware_analyzer/pypi/test_fake_email.py b/tests/malware_analyzer/pypi/test_fake_email.py new file mode 100644 index 000000000..aa080a6b8 --- /dev/null +++ b/tests/malware_analyzer/pypi/test_fake_email.py @@ -0,0 +1,166 @@ +# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. + +"""Tests for the FakeEmailAnalyzer heuristic.""" + + +from collections.abc import Generator +from unittest.mock import MagicMock, patch + +import pytest + +from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult +from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer +from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset + + +@pytest.fixture(name="analyzer") +def analyzer_fixture() -> FakeEmailAnalyzer: + """Pytest fixture to create a FakeEmailAnalyzer instance.""" + return FakeEmailAnalyzer() + + +@pytest.fixture(name="pypi_package_json_asset_mock") +def pypi_package_json_asset_mock_fixture() -> MagicMock: + """Pytest fixture for a mock PyPIPackageJsonAsset.""" + mock_asset = MagicMock(spec=PyPIPackageJsonAsset) + # Default to successful download, tests can override + mock_asset.download = MagicMock(return_value=True) + # package_json should be set by each test to simulate different PyPI responses + mock_asset.package_json = {} + return mock_asset + + +@pytest.fixture(name="mock_dns_resolve") +def mock_dns_resolve_fixture() -> Generator[MagicMock]: + """General purpose mock for dns.resolver.resolve. + + Patches where dns_resolver is imported in the module under test. + """ + with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.dns_resolver.resolve") as mock_resolve: + # Default behavior: simulate successful MX record lookup. + mock_mx_record = MagicMock() + mock_mx_record.exchange = "mail.default-domain.com" + mock_resolve.return_value = [mock_mx_record] + yield mock_resolve + + +# Tests for the analyze method +def test_analyze_download_failure(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: + """Test the analyzer fails if downloading package JSON fails.""" + pypi_package_json_asset_mock.download.return_value = False + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.FAIL + assert "message" in info + assert isinstance(info["message"], str) + assert "Failed to download package JSON" in info["message"] + pypi_package_json_asset_mock.download.assert_called_once_with("") + + +def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: + """Test the analyzer skips if no author_email or maintainer_email is present.""" + pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}} + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.SKIP + assert info["message"] == "No maintainers are available" + + +def test_analyze_skip_no_info_key(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: + """Test the analyzer skips if 'info' key is missing in PyPI data.""" + pypi_package_json_asset_mock.package_json = {} # No 'info' key + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.SKIP + assert info["message"] == "No maintainers are available" + + +def test_analyze_fail_empty_author_email(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None: + """Test analyzer fails for empty author_email string (maintainer_email is None).""" + pypi_package_json_asset_mock.package_json = {"info": {"author_email": "", "maintainer_email": None}} + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.FAIL + assert info["email"] == "" + + +def test_analyze_pass_only_maintainer_email_valid( + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock +) -> None: + """Test analyzer passes when only maintainer_email is present and valid.""" + mock_mx_record = MagicMock() + mock_mx_record.exchange = "mail.example.net" + mock_dns_resolve.return_value = [mock_mx_record] + + pypi_package_json_asset_mock.package_json = { + "info": {"author_email": None, "maintainer_email": "maintainer@example.net"} + } + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.PASS + assert info == {} + mock_dns_resolve.assert_called_once_with("example.net", "MX") + + +def test_analyze_pass_both_emails_valid( + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock +) -> None: + """Test the analyzer passes when both emails are present and valid.""" + + def side_effect_dns_resolve(domain: str, record_type: str = "MX") -> list[MagicMock]: + mock_mx = MagicMock() + domains = { + "MX": {"example.com", "example.net"}, + } + if domain not in domains.get(record_type, set()): + pytest.fail(f"Unexpected domain for DNS resolve: {domain}") + mock_mx.exchange = f"mail.{domain}" + return [mock_mx] + + mock_dns_resolve.side_effect = side_effect_dns_resolve + + pypi_package_json_asset_mock.package_json = { + "info": {"author_email": "author@example.com", "maintainer_email": "maintainer@example.net"} + } + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.PASS + assert info == {} + assert mock_dns_resolve.call_count == 2 + mock_dns_resolve.assert_any_call("example.com", "MX") + mock_dns_resolve.assert_any_call("example.net", "MX") + + +def test_analyze_fail_author_email_invalid_format( + analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock +) -> None: + """Test analyzer fails when author_email has an invalid format.""" + pypi_package_json_asset_mock.package_json = { + "info": {"author_email": "bad_email_format", "maintainer_email": "maintainer@example.net"} + } + result, info = analyzer.analyze(pypi_package_json_asset_mock) + assert result == HeuristicResult.FAIL + assert info["email"] == "bad_email_format" + mock_dns_resolve.assert_not_called() # Regex check fails before DNS lookup + + +# Tests for the is_valid_email method +def test_is_valid_email_valid_email_with_mx(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: + """Test is_valid_email returns True for a valid email with MX records.""" + mock_mx_record = MagicMock() + mock_mx_record.exchange = "mail.example.com" + mock_dns_resolve.return_value = [mock_mx_record] + assert analyzer.is_valid_email("test@example.com") is True + mock_dns_resolve.assert_called_once_with("example.com", "MX") + + +def test_is_valid_email_invalid_format(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: + """Test is_valid_email method with various invalid email formats.""" + assert not analyzer.is_valid_email("not_an_email") + assert not analyzer.is_valid_email("test@") + assert not analyzer.is_valid_email("@example.com") + assert not analyzer.is_valid_email("test@example") + assert not analyzer.is_valid_email("") + mock_dns_resolve.assert_not_called() + + +def test_is_valid_email_no_mx_records_returned(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None: + """Test is_valid_email returns False if DNS resolve returns no MX records.""" + mock_dns_resolve.return_value = [] # Simulate no MX records found + assert analyzer.is_valid_email("test@no-mx-domain.com") is False + mock_dns_resolve.assert_called_once_with("no-mx-domain.com", "MX")