-
Notifications
You must be signed in to change notification settings - Fork 0
/
model.py
119 lines (88 loc) · 4.5 KB
/
model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import xarray as xr
import cv2
import numpy as np
import matplotlib.pyplot as plt
import cartopy.crs as ccrs # CRS stands for "coordinate reference system"
from skimage import metrics
from cloudcasting.models import AbstractModel
import ocf_optical_flow_tvl1
# We define a new class that inherits from AbstractModel
class OpticalFlowTVL1(AbstractModel):
"""An optical flow model which predicts the next image given the prior two images using the TVL1 method"""
def __init__(self, history_steps: int) -> None:
# All models must include `history_steps` as a parameter. This is the number of previous
# frames that the model uses to makes its predictions. This should not be more than 25, i.e.
# 6 hours (inclusive of end points) of 15 minutely data
# You also must inlude the following line in your init function
super().__init__(history_steps)
###### YOUR CODE HERE ######
# Here you can add any other parameters that you need to initialize your model
# You might load your trained ML model or set up an optical flow method here
############################
def forward(self, X):
# This is where you will make predictions with your model
# The input X is a numpy array with shape (batch_size, channels, time, height, width)
###### YOUR CODE HERE ######
# Grab the most recent frame from the input data
samples_list = []
for b in range (X.shape[0]):
channel_list = []
for c in range (X.shape[1]):
previous_image = X[b, c, -2, :, :]
current_image = X[b, c, -1, :, :]
new_image_list = []
for i in range (12):
# convert to uint8 data type
image1 = (previous_image * 255).astype(np.uint8)
image2 = (current_image * 255).astype(np.uint8)
optical_flow = cv2.optflow.createOptFlow_DualTVL1()
#optimise parameters (taken from Urbich et al. 2018 cloud imagery use case)
optical_flow.setTau(0.1)
optical_flow.setGamma(0.1)
optical_flow.setLambda(0.03) #Urbich et al. 2018
optical_flow.setTheta(0.3) #default
optical_flow.setEpsilon(0.01)
optical_flow.setOuterIterations(2) #Urbich et al. 2018
optical_flow.setInnerIterations(10) #Urbich et al. 2018
optical_flow.setScalesNumber(3) #Urbich et al. 2018
optical_flow.setWarpingsNumber(3) #Urbich et al. 2018
optical_flow.setScaleStep(0.5) #Urbich et al. 2018
#calculate TVL1 optical flow
flow = optical_flow.calc(image1, image2, None)
# extracting flow components
u = flow[..., 0] # width
v = flow[..., 1] # height
h = flow.shape[0]
w = flow.shape[1]
flow2 = -flow.copy()
flow2[:,:,0] += np.arange(w)
flow2[:,:,1] += np.arange(h)[:,np.newaxis]
new_image = cv2.remap(image2, flow2, None, cv2.INTER_LINEAR)
new_image_list.append(new_image.astype(np.float32)/255)
previous_image = image2.astype(float)/255
current_image = new_image.astype(float) / 255
############################
image_stack = np.stack(new_image_list)
channel_list.append(image_stack)
channel_stack = np.stack(channel_list)
samples_list.append(channel_stack)
samples_stack = np.stack(samples_list)
return samples_stack
def hyperparameters_dict(self):
# This function should return a dictionary of hyperparameters for the model
# This is just for your own reference and will be saved with the model scores to wandb
###### YOUR CODE HERE ######
params_dict = {
"setTau": 0.1,
"setGamma": 0.1,
"setLambda": 0.03,
"setTheta" : 0.3,
"setEpsilon" : 0.01,
"setOuterIterations" : 2,
"setInnerIterations" : 10,
"setScalesNumber": 3,
"setWarpingsNumber" : 3,
"setScaleStep": 0.5
}
############################
return params_dict