-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
146 lines (134 loc) · 7.5 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import argparse
import json
import pickle
import os
import re
import sys
from typing import List
from datetime import datetime
from pathlib import Path
from concurrent.futures import as_completed, ThreadPoolExecutor
from itertools import zip_longest
# Apache Tika Python Client Library (Downloads Tika Server in Code) - https://github.com/chrismattmann/tika-python
os.environ['PYTHONIOENCODING'] = 'utf8'
from tika import parser
regex_flags = re.MULTILINE | re.DOTALL
# Regex String Sourced from https://github.com/microsoft/presidio/blob/main/presidio-analyzer/presidio_analyzer/predefined_recognizers/us_ssn_recognizer.py
social_security_regex = re.compile(r"\b([0-9]{3})[- .]([0-9]{2})[- .]([0-9]{4})\b", regex_flags)
# Regex String Sourced from https://github.com/microsoft/presidio/blob/main/presidio-analyzer/presidio_analyzer/predefined_recognizers/credit_card_recognizer.py
weak_credit_card_regex = re.compile(r"\b((4\d{3})|(5[0-5]\d{2})|(6\d{3})|(1\d{3})|(3\d{3}))[- ]?(\d{3,4})[- ]?(\d{3,4})[- ]?(\d{3,5})\b", regex_flags)
def luhn_checksum(sanitized_value: str) -> bool:
'''Luhn Checksum checker sourced from https://github.com/microsoft/presidio/blob/main/presidio-analyzer/presidio_analyzer/predefined_recognizers/credit_card_recognizer.py'''
def digits_of(n: str) -> List[int]:
return [int(dig) for dig in str(n)]
digits = digits_of(sanitized_value)
odd_digits = digits[-1::-2]
even_digits = digits[-2::-2]
checksum = sum(odd_digits)
for d in even_digits:
checksum += sum(digits_of(str(d * 2)))
return checksum % 10 == 0
def scan_directory_for_files(scan_dir):
'''Scans through entire directory tree for files'''
return_files_list = []
for root, subdirectories, files_list in os.walk(scan_dir):
if files_list:
if not return_files_list:
return_files_list = [root + '/' + file for file in files_list]
else:
return_files_list.extend([root + '/' + file for file in files_list])
return return_files_list
def scan_files_and_chunk(scan_dir):
file_paths_dict_list = []
file_paths = scan_directory_for_files(scan_dir)
print(f"Found {len(file_paths)} files. Commencing content scan!")
if file_paths:
if len(file_paths) >= 5000:
args = [iter(file_paths)] * 5000
for file_paths_chunk in zip_longest(*args, fillvalue=None):
file_paths_dict_list.append({file_path: {'scanned': "", 'credit_card_found': False, 'social_security_found': False} for file_path in file_paths_chunk if file_path})
else:
file_paths_dict_list.append(
{file_path: {'scanned': "", 'credit_card_found': False, 'social_security_found': False} for file_path in file_paths})
return file_paths_dict_list
def content_scan(file_path):
'''Uses Apache Tika to scan contents of the file'''
parse_status = "Y"
parsed_content = ""
try:
parse_result = parser.from_file(file_path)
# https://cwiki.apache.org/confluence/display/TIKA/TikaServer#TikaServer-TikaServerServices
if parse_result.get('status') != 200:
if parse_result.get('status') == 204: # Parsed but no content found
parse_status = "N"
else: # All other statuses imply inability to parse
parse_status = "NA"
parsed_content = parse_result.get('content')
if not parsed_content:
parse_status = "N"
except Exception as e:
parse_status = "NA"
return parse_status, parsed_content
def pii_threaded_content_scan(file_paths_dict):
flagged_files_report_dict = {}
print(f"Scanning file content...")
with ThreadPoolExecutor() as executor:
file_scan_jobs = {executor.submit(content_scan, file_path): file_path for file_path, _ in file_paths_dict.items()}
try:
for file_scan_job_future in as_completed(file_scan_jobs, 180):
flag_file = False
file_path = file_scan_jobs[file_scan_job_future]
try:
content_scan_status, file_content = file_scan_job_future.result()
except Exception as e:
content_scan_status, file_content = "NA", ""
file_info = file_paths_dict[file_path]
file_info['scanned'] = content_scan_status
if content_scan_status == "Y":
if social_security_regex.search(file_content):
file_info['social_security_found'] = True
flag_file = True
if weak_credit_card_match_obj := weak_credit_card_regex.search(file_content):
possible_cc_number = weak_credit_card_match_obj[0]
if "-" in possible_cc_number:
possible_cc_number = possible_cc_number.replace("-", "")
if " " in possible_cc_number:
possible_cc_number = possible_cc_number.replace(" ", "")
if luhn_checksum(possible_cc_number):
file_info['credit_card_found'] = True
flag_file = True
if flag_file:
flagged_files_report_dict[file_path] = {
'credit_card_found': "Yes" if file_info['credit_card_found'] else "No",
'social_security_found': "Yes" if file_info['social_security_found'] else "No"
}
except Exception as e:
print(f"Skipping scan of {len(flagged_files_report_dict)} files due to error: {e}")
return flagged_files_report_dict
def main_file_scan_interface(scan_dir):
flagged_files_report_dict = {}
file_paths_dict_list = scan_files_and_chunk(scan_dir)
for file_paths_dict in file_paths_dict_list:
flagged_files_report_dict.update(pii_threaded_content_scan(file_paths_dict))
return flagged_files_report_dict
if __name__ == '__main__':
cmd_line_msg = 'Please input a file directory to scan files for any text containing PII (Social Security and Credit Card Numbers)'
args_parser = argparse.ArgumentParser(prog='piifilescan', description=cmd_line_msg)
args_parser.add_argument('scan_directory', type=str, default=None, help='Directory containing files or folders')
args_parser.add_argument('-o', dest='output_directory', type=str, required=False, default=os.getcwd(), help='Where to output the report file. Report file is output to the current directory scanner is run from by default')
config = args_parser.parse_args()
if not config.scan_directory or not os.path.isdir(config.scan_directory):
sys.exit(f'Invalid scan directory: {config.scan_directory}')
if config.output_directory and not os.path.isdir(config.output_directory):
sys.exit(f'Output directory is invalid, please set a proper output directory path!: {config.output_directory}')
print(f"Scanning Directory: {config.scan_directory}")
flagged_files_report_json = main_file_scan_interface(Path(config.scan_directory))
time_finished = datetime.now()
print(f"Finished scan of {config.scan_directory} at {time_finished.isoformat()}!")
if flagged_files_report_json:
output_file_report = Path(config.output_directory) / f"pii_files_found_report_{str(time_finished.strftime('%m_%y_%d__%H_%M_%S'))}.json"
print(f"Found PII in files. Outputting Report in JSON format to: {output_file_report}")
with output_file_report.open('w') as f:
json.dump(flagged_files_report_json, f, indent=4)
else:
print("Found no PII in any files!")