Skip to content

Commit

Permalink
Implement email ban check, test
Browse files Browse the repository at this point in the history
  • Loading branch information
jdavcs committed Jan 17, 2025
1 parent ad9f1bf commit 9daaa39
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 7 deletions.
56 changes: 49 additions & 7 deletions lib/galaxy/security/validate_user_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

import logging
import re
from typing import Optional
from typing import (
List,
Optional,
)

import dns.resolver
from dns.exception import DNSException
Expand Down Expand Up @@ -73,7 +76,9 @@ def validate_publicname_str(publicname):

def validate_email(trans, email, user=None, check_dup=True, allow_empty=False, validate_domain=False):
"""
Validates the email format, also checks whether the domain is blocklisted in the disposable domains configuration.
Validates the email format.
Checks whether the domain is blocklisted in the disposable domains configuration.
Checks whether the email address is banned.
"""
if (user and user.email == email) or (email == "" and allow_empty):
return ""
Expand All @@ -82,21 +87,26 @@ def validate_email(trans, email, user=None, check_dup=True, allow_empty=False, v
domain = extract_domain(email)
message = validate_email_domain_name(domain)

stmt = select(trans.app.model.User).filter(func.lower(trans.app.model.User.email) == email.lower()).limit(1)
if not message and check_dup and trans.sa_session.scalars(stmt).first():
message = f"User with email '{email}' already exists."
if not message:
if is_email_banned(email, trans.app.config.email_ban_file):
return "This email address has been banned."

if not message and check_dup:
stmt = select(trans.app.model.User).filter(func.lower(trans.app.model.User.email) == email.lower()).limit(1)
if trans.sa_session.scalars(stmt).first():
return f"User with email '{email}' already exists."

if not message:
# If the allowlist is not empty filter out any domain not in the list and ignore blocklist.
if trans.app.config.email_domain_allowlist_content is not None:
domain = extract_domain(email)
if domain not in trans.app.config.email_domain_allowlist_content:
message = "Please enter an allowed domain email address for this server."
return "Please enter an allowed domain email address for this server."
# If the blocklist is not empty filter out the disposable domains.
elif trans.app.config.email_domain_blocklist_content is not None:
domain = extract_domain(email, base_only=True)
if domain in trans.app.config.email_domain_blocklist_content:
message = "Please enter your permanent email address."
return "Please enter your permanent email address."

return message

Expand Down Expand Up @@ -164,3 +174,35 @@ def validate_preferred_object_store_id(
trans, object_store: ObjectStore, preferred_object_store_id: Optional[str]
) -> str:
return object_store.validate_selected_object_store_id(trans.user, preferred_object_store_id) or ""


def is_email_banned(email: str, filepath: Optional[str]) -> bool:
if not filepath:
return False
email = _make_canonical_email(email)
banned_emails = _read_email_ban_list(filepath)
for address in banned_emails:
if email == _make_canonical_email(address):
return True
return False


def _make_canonical_email(email: str) -> str:
"""
Transform to canonical representation:
- lowercase
- gmail: drop periods in local-part
- gmail: drop plus suffixes in local-part
"""
email = email.lower()
localpart, domain = email.split("@")
if domain == "gmail.com":
localpart = localpart.replace(".", "")
if localpart.find("+") > -1:
localpart = localpart[: localpart.index("+")]
return f"{localpart}@{domain}"


def _read_email_ban_list(filepath: str) -> List[str]:
with open(filepath) as f:
return [line.strip() for line in f if not line.startswith("#")]
16 changes: 16 additions & 0 deletions test/unit/data/security/test_validate_user_input.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from galaxy.security import validate_user_input
from galaxy.security.validate_user_input import (
extract_domain,
is_email_banned,
validate_email_domain_name,
validate_email_str,
validate_publicname_str,
Expand Down Expand Up @@ -46,3 +48,17 @@ def test_validate_email_str():
assert validate_email_str('"i-like-to-break-email-valid@tors"@foo.com') != ""
too_long_email = "N" * 255 + "@foo.com"
assert validate_email_str(too_long_email) != ""


def test_is_email_banned(monkeypatch):
mock_ban_list = ["[email protected]", "[email protected]", "[email protected]"]
monkeypatch.setattr(validate_user_input, "_read_email_ban_list", lambda a: mock_ban_list)

assert is_email_banned("[email protected]", "_")
assert is_email_banned("[email protected]", "_")
assert is_email_banned("[email protected]", "_")
assert is_email_banned("[email protected]", "_")
assert is_email_banned("[email protected]", "_")
assert is_email_banned("[email protected]", "_")
assert not is_email_banned("[email protected]", "_")
assert not is_email_banned("[email protected]", "_")

0 comments on commit 9daaa39

Please sign in to comment.