-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
249 lines (209 loc) · 10.2 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#!venv/bin/python
import logging
from aiogram import Bot, Dispatcher, executor, types
from aiogram.dispatcher import FSMContext
from aiogram.dispatcher.filters.state import State, StatesGroup
from aiogram.dispatcher.filters import Text
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from os import listdir
from time import strptime, sleep
from utility import generate_event_text, load_data, dump_data
from notifications_manager import set_up_notification, delete_notification
from config import BOT_TOKEN
# We set up bot and storage in RAM in order to save user states
bot = Bot(token=BOT_TOKEN)
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)
logging.basicConfig(level=logging.INFO)
# The states of user interaction we need
class SetEvent(StatesGroup):
waiting_for_eventnum = State()
waiting_for_event_name = State()
waiting_for_event_time = State()
waiting_for_event_weekday = State()
waiting_for_event_location = State()
waiting_for_event_extra = State()
finishing_up = State()
async def display_main_menu(message: types.Message):
keyboard = types.InlineKeyboardMarkup(row_width=2)
buttons = [
types.InlineKeyboardButton(text="Display my events", callback_data="display"),
types.InlineKeyboardButton(text="Add an event", callback_data="add_custom")
]
keyboard.add(*buttons)
await message.edit_text(text="Please choose an option:", reply_markup=keyboard)
# Handler for command /start (start of conversation with user)
@dp.message_handler(commands="start")
async def cmd_start(message: types.Message):
# Check if we already have a file for this user
for file in listdir("users"):
if file.startswith(str(message.from_user.id)):
await message.answer("You already have an account")
break
else: # if doesnt break above
# We create a file for user where we will store all information we need
data = {
"user_id": message.from_user.id,
"records": 0,
"events": []
}
dump_data(user_data=data)
print("New user: " + message.from_user.username)
# We create inline keyboard and buttons which will trigger our callbacks
# handlers
keyboard = types.InlineKeyboardMarkup()
buttons = [
types.InlineKeyboardButton(text="Create recommended schedule", callback_data="add_recommended"),
types.InlineKeyboardButton(text="Create custom Schedule", callback_data="add_custom")
]
keyboard.add(*buttons)
await message.answer(text="Please choose an option....", reply_markup=keyboard)
@dp.callback_query_handler(text="add_recommended")
async def add_recommended(call: types.CallbackQuery):
await call.message.answer(text="Recommended events have been added to your schedule!")
await display_main_menu(call.message)
await call.answer()
@dp.callback_query_handler(text="add_custom")
async def add_custom(call: types.CallbackQuery):
# We open a user file and check that he hasnt exceeded the limit for records
user_data = load_data(call.from_user.id)
# we restrict amount of records user can have so that he doesn't abuse the bot
# might be a good idea to implement VIP users????
if user_data["records"] >= 8:
print(call.from_user.username + " is trying to overcome the limit")
await call.message.answer(text="You can't create more records!")
sleep(0.5)
await display_main_menu(call.message)
else:
print(call.from_user.username + " is creating new event")
await SetEvent.waiting_for_event_name.set()
await call.message.answer(text="Please input the name of the event:")
# This coroutine is necessarily to let Telegram servers know that we have processed
# the callback query
await call.answer()
@dp.message_handler(state=SetEvent.waiting_for_event_name, content_types=types.ContentTypes.TEXT)
async def get_event_name(message: types.Message, state: FSMContext):
# We store all users input into memory to later write it to user file
# (the data from "state" is stored as a dictionary)
await state.update_data(event_name=message.text)
await message.answer(text="Please input the time when event occurs in following format:\nHH:MM (24-hour standart)")
await SetEvent.next()
@dp.message_handler(state=SetEvent.waiting_for_event_time, content_types=types.ContentTypes.TEXT)
async def get_event_time(message: types.Message, state: FSMContext):
# We use "time" module to process users input
# if user gave us incorrect input we ask him to try again
# ("ValueError" is raised by time.strptime() function when it cant process the arguments)
try:
strptime(message.text, "%H:%M")
except ValueError:
await message.answer(text="Your input was incorrect, please try again")
await SetEvent.waiting_for_event_time.set()
return
await state.update_data(event_time=message.text)
await message.answer(text="Please input the full name of the day of the week the event is going to occur:")
await SetEvent.next()
@dp.message_handler(state=SetEvent.waiting_for_event_weekday, content_types=types.ContentTypes.TEXT)
async def get_event_weekday(message: types.Message, state: FSMContext):
# Here we do the same process as for time
try:
strptime(message.text, "%A")
except ValueError:
await message.answer(text="Your input was incorrect, please try again")
await SetEvent.waiting_for_event_weekday.set()
return
await state.update_data(event_wday=message.text)
await message.answer(text="Please input the location of the event")
await SetEvent.next()
@dp.message_handler(state=SetEvent.waiting_for_event_location, content_types=types.ContentTypes.TEXT)
async def get_event_time(message: types.Message, state: FSMContext):
await state.update_data(event_location=message.text)
await message.answer(text="Additional information:")
await SetEvent.next()
@dp.message_handler(state=SetEvent.waiting_for_event_extra, content_types=types.ContentTypes.TEXT)
async def get_event_time(message: types.Message, state: FSMContext):
# We ask user for the final piece of data
# Then via inline keyboard callback handler is called which
# will either confirm the creation of the even and write it to the user file
# or will erase all the information we have gathered from user
await state.update_data(event_extra=message.text)
buttons = [
types.InlineKeyboardButton(text="Confirm", callback_data="create_event"),
types.InlineKeyboardButton(text="Cancel", callback_data="forget"),
]
keyboard = types.InlineKeyboardMarkup(row_width=1)
keyboard.add(*buttons)
await message.answer(text="Please confirm event creation:", reply_markup=keyboard)
await SetEvent.next()
@dp.callback_query_handler(text="create_event", state=SetEvent.finishing_up)
async def create_event(call: types.CallbackQuery, state: FSMContext):
event = await state.get_data()
await state.finish()
await state.reset_state()
user_data = load_data(call.from_user.id)
user_data["records"] += 1
user_data["events"].append({})
user_data["events"][user_data["records"] - 1] = event
user_data["events"][user_data["records"] - 1]["event_id"] = hash(call.from_user.id + int(event["event_time"][0:2]))
set_up_notification(chat_id=call.message.chat.id,
event=user_data["events"][user_data["records"] - 1]
)
user_data["events"][user_data["records"] - 1]["reminder_set"] = False
dump_data(user_data)
print(call.from_user.username + " have created new event")
await call.answer()
await display_main_menu(call.message)
@dp.callback_query_handler(text="forget", state=SetEvent.finishing_up)
async def forget_event(call: types.CallbackQuery, state: FSMContext):
print(call.from_user.username + " have canceled event creation")
await state.finish()
await state.reset_state()
await call.answer()
await display_main_menu(call.message)
# We Display users events as buttons on which he can click to see an event details!
@dp.callback_query_handler(text="display")
async def display_events(call: types.CallbackQuery):
user_data = load_data(call.from_user.id)
buttons = []
n_records = user_data["records"]
for i in range(n_records):
event = user_data["events"][i]
buttons.append(types.InlineKeyboardButton(text=event["event_name"], callback_data=("disp_" + str(i))))
button = types.InlineKeyboardButton(text="<< Back", callback_data="main_menu")
keyboard = types.InlineKeyboardMarkup(row_width=2)
keyboard.add(*buttons)
keyboard.row(button)
await call.answer()
await call.message.edit_text(text="Please choose an even to view", reply_markup=keyboard)
# Shows user a details of the event he cliked on
@dp.callback_query_handler(Text(startswith="disp_"))
async def show_event(call: types.CallbackQuery):
event_num = int(call.data.split("_")[1])
user_data = load_data(call.from_user.id)
event = user_data["events"][event_num]
# we user html <b> tag to make text bold
event_output = generate_event_text(event)
buttons = [
types.InlineKeyboardButton(text="<< Back", callback_data="main_menu"),
types.InlineKeyboardButton(text="Delete", callback_data=("del_" + str(event_num)))
]
keyboard = types.InlineKeyboardMarkup()
keyboard.add(*buttons)
await call.answer()
await call.message.edit_text(text=event_output, parse_mode=types.ParseMode.HTML, reply_markup=keyboard)
@dp.callback_query_handler(text="main_menu")
async def show_main_menu(call: types.CallbackQuery):
await call.answer()
await display_main_menu(call.message)
@dp.callback_query_handler(Text(startswith="del_"))
async def delete_event(call: types.CallbackQuery):
event_num = int(call.data.split("_")[1])
user_data = load_data(call.from_user.id)
delete_notification(event_id=user_data["events"][event_num]["event_id"])
user_data["events"].pop(event_num)
user_data["records"] -= 1
dump_data(user_data)
print(call.from_user.username + " have deleted event")
await call.answer()
await display_main_menu(call.message)
if __name__ == "__main__":
executor.start_polling(dp, skip_updates=True)