-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfirehose.py
108 lines (96 loc) · 3.48 KB
/
firehose.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from analysis.malice_watcher import MaliceWatcher
from collections import defaultdict
from scrapers.search_requests import Requests
from threading import Thread
from time import sleep
class Firehose(Thread):
def __init__(self, query_interval=1, max_tweets=50):
Thread.__init__(self)
self.scraper = Requests()
# queue of tweets to be pushed to the frontend
self.queue = []
# time, in seconds, to wait between querying the scraper
self.query_interval = query_interval
self.max_tweets = max_tweets
# query types, e.g. username, location, ...
self.options = {}
# pairs query types with corresponding scraper functions
self.scraper_funcs = {'username': self.scraper.search_user,
'nearest_location': self.scraper.search_location,
'any_words': self.scraper.search_partial_keywords,
'all_words': self.scraper.search_exact_keywords,
'exact_phrase': self.scraper.search_exact_phrase}
self.mw = MaliceWatcher()
'''
Hash a tweet using its tweet id.
'''
def hash_tweet(self, tweet: dict):
id = tweet['url']
tweet['id'] = id[id.rfind('/')+1:]
return tweet['id']
'''
Classify a tweet as malicious (1) or non-malicious (0).
'''
def malice_tweet(self, tweet: dict):
is_m = 0
print(tweet)
if tweet['content']:
is_m = self.mw.predict(tweet['content'])
tweet['malicious'] = is_m
return tweet
'''
Classify all tweets in a batch as [non]-malicious.
'''
def classify(self, queue: list):
queue = [self.malice_tweet(tweet) for tweet in queue]
return queue
'''
Process incoming parameters for scraping.
'''
def set_options(self, options: dict):
del self.queue[:] # clear queue
self.options.clear()
for option in options.keys():
if options[option] != '':
self.options[option] = options[option]
'''
Get most recent tweets.
'''
def get_tweets(self, num_tweets=10):
#return self.queue[:num_tweets]
tweet = self.queue[-1:]
del self.queue[-1:]
return tweet
'''
Remove duplicate tweets (dictionaries) in a list.
Duplicate tweets share the same id.
'''
def unduplicate(self, ls=list):
ids = [self.hash_tweet(tweet) for tweet in ls]
ids_count = defaultdict(int)
for id in ids:
ids_count[id] += 1
for i, tweet in enumerate(ls):
id = self.hash_tweet(tweet)
if ids_count[id] > 1:
del ls[i]
ids_count[id] -= 1
return ls
'''
Run this thread.
'''
def run(self):
while True:
try:
for option, value in self.options.items():
# call the corresponding scraper function
# passing in the corresponding user-defined parameters
data = self.scraper_funcs[option](value)
self.queue = data + self.queue # append to front of queue
self.queue = self.unduplicate(self.queue)
self.queue = self.classify(self.queue)
print('FETCHED %d TWEETS' % len(self.queue))
del self.queue[self.max_tweets:] # pop old elements
except RuntimeError: # new options (queries) submitted from user
pass
sleep(self.query_interval)