-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrealtime-beacons.py
94 lines (68 loc) · 3.57 KB
/
realtime-beacons.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#!flask/bin/python
import requests
from flask import Flask
from flask import request
from config import validator, devices, access_points, secret
# Variables to use with telegram
from config import telegram_chatid, telegram_token, enable_telegram
# Variables to use with webex
from config import webex_chatid, webex_token, enable_webex
def location_analytics(data):
for u in devices:
for i in data['data']['observations']:
if i['clientMac'] == u['MAC']:
u['location_new'] = i['location']
if u['initialiser'] == 0:
u['location'] = u['location_new']
u['initialiser'] = 1
u['location_ap'] = access_points[str(data['data']['apMac'])]
else:
print('Sensor: ' + u['MAC_alias'])
location_x = round(u['location']['x'][0])
location_y = round(u['location']['y'][0])
location_new_x = round(u['location_new']['x'][0])
location_new_y = round(u['location_new']['y'][0])
if location_x == location_new_x and location_y == location_new_y:
print('The Sensor ' + u['MAC_alias'] + 'did not move')
else:
print('The Sensor: ' + u['MAC_alias'] + ' moved')
if enable_telegram:
telegram_bot_sendtext('The Bluetooth Device: ' + u['MAC_alias'] + ' Moved! Current Position: ' \
+ str(u['location_new']) + ' AP of origin was: *' + u['location_ap'] + \
'* Current AP: *' + access_points[str(data['data']['apMac'])])
if enable_webex:
webex_sendmessage('The Bluetooth Device: *' + u['MAC_alias'] + '* Moved! Current Position: ' \
+ str(u['location_new']) + ' AP of origin was: *' + u['location_ap'] + \
'* Current AP: *' + access_points[str(data['data']['apMac'])] + '*')
u['location'] = u['location_new']
u['location_ap'] = access_points[str(data['data']['apMac'])]
def telegram_bot_sendtext(bot_message):
send_text = 'https://api.telegram.org/bot' + telegram_token + '/sendMessage?chat_id=' + telegram_chatid + '&parse_mode=Markdown&text=' + bot_message
response = requests.get(send_text)
return response.json()
def webex_sendmessage(bot_message):
apiUrl = 'https://webexapis.com/v1/messages'
httpHeaders = { 'Content-type': 'application/json', 'Authorization': 'Bearer ' + webex_token }
body = { 'roomId': webex_chatid, 'text': bot_message }
response = requests.post( url = apiUrl, json = body, headers = httpHeaders )
return response.json
app = Flask(__name__)
@app.route('/', methods=['GET'])
def send_validator():
print("validator sent to: ", request.environ['REMOTE_ADDR'])
return validator
@app.route('/', methods=['POST'])
def receive_data():
if not request.json or not 'data' in request.json:
return("Invalid data", 402)
data = request.json
print("Received POST from ", request.environ['REMOTE_ADDR'])
if data['secret'] != secret:
print("secret invalid:", data['secret'])
return("invalid secret", 403)
else:
print("secret verified: ", data['secret'])
location_analytics(data)
return "Location Received"
if __name__ == '__main__':
app.run(debug=False)