generated from specklesystems/speckle_automate_python_example
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmain.py
273 lines (218 loc) · 10.1 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
"""This module contains the business logic of the function.
use the automation_context module to wrap your function in an Autamate context helper
"""
import os
import subprocess
from archaea.geometry.vector3d import Vector3d
from archaea.geometry.mesh import Mesh
from archaea.geometry.point3d import Point3d
from archaea_simulation.simulation_objects.domain import Domain
from archaea_simulation.speckle.vtk_to_speckle import vtk_to_speckle, Text
from archaea_simulation.cfd.utils.path import get_cfd_export_path
from pydantic import Field
from speckle_automate import (
AutomateBase,
AutomationContext,
execute_automate_function,
)
from specklepy.api import operations
from specklepy.objects.base import Base
from specklepy.objects.other import DisplayStyle
from specklepy.objects.geometry import Brep, Line, Polyline, Point, Plane
from specklepy.transports.server import ServerTransport
from flatten import flatten_base
# # TODO: Use speckle text object when specklepy has new release
class FunctionInputs(AutomateBase):
"""These are function author defined values.
Automate will make sure to supply them matching the types specified here.
Please use the pydantic model schema to define your inputs:
https://docs.pydantic.dev/latest/usage/models/
"""
wind_direction: float = Field(
title="Wind Direction",
description=(
"Wind direction represents as azimuth angle is like a compass direction"
"with North = 0°, East = 90°, South = 180°, West = 270°."
),
)
wind_speed: float = Field(
title="Wind Speed",
description="Wind speed (m/s) in XY plane."
)
reference_height: float = Field(
title="Reference Wind Speed Height",
description="Altitude (m) of the wind speed is measured.",
default=15,
)
number_of_cpus: int = Field(
title="Number of CPUs",
description="Number of CPUs to run simulation parallelly.",
default=4,
)
tunnel_width_scale: float = Field(
title="Tunnel width scale",
description=("Scale value of the tunnel width according to context objects bounding box. "
"It scales the inlet surface which we send wind into tunnel."),
default=5,
)
tunnel_depth_scale: float = Field(
title="Tunnel depth scale",
description="Scale value of the tunnel depth according to context objects bounding box.",
default=5,
)
tunnel_height_scale: float = Field(
title="Tunnel height scale",
description=("Scale value of the tunnel height according to context objects bounding box. "
"It scales from zGround."),
default=3,
)
def automate_function(
automate_context: AutomationContext,
function_inputs: FunctionInputs,
) -> None:
"""This is an example Speckle Automate function.
Args:
automate_context: A context helper object, that carries relevant information
about the runtime context of this function.
It gives access to the Speckle project data, that triggered this run.
It also has conveniece methods attach result data to the Speckle model.
function_inputs: An instance object matching the defined schema.
"""
print("number of cpus", os.cpu_count())
subprocess.run("/bin/bash -c 'source /opt/openfoam9/etc/bashrc'", shell=True)
# the context provides a conveniet way, to receive the triggering version
version_root_object = automate_context.receive_version()
accepted_types = [Brep.speckle_type]
objects_to_create_stl = []
flatten_base_objects = flatten_base(version_root_object)
count = 0
for b in flatten_base_objects:
if b.speckle_type in accepted_types:
if not b.id:
raise ValueError("Cannot operate on objects without their id's.")
objects_to_create_stl.append(b)
count += 1
speckle_meshes = []
for speckle_mesh in objects_to_create_stl:
speckle_meshes += speckle_mesh.displayValue
archaea_meshes = []
for speckle_mesh in speckle_meshes:
archaea_mesh = Mesh.from_ngon_mesh(speckle_mesh.vertices, speckle_mesh.faces)
archaea_meshes.append(archaea_mesh)
x_scale = function_inputs.tunnel_width_scale
y_scale = function_inputs.tunnel_depth_scale
z_scale = function_inputs.tunnel_height_scale
# Init domain
domain = Domain.from_meshes(archaea_meshes, x_scale=x_scale, y_scale=y_scale, z_scale=z_scale, wind_direction=function_inputs.wind_direction, wind_speed=function_inputs.wind_speed)
# Get folder to copy cases
archaea_folder = get_cfd_export_path()
if not os.path.exists(archaea_folder):
os.makedirs(archaea_folder)
# Get case folder
case_folder = os.path.join(archaea_folder, version_root_object.id)
domain.create_case(case_folder, function_inputs.number_of_cpus)
cmd_path = os.path.join(case_folder, './Allrun')
cmd = "/bin/bash -c '{cmd_path}'".format(cmd_path=cmd_path)
# retcode = subprocess.call(cmd, shell=True, stdout=pipefile)
completed_process = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
print(completed_process.stdout)
blockMesh_log = os.path.join(case_folder, 'log.blockMesh')
add_to_store_if_exist(automate_context, blockMesh_log)
decomposePar_log = os.path.join(case_folder, 'log.decomposePar')
add_to_store_if_exist(automate_context, decomposePar_log)
patchSummary_log = os.path.join(case_folder, 'log.patchSummary')
add_to_store_if_exist(automate_context, patchSummary_log)
reconstructPar_log = os.path.join(case_folder, 'log.reconstructPar')
add_to_store_if_exist(automate_context, reconstructPar_log)
reconstructParMesh_log = os.path.join(case_folder, 'log.reconstructParMesh')
add_to_store_if_exist(automate_context, reconstructParMesh_log)
simpleFoam_log = os.path.join(case_folder, 'log.simpleFoam')
add_to_store_if_exist(automate_context, simpleFoam_log)
snappyHexMesh_log = os.path.join(case_folder, 'log.snappyHexMesh')
add_to_store_if_exist(automate_context, snappyHexMesh_log)
surfaceFeatures_log = os.path.join(case_folder, 'log.surfaceFeatures')
add_to_store_if_exist(automate_context, surfaceFeatures_log)
vtk_file = os.path.join(case_folder, 'postProcessing',
'cutPlaneSurface', '400', 'U_cutPlane.vtk')
result_mesh = vtk_to_speckle(vtk_file, domain.center.move(Vector3d(domain.x / 2 + 2, -domain.y / 2 - 2, 0)))
domain_corner_lines = domain_lines(domain.corners)
subdomain_corner_lines = domain_lines(domain.subdomain_corners)
arrow_line, arrow, text = wind_direction_arrow(domain)
result = Base()
result.data = [result_mesh, domain_corner_lines, subdomain_corner_lines, [arrow_line, arrow, text]]
automate_context.create_new_version_in_project(
result,
"automate_result"
)
if count == 0:
# this is how a run is marked with a failure cause
automate_context.mark_run_failed(
"Automation failed: "
"Not found appropriate object to run CFD simulation."
)
else:
automate_context.mark_run_success("Object found to run simulation!")
def wind_direction_arrow(domain: Domain):
wind_vector = Vector3d.from_azimuth_angle(domain.wind_direction)
vector = wind_vector.reverse()
mid = domain.center.move(vector.scale(domain.y / 2))
mid = mid.move(vector.scale(5))
point_left = mid.move(vector.scale(2)).rotate(Vector3d(0, 0, 1), 45, mid)
point_right = mid.move(vector.scale(2)).rotate(Vector3d(0, 0, 1), -45, mid)
offset_mid_2 = mid.move(vector.scale(2 ** 0.5))
offset_mid_10 = mid.move(vector.scale(10))
arrow_line = Polyline.from_points([Point.from_coords(p.x, p.y, p.z) for p in [offset_mid_2, offset_mid_10]])
arrow = Polyline.from_points([Point.from_coords(p.x, p.y, p.z) for p in [mid, point_left, point_right, mid]])
x_dir = vector.reverse()
y_dir = x_dir.cross_product(Vector3d(0,0,1)).reverse()
text_v = point_left.move(vector.scale(20))
plane = Plane.from_list([text_v.x, text_v.y, text_v.z,
0, 0, 1,
x_dir.x, x_dir.y, 0,
y_dir.x, y_dir.y, 0,
3])
text = Text()
text.height = 2.5
text.value = f"{domain.wind_speed} m/s"
text.plane = plane
text.units = "m"
display_style = DisplayStyle()
display_style.color = -16777216
display_style.linetype = "Continuous"
display_style.units = "m"
display_style.lineweight = 0
text.displayStyle = display_style
return arrow_line, arrow, text
def domain_lines(corners):
speckle_domain_points = [Point.from_coords(corner.x, corner.y, corner.z) for corner in corners]
floor_polyline = Polyline.from_points(speckle_domain_points[0:4])
floor_polyline.closed = True
ceiling_polyline = Polyline.from_points(speckle_domain_points[-4:])
ceiling_polyline.closed = True
line_1 = Line()
line_1.units = 'm'
line_1.start = speckle_domain_points[0]
line_1.end = speckle_domain_points[4]
line_2 = Line()
line_2.units = 'm'
line_2.start = speckle_domain_points[1]
line_2.end = speckle_domain_points[5]
line_3 = Line()
line_3.units = 'm'
line_3.start = speckle_domain_points[2]
line_3.end = speckle_domain_points[6]
line_4 = Line()
line_4.units = 'm'
line_4.start = speckle_domain_points[3]
line_4.end = speckle_domain_points[7]
return [floor_polyline,ceiling_polyline,line_1,line_2,line_3,line_4]
def add_to_store_if_exist(automate_context: AutomationContext, path):
if os.path.exists(path):
automate_context.store_file_result(path)
# make sure to call the function with the executor
if __name__ == "__main__":
# NOTE: always pass in the automate function by its reference, do not invoke it!
# pass in the function reference with the inputs schema to the executor
execute_automate_function(automate_function, FunctionInputs)
# if the function has no arguments, the executor can handle it like so
# execute_automate_function(automate_function_without_inputs)