Skip to content

Commit

Permalink
Scan workflow for duplicates, emit warnings.
Browse files Browse the repository at this point in the history
  • Loading branch information
alliepiper committed Apr 23, 2024
1 parent 539b276 commit 8b5ae80
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 91 deletions.
184 changes: 135 additions & 49 deletions ci/compute-matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import yaml

matrix_yaml = None
dirty_projects = []


def write_output(key, value):
Expand Down Expand Up @@ -107,7 +106,7 @@ def is_windows(matrix_job):
return matrix_job['os'].startswith('windows')


def validate_matrix_job(matrix_job):
def validate_required_tags(matrix_job):
for tag in matrix_yaml['required_tags']:
if tag not in matrix_job:
raise Exception(f"Missing required tag '{tag}' in matrix job {matrix_job}")
Expand All @@ -122,7 +121,7 @@ def validate_matrix_job(matrix_job):
raise Exception(f"Unknown tag '{tag}' in matrix job {matrix_job}")


def fill_defaults_matrix_job(matrix_job):
def set_default_tags(matrix_job):
generic_defaults = set(matrix_yaml['defaulted_tags'])
generic_defaults -= set(['os']) # handled specially.

Expand All @@ -142,24 +141,6 @@ def set_derived_tags(matrix_job):
matrix_job['device_compiler'] = {'name': 'nvcc', 'version': matrix_job['ctk'], 'exe': 'nvcc'}


def explode_matrix_job(matrix_job):
new_jobs = []
for tag in matrix_job:
if tag != "job_types" and isinstance(matrix_job[tag], list):
for value in matrix_job[tag]:
new_job = copy.deepcopy(matrix_job)
new_job[tag] = value
exploded = explode_matrix_job(new_job)
if exploded:
new_jobs.extend(exploded)
else:
new_jobs.append(new_job)
# Only explode the first explodable tag. Recursion handles the others.
break

return new_jobs if len(new_jobs) > 0 else None


def generate_dispatch_group_name(matrix_job):
project_name = get_formatted_projected_name(matrix_job['project'])
ctk = matrix_job['ctk']
Expand Down Expand Up @@ -315,34 +296,125 @@ def merge_dispatch_groups(accum_dispatch_groups, new_dispatch_groups):


def matrix_job_to_dispatch_group(matrix_job):
validate_matrix_job(matrix_job)
fill_defaults_matrix_job(matrix_job)
return {generate_dispatch_group_name(matrix_job): generate_dispatch_group_jobs(matrix_job)}


# If the job explodes, recurse into the results:
exploded_jobs = explode_matrix_job(matrix_job)
if exploded_jobs is not None:
all_dispatch_groups = {}
for job in exploded_jobs:
dispatch_group = matrix_job_to_dispatch_group(job)
merge_dispatch_groups(all_dispatch_groups, dispatch_group)
return all_dispatch_groups
def explode_tags(matrix_job):
explode_tag = None
for tag in matrix_job:
if tag != "job_types" and isinstance(matrix_job[tag], list):
explode_tag = tag
break

set_derived_tags(matrix_job)
if not explode_tag:
return [matrix_job]

# Filter jobs that don't need to rerun:
if matrix_job['project'] not in dirty_projects:
return {}
result = []
for value in matrix_job[explode_tag]:
new_job = copy.deepcopy(matrix_job)
new_job[explode_tag] = value
result.extend(explode_tags(new_job))

# We have a fully specified job, start processing.
dispatch_group_name = generate_dispatch_group_name(matrix_job)
dispatch_group_jobs = generate_dispatch_group_jobs(matrix_job)
return result

return {dispatch_group_name: dispatch_group_jobs}

def preprocess_matrix_jobs(matrix_jobs):
result = []
for matrix_job in matrix_jobs:
validate_required_tags(matrix_job)
set_default_tags(matrix_job)
for job in explode_tags(matrix_job):
set_derived_tags(job)
result.append(job)
return result


def filter_projects(matrix_jobs, projects):
return [job for job in matrix_jobs if job['project'] in projects]


def finalize_workflow_dispatch_groups(workflow_dispatch_groups_orig):
workflow_dispatch_groups = copy.deepcopy(workflow_dispatch_groups_orig)

# Merge consumers for any two_stage arrays that have the same producer(s). Print a warning.
for group_name, group_json in workflow_dispatch_groups.items():
if not 'two_stage' in group_json:
continue
two_stage_json = group_json['two_stage']
merged_producers = []
merged_consumers = []
for two_stage in two_stage_json:
producers = two_stage['producers']
consumers = two_stage['consumers']
if producers in merged_producers:
producer_index = merged_producers.index(producers)
matching_consumers = merged_consumers[producer_index]

producer_names = ", ".join([producer['name'] for producer in producers])
print(f"::warning file=ci/matrix.yaml::Duplicate producer '{producer_names}' in '{group_name}'",
file=sys.stderr)
consumer_names = ", ".join([consumer['name'] for consumer in matching_consumers])
print(f"::warning file=ci/matrix.yaml::Original consumers: {consumer_names}", file=sys.stderr)
consumer_names = ", ".join([consumer['name'] for consumer in consumers])
print(f"::warning file=ci/matrix.yaml::Duplicate consumers: {consumer_names}", file=sys.stderr)
# Merge if unique:
for consumer in consumers:
if consumer not in matching_consumers:
matching_consumers.append(consumer)
consumer_names = ", ".join([consumer['name'] for consumer in matching_consumers])
print(f"::warning file=ci/matrix.yaml::Merged consumers: {consumer_names}", file=sys.stderr)
else:
merged_producers.append(producers)
merged_consumers.append(consumers)
# Update with the merged lists:
two_stage_json = []
for producers, consumers in zip(merged_producers, merged_consumers):
two_stage_json.append({'producers': producers, 'consumers': consumers})
group_json['two_stage'] = two_stage_json

# Check for any duplicate job names in standalone arrays. Warn and remove duplicates.
for group_name, group_json in workflow_dispatch_groups.items():
standalone_jobs = group_json['standalone'] if 'standalone' in group_json else []
unique_standalone_jobs = []
for job_json in standalone_jobs:
if job_json in unique_standalone_jobs:
print(f"::warning file=ci/matrix.yaml::Removing duplicate standalone job '{job_json['name']}' in '{group_name}'",
file=sys.stderr)
else:
unique_standalone_jobs.append(job_json)

# If any producer/consumer jobs exist in standalone arrays, warn and remove the standalones.
two_stage_jobs = group_json['two_stage'] if 'two_stage' in group_json else []
for two_stage_job in two_stage_jobs:
for producer in two_stage_job['producers']:
if producer in unique_standalone_jobs:
print(f"::warning file=ci/matrix.yaml::Removing standalone job '{producer['name']}' " +
f"as it appears as a producer in '{group_name}'",
file=sys.stderr)
unique_standalone_jobs.remove(producer)
for consumer in two_stage_job['consumers']:
if consumer in unique_standalone_jobs:
print(f"::warning file=ci/matrix.yaml::Removing standalone job '{consumer['name']}' " +
f"as it appears as a consumer in '{group_name}'",
file=sys.stderr)
unique_standalone_jobs.remove(consumer)
standalone_jobs = list(unique_standalone_jobs)

# If any producer or consumer job appears more than once, warn and leave as-is.
all_two_stage_jobs = []
duplicate_jobs = {}
for two_stage_job in two_stage_jobs:
for job in two_stage_job['producers'] + two_stage_job['consumers']:
if job in all_two_stage_jobs:
duplicate_jobs[job['name']] = duplicate_jobs.get(job['name'], 1) + 1
else:
all_two_stage_jobs.append(job)
for job_name, count in duplicate_jobs.items():
print(f"::warning file=ci/matrix.yaml::" +
f"Job '{job_name}' appears {count} times in '{group_name}'.",
f"Cannot remove duplicate while resolving dependencies. This job WILL execute {count} times.",
file=sys.stderr)

# Remove all named values that contain an empty list of jobs:
for group_name, group_json in workflow_dispatch_groups.items():
if not group_json['standalone'] and not group_json['two_stage']:
Expand All @@ -368,6 +440,7 @@ def natural_sort_key(key):
group_json['two_stage'], key=lambda x: natural_sort_key(x['producers'][0]['name']))

# Count the total number of jobs:
print(f"::begin-group::Job list", file=sys.stderr)
total_jobs = 0
for group_name, group_json in workflow_dispatch_groups.items():
if 'standalone' in group_json:
Expand All @@ -383,6 +456,7 @@ def natural_sort_key(key):
total_jobs += 1
print(f"{total_jobs} - {group_name}: {job_json['name']}", file=sys.stderr)

print(f"::end-group::", file=sys.stderr)
print(f"Total jobs: {total_jobs}", file=sys.stderr)

# Check to see if any .two_stage.producers arrays have more than 1 job, which is not supported. See ci-dispatch-two-stage.yml for details.
Expand All @@ -401,7 +475,6 @@ def natural_sort_key(key):


def main():
global dirty_projects
global matrix_yaml

parser = argparse.ArgumentParser(description='Compute matrix for workflow')
Expand All @@ -410,7 +483,6 @@ def main():
parser.add_argument('--dirty-projects', nargs='*', dest='dirty_projects',
help='Project(s) to rerun', default=[])
args = parser.parse_args()
dirty_projects = args.dirty_projects

# Check if the matrix file exists
if not os.path.isfile(args.matrix_file):
Expand All @@ -421,8 +493,8 @@ def main():
matrix_yaml = yaml.safe_load(f)

# Check if the workflow is valid
if args.workflow not in matrix_yaml:
print(f"Error: Workflow '{args.workflow}' does not exist in the matrix YAML.")
if args.workflow not in matrix_yaml['workflows']:
print(f"Error: Workflow 'workflows.{args.workflow}' does not exist in the matrix YAML.")
sys.exit(1)

# Print usage if no arguments are provided
Expand All @@ -433,24 +505,38 @@ def main():
# Print the arguments to stderr:
print("Arguments:", file=sys.stderr)
print(args, file=sys.stderr)
print("Matrix YAML:", file=sys.stderr)
print(matrix_yaml, file=sys.stderr)

matrix_json = matrix_yaml[args.workflow]
# print("::group::Matrix YAML", file=sys.stderr)
# print("Matrix YAML:", file=sys.stderr)
# print(matrix_yaml, file=sys.stderr)
# print("::end-group::", file=sys.stderr)

matrix_jobs = preprocess_matrix_jobs(matrix_yaml['workflows'][args.workflow])

# print("::group::Matrix Jobs", file=sys.stderr)
# print("Matrix Jobs:", file=sys.stderr)
# for matrix_job in matrix_jobs:
# print(json.dumps(matrix_job, indent=None, separators=(',', ':')), file=sys.stderr)
# print("::end-group::", file=sys.stderr)

if args.dirty_projects:
matrix_jobs = filter_projects(matrix_jobs, args.dirty_projects)

workflow_dispatch_groups = {}
for matrix_job in matrix_json:
for matrix_job in matrix_jobs:
merge_dispatch_groups(workflow_dispatch_groups, matrix_job_to_dispatch_group(matrix_job))

final_workflow = finalize_workflow_dispatch_groups(workflow_dispatch_groups)

# Pretty print the workflow json to stderr:
print("::group::Final Workflow", file=sys.stderr)
print(json.dumps(final_workflow, indent=2), file=sys.stderr)
print("::end-group::", file=sys.stderr)

# Print a single-line, compact version of the workflow json to stdout:
write_output("WORKFLOW", json.dumps(final_workflow))
write_output("WORKFLOW", json.dumps(final_workflow, indent=None, separators=(',', ':')))
# Print the list of key (dispatch group) names to stdout in a single line as a json list:
write_output("WORKFLOW_KEYS", json.dumps(list(final_workflow.keys())))
write_output("WORKFLOW_KEYS", json.dumps(list(final_workflow.keys()), indent=None, separators=(',', ':')))


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 8b5ae80

Please sign in to comment.