Skip to content

chore(aap): ensure ctypes dynamic loading is unpatched #13386

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions ddtrace/appsec/_ddwaf/ddwaf_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ddtrace.appsec._ddwaf.waf_stubs import ddwaf_context_capsule
from ddtrace.appsec._ddwaf.waf_stubs import ddwaf_handle_capsule
from ddtrace.appsec._utils import _observator
from ddtrace.appsec._utils import unpatching_popen
from ddtrace.internal.logger import get_logger
from ddtrace.settings.asm import config as asm_config

Expand All @@ -26,18 +27,17 @@

if system() == "Linux":
try:
asm_config._bypass_instrumentation_for_waf = True
ctypes.CDLL(ctypes.util.find_library("rt"), mode=ctypes.RTLD_GLOBAL)
with unpatching_popen():
ctypes.CDLL(ctypes.util.find_library("rt"), mode=ctypes.RTLD_GLOBAL)
except Exception: # nosec
pass
finally:
asm_config._bypass_instrumentation_for_waf = False

ARCHI = machine().lower()

# 32-bit-Python on 64-bit-Windows

ddwaf = ctypes.CDLL(asm_config._asm_libddwaf)
with unpatching_popen():
ddwaf = ctypes.CDLL(asm_config._asm_libddwaf)
#
# Constants
#
Expand Down
2 changes: 1 addition & 1 deletion ddtrace/appsec/_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def delayed_init(self) -> None:
self.metrics._set_waf_init_metric(self._ddwaf.info, self._ddwaf.initialized)
except Exception:
# Partial of DDAS-0005-00
log.warning("[DDAS-0005-00] WAF initialization failed")
log.warning("[DDAS-0005-00] WAF initialization failed", exc_info=True)

self._update_required()

Expand Down
21 changes: 21 additions & 0 deletions ddtrace/appsec/_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# this module must not load any other unsafe appsec module directly

import collections
import contextlib
import json
import logging
import typing
Expand Down Expand Up @@ -330,3 +331,23 @@ def get_triggers(span) -> Any:
def add_context_log(logger: logging.Logger, msg: str, offset: int = 0) -> str:
filename, line_number, function_name, _stack_info = logger.findCaller(False, 3 + offset)
return f"{msg}[{filename}, line {line_number}, in {function_name}]"


@contextlib.contextmanager
def unpatching_popen():
"""
Context manager to temporarily unpatch `subprocess.Popen` for testing purposes.
This is useful to ensure that the original `Popen` behavior is restored after the context.
"""
import subprocess # nosec B404

from ddtrace.internal._unpatched import unpatched_Popen

original_popen = subprocess.Popen
subprocess.Popen = unpatched_Popen
asm_config._bypass_instrumentation_for_waf = True
try:
yield
finally:
subprocess.Popen = original_popen
asm_config._bypass_instrumentation_for_waf = False
9 changes: 9 additions & 0 deletions ddtrace/internal/_unpatched.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,12 @@
# to get a reference to the right threading module.
import threading as _threading # noqa
import gc as _gc # noqa

import sys

previous_loaded_modules = frozenset(sys.modules.keys())
from subprocess import Popen as unpatched_Popen # noqa # nosec B404

loaded_modules = frozenset(sys.modules.keys())
for module in previous_loaded_modules - loaded_modules:
del sys.modules[module]
Loading