Skip to content

Commit

Permalink
feat: date params
Browse files Browse the repository at this point in the history
  • Loading branch information
alastairtree committed Feb 28, 2025
1 parent 4ea2e47 commit 73509e8
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 78 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ From a linux host or WSL (i.e. not in a dev container) you can use the container
./pack.sh
./build-docker.sh

source dev.env

docker run -it --rm \
--network mag-lab-data-platform \
-e PREFECT_API_URL=http://prefect:4200/api \
-e IMAP_IMAGE_TAG=local-dev \
-e IMAP_VOLUMES=/mnt/imap-data/dev:/data \
--env-file defaults.env \
--env-file dev.env \
--entrypoint /bin/bash \
ghcr.io/imperialcollegelondon/imap-pipeline-core:local-dev \
-c "python -c 'import prefect_server.workflow; prefect_server.workflow.deploy_flows()'"
Expand Down
8 changes: 8 additions & 0 deletions defaults.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
PREFECT_API_URL=http://prefect:4200/api
IMAP_IMAGE_TAG=local-dev
IMAP_VOLUMES=/mnt/imap-data/dev:/data
SQLALCHEMY_URL=postgresql+psycopg://postgres:[email protected]:5432/imap
WEBPODA_AUTH_CODE==
SDC_AUTH_CODE=
PREFECT_LOGGING_LEVEL=DEBUG
PREFECT_LOGGING_EXTRA_LOGGERS=imap_mag
10 changes: 10 additions & 0 deletions deploy/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ rm -rf /data/output
START_DATE='2025-05-02'
END_DATE='2025-05-03'

#if arg 0 is set, use it as the start date
if [ -n "$1" ]; then
START_DATE=$1
fi
if [ -n "$2" ]; then
END_DATE=$2
fi

echo "Running pipeline for $START_DATE to $END_DATE"

imap-mag fetch-binary --config config-hk-download.yaml --apid 1063 --start-date $START_DATE --end-date $END_DATE

imap-mag process --config config-hk-process.yaml power.pkts
Expand Down
4 changes: 4 additions & 0 deletions src/imap_mag/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ def fetch_binary(
)
)

logging.info(
f"Downloaded files from WebPODA: {', '.join([str(file) for file in downloaded_binaries.keys()])}"
)

output_manager = appUtils.getOutputManager(configFile.destination)

for file, metadata_provider in downloaded_binaries.items():
Expand Down
129 changes: 54 additions & 75 deletions src/prefect_server/workflow.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
import asyncio
import os
import sys
from datetime import datetime
from datetime import date, datetime

import prefect
import prefect.blocks
import prefect.deployments
from prefect import deploy, flow, get_client, serve
from prefect import deploy, flow, serve
from prefect.client.schemas.objects import (
ConcurrencyLimitConfig,
ConcurrencyLimitStrategy,
Expand All @@ -20,34 +16,19 @@ class CONSTANTS:


@flow(log_prints=True)
def run_imap_pipeline():
print("Starting IMAP pipeline")
def run_imap_pipeline(start_date: date, end_date: date):
print(f"Starting IMAP pipeline for {start_date} to {end_date}")

ShellOperation(
commands=[
"./entrypoint.sh",
f"./entrypoint.sh {start_date.strftime('%Y-%m-%d')} {end_date.strftime('%Y-%m-%d')}"
],
env={"today": datetime.today().strftime("%Y%m%d")},
).run()

print("Finished IMAP pipeline")


async def setupOtherServerConfig():
# Set a concurrency limit of 10 on the 'autoflow_kernels' tag
async with get_client() as client:
# Check if the limit already exists

try:
existing_limit = await client.read_global_concurrency_limit_by_name(
"not-a-name"
)
except prefect.exceptions.ObjectNotFound:
existing_limit = None

print(f"config: {existing_limit}")


def get_cron_from_env(env_var_name: str, default: str | None = None) -> str | None:
cron = os.getenv(env_var_name, default)

Expand All @@ -60,71 +41,69 @@ def get_cron_from_env(env_var_name: str, default: str | None = None) -> str | No


def deploy_flows(local_debug: bool = False):
asyncio.get_event_loop().run_until_complete(setupOtherServerConfig())

imap_flow_name = "imappipeline"

# Docker image and tag, e.g. so-pipeline-core:latest. May include registry, e.g. ghcr.io/imperialcollegelondon/so-pipeline-core:latest
docker_image = os.getenv(
"IMAP_IMAGE",
"ghcr.io/imperialcollegelondon/imap-pipeline-core",
)
docker_tag = os.getenv(
"IMAP_IMAGE_TAG",
"main",
)
# Comma separated docker volumes, e.g. /mnt/imap-data/dev:/data
docker_volumes = os.getenv("IMAP_VOLUMES", "").split(",")
# Comma separated docker networks, e.g. mag-lab-data-platform,some-other-network
docker_networks = os.getenv(
"DOCKER_NETWORK",
"mag-lab-data-platform",
).split(",")

# remove empty strings
docker_volumes = [x for x in docker_volumes if x]
docker_networks = [x for x in docker_networks if x]

shared_job_env_variables = dict(
WEBPODA_AUTH_CODE=os.getenv("WEBPODA_AUTH_CODE"),
SDC_AUTH_CODE=os.getenv("SDC_AUTH_CODE"),
SQLALCHEMY_URL=os.getenv("SQLALCHEMY_URL"),
PREFECT_LOGGING_EXTRA_LOGGERS="imap_mag,imap_db,mag_toolkit",
)
if local_debug:
# just run the prefect server locally and deploy all the flows to it without params and schedules

serve(
run_imap_pipeline.to_deployment(
name=imap_flow_name,
),
)
shared_job_variables = dict(env=shared_job_env_variables)
print("Deploying IMAP Pipeline to Prefect with local server")
else:
# do a full prefect deployment with containers, work-pools, schedules etc

# Docker image and tag, e.g. so-pipeline-core:latest. May include registry, e.g. ghcr.io/imperialcollegelondon/so-pipeline-core:latest
docker_image = os.getenv(
"IMAP_IMAGE",
"ghcr.io/imperialcollegelondon/imap-pipeline-core",
)
docker_tag = os.getenv(
"IMAP_IMAGE_TAG",
"main",
)
# Comma separated docker volumes, e.g. /mnt/imap-data/dev:/data
docker_volumes = os.getenv("IMAP_VOLUMES", "").split(",")
# Comma separated docker networks, e.g. mag-lab-data-platform,some-other-network
docker_networks = os.getenv(
"DOCKER_NETWORK",
"mag-lab-data-platform",
).split(",")

# remove empty strings
docker_volumes = [x for x in docker_volumes if x]
docker_networks = [x for x in docker_networks if x]

shared_job_env_variables = dict(
WEBPODA_AUTH_CODE=os.getenv("WEBPODA_AUTH_CODE"),
SDC_AUTH_CODE=os.getenv("SDC_AUTH_CODE"),
SQLALCHEMY_URL=os.getenv("SQLALCHEMY_URL"),
PREFECT_LOGGING_EXTRA_LOGGERS="imap_mag,imap_db,mag_toolkit",
)
shared_job_variables = dict(
env=shared_job_env_variables,
image_pull_policy="IfNotPresent",
networks=docker_networks,
volumes=docker_volumes,
)

print(
f"Deploying IMAP Pipeline to Prefect with docker {docker_image}:{docker_tag}\n Networks: {docker_networks}\n Volumes: {docker_volumes}"
)

imap_pipeline_deployable = run_imap_pipeline.to_deployment(
name=imap_flow_name,
cron=get_cron_from_env("IMAP_CRON_HEALTHCHECK"),
job_variables=shared_job_variables,
concurrency_limit=ConcurrencyLimitConfig(
limit=1, collision_strategy=ConcurrencyLimitStrategy.CANCEL_NEW
),
tags=[CONSTANTS.DEPLOYMENT_TAG],
)
imap_flow_name = "imappipeline"
imap_pipeline_deployable = run_imap_pipeline.to_deployment(
name=imap_flow_name,
cron=get_cron_from_env("IMAP_CRON_HEALTHCHECK"),
job_variables=shared_job_variables,
concurrency_limit=ConcurrencyLimitConfig(
limit=1, collision_strategy=ConcurrencyLimitStrategy.CANCEL_NEW
),
tags=[CONSTANTS.DEPLOYMENT_TAG],
)

deployables = (imap_pipeline_deployable,)

deployables = (imap_pipeline_deployable,)
if local_debug:
for deployable in deployables:
deployable.work_queue_name = None
deployable.job_variables = None

serve(
*deployables,
)
else:
deploy_ids = deploy(
*deployables,
work_pool_name=CONSTANTS.DEFAULT_WORKPOOL,
Expand Down

0 comments on commit 73509e8

Please sign in to comment.