3
3
4
4
import logging
5
5
import boto3
6
+ import time
6
7
7
8
from botocore .exceptions import ClientError
8
9
@@ -32,61 +33,6 @@ def from_client(cls):
32
33
33
34
# snippet-end:[python.example_code.controltower.ControlTowerWrapper.decl]
34
35
35
- # snippet-start:[python.example_code.controltower.SetupLandingZone]
36
- def create_landing_zone (self , manifest ):
37
- """
38
- Sets up a landing zone using the provided manifest.
39
-
40
- :param manifest: The landing zone manifest containing configuration details.
41
- :return: Dictionary containing the landing zone ARN and operation ID.
42
- :raises ClientError: If the landing zone setup fails.
43
-
44
- """
45
- try :
46
- response = self .controltower_client .create_landing_zone (
47
- manifest = manifest ,
48
- version = '3.3'
49
- )
50
- return response
51
- except ClientError as err :
52
- if err .response ["Error" ]["Code" ] == "AccessDeniedException" :
53
- logger .error ("Access denied. Please ensure you have the necessary permissions." )
54
- else :
55
- logger .error (
56
- "Couldn't set up landing zone. Here's why: %s: %s" ,
57
- err .response ["Error" ]["Code" ],
58
- err .response ["Error" ]["Message" ]
59
- )
60
- raise
61
-
62
- # snippet-end:[python.example_code.controltower.SetupLandingZone]
63
-
64
- # snippet-start:[python.example_code.controltower.DeleteLandingZone]
65
- def delete_landing_zone (self , landing_zone_identifier ):
66
- """
67
- Deletes a landing zone by its identifier.
68
-
69
- :param landing_zone_identifier: The landing zone identifier to delete.
70
- :raises ClientError: If the landing zone delete fails.
71
-
72
- """
73
- try :
74
- self .controltower_client .delete_landing_zone (
75
- landingZoneIdentifier = landing_zone_identifier
76
- )
77
- except ClientError as err :
78
- if err .response ["Error" ]["Code" ] == "ResourceNotFoundException" :
79
- logger .error ("Landing zone not found." )
80
- else :
81
- logger .error (
82
- "Couldn't delete landing zone. Here's why: %s: %s" ,
83
- err .response ["Error" ]["Code" ],
84
- err .response ["Error" ]["Message" ]
85
- )
86
- raise
87
-
88
- # snippet-end:[python.example_code.controltower.DeleteLandingZone]
89
-
90
36
# snippet-start:[python.example_code.controltower.ListBaselines]
91
37
def list_baselines (self ):
92
38
"""
@@ -116,12 +62,13 @@ def list_baselines(self):
116
62
# snippet-end:[python.example_code.controltower.ListBaselines]
117
63
118
64
# snippet-start:[python.example_code.controltower.EnableBaseline]
119
- def enable_baseline (self , target_identifier , baseline_identifier , baseline_version ):
65
+ def enable_baseline (self , target_identifier , identity_center_baseline , baseline_identifier , baseline_version ):
120
66
"""
121
67
Enables a baseline for the specified target if it's not already enabled.
122
68
123
69
:param target_identifier: The ARN of the target.
124
70
:param baseline_identifier: The identifier of baseline to enable.
71
+ :param identity_center_baseline: The identifier of identity center baseline if it is enabled.
125
72
:param baseline_version: The version of baseline to enable.
126
73
:return: The enabled baseline ARN or None if already enabled.
127
74
:raises ClientError: If enabling the baseline fails for reasons other than it being already enabled.
@@ -130,13 +77,29 @@ def enable_baseline(self, target_identifier, baseline_identifier, baseline_versi
130
77
response = self .controltower_client .enable_baseline (
131
78
baselineIdentifier = baseline_identifier ,
132
79
baselineVersion = baseline_version ,
133
- targetIdentifier = target_identifier
80
+ targetIdentifier = target_identifier ,
81
+ parameters = [
82
+ {
83
+ "key" : "IdentityCenterEnabledBaselineArn" ,
84
+ "value" : identity_center_baseline
85
+ }
86
+ ]
134
87
)
88
+
89
+ operation_id = response ['operationIdentifier' ]
90
+ while True :
91
+ status = self .get_baseline_operation (operation_id )
92
+ print (f"Baseline operation status: { status } " )
93
+ if status in ['SUCCEEDED' , 'FAILED' ]:
94
+ break
95
+ time .sleep (30 )
96
+
135
97
return response ['arn' ]
136
98
except ClientError as err :
137
99
if err .response ["Error" ]["Code" ] == "ValidationException" :
138
100
if "already enabled" in err .response ["Error" ]["Message" ]:
139
101
print ("Baseline is already enabled for this target" )
102
+ return None
140
103
else :
141
104
print ("Unable to enable baseline due to validation exception: %s: %s" ,
142
105
err .response ["Error" ]["Code" ],
@@ -194,7 +157,16 @@ def enable_control(self, control_arn, target_identifier):
194
157
controlIdentifier = control_arn ,
195
158
targetIdentifier = target_identifier
196
159
)
197
- return response ['operationIdentifier' ]
160
+
161
+ operation_id = response ['operationIdentifier' ]
162
+ while True :
163
+ status = self .get_control_operation (operation_id )
164
+ print (f"Control operation status: { status } " )
165
+ if status in ['SUCCEEDED' , 'FAILED' ]:
166
+ break
167
+ time .sleep (30 )
168
+
169
+ return operation_id
198
170
199
171
except ClientError as err :
200
172
if (err .response ["Error" ]["Code" ] == "ValidationException" and
@@ -227,7 +199,7 @@ def get_control_operation(self, operation_id):
227
199
return response ['controlOperation' ]['status' ]
228
200
except ClientError as err :
229
201
if err .response ["Error" ]["Code" ] == "ResourceNotFoundException" :
230
- logger .error ("Control not found." )
202
+ logger .error ("Operation not found." )
231
203
else :
232
204
logger .error (
233
205
"Couldn't get control operation status. Here's why: %s: %s" ,
@@ -238,61 +210,70 @@ def get_control_operation(self, operation_id):
238
210
239
211
# snippet-end:[python.example_code.controltower.GetControlOperation]
240
212
241
- # snippet-start:[python.example_code.controltower.DisableControl ]
242
- def disable_control (self , control_arn , target_identifier ):
213
+ # snippet-start:[python.example_code.controltower.GetBaselineOperation ]
214
+ def get_baseline_operation (self , operation_id ):
243
215
"""
244
- Disables a control for a specified target .
216
+ Gets the status of a baseline operation .
245
217
246
- :param control_arn: The ARN of the control to disable.
247
- :param target_identifier: The identifier of the target (e.g., OU ARN).
248
- :return: The operation ID.
249
- :raises ClientError: If disabling the control fails.
218
+ :param operation_id: The ID of the baseline operation.
219
+ :return: The operation status.
220
+ :raises ClientError: If getting the operation status fails.
250
221
"""
251
222
try :
252
- response = self .controltower_client .disable_control (
253
- controlIdentifier = control_arn ,
254
- targetIdentifier = target_identifier
223
+ response = self .controltower_client .get_baseline_operation (
224
+ operationIdentifier = operation_id
255
225
)
256
- return response ['operationIdentifier ' ]
226
+ return response ['baselineOperation' ][ 'status ' ]
257
227
except ClientError as err :
258
228
if err .response ["Error" ]["Code" ] == "ResourceNotFoundException" :
259
- logger .error ("Control not found." )
229
+ logger .error ("Operation not found." )
260
230
else :
261
231
logger .error (
262
- "Couldn't disable control . Here's why: %s: %s" ,
232
+ "Couldn't get baseline operation status . Here's why: %s: %s" ,
263
233
err .response ["Error" ]["Code" ],
264
234
err .response ["Error" ]["Message" ]
265
235
)
266
236
raise
267
237
268
- # snippet-end:[python.example_code.controltower.DisableControl ]
238
+ # snippet-end:[python.example_code.controltower.GetBaselineOperation ]
269
239
270
- # snippet-start:[python.example_code.controltower.GetLandingZoneOperation ]
271
- def get_landing_zone_operation (self , operation_id ):
240
+ # snippet-start:[python.example_code.controltower.DisableControl ]
241
+ def disable_control (self , control_arn , target_identifier ):
272
242
"""
273
- Gets the status of a landing zone operation .
243
+ Disables a control for a specified target .
274
244
275
- :param operation_id: The ID of the landing zone operation.
276
- :return: The operation status.
277
- :raises ClientError: If getting the operation status fails.
245
+ :param control_arn: The ARN of the control to disable.
246
+ :param target_identifier: The identifier of the target (e.g., OU ARN).
247
+ :return: The operation ID.
248
+ :raises ClientError: If disabling the control fails.
278
249
"""
279
250
try :
280
- response = self .controltower_client .get_landing_zone_operation (
281
- operationIdentifier = operation_id
251
+ response = self .controltower_client .disable_control (
252
+ controlIdentifier = control_arn ,
253
+ targetIdentifier = target_identifier
282
254
)
283
- return response ['operationDetails' ]['status' ]
255
+
256
+ operation_id = response ['operationIdentifier' ]
257
+ while True :
258
+ status = self .get_control_operation (operation_id )
259
+ print (f"Control operation status: { status } " )
260
+ if status in ['SUCCEEDED' , 'FAILED' ]:
261
+ break
262
+ time .sleep (30 )
263
+
264
+ return operation_id
284
265
except ClientError as err :
285
266
if err .response ["Error" ]["Code" ] == "ResourceNotFoundException" :
286
- logger .error ("Landing zone not found." )
267
+ logger .error ("Control not found." )
287
268
else :
288
269
logger .error (
289
- "Couldn't get landing zone operation status . Here's why: %s: %s" ,
270
+ "Couldn't disable control . Here's why: %s: %s" ,
290
271
err .response ["Error" ]["Code" ],
291
272
err .response ["Error" ]["Message" ]
292
273
)
293
274
raise
294
275
295
- # snippet-end:[python.example_code.controltower.GetLandingZoneOperation ]
276
+ # snippet-end:[python.example_code.controltower.DisableControl ]
296
277
297
278
# snippet-start:[python.example_code.controltower.ListLandingZones]
298
279
def list_landing_zones (self ):
@@ -333,66 +314,87 @@ def list_enabled_baselines(self, target_identifier):
333
314
try :
334
315
paginator = self .controltower_client .get_paginator ('list_enabled_baselines' )
335
316
enabled_baselines = []
336
- for page in paginator .paginate (targetIdentifier = target_identifier ):
317
+ for page in paginator .paginate ():
337
318
enabled_baselines .extend (page ['enabledBaselines' ])
338
319
return enabled_baselines
339
320
340
321
except ClientError as err :
341
- logger .error (
342
- "Couldn't list enabled baselines. Here's why: %s: %s" ,
343
- err .response ["Error" ]["Code" ],
344
- err .response ["Error" ]["Message" ]
345
- )
322
+ if err .response ["Error" ]["Code" ] == "ResourceNotFoundException" :
323
+ logger .error ("Target not found." )
324
+ else :
325
+ logger .error (
326
+ "Couldn't list enabled baselines. Here's why: %s: %s" ,
327
+ err .response ["Error" ]["Code" ],
328
+ err .response ["Error" ]["Message" ]
329
+ )
346
330
raise
347
331
# snippet-end:[python.example_code.controltower.ListEnabledBaselines]
348
332
349
333
# snippet-start:[python.example_code.controltower.ResetEnabledBaseline]
350
- def reset_enabled_baseline (self , target_identifier , baseline_identifier ):
334
+ def reset_enabled_baseline (self , enabled_baseline_identifier ):
351
335
"""
352
336
Resets an enabled baseline for a specific target.
353
337
354
- :param target_identifier: The identifier of the target (e.g., OU ARN).
355
- :param baseline_identifier: The identifier of the baseline to reset.
338
+ :param enabled_baseline_identifier: The identifier of the enabled baseline to reset.
356
339
:return: The operation ID.
357
340
:raises ClientError: If resetting the baseline fails.
358
341
"""
359
342
try :
360
343
response = self .controltower_client .reset_enabled_baseline (
361
- targetIdentifier = target_identifier ,
362
- baselineIdentifier = baseline_identifier
344
+ enabledBaselineIdentifier = enabled_baseline_identifier
363
345
)
364
- return response ['operationIdentifier' ]
346
+ operation_id = response ['operationIdentifier' ]
347
+ while True :
348
+ status = self .get_baseline_operation (operation_id )
349
+ print (f"Baseline operation status: { status } " )
350
+ if status in ['SUCCEEDED' , 'FAILED' ]:
351
+ break
352
+ time .sleep (30 )
353
+ return operation_id
365
354
except ClientError as err :
366
- logger .error (
367
- "Couldn't reset enabled baseline. Here's why: %s: %s" ,
368
- err .response ["Error" ]["Code" ],
369
- err .response ["Error" ]["Message" ]
370
- )
355
+ if err .response ["Error" ]["Code" ] == "ResourceNotFoundException" :
356
+ logger .error ("Target not found." )
357
+ else :
358
+ logger .error (
359
+ "Couldn't reset enabled baseline. Here's why: %s: %s" ,
360
+ err .response ["Error" ]["Code" ],
361
+ err .response ["Error" ]["Message" ]
362
+ )
371
363
raise
372
364
# snippet-end:[python.example_code.controltower.ResetEnabledBaseline]
373
365
374
366
# snippet-start:[python.example_code.controltower.DisableBaseline]
375
- def disable_baseline (self , target_identifier , baseline_identifier ):
367
+ def disable_baseline (self , enabled_baseline_identifier ):
376
368
"""
377
- Disables a baseline for a specific target.
369
+ Disables a baseline for a specific target and waits for the operation to complete .
378
370
379
- :param target_identifier: The identifier of the target (e.g., OU ARN).
380
- :param baseline_identifier: The identifier of the baseline to disable.
371
+ :param enabled_baseline_identifier: The identifier of the baseline to disable.
381
372
:return: The operation ID.
382
373
:raises ClientError: If disabling the baseline fails.
383
374
"""
384
375
try :
385
376
response = self .controltower_client .disable_baseline (
386
- targetIdentifier = target_identifier ,
387
- baselineIdentifier = baseline_identifier
377
+ enabledBaselineIdentifier = enabled_baseline_identifier
388
378
)
379
+
380
+ operation_id = response ['operationIdentifier' ]
381
+ while True :
382
+ status = self .get_baseline_operation (operation_id )
383
+ print (f"Baseline operation status: { status } " )
384
+ if status in ['SUCCEEDED' , 'FAILED' ]:
385
+ break
386
+ time .sleep (30 )
387
+
389
388
return response ['operationIdentifier' ]
390
389
except ClientError as err :
391
- logger .error (
392
- "Couldn't disable baseline. Here's why: %s: %s" ,
393
- err .response ["Error" ]["Code" ],
394
- err .response ["Error" ]["Message" ]
395
- )
390
+ if err .response ["Error" ]["Code" ] == "ResourceNotFoundException" :
391
+ logger .error ("Target not found." )
392
+ else :
393
+ logger .error (
394
+ "Couldn't disable baseline. Here's why: %s: %s" ,
395
+ err .response ["Error" ]["Code" ],
396
+ err .response ["Error" ]["Message" ]
397
+ )
396
398
raise
397
399
# snippet-end:[python.example_code.controltower.DisableBaseline]
398
400
@@ -413,11 +415,14 @@ def list_enabled_controls(self, target_identifier):
413
415
return enabled_controls
414
416
415
417
except ClientError as err :
416
- logger .error (
417
- "Couldn't list enabled controls. Here's why: %s: %s" ,
418
- err .response ["Error" ]["Code" ],
419
- err .response ["Error" ]["Message" ]
420
- )
418
+ if err .response ["Error" ]["Code" ] == "AccessDeniedException" :
419
+ logger .error ("Access denied. Please ensure you have the necessary permissions." )
420
+ else :
421
+ logger .error (
422
+ "Couldn't list enabled controls. Here's why: %s: %s" ,
423
+ err .response ["Error" ]["Code" ],
424
+ err .response ["Error" ]["Message" ]
425
+ )
421
426
raise
422
427
# snippet-end:[python.example_code.controltower.ListEnabledControls]
423
428
0 commit comments