Skip to content

Commit

Permalink
Ingest QOL tweaks (#1454)
Browse files Browse the repository at this point in the history
* tweak ingest.run, outward api

* move ingest staging dir to more sensible place

* move ingest outputs to local filesystem

* add ability to override source with local file

* simplify archive_dataset
  • Loading branch information
fvankrieken authored Feb 28, 2025
1 parent 790636e commit 38b10fd
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 100 deletions.
31 changes: 11 additions & 20 deletions dcpy/connectors/edm/recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,19 @@ def exists(ds: Dataset) -> bool:
return s3.folder_exists(BUCKET, s3_folder_path(ds))


def _archive_dataset(config: ingest.Config, file_path: Path, s3_path: str) -> None:
def archive_dataset(
config: ingest.Config, file_path: Path, raw: bool = False, latest: bool = False
) -> None:
"""
Given a config and a path to a file and an s3_path, archive it in edm-recipe
It is assumed that s3_path has taken care of figuring out which top-level folder,
how the dataset is being versioned, etc.
"""
s3_path = (
s3_raw_folder_path(config.raw_dataset_key)
if raw
else s3_folder_path(config.dataset_key)
)
if s3.folder_exists(BUCKET, s3_path):
raise Exception(
f"Archived dataset at {s3_path} already exists, cannot overwrite"
Expand All @@ -81,14 +88,9 @@ def _archive_dataset(config: ingest.Config, file_path: Path, s3_path: str) -> No
acl=config.archival.acl,
contents_only=True,
)


def archive_raw_dataset(config: ingest.Config, file_path: Path):
"""
Given a config and a path to a 'raw' input dataset, archive it in edm-recipes
Unique identifier of a raw dataset is its name and the timestamp of archival
"""
_archive_dataset(config, file_path, s3_raw_folder_path(config.raw_dataset_key))
if latest:
assert not raw, "Cannot set raw dataset to 'latest'"
set_latest(config.dataset_key, config.archival.acl)


def set_latest(key: DatasetKey, acl):
Expand All @@ -100,17 +102,6 @@ def set_latest(key: DatasetKey, acl):
)


def archive_dataset(config: ingest.Config, file_path: Path, *, latest: bool = False):
"""
Given a config and a path to a processed parquet file, archive it in edm-recipes
Unique identifier of a raw dataset is its name and its version
"""
s3_path = s3_folder_path(config.dataset_key)
_archive_dataset(config, file_path, s3_path)
if latest:
set_latest(config.dataset_key, config.archival.acl)


def update_freshness(ds: DatasetKey, timestamp: datetime) -> datetime:
path = f"{DATASET_FOLDER}/{ds.id}/{ds.version}/config.json"
config = get_config(ds.id, ds.version)
Expand Down
4 changes: 2 additions & 2 deletions dcpy/lifecycle/_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typer

from dcpy.lifecycle.ingest import run as ingest
from dcpy.lifecycle.ingest import _cli_wrapper_run as run_ingest
from dcpy.lifecycle.package._cli import app as package_app
from dcpy.lifecycle.distribute import _cli as distribute_cli
from dcpy.lifecycle.scripts import _cli as scripts_cli
Expand All @@ -11,4 +11,4 @@
app.add_typer(scripts_cli.app, name="scripts")

# while there's only one ingest command, add it directly
app.command(name="ingest")(ingest._cli_wrapper_run)
app.command(name="ingest")(run_ingest)
41 changes: 41 additions & 0 deletions dcpy/lifecycle/ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pathlib import Path
import typer

from .run import ingest

app = typer.Typer(add_completion=False)


@app.command("ingest")
def _cli_wrapper_run(
dataset_id: str = typer.Argument(),
version: str = typer.Option(
None,
"-v",
"--version",
help="Version of dataset being archived",
),
mode: str = typer.Option(None, "-m", "--mode", help="Preprocessing mode"),
latest: bool = typer.Option(
False, "-l", "--latest", help="Push to latest folder in s3"
),
skip_archival: bool = typer.Option(False, "--skip-archival", "-s"),
csv: bool = typer.Option(
False, "-c", "--csv", help="Output csv locally as well as parquet"
),
local_file_path: Path = typer.Option(
None,
"--local-file-path",
"-p",
help="Use local file path as source, overriding source in template",
),
):
ingest(
dataset_id,
version,
mode=mode,
latest=latest,
skip_archival=skip_archival,
output_csv=csv,
local_file_path=local_file_path,
)
6 changes: 6 additions & 0 deletions dcpy/lifecycle/ingest/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def get_config(
*,
mode: str | None = None,
template_dir: Path = TEMPLATE_DIR,
local_file_path: Path | None = None,
) -> Config:
"""Generate config object for dataset and optional version"""
run_details = metadata.get_run_details()
Expand All @@ -155,6 +156,11 @@ def get_config(
version = version or get_version(template.ingestion.source, run_details.timestamp)
template = read_template(dataset_id, version=version, template_dir=template_dir)

if local_file_path:
template.ingestion.source = LocalFileSource(
type="local_file", path=local_file_path
)

processing_steps = determine_processing_steps(
template.ingestion.processing_steps,
target_crs=template.ingestion.target_crs,
Expand Down
89 changes: 37 additions & 52 deletions dcpy/lifecycle/ingest/run.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,95 @@
import json
from pathlib import Path
import typer

import shutil
from dcpy.models.lifecycle.ingest import Config
from dcpy.connectors.edm import recipes
from dcpy.lifecycle import BASE_PATH
from . import configure, extract, transform, validate

TMP_DIR = Path("tmp")
INGEST_DIR = BASE_PATH / "ingest"
INGEST_STAGING_DIR = INGEST_DIR / "staging"
INGEST_OUTPUT_DIR = INGEST_DIR / "datasets"
CONFIG_FILENAME = "config.json"


def run(
def ingest(
dataset_id: str,
version: str | None = None,
*,
staging_dir: Path | None = None,
dataset_staging_dir: Path | None = None,
ingest_output_dir: Path = INGEST_OUTPUT_DIR,
mode: str | None = None,
latest: bool = False,
skip_archival: bool = False,
output_csv: bool = False,
template_dir: Path = configure.TEMPLATE_DIR,
local_file_path: Path | None = None,
) -> Config:
config = configure.get_config(
dataset_id, version=version, mode=mode, template_dir=template_dir
dataset_id,
version=version,
mode=mode,
template_dir=template_dir,
local_file_path=local_file_path,
)
transform.validate_processing_steps(config.id, config.ingestion.processing_steps)

if not staging_dir:
staging_dir = (
TMP_DIR / dataset_id / config.archival.archival_timestamp.isoformat()
if not dataset_staging_dir:
dataset_staging_dir = (
INGEST_STAGING_DIR
/ dataset_id
/ config.archival.archival_timestamp.isoformat()
)
staging_dir.mkdir(parents=True)
dataset_staging_dir.mkdir(parents=True)
else:
staging_dir.mkdir(parents=True, exist_ok=True)
dataset_staging_dir.mkdir(parents=True, exist_ok=True)

with open(dataset_staging_dir / CONFIG_FILENAME, "w") as f:
json.dump(config.model_dump(mode="json"), f, indent=4)

# download dataset
extract.download_file_from_source(
config.ingestion.source,
config.archival.raw_filename,
config.version,
staging_dir,
dataset_staging_dir,
)
file_path = staging_dir / config.archival.raw_filename
file_path = dataset_staging_dir / config.archival.raw_filename

if not skip_archival:
# archive to edm-recipes/raw_datasets
recipes.archive_raw_dataset(config, file_path)
recipes.archive_dataset(config, file_path, raw=True)

init_parquet = "init.parquet"
transform.to_parquet(
config.ingestion.file_format,
file_path,
dir=staging_dir,
dir=dataset_staging_dir,
output_filename=init_parquet,
)

transform.process(
config.id,
config.ingestion.processing_steps,
config.columns,
staging_dir / init_parquet,
staging_dir / config.filename,
dataset_staging_dir / init_parquet,
dataset_staging_dir / config.filename,
output_csv=output_csv,
)

with open(staging_dir / "config.json", "w") as f:
json.dump(config.model_dump(mode="json"), f, indent=4)
dataset_output_dir = ingest_output_dir / dataset_id / config.version
dataset_output_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(dataset_staging_dir / CONFIG_FILENAME, dataset_output_dir)
shutil.copy(dataset_staging_dir / config.filename, dataset_output_dir)

action = validate.validate_against_existing_versions(
config.dataset, staging_dir / config.filename
config.dataset, dataset_staging_dir / config.filename
)
if not skip_archival:
match action:
case validate.ArchiveAction.push:
recipes.archive_dataset(
config, staging_dir / config.filename, latest=latest
config, dataset_staging_dir / config.filename, latest=latest
)
case validate.ArchiveAction.update_freshness:
recipes.update_freshness(
Expand All @@ -84,34 +100,3 @@ def run(
case _:
pass
return config


app = typer.Typer(add_completion=False)


@app.command("run")
def _cli_wrapper_run(
dataset_id: str = typer.Argument(),
version: str = typer.Option(
None,
"-v",
"--version",
help="Version of dataset being archived",
),
mode: str = typer.Option(None, "-m", "--mode", help="Preprocessing mode"),
latest: bool = typer.Option(
False, "-l", "--latest", help="Push to latest folder in s3"
),
skip_archival: bool = typer.Option(False, "--skip-archival", "-s"),
csv: bool = typer.Option(
False, "-c", "--csv", help="Output csv locally as well as parquet"
),
):
run(
dataset_id,
version,
mode=mode,
latest=latest,
skip_archival=skip_archival,
output_csv=csv,
)
4 changes: 2 additions & 2 deletions dcpy/lifecycle/scripts/ingest_with_library_fallback.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import typer

from dcpy.lifecycle.ingest import configure, run as run_ingest
from dcpy.lifecycle.ingest import configure, ingest


def run(
Expand All @@ -21,7 +21,7 @@ def run(
),
):
if (configure.TEMPLATE_DIR / f"{dataset_id}.yml").exists():
run_ingest.run(
ingest(
dataset_id,
version,
mode=mode,
Expand Down
29 changes: 20 additions & 9 deletions dcpy/lifecycle/scripts/validate_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dcpy.models.lifecycle.ingest import DatasetAttributes
from dcpy.data import compare
from dcpy.connectors.edm import recipes
from dcpy.lifecycle.ingest.run import TMP_DIR, run as run_ingest
from dcpy.lifecycle.ingest.run import INGEST_DIR, ingest as run_ingest
from dcpy.lifecycle.builds import metadata as build_metadata

DATABASE = "sandbox"
Expand Down Expand Up @@ -108,18 +108,29 @@ def library_archive(
def ingest(
dataset: str,
version: str | None = None,
ingest_parent_dir: Path = TMP_DIR,
) -> None:
ingest_dir = ingest_parent_dir / dataset / "staging"
if ingest_dir.is_dir():
shutil.rmtree(ingest_dir)
run_ingest(dataset, version=version, staging_dir=ingest_dir, skip_archival=True)
## separate from "standard" ingest folders
ingest_dir = INGEST_DIR / "migrate_from_library"
## where ingest will do processing
dataset_staging_dir = ingest_dir / "staging"
## where ingest will dump outputs
ingest_output_dir = ingest_dir / "datasets"
if dataset_staging_dir.is_dir():
shutil.rmtree(dataset_staging_dir)

config = run_ingest(
dataset,
version=version,
dataset_staging_dir=dataset_staging_dir,
ingest_output_dir=ingest_output_dir,
skip_archival=True,
)

ingest_output_path = ingest_dir / f"{dataset}.parquet"
## copy so that it's in the "library" file system for easy import
output_path = ingest_output_dir / dataset / config.version / f"{dataset}.parquet"
ingest_path = LIBRARY_PATH / dataset / "ingest" / f"{dataset}.parquet"

ingest_path.parent.mkdir(exist_ok=True, parents=True)
shutil.copy(ingest_output_path, ingest_path)
shutil.copy(output_path, ingest_path)


def load_recipe(
Expand Down
2 changes: 1 addition & 1 deletion dcpy/test/connectors/edm/test_recipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TestArchiveDataset:
def test_archive_raw_dataset(self, create_buckets, create_temp_filesystem: Path):
tmp_file = create_temp_filesystem / self.raw_file_name
tmp_file.touch()
recipes.archive_raw_dataset(self.config, tmp_file)
recipes.archive_dataset(self.config, tmp_file, raw=True)
assert s3.folder_exists(
RECIPES_BUCKET, recipes.s3_raw_folder_path(self.config.raw_dataset_key)
)
Expand Down
12 changes: 12 additions & 0 deletions dcpy/test/lifecycle/ingest/test_configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,15 @@ def test_invalid_mode(self):
mode="fake_mode",
template_dir=TEMPLATE_DIR,
)

def test_file_path_override(self):
file_path = Path("dir/fake_file_path")
config = configure.get_config(
"dcp_addresspoints",
version="24c",
template_dir=TEMPLATE_DIR,
local_file_path=file_path,
)
assert config.ingestion.source == LocalFileSource(
type="local_file", path=file_path
)
Loading

0 comments on commit 38b10fd

Please sign in to comment.