-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathviews.py
140 lines (97 loc) · 4.48 KB
/
views.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import logging
from typing import Type, TYPE_CHECKING
from django.http import HttpRequest, JsonResponse
from esteid.exceptions import ActionInProgress
from esteid.mixins import DjangoRestCompatibilityMixin, SessionViewMixin
from .authenticator import Authenticator
from .types import AuthenticationResult
try:
from rest_framework.exceptions import ValidationError as DRFValidationError
except ImportError:
# If rest framework is not installed, create a stub class so the isinstance check is always false
class DRFValidationError:
pass
if TYPE_CHECKING:
# Make type checkers aware of request.session attribute which is missing on the HttpRequest class
from django.contrib.sessions import base_session
class RequestType(HttpRequest):
session: base_session.AbstractBaseSession
data: dict
logger = logging.getLogger(__name__)
def get_origin(request):
origin = f"{request.scheme}://{request.get_host()}"
return origin
class AuthenticationViewMixin(SessionViewMixin):
# these come from the `url()` definition as in `View.as_view(authentication_method='...')`, either one is enough
authentication_method: str = None
authenticator: Type[Authenticator] = None
def on_auth_success(self, request, data: AuthenticationResult):
"""
A hook to make use of the authentication data once the process is complete.
May be used to store the data into session, authenticate a user etc.
"""
pass
def success_response(self, request, data: AuthenticationResult):
"""Customizable response on success"""
return JsonResponse({**data, "status": self.Status.SUCCESS})
def select_authenticator_class(self) -> Type["Authenticator"]:
if self.authenticator is not None:
return self.authenticator
return Authenticator.select_authenticator(self.authentication_method)
def start_session(self, request: "RequestType", *args, **kwargs):
"""
Initiates an authentication session.
"""
auth_class = self.select_authenticator_class()
authenticator = auth_class.start_session(request.session, request.data, origin=get_origin(request))
do_cleanup = True
try:
result = authenticator.authenticate()
except ActionInProgress as e:
do_cleanup = False
# return SUCCESS to indicate that the upstream service successfully accepted the request
return JsonResponse({"status": self.Status.SUCCESS, **e.data}, status=e.status)
else:
# Handle a theoretical case of immediate authentication
self.on_auth_success(request, result)
return JsonResponse({**result, "status": self.Status.SUCCESS})
finally:
if do_cleanup:
authenticator.cleanup()
def finish_session(self, request: "RequestType", *args, **kwargs):
"""
Checks the status of an authentication session
"""
authenticator_class = self.select_authenticator_class()
authenticator = authenticator_class.load_session(request.session, origin=get_origin(request))
do_cleanup = True
try:
result = authenticator.poll(request.data)
except ActionInProgress as e:
do_cleanup = False
return JsonResponse({"status": self.Status.PENDING, **e.data}, status=e.status)
else:
self.on_auth_success(request, result)
return self.success_response(request, result)
finally:
if do_cleanup:
authenticator.cleanup()
def handle_delete_request(self, request):
authenticator_class = self.select_authenticator_class()
authenticator = authenticator_class.load_session(request.session, origin=get_origin(request))
authenticator.cleanup()
class AuthenticationViewRestMixin(AuthenticationViewMixin):
"""
To be used with rest-framework's APIView.
"""
def delete(self, request, *args, **kwargs):
self.handle_delete_request(request)
return JsonResponse({"status": self.Status.CANCELLED})
class AuthenticationViewDjangoMixin(DjangoRestCompatibilityMixin, AuthenticationViewMixin):
"""
To be used with plain Django class-based views (No rest-framework).
Adds `data` attribute to the request with the POST or JSON data.
"""
def delete(self, request, *args, **kwargs):
self.handle_delete_request(request)
return JsonResponse({"status": self.Status.CANCELLED})