-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathleaksAnalyzer.py
executable file
·184 lines (147 loc) · 5.12 KB
/
leaksAnalyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/python
import sys, getopt
import sqlite3
import requests, json
contacts_file = '' # contact file (fo analyze mode)
leaks_file = '' # plain text leak file (when initializing the DB)
mode = '' # 2 modes possible: "init" or "analyze"
modules = '' # modules to use when in analyze mode (currently only '' or 'adobe')
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def error(err):
print 'leakAnalyzer.py [-h] --mode=<command> [--contacts=<filename>] [--leak=<filename>] [--modules=<list_of_modules>]'
print 'where <command> is one of "init" or "analyze"'
sys.exit(err)
# Initialize a sqlite DB of the leaks
def init(conn, cur):
global leak_file
# Initialize sqlite DB
print bcolors.HEADER + "Initialzing sqlite leaks DB..." + bcolors.ENDC
c.execute('PRAGMA cache_size = -1024000') # 1GB
c.execute('PRAGMA journal_mode = OFF')
c.execute('CREATE TABLE emails (email text, pwd text)')
c.execute('CREATE TABLE hashes (pwd text, use int, hints text)')
# Digest Adobe leak DB for faster lookups
print bcolors.HEADER + "Digesting leak file into sqlite leaks DB..." + bcolors.ENDC
counter = 1
pwd = {}
with open(leaks_file) as f:
for l in f:
# Output status
print "Processing line " + str(counter) + '\r',
counter += 1
# Skip empty lines
if not l.strip():
continue
# We don't need the 4 last characters; They are always equal to "|--\n"
l = l[:-4]
# Digest useful emails info
try:
# Extract info
email = l.split('-|-')[2]
pwd_hash = l.split('-|-')[3]
pwd_hint = l.split('-|-')[4]
except:
continue
# Insert email infos into DB
info = [ email, pwd_hash ]
cur.execute('INSERT INTO emails VALUES (?, ?)', info)
# Keep track of password info for later insertion in DB
if not pwd.get(pwd_hash):
pwd[pwd_hash] = { 'Use' : 1, 'Hints' : [pwd_hint] }
else:
pwd[pwd_hash]['Use'] += 1
if pwd_hint:
pwd[pwd_hash]['Hints'].append(pwd_hint)
# Update hashes table
print ''
print bcolors.HEADER + 'Digesting password informations...' + bcolors.ENDC
for p,info in pwd.items():
row = [ p, info['Use'], ','.join(info['Hints']) ]
cur.execute('INSERT INTO hashes VALUES (?, ?, ?)', row)
# Commit changes
conn.commit()
print bcolors.HEADER + "Done initializing sqlite leak DB..." + bcolors.ENDC
def analyze(cur):
global contacts_file, modules
# Parse contacts list and find out which email address was leaked
print bcolors.HEADER + "Looking up haveibeenpwned.com for leaks for the provided email addresses" + bcolors.ENDC
with open(contacts_file) as f:
for l in f:
# Get email address
email = l.rstrip('\n')
print bcolors.BOLD + "Looking up " + email + "..." + bcolors.ENDC
# Ask haveibeenpwned.com domain
url = 'https://haveibeenpwned.com/api/v2/breachedaccount/' + email + '?truncateResponse=true'
resp = requests.get(url, verify=False)
# Analyze results
leaks= []
if resp.status_code == 200:
# Found a leak?
leaks = json.loads(resp.content)
print bcolors.FAIL + "Found leaks for " + email + " : " + str(leaks) + bcolors.ENDC
# Analyze email addresses against leaks db if requested
if "adobe" in modules:
for leak in leaks:
if leak['Name'] == 'Adobe':
analyze_adobe_leak(email, cur)
def analyze_adobe_leak(email, cur):
# lookup emails in adobe leak
print bcolors.HEADER + "Looking up for additional info in the adobe leak..." + bcolors.ENDC
cur.execute('SELECT pwd FROM emails WHERE email=? LIMIT 1', [email] )
pwd = cur.fetchone()
cur.execute('SELECT use, hints FROM hashes WHERE pwd=? LIMIT 1', pwd )
r = cur.fetchone()
print email + ' uses a password used by ' + str( r[0]-1 ) + ' other user(s).'
if r[1]:
print 'collected password hints are: ' + r[1]
def main(argv):
global contacts_file, leaks_file, mode, modules
# process arguments
try:
opts, args = getopt.getopt(argv, 'h:', ['contacts=','leak_file=', 'mode=', 'modules='])
except getopt.GetoptError:
error(2)
for opt, arg in opts:
# print help
if opt == '-h':
error(1)
# mode
elif opt == '--mode':
mode = arg
# plain text leak file
elif opt == '--leak_file':
leaks_file = arg
# contacts file
elif opt == '--contacts':
contacts_file = arg
# db
elif opt == '--modules':
modules = arg
# Open leaks sqlite DB
conn = sqlite3.connect('./leaks.db')
cur = conn.cursor()
# Run requested mode
if mode == 'init':
if leaks_file:
init(conn, cur)
else:
print 'The "leak_file" argument is mandatory in "init" mode'
error(2)
elif mode == 'analyze':
if contacts_file:
analyze(cur)
else:
print 'The "contacts" argument is mandatory in "analyze" mode'
error(2)
else:
error(2)
if __name__ == "__main__":
main(sys.argv[1:])