Skip to content

Commit

Permalink
Merge pull request #308 from SigmaHQ:fieldref-conversion
Browse files Browse the repository at this point in the history
Integration of fieldref into base backend
  • Loading branch information
thomaspatzke authored Nov 17, 2024
2 parents 626fd2a + 419d6b3 commit 6b99c35
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 18 deletions.
5 changes: 5 additions & 0 deletions sigma/backends/test/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class TextQueryTestBackend(TextQueryBackend):
}

field_equals_field_expression: ClassVar[str] = "{field1}=fieldref({field2})"
field_equals_field_startswith_expression: ClassVar[str] = (
"{field1}=fieldref_startswith({field2})"
)
field_equals_field_endswith_expression: ClassVar[str] = "{field1}=fieldref_endswith({field2})"
field_equals_field_contains_expression: ClassVar[str] = "{field1}=fieldref_contains({field2})"

field_null_expression: ClassVar[str] = "{field} is null"

Expand Down
50 changes: 42 additions & 8 deletions sigma/conversion/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,9 +889,12 @@ class variables. If this is not sufficient, the respective methods can be implem
)

# Expression for comparing two event fields
field_equals_field_expression: ClassVar[Optional[str]] = (
None # Field comparison expression with the placeholders {field1} and {field2} corresponding to left field and right value side of Sigma detection item
)
# Field comparison expression with the placeholders {field1} and {field2} corresponding to left field and right value side of Sigma detection item
field_equals_field_expression: ClassVar[Optional[str]] = None
field_equals_field_startswith_expression: ClassVar[Optional[str]] = None
field_equals_field_endswith_expression: ClassVar[Optional[str]] = None
field_equals_field_contains_expression: ClassVar[Optional[str]] = None

field_equals_field_escaping_quoting: Tuple[bool, bool] = (
True,
True,
Expand Down Expand Up @@ -1650,16 +1653,47 @@ def convert_condition_field_eq_field_escape_and_quote(
)

def convert_condition_field_eq_field(
self, cond: SigmaFieldReference, state: ConversionState
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState
) -> Union[str, DeferredQueryExpression]:
"""Conversion of comparision of two fields."""
field1, field2 = self.convert_condition_field_eq_field_escape_and_quote(
cond.field, cond.value.field
)
return self.field_equals_field_expression.format(
field1=field1,
field2=field2,
)
value = cond.value
if value.starts_with and value.ends_with:
if self.field_equals_field_contains_expression is None:
raise NotImplementedError(
"Field reference contains expression is not supported by backend."
)
return self.field_equals_field_contains_expression.format(
field1=field1,
field2=field2,
)
elif value.starts_with:
if self.field_equals_field_startswith_expression is None:
raise NotImplementedError(
"Field reference startswith expression is not supported by backend."
)
return self.field_equals_field_startswith_expression.format(
field1=field1,
field2=field2,
)
elif value.ends_with:
if self.field_equals_field_endswith_expression is None:
raise NotImplementedError(
"Field reference endswith expression is not supported by backend."
)
return self.field_equals_field_endswith_expression.format(
field1=field1,
field2=field2,
)
else:
if self.field_equals_field_expression is None:
raise NotImplementedError("Field reference expression is not supported by backend.")
return self.field_equals_field_expression.format(
field1=field1,
field2=field2,
)

def convert_condition_field_eq_val_null(
self, cond: ConditionFieldEqualsValueExpression, state: ConversionState
Expand Down
8 changes: 4 additions & 4 deletions sigma/modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ def modify(
val.regexp = val.regexp + ".*"
val.compile()
elif isinstance(val, SigmaFieldReference):
val.wildcard_start = SpecialChars.WILDCARD_MULTI
val.wildcard_end = SpecialChars.WILDCARD_MULTI
val.starts_with = True
val.ends_with = True
return val


Expand All @@ -151,7 +151,7 @@ def modify(
val.regexp = val.regexp + ".*"
val.compile()
elif isinstance(val, SigmaFieldReference):
val.wildcard_end = SpecialChars.WILDCARD_MULTI
val.starts_with = True
return val


Expand All @@ -169,7 +169,7 @@ def modify(
val.regexp = ".*" + val.regexp
val.compile()
elif isinstance(val, SigmaFieldReference):
val.wildcard_start = SpecialChars.WILDCARD_MULTI
val.ends_with = True
return val


Expand Down
2 changes: 1 addition & 1 deletion sigma/processing/transformations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def apply_detection_item(
):
new_values.extend(
(
SigmaFieldReference(mapped_field)
SigmaFieldReference(mapped_field, value.starts_with, value.ends_with)
for mapped_field in self._apply_field_name(value.field)
)
)
Expand Down
4 changes: 2 additions & 2 deletions sigma/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,8 +828,8 @@ class SigmaFieldReference(NoPlainConversionMixin, SigmaType):
"""Type for referencing to other fields for comparison between them."""

field: str
wildcard_start: Union[SpecialChars, None] = None
wildcard_end: Union[SpecialChars, None] = None
starts_with: bool = False
ends_with: bool = False


@dataclass
Expand Down
144 changes: 144 additions & 0 deletions tests/test_conversion_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,27 @@ def test_convert_compare_fields(test_backend):
)


def test_convert_compare_fields_unsupported(test_backend, monkeypatch):
monkeypatch.setattr(test_backend, "field_equals_field_expression", None)
with pytest.raises(NotImplementedError):
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|fieldref: "field B"
field A|fieldref: fieldB
condition: sel
"""
)
)


def test_convert_compare_fields_noquote(test_backend: TextQueryTestBackend):
test_backend.field_equals_field_expression = "`{field1}`=`{field2}`"
test_backend.field_equals_field_escaping_quoting = (False, False)
Expand Down Expand Up @@ -1668,6 +1689,129 @@ def test_convert_compare_fields_differentiation_prefix(test_backend):
)


def test_convert_compare_fields_startswith(test_backend):
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|fieldref|startswith: fieldB
condition: sel
"""
)
)
== ["mappedA=fieldref_startswith(mappedB)"]
)


def test_convert_compare_fields_startswith_unsupported(test_backend, monkeypatch):
monkeypatch.setattr(test_backend, "field_equals_field_startswith_expression", None)
with pytest.raises(NotImplementedError):
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|fieldref|startswith: fieldB
condition: sel
"""
)
)


def test_convert_compare_fields_endswith(test_backend):
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|fieldref|endswith: fieldB
condition: sel
"""
)
)
== ["mappedA=fieldref_endswith(mappedB)"]
)


def test_convert_compare_fields_endswith_unsupported(test_backend, monkeypatch):
monkeypatch.setattr(test_backend, "field_equals_field_endswith_expression", None)
with pytest.raises(NotImplementedError):
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|fieldref|endswith: fieldB
condition: sel
"""
)
)


def test_convert_compare_fields_contains(test_backend):
assert (
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|fieldref|contains: fieldB
condition: sel
"""
)
)
== ["mappedA=fieldref_contains(mappedB)"]
)


def test_convert_compare_fields_contains_unsupported(test_backend, monkeypatch):
monkeypatch.setattr(test_backend, "field_equals_field_contains_expression", None)
with pytest.raises(NotImplementedError):
test_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|fieldref|contains: fieldB
condition: sel
"""
)
)


def test_convert_compare_fields_wrong_type(test_backend):
with pytest.raises(SigmaTypeError):
assert test_backend.convert(
Expand Down
6 changes: 3 additions & 3 deletions tests/test_modifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def test_fieldref_contains(dummy_detection_item):
fieldref = SigmaFieldReferenceModifier(dummy_detection_item, []).modify(SigmaString("field"))
assert (
SigmaContainsModifier(dummy_detection_item, [SigmaFieldReferenceModifier]).modify(fieldref)
) == SigmaFieldReference("field", SpecialChars.WILDCARD_MULTI, SpecialChars.WILDCARD_MULTI)
) == SigmaFieldReference("field", True, True)


def test_fieldref_startswith(dummy_detection_item):
Expand All @@ -430,14 +430,14 @@ def test_fieldref_startswith(dummy_detection_item):
SigmaStartswithModifier(dummy_detection_item, [SigmaFieldReferenceModifier]).modify(
fieldref
)
) == SigmaFieldReference("field", None, SpecialChars.WILDCARD_MULTI)
) == SigmaFieldReference("field", True, False)


def test_fieldref_endswith(dummy_detection_item):
fieldref = SigmaFieldReferenceModifier(dummy_detection_item, []).modify(SigmaString("field"))
assert (
SigmaEndswithModifier(dummy_detection_item, [SigmaFieldReferenceModifier]).modify(fieldref)
) == SigmaFieldReference("field", SpecialChars.WILDCARD_MULTI, None)
) == SigmaFieldReference("field", False, True)


def test_fieldref_wildcard(dummy_detection_item):
Expand Down

0 comments on commit 6b99c35

Please sign in to comment.