From 32f7c475a58d7431411a8065953e3b46152c0e07 Mon Sep 17 00:00:00 2001 From: Jeremy Hicks Date: Tue, 3 Dec 2024 19:44:28 -0500 Subject: [PATCH 1/2] added docstrings in functions in capture.py and inmem_capture.py --- src/pyshark/capture/capture.py | 234 ++++++++++++++++++++------- src/pyshark/capture/inmem_capture.py | 124 ++++++++++---- 2 files changed, 266 insertions(+), 92 deletions(-) diff --git a/src/pyshark/capture/capture.py b/src/pyshark/capture/capture.py index b6c8b4e3..73030e53 100644 --- a/src/pyshark/capture/capture.py +++ b/src/pyshark/capture/capture.py @@ -14,8 +14,14 @@ from pyshark.tshark.output_parser import tshark_ek from pyshark.tshark.output_parser import tshark_json from pyshark.tshark.output_parser import tshark_xml -from pyshark.tshark.tshark import get_process_path, get_tshark_display_filter_flag, \ - tshark_supports_json, TSharkVersionException, get_tshark_version, tshark_supports_duplicate_keys +from pyshark.tshark.tshark import ( + get_process_path, + get_tshark_display_filter_flag, + tshark_supports_json, + TSharkVersionException, + get_tshark_version, + tshark_supports_duplicate_keys, +) if sys.version_info < (3, 8): @@ -38,20 +44,36 @@ class RawMustUseJsonException(Exception): class StopCapture(Exception): """Exception that the user can throw anywhere in packet-handling to stop the capture process.""" + pass class Capture: """Base class for packet captures.""" + SUMMARIES_BATCH_SIZE = 64 DEFAULT_LOG_LEVEL = logging.CRITICAL SUPPORTED_ENCRYPTION_STANDARDS = ["wep", "wpa-pwk", "wpa-pwd", "wpa-psk"] - def __init__(self, display_filter=None, only_summaries=False, eventloop=None, - decryption_key=None, encryption_type="wpa-pwd", output_file=None, - decode_as=None, disable_protocol=None, tshark_path=None, - override_prefs=None, capture_filter=None, use_json=False, include_raw=False, - use_ek=False, custom_parameters=None, debug=False): + def __init__( + self, + display_filter=None, + only_summaries=False, + eventloop=None, + decryption_key=None, + encryption_type="wpa-pwd", + output_file=None, + decode_as=None, + disable_protocol=None, + tshark_path=None, + override_prefs=None, + capture_filter=None, + use_json=False, + include_raw=False, + use_ek=False, + custom_parameters=None, + debug=False, + ): self.loaded = False self.tshark_path = tshark_path @@ -70,7 +92,8 @@ def __init__(self, display_filter=None, only_summaries=False, eventloop=None, self._decode_as = decode_as self._disable_protocol = disable_protocol self._log = logging.Logger( - self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL) + self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL + ) self._closed = False self._custom_parameters = custom_parameters self._eof_reached = False @@ -79,8 +102,7 @@ def __init__(self, display_filter=None, only_summaries=False, eventloop=None, self.__tshark_version = None if include_raw and not (use_json or use_ek): - raise RawMustUseJsonException( - "use_json/use_ek must be True if include_raw") + raise RawMustUseJsonException("use_json/use_ek must be True if include_raw") if self.debug: self.set_debug() @@ -88,11 +110,16 @@ def __init__(self, display_filter=None, only_summaries=False, eventloop=None, self.eventloop = eventloop if self.eventloop is None: self._setup_eventloop() - if encryption_type and encryption_type.lower() in self.SUPPORTED_ENCRYPTION_STANDARDS: + if ( + encryption_type + and encryption_type.lower() in self.SUPPORTED_ENCRYPTION_STANDARDS + ): self.encryption = (decryption_key, encryption_type.lower()) else: standards = ", ".join(self.SUPPORTED_ENCRYPTION_STANDARDS) - raise UnknownEncyptionStandardException(f"Only the following standards are supported: {standards}.") + raise UnknownEncyptionStandardException( + f"Only the following standards are supported: {standards}." + ) def __getitem__(self, item): """Gets the packet in the given index. @@ -139,12 +166,16 @@ def load_packets(self, packet_count=0, timeout=None): def keep_packet(pkt): self._packets.append(pkt) - if packet_count != 0 and len(self._packets) - initial_packet_amount >= packet_count: + if ( + packet_count != 0 + and len(self._packets) - initial_packet_amount >= packet_count + ): raise StopCapture() try: self.apply_on_packets( - keep_packet, timeout=timeout, packet_count=packet_count) + keep_packet, timeout=timeout, packet_count=packet_count + ) self.loaded = True except asyncTimeoutError: pass @@ -153,8 +184,11 @@ def set_debug(self, set_to=True, log_level=logging.DEBUG): """Sets the capture to debug mode (or turns it off if specified).""" if set_to: handler = logging.StreamHandler(sys.stdout) - handler.setFormatter(logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s")) + handler.setFormatter( + logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + ) self._log.addHandler(handler) self._log.level = log_level self.debug = set_to @@ -176,8 +210,10 @@ def _setup_eventloop(self): # On Python before 3.8, Proactor is not the default eventloop type, so we have to create a new one. # If there was an existing eventloop this can create issues, since we effectively disable it here. if asyncio.all_tasks(): - warnings.warn("The running eventloop has tasks but pyshark must set a new eventloop to continue. " - "Existing tasks may not run.") + warnings.warn( + "The running eventloop has tasks but pyshark must set a new eventloop to continue. " + "Existing tasks may not run." + ) self.eventloop = asyncio.ProactorEventLoop() asyncio.set_event_loop(self.eventloop) else: @@ -190,7 +226,9 @@ def _setup_eventloop(self): asyncio.set_event_loop(self.eventloop) else: raise - if os.name == "posix" and isinstance(threading.current_thread(), threading._MainThread): + if os.name == "posix" and isinstance( + threading.current_thread(), threading._MainThread + ): # The default child watchers (ThreadedChildWatcher) attach_loop method is empty! # While using pyshark with ThreadedChildWatcher, asyncio could raise a ChildProcessError # "Unknown child process pid %d, will report returncode 255" @@ -212,7 +250,8 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None): """ # NOTE: This has code duplication with the async version, think about how to solve this tshark_process = existing_process or self.eventloop.run_until_complete( - self._get_tshark_process()) + self._get_tshark_process() + ) parser = self._setup_tshark_output_parser() packets_captured = 0 @@ -221,8 +260,12 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None): while True: try: packet, data = self.eventloop.run_until_complete( - parser.get_packets_from_stream(tshark_process.stdout, data, - got_first_packet=packets_captured > 0)) + parser.get_packets_from_stream( + tshark_process.stdout, + data, + got_first_packet=packets_captured > 0, + ) + ) except EOFError: self._log.debug("EOF reached (sync)") @@ -237,7 +280,8 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None): finally: if tshark_process in self._running_processes: self.eventloop.run_until_complete( - self._cleanup_subprocess(tshark_process)) + self._cleanup_subprocess(tshark_process) + ) def apply_on_packets(self, callback, timeout=None, packet_count=None): """Runs through all packets and calls the given callback (a function) with each one as it is read. @@ -257,7 +301,9 @@ def print_callback(pkt): coro = asyncio.wait_for(coro, timeout) return self.eventloop.run_until_complete(coro) - async def packets_from_tshark(self, packet_callback, packet_count=None, close_tshark=True): + async def packets_from_tshark( + self, packet_callback, packet_count=None, close_tshark=True + ): """ A coroutine which creates a tshark process, runs the given callback on each packet that is received from it and closes the process when it is done. @@ -266,7 +312,9 @@ async def packets_from_tshark(self, packet_callback, packet_count=None, close_ts """ tshark_process = await self._get_tshark_process(packet_count=packet_count) try: - await self._go_through_packets_from_fd(tshark_process.stdout, packet_callback, packet_count=packet_count) + await self._go_through_packets_from_fd( + tshark_process.stdout, packet_callback, packet_count=packet_count + ) except StopCapture: pass finally: @@ -283,8 +331,9 @@ async def _go_through_packets_from_fd(self, fd, packet_callback, packet_count=No while True: try: - packet, data = await parser.get_packets_from_stream(fd, data, - got_first_packet=packets_captured > 0) + packet, data = await parser.get_packets_from_stream( + fd, data, got_first_packet=packets_captured > 0 + ) except EOFError: self._log.debug("EOF reached") self._eof_reached = True @@ -305,7 +354,9 @@ async def _go_through_packets_from_fd(self, fd, packet_callback, packet_count=No break def _create_stderr_handling_task(self, stderr): - self._stderr_handling_tasks.append(asyncio.ensure_future(self._handle_process_stderr_forever(stderr))) + self._stderr_handling_tasks.append( + asyncio.ensure_future(self._handle_process_stderr_forever(stderr)) + ) async def _handle_process_stderr_forever(self, stderr): while True: @@ -332,7 +383,8 @@ async def _get_tshark_process(self, packet_count=None, stdin=None): if self.use_json or self._use_ek: if not tshark_supports_json(self._get_tshark_version()): raise TSharkVersionException( - "JSON only supported on Wireshark >= 2.2.0") + "JSON only supported on Wireshark >= 2.2.0" + ) if self.use_json: output_type = "json" @@ -342,30 +394,53 @@ async def _get_tshark_process(self, packet_count=None, stdin=None): output_type = "ek" else: output_type = "psml" if self._only_summaries else "pdml" - parameters = [self._get_tshark_path(), "-l", "-n", "-T", output_type] + \ - self.get_parameters(packet_count=packet_count) + output_parameters + parameters = ( + [self._get_tshark_path(), "-l", "-n", "-T", output_type] + + self.get_parameters(packet_count=packet_count) + + output_parameters + ) self._log.debug( - "Creating TShark subprocess with parameters: " + " ".join(parameters)) + "Creating TShark subprocess with parameters: " + " ".join(parameters) + ) self._log.debug("Executable: %s", parameters[0]) - tshark_process = await asyncio.create_subprocess_exec(*parameters, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - stdin=stdin) + tshark_process = await asyncio.create_subprocess_exec( + *parameters, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=stdin + ) self._create_stderr_handling_task(tshark_process.stderr) self._created_new_process(parameters, tshark_process) return tshark_process def _created_new_process(self, parameters, process, process_name="TShark"): - self._log.debug( - process_name + f" subprocess (pid {process.pid}) created") + """Logs the creation of a new process and monitors if it has crashed. + + Args: + parameters (list): Parameters used to create the process. + process (subprocess): Newly created process. + process_name (str, optional): Name of the process that, defaults to "TShark". + + Raises: + TSharkCrashException: Displayed if the process returns a code != 0. + """ + self._log.debug(process_name + f" subprocess (pid {process.pid}) created") if process.returncode is not None and process.returncode != 0: raise TSharkCrashException( - f"{process_name} seems to have crashed. Try updating it. (command ran: '{' '.join(parameters)}')") + f"{process_name} seems to have crashed. Try updating it. (command ran: '{' '.join(parameters)}')" + ) self._running_processes.add(process) async def _cleanup_subprocess(self, process): - """Kill the given process and properly closes any pipes connected to it.""" + """Kill the given process and properly closes any pipes connected to it. + + Args: + process (subprocess): Process to be killed in the event of cleanup. + + Raises: + TSharkCrashException: If the subprocess has failed/crashed or returned a non-zero code. + + Returns: + _type_: _description_ + """ self._log.debug(f"Cleanup Subprocess (pid {process.pid})") if process.returncode is None: try: @@ -373,7 +448,8 @@ async def _cleanup_subprocess(self, process): return await asyncio.wait_for(process.wait(), 1) except asyncTimeoutError: self._log.debug( - "Waiting for process to close failed, may have zombie process.") + "Waiting for process to close failed, may have zombie process." + ) except ProcessLookupError: pass except OSError: @@ -381,16 +457,24 @@ async def _cleanup_subprocess(self, process): raise elif process.returncode > 0: if process.returncode != 1 or self._eof_reached: - raise TSharkCrashException(f"TShark (pid {process.pid}) seems to have crashed (retcode: {process.returncode}).\n" - f"Last error line: {self._last_error_line}\n" - "Try rerunning in debug mode [ capture_obj.set_debug() ] or try updating tshark.") + raise TSharkCrashException( + f"TShark (pid {process.pid}) seems to have crashed (retcode: {process.returncode}).\n" + f"Last error line: {self._last_error_line}\n" + "Try rerunning in debug mode [ capture_obj.set_debug() ] or try updating tshark." + ) def _setup_tshark_output_parser(self): + """Returns a TShark output parser based on the configurations. + + Returns: + TSharkOutputParser: (options): JSON | EK | XML + """ if self.use_json: return tshark_json.TsharkJsonParser(self._get_tshark_version()) if self._use_ek: - ek_field_mapping.MAPPING.load_mapping(str(self._get_tshark_version()), - tshark_path=self.tshark_path) + ek_field_mapping.MAPPING.load_mapping( + str(self._get_tshark_version()), tshark_path=self.tshark_path + ) return tshark_ek.TsharkEkJsonParser() return tshark_xml.TsharkXmlParser(parse_summaries=self._only_summaries) @@ -398,6 +482,9 @@ def close(self): self.eventloop.run_until_complete(self.close_async()) async def close_async(self): + """Close async process after cleaning the subprocess and after stderr handling + has finished. + """ for process in self._running_processes.copy(): await self._cleanup_subprocess(process) self._running_processes.clear() @@ -412,21 +499,40 @@ def __del__(self): if self._running_processes: self.close() - def __enter__(self): return self - async def __aenter__(self): return self - def __exit__(self, exc_type, exc_val, exc_tb): self.close() + def __enter__(self): + return self + + async def __aenter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() - async def __aexit__(self, exc_type, exc_val, - exc_tb): await self.close_async() + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close_async() def get_parameters(self, packet_count=None): - """Returns the special tshark parameters to be used according to the configuration of this class.""" + """Returns the special tshark parameters to be used according to the configuration of this class. + + Args: + packet_count (int, optional): The number of packets to capture, defaults to None. + + Raises: + TypeError: When custom params are of an unsupported type. + + Returns: + list: TShark Params + """ params = [] if self._capture_filter: params += ["-f", self._capture_filter] if self._display_filter: - params += [get_tshark_display_filter_flag(self._get_tshark_version(),), - self._display_filter] + params += [ + get_tshark_display_filter_flag( + self._get_tshark_version(), + ), + self._display_filter, + ] # Raw is only enabled when JSON is also enabled. if self.include_raw: params += ["-x"] @@ -443,11 +549,22 @@ def get_parameters(self, packet_count=None): raise TypeError("Custom parameters type not supported.") if all(self.encryption): - params += ["-o", "wlan.enable_decryption:TRUE", "-o", 'uat:80211_keys:"' + self.encryption[1] + '","' + - self.encryption[0] + '"'] + params += [ + "-o", + "wlan.enable_decryption:TRUE", + "-o", + 'uat:80211_keys:"' + + self.encryption[1] + + '","' + + self.encryption[0] + + '"', + ] if self._override_prefs: for preference_name, preference_value in self._override_prefs.items(): - if all(self.encryption) and preference_name in ("wlan.enable_decryption", "uat:80211_keys"): + if all(self.encryption) and preference_name in ( + "wlan.enable_decryption", + "uat:80211_keys", + ): continue # skip if override preferences also given via --encryption options params += ["-o", f"{preference_name}:{preference_value}"] @@ -456,8 +573,7 @@ def get_parameters(self, packet_count=None): if self._decode_as: for criterion, decode_as_proto in self._decode_as.items(): - params += ["-d", - ",".join([criterion.strip(), decode_as_proto.strip()])] + params += ["-d", ",".join([criterion.strip(), decode_as_proto.strip()])] if self._disable_protocol: params += ["--disable-protocol", self._disable_protocol.strip()] diff --git a/src/pyshark/capture/inmem_capture.py b/src/pyshark/capture/inmem_capture.py index 0fc5d0da..e1fbd5f6 100644 --- a/src/pyshark/capture/inmem_capture.py +++ b/src/pyshark/capture/inmem_capture.py @@ -23,11 +23,25 @@ class LinkTypes(object): class InMemCapture(Capture): - def __init__(self, bpf_filter=None, display_filter=None, only_summaries=False, - decryption_key=None, encryption_type='wpa-pwk', decode_as=None, - disable_protocol=None, tshark_path=None, override_prefs=None, use_json=False, use_ek=False, - linktype=LinkTypes.ETHERNET, include_raw=False, eventloop=None, custom_parameters=None, - debug=False): + def __init__( + self, + bpf_filter=None, + display_filter=None, + only_summaries=False, + decryption_key=None, + encryption_type="wpa-pwk", + decode_as=None, + disable_protocol=None, + tshark_path=None, + override_prefs=None, + use_json=False, + use_ek=False, + linktype=LinkTypes.ETHERNET, + include_raw=False, + eventloop=None, + custom_parameters=None, + debug=False, + ): """Creates a new in-mem capture, a capture capable of receiving binary packets and parsing them using tshark. Significantly faster if packets are added in a batch. @@ -47,13 +61,22 @@ def __init__(self, bpf_filter=None, display_filter=None, only_summaries=False, :param custom_parameters: A dict of custom parameters to pass to tshark, i.e. {"--param": "value"} or else a list of parameters in the format ["--foo", "bar", "--baz", "foo"]. """ - super(InMemCapture, self).__init__(display_filter=display_filter, only_summaries=only_summaries, - decryption_key=decryption_key, encryption_type=encryption_type, - decode_as=decode_as, disable_protocol=disable_protocol, - tshark_path=tshark_path, override_prefs=override_prefs, - use_json=use_json, use_ek=use_ek, - include_raw=include_raw, eventloop=eventloop, - custom_parameters=custom_parameters, debug=debug) + super(InMemCapture, self).__init__( + display_filter=display_filter, + only_summaries=only_summaries, + decryption_key=decryption_key, + encryption_type=encryption_type, + decode_as=decode_as, + disable_protocol=disable_protocol, + tshark_path=tshark_path, + override_prefs=override_prefs, + use_json=use_json, + use_ek=use_ek, + include_raw=include_raw, + eventloop=eventloop, + custom_parameters=custom_parameters, + debug=debug, + ) self.bpf_filter = bpf_filter self._packets_to_write = None self._current_linktype = linktype @@ -61,26 +84,37 @@ def __init__(self, bpf_filter=None, display_filter=None, only_summaries=False, def get_parameters(self, packet_count=None): """Returns the special tshark parameters to be used according to the configuration of this class.""" - params = super(InMemCapture, self).get_parameters( - packet_count=packet_count) - params += ['-i', '-'] + params = super(InMemCapture, self).get_parameters(packet_count=packet_count) + params += ["-i", "-"] return params async def _get_tshark_process(self, packet_count=None): + """Returns a new TShark process for packet capture unless one already exists and then + it returns that current process. + + Args: + packet_count (int, optional): Number of packets to capture, defaults to None. + + Returns: + subprocess: TShark process + """ if self._current_tshark: return self._current_tshark - proc = await super(InMemCapture, self)._get_tshark_process(packet_count=packet_count, stdin=subprocess.PIPE) + proc = await super(InMemCapture, self)._get_tshark_process( + packet_count=packet_count, stdin=subprocess.PIPE + ) self._current_tshark = proc # Create PCAP header - header = struct.pack("IHHIIII", 0xa1b2c3d4, 2, 4, - 0, 0, 0x7fff, self._current_linktype) + header = struct.pack( + "IHHIIII", 0xA1B2C3D4, 2, 4, 0, 0, 0x7FFF, self._current_linktype + ) proc.stdin.write(header) return proc def _get_json_separators(self): - """"Returns the separators between packets in a JSON output + """ "Returns the separators between packets in a JSON output Returns a tuple of (packet_separator, end_of_file_separator, characters_to_disregard). The latter variable being the number of characters to ignore in order to pass the packet (i.e. extra newlines, @@ -89,9 +123,19 @@ def _get_json_separators(self): if self._get_tshark_version() >= version.parse("2.6.7"): return f"{os.linesep} }}".encode(), f"}}{os.linesep}]".encode(), 0 else: - return f'}}{os.linesep}{os.linesep}'.encode(), f"}}{os.linesep}{os.linesep}]", 1 + return ( + f"}}{os.linesep}{os.linesep}".encode(), + f"}}{os.linesep}{os.linesep}]", + 1, + ) def _write_packet(self, packet, sniff_time): + """Writes a packet to the TShark process STDIN for parsing. + + Args: + packet (bytes): Packet data being written. + sniff_time (float | datetime.datetime | None): Packet's sniff time - current time if None. + """ if sniff_time is None: now = time.time() elif isinstance(sniff_time, datetime.datetime): @@ -101,8 +145,9 @@ def _write_packet(self, packet, sniff_time): secs = int(now) usecs = int((now * 1000000) % 1000000) # Write packet header - self._current_tshark.stdin.write(struct.pack( - "IIII", secs, usecs, len(packet), len(packet))) + self._current_tshark.stdin.write( + struct.pack("IIII", secs, usecs, len(packet), len(packet)) + ) self._current_tshark.stdin.write(packet) def parse_packet(self, binary_packet, sniff_time=None, timeout=DEFAULT_TIMEOUT): @@ -124,9 +169,13 @@ def parse_packets(self, binary_packets, sniff_times=None, timeout=DEFAULT_TIMEOU """ if self.eventloop is None: self._setup_eventloop() - return self.eventloop.run_until_complete(self.parse_packets_async(binary_packets, sniff_times, timeout)) + return self.eventloop.run_until_complete( + self.parse_packets_async(binary_packets, sniff_times, timeout) + ) - async def parse_packets_async(self, binary_packets, sniff_times=None, timeout=DEFAULT_TIMEOUT): + async def parse_packets_async( + self, binary_packets, sniff_times=None, timeout=DEFAULT_TIMEOUT + ): """A coroutine which parses binary packets and return a list of parsed packets. DOES NOT CLOSE tshark. It must be closed manually by calling close() when you're done @@ -137,7 +186,9 @@ async def parse_packets_async(self, binary_packets, sniff_times=None, timeout=DE sniff_times = [] if not self._current_tshark: await self._get_tshark_process() - for binary_packet, sniff_time in itertools.zip_longest(binary_packets, sniff_times): + for binary_packet, sniff_time in itertools.zip_longest( + binary_packets, sniff_times + ): self._write_packet(binary_packet, sniff_time) def callback(pkt): @@ -151,18 +202,24 @@ def callback(pkt): async def _get_parsed_packet_from_tshark(self, callback, timeout): await self._current_tshark.stdin.drain() try: - await asyncio.wait_for(self.packets_from_tshark(callback, close_tshark=False), timeout) + await asyncio.wait_for( + self.packets_from_tshark(callback, close_tshark=False), timeout + ) except asyncio.TimeoutError: await self.close_async() - raise asyncio.TimeoutError("Timed out while waiting for tshark to parse packet. " - "Try rerunning with cap.set_debug() to see tshark errors. " - "Closing tshark..") + raise asyncio.TimeoutError( + "Timed out while waiting for tshark to parse packet. " + "Try rerunning with cap.set_debug() to see tshark errors. " + "Closing tshark.." + ) async def close_async(self): self._current_tshark = None await super(InMemCapture, self).close_async() - def feed_packet(self, binary_packet, linktype=LinkTypes.ETHERNET, timeout=DEFAULT_TIMEOUT): + def feed_packet( + self, binary_packet, linktype=LinkTypes.ETHERNET, timeout=DEFAULT_TIMEOUT + ): """ DEPRECATED. Use parse_packet instead. This function adds the packet to the packets list, and also closes and reopens tshark for @@ -177,15 +234,16 @@ def feed_packet(self, binary_packet, linktype=LinkTypes.ETHERNET, timeout=DEFAUL By default, assumes the packet is an ethernet packet. For another link type, supply the linktype argument (most can be found in the class LinkTypes) """ - warnings.warn( - "Deprecated method. Use InMemCapture.parse_packet() instead.") + warnings.warn("Deprecated method. Use InMemCapture.parse_packet() instead.") self._current_linktype = linktype pkt = self.parse_packet(binary_packet, timeout=timeout) self.close() self._packets.append(pkt) return pkt - def feed_packets(self, binary_packets, linktype=LinkTypes.ETHERNET, timeout=DEFAULT_TIMEOUT): + def feed_packets( + self, binary_packets, linktype=LinkTypes.ETHERNET, timeout=DEFAULT_TIMEOUT + ): """Gets a list of binary packets, parses them using tshark and returns their parsed values. Keeps the packets in the internal packet list as well. From c499f60df065dfa7ae967dd5a19578a4a5efc5cc Mon Sep 17 00:00:00 2001 From: Jeremy Hicks Date: Wed, 4 Dec 2024 14:39:44 -0500 Subject: [PATCH 2/2] changed line length from 79 -> 110 --- src/pyshark/capture/capture.py | 62 +++++++--------------------- src/pyshark/capture/inmem_capture.py | 28 ++++--------- 2 files changed, 21 insertions(+), 69 deletions(-) diff --git a/src/pyshark/capture/capture.py b/src/pyshark/capture/capture.py index 73030e53..47436687 100644 --- a/src/pyshark/capture/capture.py +++ b/src/pyshark/capture/capture.py @@ -91,9 +91,7 @@ def __init__( self._running_processes = set() self._decode_as = decode_as self._disable_protocol = disable_protocol - self._log = logging.Logger( - self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL - ) + self._log = logging.Logger(self.__class__.__name__, level=self.DEFAULT_LOG_LEVEL) self._closed = False self._custom_parameters = custom_parameters self._eof_reached = False @@ -110,10 +108,7 @@ def __init__( self.eventloop = eventloop if self.eventloop is None: self._setup_eventloop() - if ( - encryption_type - and encryption_type.lower() in self.SUPPORTED_ENCRYPTION_STANDARDS - ): + if encryption_type and encryption_type.lower() in self.SUPPORTED_ENCRYPTION_STANDARDS: self.encryption = (decryption_key, encryption_type.lower()) else: standards = ", ".join(self.SUPPORTED_ENCRYPTION_STANDARDS) @@ -166,16 +161,11 @@ def load_packets(self, packet_count=0, timeout=None): def keep_packet(pkt): self._packets.append(pkt) - if ( - packet_count != 0 - and len(self._packets) - initial_packet_amount >= packet_count - ): + if packet_count != 0 and len(self._packets) - initial_packet_amount >= packet_count: raise StopCapture() try: - self.apply_on_packets( - keep_packet, timeout=timeout, packet_count=packet_count - ) + self.apply_on_packets(keep_packet, timeout=timeout, packet_count=packet_count) self.loaded = True except asyncTimeoutError: pass @@ -184,11 +174,7 @@ def set_debug(self, set_to=True, log_level=logging.DEBUG): """Sets the capture to debug mode (or turns it off if specified).""" if set_to: handler = logging.StreamHandler(sys.stdout) - handler.setFormatter( - logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s" - ) - ) + handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")) self._log.addHandler(handler) self._log.level = log_level self.debug = set_to @@ -226,9 +212,7 @@ def _setup_eventloop(self): asyncio.set_event_loop(self.eventloop) else: raise - if os.name == "posix" and isinstance( - threading.current_thread(), threading._MainThread - ): + if os.name == "posix" and isinstance(threading.current_thread(), threading._MainThread): # The default child watchers (ThreadedChildWatcher) attach_loop method is empty! # While using pyshark with ThreadedChildWatcher, asyncio could raise a ChildProcessError # "Unknown child process pid %d, will report returncode 255" @@ -249,9 +233,7 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None): :param packet_count: If given, stops after this amount of packets is captured. """ # NOTE: This has code duplication with the async version, think about how to solve this - tshark_process = existing_process or self.eventloop.run_until_complete( - self._get_tshark_process() - ) + tshark_process = existing_process or self.eventloop.run_until_complete(self._get_tshark_process()) parser = self._setup_tshark_output_parser() packets_captured = 0 @@ -279,9 +261,7 @@ def _packets_from_tshark_sync(self, packet_count=None, existing_process=None): break finally: if tshark_process in self._running_processes: - self.eventloop.run_until_complete( - self._cleanup_subprocess(tshark_process) - ) + self.eventloop.run_until_complete(self._cleanup_subprocess(tshark_process)) def apply_on_packets(self, callback, timeout=None, packet_count=None): """Runs through all packets and calls the given callback (a function) with each one as it is read. @@ -301,9 +281,7 @@ def print_callback(pkt): coro = asyncio.wait_for(coro, timeout) return self.eventloop.run_until_complete(coro) - async def packets_from_tshark( - self, packet_callback, packet_count=None, close_tshark=True - ): + async def packets_from_tshark(self, packet_callback, packet_count=None, close_tshark=True): """ A coroutine which creates a tshark process, runs the given callback on each packet that is received from it and closes the process when it is done. @@ -354,9 +332,7 @@ async def _go_through_packets_from_fd(self, fd, packet_callback, packet_count=No break def _create_stderr_handling_task(self, stderr): - self._stderr_handling_tasks.append( - asyncio.ensure_future(self._handle_process_stderr_forever(stderr)) - ) + self._stderr_handling_tasks.append(asyncio.ensure_future(self._handle_process_stderr_forever(stderr))) async def _handle_process_stderr_forever(self, stderr): while True: @@ -382,9 +358,7 @@ async def _get_tshark_process(self, packet_count=None, stdin=None): output_parameters = [] if self.use_json or self._use_ek: if not tshark_supports_json(self._get_tshark_version()): - raise TSharkVersionException( - "JSON only supported on Wireshark >= 2.2.0" - ) + raise TSharkVersionException("JSON only supported on Wireshark >= 2.2.0") if self.use_json: output_type = "json" @@ -400,9 +374,7 @@ async def _get_tshark_process(self, packet_count=None, stdin=None): + output_parameters ) - self._log.debug( - "Creating TShark subprocess with parameters: " + " ".join(parameters) - ) + self._log.debug("Creating TShark subprocess with parameters: " + " ".join(parameters)) self._log.debug("Executable: %s", parameters[0]) tshark_process = await asyncio.create_subprocess_exec( *parameters, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=stdin @@ -447,9 +419,7 @@ async def _cleanup_subprocess(self, process): process.kill() return await asyncio.wait_for(process.wait(), 1) except asyncTimeoutError: - self._log.debug( - "Waiting for process to close failed, may have zombie process." - ) + self._log.debug("Waiting for process to close failed, may have zombie process.") except ProcessLookupError: pass except OSError: @@ -553,11 +523,7 @@ def get_parameters(self, packet_count=None): "-o", "wlan.enable_decryption:TRUE", "-o", - 'uat:80211_keys:"' - + self.encryption[1] - + '","' - + self.encryption[0] - + '"', + 'uat:80211_keys:"' + self.encryption[1] + '","' + self.encryption[0] + '"', ] if self._override_prefs: for preference_name, preference_value in self._override_prefs.items(): diff --git a/src/pyshark/capture/inmem_capture.py b/src/pyshark/capture/inmem_capture.py index e1fbd5f6..24c3e51c 100644 --- a/src/pyshark/capture/inmem_capture.py +++ b/src/pyshark/capture/inmem_capture.py @@ -106,9 +106,7 @@ async def _get_tshark_process(self, packet_count=None): self._current_tshark = proc # Create PCAP header - header = struct.pack( - "IHHIIII", 0xA1B2C3D4, 2, 4, 0, 0, 0x7FFF, self._current_linktype - ) + header = struct.pack("IHHIIII", 0xA1B2C3D4, 2, 4, 0, 0, 0x7FFF, self._current_linktype) proc.stdin.write(header) return proc @@ -145,9 +143,7 @@ def _write_packet(self, packet, sniff_time): secs = int(now) usecs = int((now * 1000000) % 1000000) # Write packet header - self._current_tshark.stdin.write( - struct.pack("IIII", secs, usecs, len(packet), len(packet)) - ) + self._current_tshark.stdin.write(struct.pack("IIII", secs, usecs, len(packet), len(packet))) self._current_tshark.stdin.write(packet) def parse_packet(self, binary_packet, sniff_time=None, timeout=DEFAULT_TIMEOUT): @@ -173,9 +169,7 @@ def parse_packets(self, binary_packets, sniff_times=None, timeout=DEFAULT_TIMEOU self.parse_packets_async(binary_packets, sniff_times, timeout) ) - async def parse_packets_async( - self, binary_packets, sniff_times=None, timeout=DEFAULT_TIMEOUT - ): + async def parse_packets_async(self, binary_packets, sniff_times=None, timeout=DEFAULT_TIMEOUT): """A coroutine which parses binary packets and return a list of parsed packets. DOES NOT CLOSE tshark. It must be closed manually by calling close() when you're done @@ -186,9 +180,7 @@ async def parse_packets_async( sniff_times = [] if not self._current_tshark: await self._get_tshark_process() - for binary_packet, sniff_time in itertools.zip_longest( - binary_packets, sniff_times - ): + for binary_packet, sniff_time in itertools.zip_longest(binary_packets, sniff_times): self._write_packet(binary_packet, sniff_time) def callback(pkt): @@ -202,9 +194,7 @@ def callback(pkt): async def _get_parsed_packet_from_tshark(self, callback, timeout): await self._current_tshark.stdin.drain() try: - await asyncio.wait_for( - self.packets_from_tshark(callback, close_tshark=False), timeout - ) + await asyncio.wait_for(self.packets_from_tshark(callback, close_tshark=False), timeout) except asyncio.TimeoutError: await self.close_async() raise asyncio.TimeoutError( @@ -217,9 +207,7 @@ async def close_async(self): self._current_tshark = None await super(InMemCapture, self).close_async() - def feed_packet( - self, binary_packet, linktype=LinkTypes.ETHERNET, timeout=DEFAULT_TIMEOUT - ): + def feed_packet(self, binary_packet, linktype=LinkTypes.ETHERNET, timeout=DEFAULT_TIMEOUT): """ DEPRECATED. Use parse_packet instead. This function adds the packet to the packets list, and also closes and reopens tshark for @@ -241,9 +229,7 @@ def feed_packet( self._packets.append(pkt) return pkt - def feed_packets( - self, binary_packets, linktype=LinkTypes.ETHERNET, timeout=DEFAULT_TIMEOUT - ): + def feed_packets(self, binary_packets, linktype=LinkTypes.ETHERNET, timeout=DEFAULT_TIMEOUT): """Gets a list of binary packets, parses them using tshark and returns their parsed values. Keeps the packets in the internal packet list as well.