Skip to content

feat(heuristics): add Fake Email analyzer to validate maintainer email domain #1106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dependencies = [
"problog >= 2.2.6,<3.0.0",
"cryptography >=44.0.0,<45.0.0",
"semgrep == 1.113.0",
"dnspython >=2.7.0,<3.0.0",
]
keywords = []
# https://pypi.org/classifiers/
Expand Down
3 changes: 3 additions & 0 deletions src/macaron/malware_analyzer/pypi_heuristics/heuristics.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class Heuristics(str, Enum):
#: Indicates that the package source code contains suspicious code patterns.
SUSPICIOUS_PATTERNS = "suspicious_patterns"

#: Indicates that the package maintainer's email address is suspicious or invalid.
FAKE_EMAIL = "fake_email"


class HeuristicResult(str, Enum):
"""Result type indicating the outcome of a heuristic."""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""The heuristic analyzer to check the email address of the package maintainers."""

import logging
import re

import dns.resolver as dns_resolver

from macaron.errors import HeuristicAnalyzerValueError
from macaron.json_tools import JsonType
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset

logger: logging.Logger = logging.getLogger(__name__)


class FakeEmailAnalyzer(BaseHeuristicAnalyzer):
"""Analyze the email address of the package maintainers."""

def __init__(self) -> None:
super().__init__(
name="fake_email_analyzer",
heuristic=Heuristics.FAKE_EMAIL,
depends_on=None,
)

def is_valid_email(self, email: str) -> bool:
"""Check if the email format is valid and the domain has MX records.

Parameters
----------
email: str
The email address to check.

Returns
-------
bool:
``True`` if the email address is valid, ``False`` otherwise.

Raises
------
HeuristicAnalyzerValueError
if the failure is due to DNS resolution.
"""
if not re.match(r"[^@]+@[^@]+\.[^@]+", email):
return False

domain = email.split("@")[1]
try:
records = dns_resolver.resolve(domain, "MX")
if not records:
return False
except Exception as err:
err_message = f"Failed to resolve domain {domain}: {err}"
raise HeuristicAnalyzerValueError(err_message) from err
return True

def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]:
"""Analyze the package.

Parameters
----------
pypi_package_json: PyPIPackageJsonAsset
The PyPI package JSON asset object.

Returns
-------
tuple[HeuristicResult, dict[str, JsonType]]:
The result and related information collected during the analysis.

Raises
------
HeuristicAnalyzerValueError
if the analysis fails.
"""
response = pypi_package_json.download("")
if not response:
error_message = "Failed to download package JSON "
return HeuristicResult.FAIL, {"message": error_message}

data = pypi_package_json.package_json
author_email = data.get("info", {}).get("author_email", None)
maintainer_email = data.get("info", {}).get("maintainer_email", None)
if maintainer_email is None and author_email is None:
message = "No maintainers are available"
return HeuristicResult.SKIP, {"message": message}

if author_email is not None and not self.is_valid_email(author_email):
return HeuristicResult.FAIL, {"email": author_email}
if maintainer_email is not None and not self.is_valid_email(maintainer_email):
return HeuristicResult.FAIL, {"email": maintainer_email}

return HeuristicResult.PASS, {}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from macaron.malware_analyzer.pypi_heuristics.metadata.anomalous_version import AnomalousVersionAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.closer_release_join_date import CloserReleaseJoinDateAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.empty_project_link import EmptyProjectLinkAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.high_release_frequency import HighReleaseFrequencyAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.one_release import OneReleaseAnalyzer
from macaron.malware_analyzer.pypi_heuristics.metadata.source_code_repo import SourceCodeRepoAnalyzer
Expand Down Expand Up @@ -357,6 +358,7 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
WheelAbsenceAnalyzer,
AnomalousVersionAnalyzer,
TyposquattingPresenceAnalyzer,
FakeEmailAnalyzer,
]

# name used to query the result of all problog rules, so it can be accessed outside the model.
Expand Down Expand Up @@ -424,13 +426,18 @@ def run_check(self, ctx: AnalyzeContext) -> CheckResultData:
failed({Heuristics.ONE_RELEASE.value}),
failed({Heuristics.ANOMALOUS_VERSION.value}).

% Package released recently with the a maintainer email address that is not valid.
{Confidence.MEDIUM.value}::trigger(malware_medium_confidence_3) :-
quickUndetailed,
failed({Heuristics.FAKE_EMAIL.value}).
% ----- Evaluation -----

% Aggregate result
{problog_result_access} :- trigger(malware_high_confidence_1).
{problog_result_access} :- trigger(malware_high_confidence_2).
{problog_result_access} :- trigger(malware_high_confidence_3).
{problog_result_access} :- trigger(malware_high_confidence_4).
{problog_result_access} :- trigger(malware_medium_confidence_3).
{problog_result_access} :- trigger(malware_medium_confidence_2).
{problog_result_access} :- trigger(malware_medium_confidence_1).
query({problog_result_access}).
Expand Down
166 changes: 166 additions & 0 deletions tests/malware_analyzer/pypi/test_fake_email.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/.

"""Tests for the FakeEmailAnalyzer heuristic."""


from collections.abc import Generator
from unittest.mock import MagicMock, patch

import pytest

from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult
from macaron.malware_analyzer.pypi_heuristics.metadata.fake_email import FakeEmailAnalyzer
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset


@pytest.fixture(name="analyzer")
def analyzer_fixture() -> FakeEmailAnalyzer:
"""Pytest fixture to create a FakeEmailAnalyzer instance."""
return FakeEmailAnalyzer()


@pytest.fixture(name="pypi_package_json_asset_mock")
def pypi_package_json_asset_mock_fixture() -> MagicMock:
"""Pytest fixture for a mock PyPIPackageJsonAsset."""
mock_asset = MagicMock(spec=PyPIPackageJsonAsset)
# Default to successful download, tests can override
mock_asset.download = MagicMock(return_value=True)
# package_json should be set by each test to simulate different PyPI responses
mock_asset.package_json = {}
return mock_asset


@pytest.fixture(name="mock_dns_resolve")
def mock_dns_resolve_fixture() -> Generator[MagicMock]:
"""General purpose mock for dns.resolver.resolve.

Patches where dns_resolver is imported in the module under test.
"""
with patch("macaron.malware_analyzer.pypi_heuristics.metadata.fake_email.dns_resolver.resolve") as mock_resolve:
# Default behavior: simulate successful MX record lookup.
mock_mx_record = MagicMock()
mock_mx_record.exchange = "mail.default-domain.com"
mock_resolve.return_value = [mock_mx_record]
yield mock_resolve


# Tests for the analyze method
def test_analyze_download_failure(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
"""Test the analyzer fails if downloading package JSON fails."""
pypi_package_json_asset_mock.download.return_value = False
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.FAIL
assert "message" in info
assert isinstance(info["message"], str)
assert "Failed to download package JSON" in info["message"]
pypi_package_json_asset_mock.download.assert_called_once_with("")


def test_analyze_skip_no_emails_present(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
"""Test the analyzer skips if no author_email or maintainer_email is present."""
pypi_package_json_asset_mock.package_json = {"info": {"author_email": None, "maintainer_email": None}}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.SKIP
assert info["message"] == "No maintainers are available"


def test_analyze_skip_no_info_key(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
"""Test the analyzer skips if 'info' key is missing in PyPI data."""
pypi_package_json_asset_mock.package_json = {} # No 'info' key
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.SKIP
assert info["message"] == "No maintainers are available"


def test_analyze_fail_empty_author_email(analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock) -> None:
"""Test analyzer fails for empty author_email string (maintainer_email is None)."""
pypi_package_json_asset_mock.package_json = {"info": {"author_email": "", "maintainer_email": None}}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.FAIL
assert info["email"] == ""


def test_analyze_pass_only_maintainer_email_valid(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock
) -> None:
"""Test analyzer passes when only maintainer_email is present and valid."""
mock_mx_record = MagicMock()
mock_mx_record.exchange = "mail.example.net"
mock_dns_resolve.return_value = [mock_mx_record]

pypi_package_json_asset_mock.package_json = {
"info": {"author_email": None, "maintainer_email": "[email protected]"}
}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.PASS
assert info == {}
mock_dns_resolve.assert_called_once_with("example.net", "MX")


def test_analyze_pass_both_emails_valid(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock
) -> None:
"""Test the analyzer passes when both emails are present and valid."""

def side_effect_dns_resolve(domain: str, record_type: str = "MX") -> list[MagicMock]:
mock_mx = MagicMock()
domains = {
"MX": {"example.com", "example.net"},
}
if domain not in domains.get(record_type, set()):
pytest.fail(f"Unexpected domain for DNS resolve: {domain}")
mock_mx.exchange = f"mail.{domain}"
return [mock_mx]

mock_dns_resolve.side_effect = side_effect_dns_resolve

pypi_package_json_asset_mock.package_json = {
"info": {"author_email": "[email protected]", "maintainer_email": "[email protected]"}
}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.PASS
assert info == {}
assert mock_dns_resolve.call_count == 2
mock_dns_resolve.assert_any_call("example.com", "MX")
mock_dns_resolve.assert_any_call("example.net", "MX")


def test_analyze_fail_author_email_invalid_format(
analyzer: FakeEmailAnalyzer, pypi_package_json_asset_mock: MagicMock, mock_dns_resolve: MagicMock
) -> None:
"""Test analyzer fails when author_email has an invalid format."""
pypi_package_json_asset_mock.package_json = {
"info": {"author_email": "bad_email_format", "maintainer_email": "[email protected]"}
}
result, info = analyzer.analyze(pypi_package_json_asset_mock)
assert result == HeuristicResult.FAIL
assert info["email"] == "bad_email_format"
mock_dns_resolve.assert_not_called() # Regex check fails before DNS lookup


# Tests for the is_valid_email method
def test_is_valid_email_valid_email_with_mx(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None:
"""Test is_valid_email returns True for a valid email with MX records."""
mock_mx_record = MagicMock()
mock_mx_record.exchange = "mail.example.com"
mock_dns_resolve.return_value = [mock_mx_record]
assert analyzer.is_valid_email("[email protected]") is True
mock_dns_resolve.assert_called_once_with("example.com", "MX")


def test_is_valid_email_invalid_format(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None:
"""Test is_valid_email method with various invalid email formats."""
assert not analyzer.is_valid_email("not_an_email")
assert not analyzer.is_valid_email("test@")
assert not analyzer.is_valid_email("@example.com")
assert not analyzer.is_valid_email("test@example")
assert not analyzer.is_valid_email("")
mock_dns_resolve.assert_not_called()


def test_is_valid_email_no_mx_records_returned(analyzer: FakeEmailAnalyzer, mock_dns_resolve: MagicMock) -> None:
"""Test is_valid_email returns False if DNS resolve returns no MX records."""
mock_dns_resolve.return_value = [] # Simulate no MX records found
assert analyzer.is_valid_email("[email protected]") is False
mock_dns_resolve.assert_called_once_with("no-mx-domain.com", "MX")
Loading