-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtransformationconfig.py
80 lines (63 loc) · 2.22 KB
/
transformationconfig.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""Pydantic Transformation Configuration Data Model.
A transformation status data model is provided as well.
This data model represents what should be returned from the strategy's `status()`
method.
"""
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
from oteapi.models.genericconfig import GenericConfig
from oteapi.models.secretconfig import SecretConfig
class ProcessPriority(str, Enum):
"""Defining process priority enumerators.
Process priorities:
- Low
- Medium
- High
"""
LOW = "Low"
MEDIUM = "Medium"
HIGH = "High"
class TransformationConfig(GenericConfig, SecretConfig):
"""Transformation Strategy Data Configuration."""
transformationType: str = Field(
...,
description=(
"Type of registered transformation strategy. E.g., `celery/remote`."
),
)
name: Optional[str] = Field(
None, description="Human-readable name of the transformation strategy."
)
due: Optional[datetime] = Field(
None,
description=(
"Optional field to indicate a due data/time for when a transformation "
"should finish."
),
)
priority: Optional[ProcessPriority] = Field(
ProcessPriority.MEDIUM,
description="Define the process priority of the transformation execution.",
)
class TransformationStatus(BaseModel):
"""Return from transformation status."""
id: str = Field(..., description="ID for the given transformation process.")
status: Optional[str] = Field(
None, description="Status for the transformation process."
)
messages: Optional[list[str]] = Field(
None, description="Messages related to the transformation process."
)
created: Optional[datetime] = Field(
None,
description="Time of creation for the transformation process. Given in UTC.",
)
startTime: Optional[datetime] = Field(
None, description="Time when the transformation process started. Given in UTC."
)
finishTime: Optional[datetime] = Field(
None, description="Time when the tranformation process finished. Given in UTC."
)