-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathwechat.py
154 lines (134 loc) · 5.21 KB
/
wechat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import time
import json
import hashlib
import threading
from os import getenv
from urllib import parse
import xml.etree.ElementTree as ET
from chatGPT import ChatGPT
from util import getLogger, head, make_request
logger = getLogger("wechat")
class WXRequest:
__BASE_URL = "https://api.weixin.qq.com/cgi-bin"
__access_token = None
__token_startTime = 0
__expire = 2 * 60 * 60
def __new__(cls):
if not hasattr(cls, "_instance"):
cls._instance = super(WXRequest, cls).__new__(cls)
return cls._instance
def __init__(self, appid=getenv("Appid"), secret=getenv("Secret")) -> None:
try:
self.__checkKey(appid, secret)
self.appid = appid
self.secret = secret
except ValueError as e:
logger.error(e)
# `Customer Service API - Send Message` permission is required
def sendCustomMessage(self, user, content):
# max content sizes is 2048 chars
url = f"{self.__BASE_URL}/message/custom/send?access_token={self.__token}"
data = {
"touser": user,
"msgtype": "text",
"text": {"content": content},
}
data = json.dumps(data, ensure_ascii=False).encode()
make_request(url, method="POST", data=data)
@property
def __token(self):
__token_endTime = time.time()
if __token_endTime - self.__token_startTime > self.__expire:
self.__access_token = None
if self.__access_token == None:
self.__token_startTime = time.time()
query = {
"grant_type": "client_credential",
"appid": self.appid,
"secret": self.secret,
}
url = f"{self.__BASE_URL}/token?{parse.urlencode(query)}"
data = make_request(url, method="GET")
self.__access_token = data.get("access_token")
return self.__access_token
@staticmethod
def __checkKey(appid, secret):
if not (appid and secret):
raise ValueError("WeixinAuthSign: Invalid key")
class Bot:
__answer = {}
def __init__(self, salt=getenv("Salt")) -> None:
self.salt = salt
@classmethod
def sendChatGPTMessageByApi(cls, input, user):
cls.__answer[user] = "loading"
wxReq = WXRequest()
try:
chat_gpt = ChatGPT()
result = chat_gpt.sendMessage(input)
# `custom message` max content sizes is 2048 chars
# one Chinese take up about 3 chars
split_count = 600
result_count = len(result)
if result_count < split_count:
wxReq.sendCustomMessage(user, result)
else:
split_result = [
result[i : i + split_count]
for i in range(0, result_count, split_count)
]
for item in split_result:
wxReq.sendCustomMessage(user, item)
except ValueError as e:
logger.error(e)
wxReq.sendCustomMessage(user, str(e))
except Exception as e:
logger.error(e)
wxReq.sendCustomMessage(user, "粗错了,再来一次!")
finally:
cls.__answer.pop(user, -1)
@classmethod
def receive(cls, data):
root = ET.fromstring(data)
info_from_user = {}
for child in root:
info_from_user[child.tag] = child.text
MsgType = info_from_user.get("MsgType")
FromUserName = info_from_user.get("FromUserName")
ToUserName = info_from_user.get("ToUserName")
Content = info_from_user.get("Content")
if MsgType == "text" and Content:
if cls.__answer.get(FromUserName) == None:
# must respone to wechat server in 5 seconds
# https://developers.weixin.qq.com/doc/offiaccount/Message_Management/Receiving_standard_messages.html
task = lambda: cls.sendChatGPTMessageByApi(Content, FromUserName)
chatThread = threading.Thread(target=task, name="chatThread")
chatThread.start()
else:
return cls.gen_response(FromUserName, ToUserName, "不要捉急,慢慢来...")
# return either "sucess" or ""
return cls.gen_response(FromUserName, ToUserName, "正在组织语言(10s+)...")
return "success"
@staticmethod
def gen_response(from_user, to_user, content):
return (
"<xml>"
f"<ToUserName><![CDATA[{from_user}]]>"
f"</ToUserName><FromUserName><![CDATA[{to_user}]]>"
"</FromUserName><CreateTime>{int(time.time())}</CreateTime>"
"<MsgType><![CDATA[text]]></MsgType>"
f"<Content><![CDATA[{content}]]></Content>"
"</xml>"
)
def check_token(self, qs):
signature = head(qs.get("signature"))
timestamp = head(qs.get("timestamp"))
nonce = head(qs.get("nonce"))
echostr = head(qs.get("echostr"))
sha1 = hashlib.sha1()
for param in [nonce, timestamp, self.salt]:
sha1.update(param.encode("utf-8"))
hashcode = sha1.hexdigest()
if hashcode == signature:
return echostr
raise ValueError("Auth Fail")