Skip to content

Commit

Permalink
Merge pull request #294 from slincoln-aiq/feature/hashes_fields_trans…
Browse files Browse the repository at this point in the history
…formation

Added new transformation for creating new fields from Hashes field
  • Loading branch information
thomaspatzke authored Oct 21, 2024
2 parents 274c5bf + 7d6bb40 commit f178cb2
Show file tree
Hide file tree
Showing 2 changed files with 273 additions and 0 deletions.
151 changes: 151 additions & 0 deletions sigma/processing/transformations.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from abc import ABC, abstractmethod
from collections import defaultdict
from functools import partial
from sigma.conditions import ConditionOR, SigmaCondition
from typing import (
Any,
ClassVar,
Iterable,
List,
Dict,
Literal,
Optional,
Set,
Tuple,
Union,
Pattern,
Iterator,
Expand Down Expand Up @@ -299,6 +302,153 @@ def apply_value(
"""


@dataclass
class HashesFieldsDetectionItemTransformation(DetectionItemTransformation):
"""
Transforms the 'Hashes' field in Sigma rules by creating separate detection items for each hash type.
This transformation replaces the generic 'Hashes' field with specific fields for each hash algorithm,
optionally prefixing the field names. It supports various hash formats and can auto-detect hash types
based on their length.
Attributes:
valid_hash_algos (List[str]): List of supported hash algorithms.
field_prefix (str): Prefix to add to the new field names.
drop_algo_prefix (bool): If True, omits the algorithm name from the new field name.
hash_lengths (Dict[int, str]): Mapping of hash lengths to their corresponding algorithms.
Example:
Input:
Hashes:
- 'SHA1=5F1CBC3D99558307BC1250D084FA968521482025'
- 'MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7'
Output:
FileSHA1: '5F1CBC3D99558307BC1250D084FA968521482025'
FileMD5: '987B65CD9B9F4E9A1AFD8F8B48CF64A7'
"""

valid_hash_algos: List[str]
field_prefix: str = ""
drop_algo_prefix: bool = False
hash_lengths: ClassVar[Dict[int, str]] = {32: "MD5", 40: "SHA1", 64: "SHA256", 128: "SHA512"}

def apply_detection_item(
self, detection_item: SigmaDetectionItem
) -> Optional[Union[SigmaDetection, SigmaDetectionItem]]:
"""
Applies the transformation to a single detection item.
Args:
detection_item (SigmaDetectionItem): The detection item to transform.
Returns:
Optional[Union[SigmaDetection, SigmaDetectionItem]]: A new SigmaDetection object containing
the transformed detection items, or None if no valid hashes were found.
Raises:
Exception: If no valid hash algorithms were found in the detection item.
"""
algo_dict = self._parse_hash_values(detection_item.value)

if not algo_dict:
raise Exception(
f"No valid hash algo found in Hashes field. Please use one of the following: {', '.join(self.valid_hash_algos)}"
)

return self._create_new_detection_items(algo_dict)

def _parse_hash_values(
self, values: Union[SigmaString, List[SigmaString]]
) -> Dict[str, List[str]]:
"""
Parses the hash values from the detection item.
Args:
values (Union[SigmaString, List[SigmaString]]): The hash values to parse.
Returns:
Dict[str, List[str]]: A dictionary mapping field names to lists of hash values.
"""
algo_dict = defaultdict(list)
if not isinstance(values, list):
values = [values]

for value in values:
hash_algo, hash_value = self._extract_hash_algo_and_value(value.to_plain())
if hash_algo:
field_name = self._get_field_name(hash_algo)
algo_dict[field_name].append(hash_value)

return algo_dict

def _extract_hash_algo_and_value(self, value: str) -> Tuple[str, str]:
"""
Extracts the hash algorithm and value from a string.
Args:
value (str): The string containing the hash algorithm and value.
Returns:
Tuple[str, str]: A tuple containing the hash algorithm and value.
"""
parts = value.split("|") if "|" in value else value.split("=")
if len(parts) == 2:
hash_algo, hash_value = parts
hash_algo = hash_algo.lstrip("*").upper()
else:
hash_value = parts[0]
hash_algo = self._determine_hash_algo_by_length(hash_value)

return (hash_algo, hash_value) if hash_algo in self.valid_hash_algos else ("", hash_value)

def _determine_hash_algo_by_length(self, hash_value: str) -> str:
"""
Determines the hash algorithm based on the length of the hash value.
Args:
hash_value (str): The hash value to analyze.
Returns:
str: The determined hash algorithm, or an empty string if not recognized.
"""
return self.hash_lengths.get(len(hash_value), "")

def _get_field_name(self, hash_algo: str) -> str:
"""
Generates the field name for a given hash algorithm.
Args:
hash_algo (str): The hash algorithm.
Returns:
str: The generated field name.
"""
return f"{self.field_prefix}{'' if self.drop_algo_prefix else hash_algo}"

def _create_new_detection_items(self, algo_dict: Dict[str, List[str]]) -> SigmaDetection:
"""
Creates new detection items based on the parsed hash values.
Args:
algo_dict (Dict[str, List[str]]): A dictionary mapping field names to lists of hash values.
Returns:
SigmaDetection: A new SigmaDetection object containing the created detection items.
"""
return SigmaDetection(
detection_items=[
SigmaDetectionItem(
field=k if k != "keyword" else None,
modifiers=[],
value=[SigmaString(x) for x in v],
)
for k, v in algo_dict.items()
if k
],
item_linking=ConditionOR,
)


class StringValueTransformation(ValueTransformation):
"""
Base class for transformations that operate on SigmaString values.
Expand Down Expand Up @@ -1052,6 +1202,7 @@ def apply(
"field_name_prefix_mapping": FieldPrefixMappingTransformation,
"field_name_transform": FieldFunctionTransformation,
"drop_detection_item": DropDetectionItemTransformation,
"hashes_fields": HashesFieldsDetectionItemTransformation,
"field_name_suffix": AddFieldnameSuffixTransformation,
"field_name_prefix": AddFieldnamePrefixTransformation,
"wildcard_placeholders": WildcardPlaceholderTransformation,
Expand Down
122 changes: 122 additions & 0 deletions tests/test_processing_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
ValueListPlaceholderTransformation,
QueryExpressionPlaceholderTransformation,
ReplaceStringTransformation,
HashesFieldsDetectionItemTransformation,
)
from sigma.processing.pipeline import ProcessingPipeline, ProcessingItem
from sigma.processing.conditions import (
Expand Down Expand Up @@ -1787,3 +1788,124 @@ def class_filter(c):

for cls in inspect.getmembers(transformations_module, class_filter):
assert cls[1] in classes_with_identifiers


@pytest.fixture
def hashes_transformation():
return HashesFieldsDetectionItemTransformation(
valid_hash_algos=["MD5", "SHA1", "SHA256", "SHA512"],
field_prefix="File",
drop_algo_prefix=False,
)


def test_hashes_transformation_single_hash(hashes_transformation):
detection_item = SigmaDetectionItem(
"Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")]
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 1
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]


def test_hashes_transformation_multiple_hashes(hashes_transformation):
detection_item = SigmaDetectionItem(
"Hashes",
[],
[
SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"),
SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7"),
],
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 2
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]
assert result.detection_items[1].field == "FileMD5"
assert result.detection_items[1].value == [SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7")]
assert result.item_linking == ConditionOR


def test_hashes_transformation_drop_algo_prefix():
transformation = HashesFieldsDetectionItemTransformation(
valid_hash_algos=["MD5", "SHA1", "SHA256", "SHA512"],
field_prefix="File",
drop_algo_prefix=True,
)
detection_item = SigmaDetectionItem(
"Hashes", [], [SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025")]
)
result = transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 1
assert result.detection_items[0].field == "File"
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]


def test_hashes_transformation_invalid_hash(hashes_transformation):
detection_item = SigmaDetectionItem("Hashes", [], [SigmaString("INVALID=123456")])
with pytest.raises(Exception, match="No valid hash algo found"):
hashes_transformation.apply_detection_item(detection_item)


def test_hashes_transformation_mixed_valid_invalid(hashes_transformation):
detection_item = SigmaDetectionItem(
"Hashes",
[],
[
SigmaString("SHA1=5F1CBC3D99558307BC1250D084FA968521482025"),
SigmaString("INVALID=123456"),
SigmaString("MD5=987B65CD9B9F4E9A1AFD8F8B48CF64A7"),
],
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 2
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]
assert result.detection_items[1].field == "FileMD5"
assert result.detection_items[1].value == [SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7")]


def test_hashes_transformation_auto_detect_hash_type(hashes_transformation):
detection_item = SigmaDetectionItem(
"Hashes",
[],
[
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025"), # SHA1
SigmaString("987B65CD9B9F4E9A1AFD8F8B48CF64A7"), # MD5
SigmaString("A" * 64), # SHA256
SigmaString("B" * 128), # SHA512
],
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 4
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[1].field == "FileMD5"
assert result.detection_items[2].field == "FileSHA256"
assert result.detection_items[3].field == "FileSHA512"


def test_hashes_transformation_pipe_separator(hashes_transformation):
detection_item = SigmaDetectionItem(
"Hashes", [], [SigmaString("SHA1|5F1CBC3D99558307BC1250D084FA968521482025")]
)
result = hashes_transformation.apply_detection_item(detection_item)
assert isinstance(result, SigmaDetection)
assert len(result.detection_items) == 1
assert result.detection_items[0].field == "FileSHA1"
assert result.detection_items[0].value == [
SigmaString("5F1CBC3D99558307BC1250D084FA968521482025")
]

0 comments on commit f178cb2

Please sign in to comment.