forked from RedHatProductSecurity/osidb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi.py
134 lines (108 loc) · 3.86 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
"""
OSIM API endpoints
"""
import logging
from drf_spectacular.utils import OpenApiParameter, extend_schema
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.views import APIView
from .helpers import get_flaw_or_404, str2bool
from .serializers import ClassificationWorkflowSerializer, WorkflowSerializer
from .workflow import WorkflowFramework
logger = logging.getLogger(__name__)
class index(APIView):
"""index API endpoint"""
def get(self, request, *args, **kwargs):
"""index API endpoint listing available API endpoints"""
logger.info("getting index")
from .urls import urlpatterns
return Response(
{
"index": [f"/{url.pattern}" for url in urlpatterns],
}
)
# TODO do we need this when OSIM is baked into OSIDB service ?
class healthy(APIView):
"""unauthenticated health check API endpoint"""
permission_classes = [AllowAny]
def get(self, request, *args, **kwargs):
"""
unauthenticated health check API endpoint
"""
logger.info("getting status")
return Response()
class adjust(APIView):
"""workflow adjustion API endpoint"""
def post(self, request, pk):
"""
workflow adjustion API endpoint
adjust workflow classification of flaw identified by UUID or CVE
and return its workflow:state classification (new if changed and old otherwise)
adjust operation is idempotent so when the classification
is already adjusted running it results in no operation
"""
logger.info(f"adjusting flaw {pk} workflow classification")
flaw = get_flaw_or_404(pk)
flaw.adjust_classification()
return Response(
{
"flaw": flaw.pk,
"classification": flaw.classification,
}
)
class classification(APIView):
"""workflow classification API endpoint"""
@extend_schema(
parameters=[
OpenApiParameter(
"verbose",
type={"type": "boolean"},
location=OpenApiParameter.QUERY,
description=(
"Return also workflows with flaw classification "
"which represents the reasoning of the result."
),
),
],
)
def get(self, request, pk):
"""
workflow classification API endpoint
for flaw identified by UUID or CVE returns its workflow:state classification
params:
verbose - return also workflows with flaw classification
which represents the reasoning of the result
"""
logger.info(f"getting flaw {pk} workflow classification")
flaw = get_flaw_or_404(pk)
workflow, state = WorkflowFramework().classify(flaw)
response = {
"flaw": flaw.pk,
"classification": {
"workflow": workflow.name,
"state": state.name,
},
}
# optional verbose classification context
verbose = request.GET.get("verbose")
if verbose is not None:
if str2bool(verbose, "verbose"):
response["workflows"] = ClassificationWorkflowSerializer(
WorkflowFramework().workflows,
context={"flaw": flaw},
many=True,
).data
return Response(response)
class workflows(APIView):
"""workflow info API endpoint"""
def get(self, request, *args, **kwargs):
"""workflow info API endpoint"""
logger.info("getting workflows")
return Response(
{
"workflows": WorkflowSerializer(
WorkflowFramework().workflows,
many=True,
).data,
}
)