Skip to content

Commit

Permalink
alterations to properly reuse model-generated classes
Browse files Browse the repository at this point in the history
  • Loading branch information
jpl-btlunsfo committed Aug 18, 2024
1 parent 23291ef commit a6fdf86
Showing 1 changed file with 39 additions and 29 deletions.
68 changes: 39 additions & 29 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
JobList,
LandingPage,
Link,
Ogcapppkg,
Process,
ProcessList,
ProcessSummary,
Expand Down Expand Up @@ -269,16 +270,16 @@ def deploy_process(
settings: Annotated[config.Settings, Depends(get_settings)],
redis_locking_client: Annotated[RedisLock, Depends(get_redis_locking_client)],
db: Session = Depends(get_db),
process: Process = Body(...),
app: Ogcapppkg = Body(...),
):
"""
Deploy a new process.
**Note:** This is not an officially supported endpoint in the OGC Processes specification.
"""
check_process_integrity(db, process.id, new_process=True)
check_process_integrity(db, app.processDescription.id, new_process=True)

with redis_locking_client.lock("deploy_process_" + process.id): # as lock:
with redis_locking_client.lock("deploy_process_" + app.processDescription.id): # as lock:
pass

# Acquire lock
Expand All @@ -293,19 +294,28 @@ def deploy_process(
# Update process in DB w/ deployment_status field "deployed"
# Release lock

# Verify that the process_id corresponds with a DAG ID by filename in the DAG catalog
dag_filename = process.id + ".py"
dag_catalog_filepath = os.path.join(settings.DAG_CATALOG_DIRECTORY, dag_filename)
if not os.path.isfile(dag_catalog_filepath):
if process.executionunit.type == "application/cwl":
cwl_arbitrary_dag.write_dag(
dag_catalog_filepath,
process.id,
process.executionunit.href,
dict(),
process.processdescription,
)
else:
dag_filename = app.processDescription.id + ".py"

if app.executionUnit.type == "application/cwl":
cwl_arbitrary_dag.write_dag(
os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename),
app.processDescription.id,
app.executionUnit.href,
dict(),
app.processDescription.description,
)
elif app.executionUnit.mediaType == "application/cwl+json":
cwl_arbitrary_dag.write_dag(
os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename),
app.processDescription.id,
app.executionUnit.value,
dict(),
app.processDescription.description,
)
else:
# Verify that the process_id corresponds with a DAG ID by filename in the DAG catalog
dag_catalog_filepath = os.path.join(settings.DAG_CATALOG_DIRECTORY, dag_filename)
if not os.path.isfile(dag_catalog_filepath):
# If the file doesn't exist and the executionunit wasn't provided,
# list other files in the same directory
existing_files = os.listdir(settings.DAG_CATALOG_DIRECTORY)
Expand All @@ -314,18 +324,18 @@ def deploy_process(
# Raise an exception with details about what files are actually there
raise HTTPException(
status_code=fastapi_status.HTTP_409_CONFLICT,
detail=f"The process ID '{process.id}' does not have a matching DAG file named '{dag_filename}' in the DAG catalog.\nThe DAG catalog includes the following files:\n{existing_files_str}",
detail=f"The process ID '{app.processDescription.id}' does not have a matching DAG file named '{dag_filename}' in the DAG catalog.\nThe DAG catalog includes the following files:\n{existing_files_str}",
)

if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)):
# Log warning that file already exists in the deployed dags directory
pass
if os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)):
# Log warning that file already exists in the deployed dags directory
pass

# Copy DAG from the DAG catalog PVC to deployed PVC
shutil.copy2(
dag_catalog_filepath,
settings.DEPLOYED_DAGS_DIRECTORY,
)
# Copy DAG from the DAG catalog PVC to deployed PVC
shutil.copy2(
dag_catalog_filepath,
settings.DEPLOYED_DAGS_DIRECTORY,
)

if not os.path.isfile(os.path.join(settings.DEPLOYED_DAGS_DIRECTORY, dag_filename)):
raise HTTPException(
Expand All @@ -340,22 +350,22 @@ def deploy_process(
timeout = 20
start_time = time.time()
while time.time() - start_time < timeout:
response = requests.get(f"{settings.EMS_API_URL}/dags/{process.id}", auth=ems_api_auth)
response = requests.get(f"{settings.EMS_API_URL}/dags/{app.processDescription.id}", auth=ems_api_auth)
data = response.json()
if response.status_code == 404:
pass
elif data["is_paused"]:
pause_dag(settings.EMS_API_URL, process.id, ems_api_auth, pause=False)
pause_dag(settings.EMS_API_URL, app.processDescription.id, ems_api_auth, pause=False)
elif data["is_active"]:
break
time.sleep(0.5)
else:
raise HTTPException(
status_code=fastapi_status.HTTP_504_GATEWAY_TIMEOUT,
detail=f"Timeout waiting for DAG '{process.id}' to be available in Airflow.",
detail=f"Timeout waiting for DAG '{app.processDescription.id}' to be available in Airflow.",
)

return crud.create_process(db, process)
return crud.create_process(db, app.processDescription)


@app.delete(
Expand Down

0 comments on commit a6fdf86

Please sign in to comment.