-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient_connection.py
168 lines (142 loc) · 5.75 KB
/
client_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from asyncio import StreamReader, StreamWriter, wait_for
from logging import Logger
from typing import TYPE_CHECKING, List, Optional
from backend.dtn7sqlite.models import Article, Newsgroup
from config import server_config
from logger import global_logger
from status_codes import StatusCodes
from utils import get_version
if TYPE_CHECKING:
from nntp_server import AsyncNNTPServer
class ClientConnection:
"""
Holds all state of a client connection to the server.
"""
def __init__(
self, server: "AsyncNNTPServer", reader: StreamReader, writer: StreamWriter
) -> None:
self._server: "AsyncNNTPServer" = server
self._reader: StreamReader = reader
self._writer: StreamWriter = writer
self.logger: Logger = global_logger()
self._terminated: bool = False
self._empty_token_counter: int = 0
self._cmd_args: Optional[List[str]] = None
self._selected_group: Optional[Newsgroup] = None
self._selected_article: Optional[Article] = None
self._post_mode: bool = False
self._article_buffer: List[str] = []
self._command: str = ""
async def handle_client(self) -> None:
self._terminated = False
self._empty_token_counter = 0
if server_config["server_type"] == "read-only":
self._server.send(
writer=self._writer,
send_obj=StatusCodes.STATUS_READYNOPOST.substitute(
url=server_config["nntp_hostname"], version=get_version()
),
)
else:
self._server.send(
writer=self._writer,
send_obj=StatusCodes.STATUS_READYOKPOST.substitute(
url=server_config["nntp_hostname"], version=get_version()
),
)
# main execution loop for handling a connection until it's closed
while not self._terminated:
# while True:
try:
# TODO: make timeout a setting
incoming_data = await wait_for(self._reader.readline(), timeout=43200.0)
except TimeoutError as e:
self.logger.error(f"ERROR: TimeoutError occurred. {e}")
continue
self.logger.debug(
f"{self._writer.get_extra_info(name='peername')} >"
f" {incoming_data.decode(encoding='utf-8').strip()}"
)
if self._post_mode:
# only rstrip in order to preserve indentation in body
data_decode = incoming_data.decode(encoding="utf-8").rstrip()
if data_decode == ".":
try:
await self._server.backend.save_article(article_buffer=self._article_buffer)
self._server.send(
writer=self._writer, send_obj=StatusCodes.STATUS_POSTSUCCESSFUL
)
except Exception as e: # noqa E722
self.logger.error(e)
self._server.send(
writer=self._writer, send_obj=StatusCodes.ERR_NOTPERFORMED
)
self._post_mode = False
self._article_buffer = []
else:
self._article_buffer.append(data_decode)
continue
try:
tokens: List[str] = (
incoming_data.decode(encoding="utf-8").strip().lower().split(" ")
)
except IOError:
continue
if all([t == "" for t in tokens]):
self._empty_token_counter += 1
if self._empty_token_counter >= server_config["max_empty_requests"]:
self.logger.warning(
"WARNING: Noping out because client is sending too many empty requests"
)
self._terminated = True
continue
else:
self._empty_token_counter = 0
self._command = tokens.pop(0) if len(tokens) > 0 else None
self._cmd_args: Optional[List[str]] = tokens
if self._command in self._server.backend.available_commands:
try:
self._server.send(
writer=self._writer,
send_obj=await self._server.backend.call_dict[self._command](self),
)
except Exception as e:
self.logger.exception(e)
self._terminated = True
else:
# command is not in list of implemented capabilities
self._server.send(writer=self._writer, send_obj=StatusCodes.ERR_CMDSYNTAXERROR)
if self._command == "quit":
self._terminated = True
def stop(self):
self._writer.close()
@property
def article_buffer(self):
return self._article_buffer
@property
def cmd_args(self) -> Optional[List[str]]:
return self._cmd_args
@property
def command(self) -> Optional[str]:
return self._command
@property
def post_mode(self) -> bool:
return self._post_mode
@post_mode.setter
def post_mode(self, val) -> None:
self._post_mode = val
@property
def selected_article(self) -> Optional[Article]:
return self._selected_article
@selected_article.setter
def selected_article(self, val) -> None:
self._selected_article = val
@property
def selected_group(self) -> Optional[Newsgroup]:
return self._selected_group
@selected_group.setter
def selected_group(self, val) -> None:
self._selected_group = val
@property
def terminated(self) -> bool:
return self._terminated