Skip to content

Commit

Permalink
Merge pull request #301 from slincoln-aiq/feature/not_equals_expressions
Browse files Browse the repository at this point in the history
Not Equals (!=) Expressions
  • Loading branch information
thomaspatzke authored Nov 10, 2024
2 parents ebbe618 + 856ab0d commit f5cb2bf
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 5 deletions.
106 changes: 101 additions & 5 deletions sigma/conversion/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import ABC, abstractmethod
from collections import ChainMap, defaultdict
from contextlib import contextmanager
import re

from sigma.correlations import (
Expand Down Expand Up @@ -139,6 +140,9 @@ class Backend(ABC):
# not exists: convert as "not exists-expression" or as dedicated expression
explicit_not_exists_expression: ClassVar[bool] = False

# use not_eq_token, not_eq_expression, etc. to implement != as a separate expression instead of not_token in ConditionNOT
convert_not_as_not_eq: ClassVar[bool] = False

def __init__(
self,
processing_pipeline: Optional[ProcessingPipeline] = None,
Expand Down Expand Up @@ -745,9 +749,15 @@ class variables. If this is not sufficient, the respective methods can be implem
eq_token: ClassVar[Optional[str]] = (
None # Token inserted between field and value (without separator)
)
not_eq_token: ClassVar[Optional[str]] = (
None # Token inserted between field and value (without separator) if using not_eq_expression over not_token
)
eq_expression: ClassVar[str] = (
"{field}{backend.eq_token}{value}" # Expression for field = value
)
not_eq_expression: ClassVar[str] = (
"{field}{backend.not_eq_token}{value}" # Expression for field != value
)

# Query structure
# The generated query can be embedded into further structures. One common example are data
Expand Down Expand Up @@ -812,12 +822,15 @@ class variables. If this is not sufficient, the respective methods can be implem
}
)

# String matching operators. if none is appropriate eq_token is used.
# String matching operators. if none is appropriate eq_token (or not_eq_token) is used.
startswith_expression: ClassVar[Optional[str]] = None
not_startswith_expression: ClassVar[Optional[str]] = None
startswith_expression_allow_special: ClassVar[bool] = False
endswith_expression: ClassVar[Optional[str]] = None
not_endswith_expression: ClassVar[Optional[str]] = None
endswith_expression_allow_special: ClassVar[bool] = False
contains_expression: ClassVar[Optional[str]] = None
not_contains_expression: ClassVar[Optional[str]] = None
contains_expression_allow_special: ClassVar[bool] = False
wildcard_match_expression: ClassVar[Optional[str]] = (
None # Special expression if wildcards can't be matched with the eq_token operator.
Expand All @@ -828,6 +841,7 @@ class variables. If this is not sufficient, the respective methods can be implem
# is one of the flags shortcuts supported by Sigma (currently i, m and s) and refers to the
# token stored in the class variable re_flags.
re_expression: ClassVar[Optional[str]] = None
not_re_expression: ClassVar[Optional[str]] = None
re_escape_char: ClassVar[Optional[str]] = (
None # Character used for escaping in regular expressions
)
Expand All @@ -849,17 +863,21 @@ class variables. If this is not sufficient, the respective methods can be implem
# Case sensitive string matching operators similar to standard string matching. If not provided,
# case_sensitive_match_expression is used.
case_sensitive_startswith_expression: ClassVar[Optional[str]] = None
case_sensitive_not_startswith_expression: ClassVar[Optional[str]] = None
case_sensitive_startswith_expression_allow_special: ClassVar[bool] = False
case_sensitive_endswith_expression: ClassVar[Optional[str]] = None
case_sensitive_not_endswith_expression: ClassVar[Optional[str]] = None
case_sensitive_endswith_expression_allow_special: ClassVar[bool] = False
case_sensitive_contains_expression: ClassVar[Optional[str]] = None
case_sensitive_not_contains_expression: ClassVar[Optional[str]] = None
case_sensitive_contains_expression_allow_special: ClassVar[bool] = False

# CIDR expressions: define CIDR matching if backend has native support. Else pySigma expands
# CIDR values into string wildcard matches.
cidr_expression: ClassVar[Optional[str]] = (
None # CIDR expression query as format string with placeholders {field}, {value} (the whole CIDR value), {network} (network part only), {prefixlen} (length of network mask prefix) and {netmask} (CIDR network mask only)
)
not_cidr_expression: ClassVar[Optional[str]] = None

# Numeric comparison operators
compare_op_expression: ClassVar[Optional[str]] = (
Expand Down Expand Up @@ -1087,6 +1105,58 @@ def __new__(cls, *args, **kwargs):
c.explicit_not_exists_expression = c.field_not_exists_expression is not None
return c

@contextmanager
def not_equals_context_manager(self, use_negated_expressions: bool = False):
"""Context manager to temporarily swap expressions with their negated versions."""
if not use_negated_expressions:
yield
return

# Store original expressions
original_expressions = {
"eq_expression": self.eq_expression,
"re_expression": self.re_expression,
"cidr_expression": self.cidr_expression,
"startswith_expression": self.startswith_expression,
"case_sensitive_startswith_expression": self.case_sensitive_startswith_expression,
"endswith_expression": self.endswith_expression,
"case_sensitive_endswith_expression": self.case_sensitive_endswith_expression,
"contains_expression": self.contains_expression,
"case_sensitive_contains_expression": self.case_sensitive_contains_expression,
}

# Swap to negated versions
try:
self.eq_expression = self.not_eq_expression
self.re_expression = self.not_re_expression
self.cidr_expression = self.not_cidr_expression
self.startswith_expression = self.not_startswith_expression
self.case_sensitive_startswith_expression = (
self.case_sensitive_not_startswith_expression
)
self.endswith_expression = self.not_endswith_expression
self.case_sensitive_endswith_expression = self.case_sensitive_not_endswith_expression
self.contains_expression = self.not_contains_expression
self.case_sensitive_contains_expression = self.case_sensitive_not_contains_expression
yield
finally:
# Restore original expressions
self.eq_expression = original_expressions["eq_expression"]
self.re_expression = original_expressions["re_expression"]
self.cidr_expression = original_expressions["cidr_expression"]
self.startswith_expression = original_expressions["startswith_expression"]
self.case_sensitive_startswith_expression = original_expressions[
"case_sensitive_startswith_expression"
]
self.endswith_expression = original_expressions["endswith_expression"]
self.case_sensitive_endswith_expression = original_expressions[
"case_sensitive_endswith_expression"
]
self.contains_expression = original_expressions["contains_expression"]
self.case_sensitive_contains_expression = original_expressions[
"case_sensitive_contains_expression"
]

def compare_precedence(self, outer: ConditionItem, inner: ConditionItem) -> bool:
"""
Compare precedence of outer and inner condition items. Return True if precedence of
Expand Down Expand Up @@ -1209,17 +1279,22 @@ def convert_condition_not(
arg = cond.args[0]
try:
if arg.__class__ in self.precedence: # group if AND or OR condition is negated
return (
self.not_token + self.token_separator + self.convert_condition_group(arg, state)
)
converted_group = self.convert_condition_group(arg, state)
if self.convert_not_as_not_eq:
return converted_group
else:
return self.not_token + self.token_separator + converted_group
else:
expr = self.convert_condition(arg, state)
if isinstance(
expr, DeferredQueryExpression
): # negate deferred expression and pass it to parent
return expr.negate()
else: # convert negated expression to string
return self.not_token + self.token_separator + expr
if self.convert_not_as_not_eq:
return expr
else:
return self.not_token + self.token_separator + expr
except TypeError: # pragma: no cover
raise NotImplementedError("Operator 'not' not supported by the backend")

Expand Down Expand Up @@ -1313,6 +1388,27 @@ def convert_value_str(self, s: SigmaString, state: ConversionState) -> str:
else:
return converted

def convert_condition_field_eq_val(
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState
) -> Union[str, DeferredQueryExpression]:
"""Uses context manager with parent class method to swap expressions with their negated versions
if convert_not_as_not_eq is set and the parent of the condition is a ConditionNOT."""

# Determine if negation is needed

def is_parent_not(cond):
if cond.parent is None:
return False
if isinstance(cond.parent, ConditionNOT):
return True
return is_parent_not(cond.parent)

negation = is_parent_not(cond) and self.convert_not_as_not_eq

# Use context manager to handle negation
with self.not_equals_context_manager(use_negated_expressions=negation):
return super().convert_condition_field_eq_val(cond, state)

def convert_condition_field_eq_val_str(
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState
) -> Union[str, DeferredQueryExpression]:
Expand Down
175 changes: 175 additions & 0 deletions tests/test_conversion_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2803,3 +2803,178 @@ def test_convert_without_output(test_backend):

assert rule._conversion_result == ['mappedA="value" and \'field A\'="value"']
assert result == []


def test_convert_not_as_not_eq(test_backend, monkeypatch):
"""Test that NOT conditions are converted using not_eq expressions when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_eq_token", "!=")
monkeypatch.setattr(test_backend, "not_eq_expression", "{field}{backend.not_eq_token}{value}")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA: value1
condition: not sel
"""
)
)
== ['mappedA!="value1"']
)


def test_convert_not_startswith(test_backend, monkeypatch):
"""Test negated startswith expression when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_startswith_expression", "{field} not_startswith {value}")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|startswith: "val"
condition: not sel
"""
)
)
== ['mappedA not_startswith "val"']
)


def test_convert_not_contains(test_backend, monkeypatch):
"""Test negated contains expression when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_contains_expression", "{field} not_contains {value}")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|contains: "val"
condition: not sel
"""
)
)
== ['mappedA not_contains "val"']
)


def test_convert_not_re(test_backend, monkeypatch):
"""Test negated regular expression when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_re_expression", "{field}!=/{regex}/")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|re: "val.*"
condition: not sel
"""
)
)
== ["mappedA!=/val.*/"]
)


def test_convert_not_cidr(test_backend, monkeypatch):
"""Test negated CIDR expression when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_cidr_expression", "cidrnotmatch('{field}', \"{value}\")")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|cidr: "192.168.1.0/24"
condition: not sel
"""
)
)
== ["cidrnotmatch('mappedA', \"192.168.1.0/24\")"]
)


def test_convert_not_and_group(test_backend, monkeypatch):
"""Test that NOT with AND group is handled correctly when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_eq_token", "!=")
monkeypatch.setattr(test_backend, "not_eq_expression", "{field}{backend.not_eq_token}{value}")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel1:
fieldA: value1
sel2:
fieldB: value2
condition: not (sel1 and sel2)
"""
)
)
== ['(mappedA!="value1" and mappedB!="value2")']
)


def test_convert_not_or_group(test_backend, monkeypatch):
"""Test that NOT with OR group is handled correctly when convert_not_as_not_eq is True"""
monkeypatch.setattr(test_backend, "convert_not_as_not_eq", True)
monkeypatch.setattr(test_backend, "not_eq_token", "!=")
monkeypatch.setattr(test_backend, "not_eq_expression", "{field}{backend.not_eq_token}{value}")
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel1:
fieldA: value1
sel2:
fieldB: value2
condition: not (sel1 or sel2)
"""
)
)
== ['(mappedA!="value1" or mappedB!="value2")']
)

0 comments on commit f5cb2bf

Please sign in to comment.