Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added docstrings in functions in capture.py and inmem_capture.py #721

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 143 additions & 61 deletions src/pyshark/capture/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@
from pyshark.tshark.output_parser import tshark_ek
from pyshark.tshark.output_parser import tshark_json
from pyshark.tshark.output_parser import tshark_xml
from pyshark.tshark.tshark import get_process_path, get_tshark_display_filter_flag, \
tshark_supports_json, TSharkVersionException, get_tshark_version, tshark_supports_duplicate_keys
from pyshark.tshark.tshark import (
get_process_path,
get_tshark_display_filter_flag,
tshark_supports_json,
TSharkVersionException,
get_tshark_version,
tshark_supports_duplicate_keys,
)


if sys.version_info < (3, 8):
Expand All @@ -38,20 +44,36 @@ class RawMustUseJsonException(Exception):

class StopCapture(Exception):
"""Exception that the user can throw anywhere in packet-handling to stop the capture process."""

pass


class Capture:
"""Base class for packet captures."""

SUMMARIES_BATCH_SIZE = 64
DEFAULT_LOG_LEVEL = logging.CRITICAL
SUPPORTED_ENCRYPTION_STANDARDS = ["wep", "wpa-pwk", "wpa-pwd", "wpa-psk"]

def __init__(self, display_filter=None, only_summaries=False, eventloop=None,
decryption_key=None, encryption_type="wpa-pwd", output_file=None,
decode_as=None, disable_protocol=None, tshark_path=None,
override_prefs=None, capture_filter=None, use_json=False, include_raw=False,
use_ek=False, custom_parameters=None, debug=False):
def __init__(
self,
display_filter=None,
only_summaries=False,
eventloop=None,
decryption_key=None,
encryption_type="wpa-pwd",
output_file=None,
decode_as=None,
disable_protocol=None,
tshark_path=None,
override_prefs=None,
capture_filter=None,
use_json=False,
include_raw=False,
use_ek=False,
custom_parameters=None,
debug=False,
):

self.loaded = False
self.tshark_path = tshark_path
Expand All @@ -69,8 +91,7 @@ def __init__(self, display_filter=None, only_summaries=False, eventloop=None,
self._running_processes = set()
self._decode_as = decode_as
self._disable_protocol = disable_protocol
self._log = logging.Logger(
self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL)
self._log = logging.Logger(self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL)
self._closed = False
self._custom_parameters = custom_parameters
self._eof_reached = False
Expand All @@ -79,8 +100,7 @@ def __init__(self, display_filter=None, only_summaries=False, eventloop=None,
self.__tshark_version = None

if include_raw and not (use_json or use_ek):
raise RawMustUseJsonException(
"use_json/use_ek must be True if include_raw")
raise RawMustUseJsonException("use_json/use_ek must be True if include_raw")

if self.debug:
self.set_debug()
Expand All @@ -92,7 +112,9 @@ def __init__(self, display_filter=None, only_summaries=False, eventloop=None,
self.encryption = (decryption_key, encryption_type.lower())
else:
standards = ", ".join(self.SUPPORTED_ENCRYPTION_STANDARDS)
raise UnknownEncyptionStandardException(f"Only the following standards are supported: {standards}.")
raise UnknownEncyptionStandardException(
f"Only the following standards are supported: {standards}."
)

def __getitem__(self, item):
"""Gets the packet in the given index.
Expand Down Expand Up @@ -143,8 +165,7 @@ def keep_packet(pkt):
raise StopCapture()

try:
self.apply_on_packets(
keep_packet, timeout=timeout, packet_count=packet_count)
self.apply_on_packets(keep_packet, timeout=timeout, packet_count=packet_count)
self.loaded = True
except asyncTimeoutError:
pass
Expand All @@ -153,8 +174,7 @@ def set_debug(self, set_to=True, log_level=logging.DEBUG):
"""Sets the capture to debug mode (or turns it off if specified)."""
if set_to:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
self._log.addHandler(handler)
self._log.level = log_level
self.debug = set_to
Expand All @@ -176,8 +196,10 @@ def _setup_eventloop(self):
# On Python before 3.8, Proactor is not the default eventloop type, so we have to create a new one.
# If there was an existing eventloop this can create issues, since we effectively disable it here.
if asyncio.all_tasks():
warnings.warn("The running eventloop has tasks but pyshark must set a new eventloop to continue. "
"Existing tasks may not run.")
warnings.warn(
"The running eventloop has tasks but pyshark must set a new eventloop to continue. "
"Existing tasks may not run."
)
self.eventloop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(self.eventloop)
else:
Expand Down Expand Up @@ -211,8 +233,7 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None):
:param packet_count: If given, stops after this amount of packets is captured.
"""
# NOTE: This has code duplication with the async version, think about how to solve this
tshark_process = existing_process or self.eventloop.run_until_complete(
self._get_tshark_process())
tshark_process = existing_process or self.eventloop.run_until_complete(self._get_tshark_process())
parser = self._setup_tshark_output_parser()
packets_captured = 0

Expand All @@ -221,8 +242,12 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None):
while True:
try:
packet, data = self.eventloop.run_until_complete(
parser.get_packets_from_stream(tshark_process.stdout, data,
got_first_packet=packets_captured > 0))
parser.get_packets_from_stream(
tshark_process.stdout,
data,
got_first_packet=packets_captured > 0,
)
)

except EOFError:
self._log.debug("EOF reached (sync)")
Expand All @@ -236,8 +261,7 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None):
break
finally:
if tshark_process in self._running_processes:
self.eventloop.run_until_complete(
self._cleanup_subprocess(tshark_process))
self.eventloop.run_until_complete(self._cleanup_subprocess(tshark_process))

def apply_on_packets(self, callback, timeout=None, packet_count=None):
"""Runs through all packets and calls the given callback (a function) with each one as it is read.
Expand Down Expand Up @@ -266,7 +290,9 @@ async def packets_from_tshark(self, packet_callback, packet_count=None, close_ts
"""
tshark_process = await self._get_tshark_process(packet_count=packet_count)
try:
await self._go_through_packets_from_fd(tshark_process.stdout, packet_callback, packet_count=packet_count)
await self._go_through_packets_from_fd(
tshark_process.stdout, packet_callback, packet_count=packet_count
)
except StopCapture:
pass
finally:
Expand All @@ -283,8 +309,9 @@ async def _go_through_packets_from_fd(self, fd, packet_callback, packet_count=No

while True:
try:
packet, data = await parser.get_packets_from_stream(fd, data,
got_first_packet=packets_captured > 0)
packet, data = await parser.get_packets_from_stream(
fd, data, got_first_packet=packets_captured > 0
)
except EOFError:
self._log.debug("EOF reached")
self._eof_reached = True
Expand Down Expand Up @@ -331,8 +358,7 @@ async def _get_tshark_process(self, packet_count=None, stdin=None):
output_parameters = []
if self.use_json or self._use_ek:
if not tshark_supports_json(self._get_tshark_version()):
raise TSharkVersionException(
"JSON only supported on Wireshark >= 2.2.0")
raise TSharkVersionException("JSON only supported on Wireshark >= 2.2.0")

if self.use_json:
output_type = "json"
Expand All @@ -342,62 +368,93 @@ async def _get_tshark_process(self, packet_count=None, stdin=None):
output_type = "ek"
else:
output_type = "psml" if self._only_summaries else "pdml"
parameters = [self._get_tshark_path(), "-l", "-n", "-T", output_type] + \
self.get_parameters(packet_count=packet_count) + output_parameters
parameters = (
[self._get_tshark_path(), "-l", "-n", "-T", output_type]
+ self.get_parameters(packet_count=packet_count)
+ output_parameters
)

self._log.debug(
"Creating TShark subprocess with parameters: " + " ".join(parameters))
self._log.debug("Creating TShark subprocess with parameters: " + " ".join(parameters))
self._log.debug("Executable: %s", parameters[0])
tshark_process = await asyncio.create_subprocess_exec(*parameters,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=stdin)
tshark_process = await asyncio.create_subprocess_exec(
*parameters, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=stdin
)
self._create_stderr_handling_task(tshark_process.stderr)
self._created_new_process(parameters, tshark_process)
return tshark_process

def _created_new_process(self, parameters, process, process_name="TShark"):
self._log.debug(
process_name + f" subprocess (pid {process.pid}) created")
"""Logs the creation of a new process and monitors if it has crashed.

Args:
parameters (list): Parameters used to create the process.
process (subprocess): Newly created process.
process_name (str, optional): Name of the process that, defaults to "TShark".

Raises:
TSharkCrashException: Displayed if the process returns a code != 0.
"""
self._log.debug(process_name + f" subprocess (pid {process.pid}) created")
if process.returncode is not None and process.returncode != 0:
raise TSharkCrashException(
f"{process_name} seems to have crashed. Try updating it. (command ran: '{' '.join(parameters)}')")
f"{process_name} seems to have crashed. Try updating it. (command ran: '{' '.join(parameters)}')"
)
self._running_processes.add(process)

async def _cleanup_subprocess(self, process):
"""Kill the given process and properly closes any pipes connected to it."""
"""Kill the given process and properly closes any pipes connected to it.

Args:
process (subprocess): Process to be killed in the event of cleanup.

Raises:
TSharkCrashException: If the subprocess has failed/crashed or returned a non-zero code.

Returns:
_type_: _description_
"""
self._log.debug(f"Cleanup Subprocess (pid {process.pid})")
if process.returncode is None:
try:
process.kill()
return await asyncio.wait_for(process.wait(), 1)
except asyncTimeoutError:
self._log.debug(
"Waiting for process to close failed, may have zombie process.")
self._log.debug("Waiting for process to close failed, may have zombie process.")
except ProcessLookupError:
pass
except OSError:
if os.name != "nt":
raise
elif process.returncode > 0:
if process.returncode != 1 or self._eof_reached:
raise TSharkCrashException(f"TShark (pid {process.pid}) seems to have crashed (retcode: {process.returncode}).\n"
f"Last error line: {self._last_error_line}\n"
"Try rerunning in debug mode [ capture_obj.set_debug() ] or try updating tshark.")
raise TSharkCrashException(
f"TShark (pid {process.pid}) seems to have crashed (retcode: {process.returncode}).\n"
f"Last error line: {self._last_error_line}\n"
"Try rerunning in debug mode [ capture_obj.set_debug() ] or try updating tshark."
)

def _setup_tshark_output_parser(self):
"""Returns a TShark output parser based on the configurations.

Returns:
TSharkOutputParser: (options): JSON | EK | XML
"""
if self.use_json:
return tshark_json.TsharkJsonParser(self._get_tshark_version())
if self._use_ek:
ek_field_mapping.MAPPING.load_mapping(str(self._get_tshark_version()),
tshark_path=self.tshark_path)
ek_field_mapping.MAPPING.load_mapping(
str(self._get_tshark_version()), tshark_path=self.tshark_path
)
return tshark_ek.TsharkEkJsonParser()
return tshark_xml.TsharkXmlParser(parse_summaries=self._only_summaries)

def close(self):
self.eventloop.run_until_complete(self.close_async())

async def close_async(self):
"""Close async process after cleaning the subprocess and after stderr handling
has finished.
"""
for process in self._running_processes.copy():
await self._cleanup_subprocess(process)
self._running_processes.clear()
Expand All @@ -412,21 +469,40 @@ def __del__(self):
if self._running_processes:
self.close()

def __enter__(self): return self
async def __aenter__(self): return self
def __exit__(self, exc_type, exc_val, exc_tb): self.close()
def __enter__(self):
return self

async def __aenter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

async def __aexit__(self, exc_type, exc_val,
exc_tb): await self.close_async()
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close_async()

def get_parameters(self, packet_count=None):
"""Returns the special tshark parameters to be used according to the configuration of this class."""
"""Returns the special tshark parameters to be used according to the configuration of this class.

Args:
packet_count (int, optional): The number of packets to capture, defaults to None.

Raises:
TypeError: When custom params are of an unsupported type.

Returns:
list: TShark Params
"""
params = []
if self._capture_filter:
params += ["-f", self._capture_filter]
if self._display_filter:
params += [get_tshark_display_filter_flag(self._get_tshark_version(),),
self._display_filter]
params += [
get_tshark_display_filter_flag(
self._get_tshark_version(),
),
self._display_filter,
]
# Raw is only enabled when JSON is also enabled.
if self.include_raw:
params += ["-x"]
Expand All @@ -443,11 +519,18 @@ def get_parameters(self, packet_count=None):
raise TypeError("Custom parameters type not supported.")

if all(self.encryption):
params += ["-o", "wlan.enable_decryption:TRUE", "-o", 'uat:80211_keys:"' + self.encryption[1] + '","' +
self.encryption[0] + '"']
params += [
"-o",
"wlan.enable_decryption:TRUE",
"-o",
'uat:80211_keys:"' + self.encryption[1] + '","' + self.encryption[0] + '"',
]
if self._override_prefs:
for preference_name, preference_value in self._override_prefs.items():
if all(self.encryption) and preference_name in ("wlan.enable_decryption", "uat:80211_keys"):
if all(self.encryption) and preference_name in (
"wlan.enable_decryption",
"uat:80211_keys",
):
continue # skip if override preferences also given via --encryption options
params += ["-o", f"{preference_name}:{preference_value}"]

Expand All @@ -456,8 +539,7 @@ def get_parameters(self, packet_count=None):

if self._decode_as:
for criterion, decode_as_proto in self._decode_as.items():
params += ["-d",
",".join([criterion.strip(), decode_as_proto.strip()])]
params += ["-d", ",".join([criterion.strip(), decode_as_proto.strip()])]

if self._disable_protocol:
params += ["--disable-protocol", self._disable_protocol.strip()]
Expand Down
Loading