forked from pix0r/pushover
-
Notifications
You must be signed in to change notification settings - Fork 14
/
pushover.py
142 lines (112 loc) · 3.77 KB
/
pushover.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python
from http.client import HTTPSConnection
from urllib.parse import urlencode
import json
import os
class PushoverError(Exception): pass
class PushoverMessage:
"""
Used for storing message specific data.
"""
def __init__(self, message):
"""
Creates a PushoverMessage object.
"""
self.vars = {}
self.vars['message'] = message
def set(self, key, value):
"""
Sets the value of a field "key" to the value of "value".
"""
if value is not None:
self.vars[key] = value
def get(self):
"""
Returns a dictionary with the values for the specified message.
"""
return self.vars
def user(self, user_token, user_device=None):
"""
Sets a single user to be the recipient of this message with token "user_token" and device "user_device".
"""
self.set('user', user_token)
self.set('device', user_device)
def __str__(self):
return "PushoverMessage: " + str(self.vars)
class Pushover:
"""
Creates a Pushover handler.
Usage:
po = Pushover("My App Token")
po.user("My User Token", "My User Device Name")
msg = po.msg("Hello, World!")
po.send(msg)
"""
PUSHOVER_SERVER = "api.pushover.net:443"
PUSHOVER_ENDPOINT = "/1/messages.json"
PUSHOVER_CONTENT_TYPE = { "Content-type": "application/x-www-form-urlencoded"}
def __init__(self, token=None):
"""
Creates a Pushover object.
"""
if token is None:
raise PushoverError("No token supplied.")
else:
self.token = token
self.user_token = None
self.user_device = None
self.messages = []
def msg(self, message):
"""
Creates a PushoverMessage object. Takes one "message" parameter (the message to be sent).
Returns with PushoverMessage object (msg).
"""
message = PushoverMessage(message)
self.messages.append(message)
return message
def send(self, message):
"""
Sends a specified message with id "message" or as object.
"""
if type(message) is PushoverMessage:
return self._send(message)
else:
raise PushoverError("Wrong type passed to Pushover.send()!")
def sendall(self):
"""
Sends all PushoverMessage's owned by the Pushover object.
"""
response = []
for message in self.messages:
response.append(self._send(message))
return response
def user(self, user_token, user_device=None):
"""
Sets a single user to be the recipient of all messages created with this Pushover object.
"""
self.user_token = user_token
self.user_device = user_device
def _send(self, message):
"""
Sends the specified PushoverMessage object via the Pushover API.
"""
kwargs = message.get()
kwargs['token'] = self.token
assert 'message' in kwargs
assert self.token is not None
if not 'user' in kwargs:
if self.user is not None:
kwargs['user'] = self.user_token
if self.user_device is not None:
kwargs['device'] = self.user_device
else:
kwargs['user'] = os.environ['PUSHOVER_USER']
data = urlencode(kwargs)
conn = HTTPSConnection(Pushover.PUSHOVER_SERVER)
conn.request("POST", Pushover.PUSHOVER_ENDPOINT, data, Pushover.PUSHOVER_CONTENT_TYPE)
output = conn.getresponse().read().decode('utf-8')
data = json.loads(output)
if data['status'] != 1:
raise PushoverError(output)
else:
return True