Skip to content

Commit

Permalink
new: Allow user accessible MISP servers
Browse files Browse the repository at this point in the history
Related  #1021
  • Loading branch information
Rafiot committed Dec 23, 2024
1 parent e189d6f commit 416eba5
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 91 deletions.
2 changes: 1 addition & 1 deletion bin/background_build_captures.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __auto_report(self, path: Path) -> None:
# could be an empty file.
settings = json.loads(ar)
try:
self.lookyloo.send_mail(capture_uuid, email=settings.get('email', ''),
self.lookyloo.send_mail(capture_uuid, as_admin=True, email=settings.get('email', ''),
comment=settings.get('comment'))
(path / 'auto_report').unlink()
except Exception as e:
Expand Down
11 changes: 7 additions & 4 deletions lookyloo/lookyloo.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ def modules_filtered(self, capture_uuid: str, /) -> str | None:

return f"Malicious capture according to {len(modules)} module(s): {', '.join(modules)}"

def send_mail(self, capture_uuid: str, /, email: str | None=None, comment: str | None=None) -> bool | dict[str, Any]:
def send_mail(self, capture_uuid: str, /, as_admin: bool, email: str | None=None, comment: str | None=None) -> bool | dict[str, Any]:
'''Send an email notification regarding a specific capture'''
if not get_config('generic', 'enable_mail_notification'):
return {"error": "Unable to send mail: mail notification disabled"}
Expand Down Expand Up @@ -916,7 +916,9 @@ def send_mail(self, capture_uuid: str, /, email: str | None=None, comment: str |
self.logger.info('There are no MISP instances available for a lookup.')
else:
for instance_name in self.misps.keys():
if occurrences := self.get_misp_occurrences(capture_uuid, instance_name=instance_name):
if occurrences := self.get_misp_occurrences(capture_uuid,
as_admin=as_admin,
instance_name=instance_name):
elements, misp_url = occurrences
for event_id, attributes in elements.items():
for value, ts in attributes:
Expand Down Expand Up @@ -1225,7 +1227,8 @@ def misp_export(self, capture_uuid: str, /, with_parent: bool=False, *, as_admin

return [event]

def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: str | None=None) -> tuple[dict[int, set[tuple[str, datetime]]], str] | None:
def get_misp_occurrences(self, capture_uuid: str, /, as_admin: bool,
*, instance_name: str | None=None) -> tuple[dict[int, set[tuple[str, datetime]]], str] | None:
if instance_name is None:
misp = self.misps.default_misp
elif self.misps.get(instance_name) is not None:
Expand All @@ -1244,7 +1247,7 @@ def get_misp_occurrences(self, capture_uuid: str, /, *, instance_name: str | Non
nodes_to_lookup = ct.root_hartree.rendered_node.get_ancestors() + [ct.root_hartree.rendered_node]
to_return: dict[int, set[tuple[str, datetime]]] = defaultdict(set)
for node in nodes_to_lookup:
hits = misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid))
hits = misp.lookup(node, ct.root_hartree.get_host_node_by_uuid(node.hostnode_uuid), as_admin=as_admin)
for event_id, values in hits.items():
if not isinstance(event_id, int) or not isinstance(values, set):
continue
Expand Down
152 changes: 88 additions & 64 deletions lookyloo/modules/misp.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,20 @@ def module_init(self) -> bool:

return True

@property
def has_public_misp(self) -> bool:
return not all(misp.admin_only for misp in self.__misps.values())

def has_lookup(self, as_admin: bool) -> bool:
if as_admin:
return any(misp.enable_lookup for misp in self.__misps.values())
return any(misp.enable_lookup and not misp.admin_only for misp in self.__misps.values())

def has_push(self, as_admin: bool) -> bool:
if as_admin:
return any(misp.enable_push for misp in self.__misps.values())
return any(misp.enable_push and not misp.admin_only for misp in self.__misps.values())

def __getitem__(self, name: str) -> MISP:
return self.__misps[name]

Expand Down Expand Up @@ -200,7 +214,7 @@ def _prepare_push(self, to_push: list[MISPEvent] | MISPEvent, allow_duplicates:
existing_uuid_to_extend = None
for event in events:
if not allow_duplicates:
existing_event = self.get_existing_event(event.attributes[0].value)
existing_event = self.__get_existing_event(event.attributes[0].value)
if existing_event:
existing_uuid_to_extend = existing_event.uuid
continue
Expand All @@ -215,38 +229,44 @@ def _prepare_push(self, to_push: list[MISPEvent] | MISPEvent, allow_duplicates:
events_to_push.append(event)
return events_to_push

def push(self, to_push: list[MISPEvent] | MISPEvent, allow_duplicates: bool=False, auto_publish: bool | None=None) -> list[MISPEvent] | dict[Any, Any]:
def push(self, to_push: list[MISPEvent] | MISPEvent, as_admin: bool, *, allow_duplicates: bool=False,
auto_publish: bool | None=None) -> list[MISPEvent] | dict[Any, Any]:
if not self.available:
return {'error': 'Module not available.'}
if not self.enable_push:
return {'error': 'Push not enabled.'}
if self.admin_only and not as_admin:
return {'error': 'Admin only module, cannot push.'}

if auto_publish is None:
auto_publish = self.auto_publish
if self.available and self.enable_push:
events = self._prepare_push(to_push, allow_duplicates, auto_publish)
if not events:
return {'error': 'All the events are already on the MISP instance.'}
if isinstance(events, dict):
return {'error': events}
to_return = []
for event in events:
try:
# NOTE: POST the event as published publishes inline, which can tak a long time.
# Here, we POST as not published, and trigger the publishing in a second call.
if hasattr(event, 'published'):
background_publish = event.published
else:
background_publish = False
if background_publish:
event.published = False
new_event = self.client.add_event(event, pythonify=True)
if background_publish and isinstance(new_event, MISPEvent):
self.client.publish(new_event)
except requests.exceptions.ReadTimeout:
return {'error': 'The connection to MISP timed out, try increasing the timeout in the config.'}
if isinstance(new_event, MISPEvent):
to_return.append(new_event)

events = self._prepare_push(to_push, allow_duplicates, auto_publish)
if not events:
return {'error': 'All the events are already on the MISP instance.'}
if isinstance(events, dict):
return {'error': events}
to_return = []
for event in events:
try:
# NOTE: POST the event as published publishes inline, which can tak a long time.
# Here, we POST as not published, and trigger the publishing in a second call.
if hasattr(event, 'published'):
background_publish = event.published
else:
return {'error': new_event}
return to_return
else:
return {'error': 'Module not available or push not enabled.'}
background_publish = False
if background_publish:
event.published = False
new_event = self.client.add_event(event, pythonify=True)
if background_publish and isinstance(new_event, MISPEvent):
self.client.publish(new_event)
except requests.exceptions.ReadTimeout:
return {'error': 'The connection to MISP timed out, try increasing the timeout in the config.'}
if isinstance(new_event, MISPEvent):
to_return.append(new_event)
else:
return {'error': new_event}
return to_return

def get_existing_event_url(self, permaurl: str) -> str | None:
attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True)
Expand All @@ -255,7 +275,7 @@ def get_existing_event_url(self, permaurl: str) -> str | None:
url = f'{self.client.root_url}/events/{attributes[0].event_id}'
return url

def get_existing_event(self, permaurl: str) -> MISPEvent | None:
def __get_existing_event(self, permaurl: str) -> MISPEvent | None:
attributes = self.client.search('attributes', value=permaurl, limit=1, page=1, pythonify=True)
if not attributes or not isinstance(attributes, list) or not isinstance(attributes[0], MISPAttribute):
return None
Expand All @@ -264,36 +284,40 @@ def get_existing_event(self, permaurl: str) -> MISPEvent | None:
return event
return None

def lookup(self, node: URLNode, hostnode: HostNode) -> dict[int | str, str | set[tuple[str, datetime]]]:
if self.available and self.enable_lookup:
tld = self.psl.publicsuffix(hostnode.name)
domain = re.sub(f'.{tld}$', '', hostnode.name).split('.')[-1]
to_lookup = [node.name, hostnode.name, f'{domain}.{tld}']
if hasattr(hostnode, 'resolved_ips'):
if 'v4' in hostnode.resolved_ips:
to_lookup += hostnode.resolved_ips['v4']
if 'v6' in hostnode.resolved_ips:
to_lookup += hostnode.resolved_ips['v6']
if hasattr(hostnode, 'cnames'):
to_lookup += hostnode.cnames
if not node.empty_response:
to_lookup.append(node.body_hash)
if attributes := self.client.search(controller='attributes', value=to_lookup,
enforce_warninglist=True, pythonify=True):
if isinstance(attributes, list):
to_return: dict[int, set[tuple[str, datetime]]] = defaultdict(set)
a: MISPAttribute
for a in attributes: # type: ignore[assignment]
if isinstance(a.value, str):
# a.timestamp is always a datetime in this situation
to_return[a.event_id].add((a.value, a.timestamp)) # type: ignore[arg-type]
else:
# This shouldn't happen (?)
self.logger.warning(f'Unexpected value type in MISP lookup: {type(a.value)}')
return to_return # type: ignore[return-value]
else:
# The request returned an error
return attributes # type: ignore[return-value]
return {'info': 'No hits.'}
else:
return {'error': 'Module not available or lookup not enabled.'}
def lookup(self, node: URLNode, hostnode: HostNode, as_admin: bool) -> dict[int | str, str | set[tuple[str, datetime]]]:
if not self.available:
return {'error': 'Module not available.'}
if not self.enable_lookup:
return {'error': 'Lookup not enabled.'}
if self.admin_only and not as_admin:
return {'error': 'Admin only module, cannot lookup.'}

tld = self.psl.publicsuffix(hostnode.name)
domain = re.sub(f'.{tld}$', '', hostnode.name).split('.')[-1]
to_lookup = [node.name, hostnode.name, f'{domain}.{tld}']
if hasattr(hostnode, 'resolved_ips'):
if 'v4' in hostnode.resolved_ips:
to_lookup += hostnode.resolved_ips['v4']
if 'v6' in hostnode.resolved_ips:
to_lookup += hostnode.resolved_ips['v6']
if hasattr(hostnode, 'cnames'):
to_lookup += hostnode.cnames
if not node.empty_response:
to_lookup.append(node.body_hash)
if attributes := self.client.search(controller='attributes', value=to_lookup,
enforce_warninglist=True, pythonify=True):
if isinstance(attributes, list):
to_return: dict[int, set[tuple[str, datetime]]] = defaultdict(set)
a: MISPAttribute
for a in attributes: # type: ignore[assignment]
if isinstance(a.value, str):
# a.timestamp is always a datetime in this situation
to_return[a.event_id].add((a.value, a.timestamp)) # type: ignore[arg-type]
else:
# This shouldn't happen (?)
self.logger.warning(f'Unexpected value type in MISP lookup: {type(a.value)}')
return to_return # type: ignore[return-value]
else:
# The request returned an error
return attributes # type: ignore[return-value]
return {'info': 'No hits.'}
Loading

0 comments on commit 416eba5

Please sign in to comment.