Skip to content

Commit

Permalink
Merge pull request #12 from vincentclaes/filter-on-tags
Browse files Browse the repository at this point in the history
Filter on tags
  • Loading branch information
vincentclaes authored May 23, 2022
2 parents 971ea55 + 069a792 commit 23bf2aa
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 98 deletions.
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ Follow up on AWS Stepfunctions over different accounts, over different regions,
Usage: stepview [OPTIONS]
Options:
--profile TEXT specify the aws profile you want to use as a comma
--profile TEXT Specify the aws profile you want to use as a comma
seperated string. For example '--profile
profile1,profile2,profile3,...' [default: default]
--period TEXT specify the time period for which you wish to look

--period TEXT Specify the time period for which you wish to look
back. You can choose from the values: minute, hour,
today, day, week, month, year [default: day]
--verbose use --verbose to set verbose logging.
--help Show this message and exit.

--tags TEXT Specify the tags you want to filter your stepfunctions
statemachine. Provide your tags as comma seperated key
words: --tags foo=bar,baz=qux

--verbose Use --verbose to set verbose logging.



## Example
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "stepview"
version = "0.3.0"
version = "0.4.0"
description = "1 global view of all your stepfunctions statemachines"
authors = ["vincent <[email protected]>"]
license = "GNU GPLv3"
Expand Down
199 changes: 133 additions & 66 deletions stepview/data.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from collections import defaultdict
from typing import List, Optional

import boto3
import botocore.client
import pendulum
Expand Down Expand Up @@ -25,15 +28,16 @@ class State:

@dataclass
class Row:
state_machine: str
state_machine_name: str
state_machine_name_with_url: str
profile_name: str
account: str
region: str
state: State

def get_values(self):
return (
self.state_machine,
self.state_machine_name_with_url,
self.profile_name,
self.account,
self.region,
Expand All @@ -44,12 +48,7 @@ def get_values(self):
f"{self.state.failed:,.0f}/"
f"{self.state.aborted:,.0f}/"
f"{self.state.timed_out:,.0f}/"
f"{self.state.throttled:,.0f}"
# f"{self.state.failed:,.0f}",
# f"{self.state.aborted:,.0f}",
# f"{self.state.timed_out:,.0f}",
# f"{self.state.throttled:,.0f}"

f"{self.state.throttled:,.0f}",
)


Expand Down Expand Up @@ -92,37 +91,50 @@ class Time:

@classmethod
def get_time_variables(cls):
return [v for k, v in cls.__dict__.items()
if not k.startswith("__")
and not k.endswith('__')
and not "method" in str(v)
and not "function" in str(v)]
return [
v
for k, v in cls.__dict__.items()
if not k.startswith("__")
and not k.endswith("__")
and not "method" in str(v)
and not "function" in str(v)
]


NOW = pendulum.now()
MAX_POOL_CONNECTIONS = 10
MAX_RUNNING_RESULTS = 3
MAX_RETRIES = 10


def main(aws_profiles: list, period: str):
def main(aws_profiles: List[str], period: str, tags: Optional[List[tuple]]):
period = get_period_objects(period=period)

progress_viz = (TextColumn("[progress.description]{task.description}"), BarColumn())

with Progress(*progress_viz) as progress:
progress.add_task("[green]Getting Data...", start=False)
profile_generator = run_all_profiles(aws_profiles=aws_profiles, period=period)
try:
profile_generator = run_all_profiles(
aws_profiles=aws_profiles, period=period, tags=tags
)
except botocore.client.ClientError as e:
if e.response["Error"]["Code"] == "ThrottlingException":
logger.info(
"Throttling exception. Please reduce the number of AWS profiles "
"or filter on tags."
)
else:
raise e

table = Table()
table.add_column("StateMachine", justify="left", overflow="fold")
table.add_column("Profile", overflow="fold")
table.add_column("Account", overflow="fold")
table.add_column("Region", overflow="fold")
table.add_column("Total", overflow="fold")
table.add_column("Started", overflow="fold")
table.add_column("Succeed (%)", overflow="fold")
table.add_column("Running", overflow="fold")
table.add_column("Failed/Aborted/TimedOut/Throttled", overflow="fold")
# table.add_column("Aborted")
# table.add_column("TimedOut")
# table.add_column("Throttled")

all_rows = []
for profile in profile_generator:
Expand All @@ -135,9 +147,11 @@ def main(aws_profiles: list, period: str):
return table, all_rows


def run_all_profiles(aws_profiles: list, period: Periods):
def run_all_profiles(
aws_profiles: List[str], period: Periods, tags=Optional[List[tuple]]
):
def _run_for_profile(aws_profile: str):
return run_for_profile(profile_name=aws_profile, period=period)
return run_for_profile(profile_name=aws_profile, period=period, tags=tags)

with ThreadPoolExecutor(len(aws_profiles)) as thread:
profile_generator = thread.map(_run_for_profile, aws_profiles)
Expand All @@ -146,9 +160,12 @@ def _run_for_profile(aws_profile: str):


def run_for_state_machine(
state_machine: object, cloudwatch_resource: object, sfn_client: object, profile_name: str, period: Periods
state_machine_arn: str,
cloudwatch_resource: object,
sfn_client: object,
profile_name: str,
period: Periods,
):
state_machine_arn = state_machine.get("stateMachineArn")
state = get_sfn_data(
cloudwatch_resource=cloudwatch_resource,
sfn_client=sfn_client,
Expand All @@ -158,56 +175,100 @@ def run_for_state_machine(
arn_parsed = parse_aws_arn(state_machine_arn)
account = arn_parsed.get("account")
region = arn_parsed.get("region")
state_machine_name = state_machine.get("name")
state_machine_name = arn_parsed.get("resource")
state_machine_url = get_statemachine_url(
state_machine_arn=state_machine_arn, region=region
)
state_machine_name_with_url = f"[link={state_machine_url}]{state_machine_name}[/link]"
state_machine_name_with_url = (
f"[link={state_machine_url}]{state_machine_name}[/link]"
)
row = Row(
state_machine=state_machine_name_with_url,
state_machine_name=state_machine_name,
state_machine_name_with_url=state_machine_name_with_url,
profile_name=profile_name,
account=account,
region=region,
state=state

state=state,
)
return row


def run_for_profile(profile_name: str, period: Periods) -> Table:
sfn_client = boto3.Session(
profile_name=profile_name
).client(
"stepfunctions",
config=botocore.client.Config(max_pool_connections=MAX_POOL_CONNECTIONS)
def get_resource_page_for_tags(tags_client:object, tags_filters: list):
tagging_paginator = tags_client.get_paginator('get_resources')
for page_for_tags in tagging_paginator.paginate(
ResourceTypeFilters=['states:stateMachine'],
TagFilters=tags_filters
):
yield page_for_tags



def parse_tags(tags: Optional[List[tuple]]):
tags_ = defaultdict(list)
for key, value in tags:
tags_[key].append(value)
tags_filters = [{"Key": key, "Values": value}for key, value in tags_.items()]
return tags_filters


def get_statemachines(tags_client:object, tags: Optional[List[tuple]]) -> list:
tags_filters = parse_tags(tags=tags)
for page_for_tags in get_resource_page_for_tags(tags_client=tags_client, tags_filters=tags_filters):
statemachines = page_for_tags.get("ResourceTagMappingList")
if statemachines:
yield_ = [statemachine.get("ResourceARN") for statemachine in statemachines]
yield yield_
else:
logger.info(f"No statemachines were found for tags: {tags}")


def run_for_profile(
profile_name: str, period: Periods, tags: Optional[List[tuple]]
) -> list:
config = botocore.client.Config(
retries={"max_attempts": MAX_RETRIES, "mode": "standard"},
max_pool_connections=MAX_POOL_CONNECTIONS,
)
sfn_client = boto3.Session(profile_name=profile_name).client(
"stepfunctions", config=config
)
cloudwatch_resource = boto3.Session(profile_name=profile_name).resource(
"cloudwatch",
config=botocore.client.Config(max_pool_connections=MAX_POOL_CONNECTIONS)
"cloudwatch", config=config
)
state_machines = sfn_client.list_state_machines().get("stateMachines")
if state_machines:
def _run_for_state_machine(state_machine):
return run_for_state_machine(
state_machine=state_machine,
cloudwatch_resource=cloudwatch_resource,
sfn_client=sfn_client,
profile_name=profile_name,
period=period,
)

with ThreadPoolExecutor(
tags_client = boto3.Session(profile_name=profile_name).client(
"resourcegroupstaggingapi", config=config
)
all_statemachine_results = []
for state_machines in get_statemachines(tags_client=tags_client, tags=tags):
if state_machines:
def _run_for_state_machine(state_machine_arn):
return run_for_state_machine(
state_machine_arn=state_machine_arn,
cloudwatch_resource=cloudwatch_resource,
sfn_client=sfn_client,
profile_name=profile_name,
period=period,
)

with ThreadPoolExecutor(
min(len(state_machines), MAX_POOL_CONNECTIONS)
) as thread:
state_machine_generator = thread.map(_run_for_state_machine, state_machines)
return state_machine_generator
else:
logger.info(f"no statemachines found for profile {profile_name}")
return ()
) as thread:
state_machine_results = thread.map(
_run_for_state_machine, state_machines
)
all_statemachine_results.extend(state_machine_results)
else:
logger.info(f"no statemachines found for profile {profile_name}")
all_statemachine_results_ = {row.state_machine_name: row for row in all_statemachine_results}
all_statemachine_results_sorted = list(dict(sorted(all_statemachine_results_.items())).values())
return all_statemachine_results_sorted


def call_metric_endpoint(
metric_name: str, cloudwatch_resource: object, state_machine_arn: str, period_object: Periods
metric_name: str,
cloudwatch_resource: object,
state_machine_arn: str,
period_object: Periods,
):
"""
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudwatch.html#metric
Expand All @@ -231,7 +292,10 @@ def call_metric_endpoint(


def get_sfn_data(
cloudwatch_resource: object, sfn_client:object, state_machine_arn: str, period: Periods
cloudwatch_resource: object,
sfn_client: object,
state_machine_arn: str,
period: Periods,
) -> State:
"""
check the docs for more info
Expand Down Expand Up @@ -273,6 +337,9 @@ def _call_metric_endpoint(metric_name):
running = get_running_executions_for_state_machine(
sfn_client=sfn_client, state_machine_arn=state_machine_arn
)
# running_ = started - succeeded - failed - aborted - timed_out - throttled
# logger.debug(f"running calculated: {running_}")
# running = running_ if running_ >= 0 else 0

succeeded_perc = (succeeded / started) * 100 if started > 0 else 0

Expand All @@ -289,7 +356,7 @@ def _call_metric_endpoint(metric_name):


def get_period_objects(period: str):
PERIODS_MAPPING = {
periods_mapping = {
Time.MINUTE: Periods(NOW.subtract(minutes=1), NOW, "microseconds"),
Time.HOUR: Periods(NOW.subtract(hours=1), NOW, "seconds"),
Time.TODAY: Periods(NOW.start_of("day"), NOW, "seconds"),
Expand All @@ -300,10 +367,10 @@ def get_period_objects(period: str):
}

try:
period_object = PERIODS_MAPPING[period]
period_object = periods_mapping[period]
except KeyError as e:
raise NameError(
f"We did not recognize the value {period}. Please choose from {PERIODS_MAPPING.keys()}"
f"We did not recognize the value {period}. Please choose from {periods_mapping.keys()}"
)
return period_object

Expand Down Expand Up @@ -339,13 +406,13 @@ def parse_aws_arn(arn):

def get_running_executions_for_state_machine(
sfn_client: object, state_machine_arn: str
):

) -> str:
executions = sfn_client.list_executions(
stateMachineArn=state_machine_arn,
statusFilter='RUNNING'
statusFilter="RUNNING",
maxResults=MAX_RUNNING_RESULTS,
)
no_running = str(len(executions.get("executions")))
if executions.get("nextToken") is not None:
no_running = f"+{no_running}"
no_running = len(executions.get("executions"))
if no_running >= MAX_RUNNING_RESULTS:
no_running = f">={no_running}"
return no_running
Loading

0 comments on commit 23bf2aa

Please sign in to comment.