diff --git a/dcpy/connectors/edm/recipes.py b/dcpy/connectors/edm/recipes.py index 9f55cf80a..5cd6fa55a 100644 --- a/dcpy/connectors/edm/recipes.py +++ b/dcpy/connectors/edm/recipes.py @@ -57,12 +57,19 @@ def exists(ds: Dataset) -> bool: return s3.folder_exists(BUCKET, s3_folder_path(ds)) -def _archive_dataset(config: ingest.Config, file_path: Path, s3_path: str) -> None: +def archive_dataset( + config: ingest.Config, file_path: Path, raw: bool = False, latest: bool = False +) -> None: """ Given a config and a path to a file and an s3_path, archive it in edm-recipe It is assumed that s3_path has taken care of figuring out which top-level folder, how the dataset is being versioned, etc. """ + s3_path = ( + s3_raw_folder_path(config.raw_dataset_key) + if raw + else s3_folder_path(config.dataset_key) + ) if s3.folder_exists(BUCKET, s3_path): raise Exception( f"Archived dataset at {s3_path} already exists, cannot overwrite" @@ -81,14 +88,9 @@ def _archive_dataset(config: ingest.Config, file_path: Path, s3_path: str) -> No acl=config.archival.acl, contents_only=True, ) - - -def archive_raw_dataset(config: ingest.Config, file_path: Path): - """ - Given a config and a path to a 'raw' input dataset, archive it in edm-recipes - Unique identifier of a raw dataset is its name and the timestamp of archival - """ - _archive_dataset(config, file_path, s3_raw_folder_path(config.raw_dataset_key)) + if latest: + assert not raw, "Cannot set raw dataset to 'latest'" + set_latest(config.dataset_key, config.archival.acl) def set_latest(key: DatasetKey, acl): @@ -100,17 +102,6 @@ def set_latest(key: DatasetKey, acl): ) -def archive_dataset(config: ingest.Config, file_path: Path, *, latest: bool = False): - """ - Given a config and a path to a processed parquet file, archive it in edm-recipes - Unique identifier of a raw dataset is its name and its version - """ - s3_path = s3_folder_path(config.dataset_key) - _archive_dataset(config, file_path, s3_path) - if latest: - set_latest(config.dataset_key, config.archival.acl) - - def update_freshness(ds: DatasetKey, timestamp: datetime) -> datetime: path = f"{DATASET_FOLDER}/{ds.id}/{ds.version}/config.json" config = get_config(ds.id, ds.version) diff --git a/dcpy/lifecycle/_cli.py b/dcpy/lifecycle/_cli.py index d4aff99be..5d3d8fc72 100644 --- a/dcpy/lifecycle/_cli.py +++ b/dcpy/lifecycle/_cli.py @@ -1,6 +1,6 @@ import typer -from dcpy.lifecycle.ingest import run as ingest +from dcpy.lifecycle.ingest import _cli_wrapper_run as run_ingest from dcpy.lifecycle.package._cli import app as package_app from dcpy.lifecycle.distribute import _cli as distribute_cli from dcpy.lifecycle.scripts import _cli as scripts_cli @@ -11,4 +11,4 @@ app.add_typer(scripts_cli.app, name="scripts") # while there's only one ingest command, add it directly -app.command(name="ingest")(ingest._cli_wrapper_run) +app.command(name="ingest")(run_ingest) diff --git a/dcpy/lifecycle/ingest/__init__.py b/dcpy/lifecycle/ingest/__init__.py new file mode 100644 index 000000000..abc67e0f3 --- /dev/null +++ b/dcpy/lifecycle/ingest/__init__.py @@ -0,0 +1,41 @@ +from pathlib import Path +import typer + +from .run import ingest + +app = typer.Typer(add_completion=False) + + +@app.command("ingest") +def _cli_wrapper_run( + dataset_id: str = typer.Argument(), + version: str = typer.Option( + None, + "-v", + "--version", + help="Version of dataset being archived", + ), + mode: str = typer.Option(None, "-m", "--mode", help="Preprocessing mode"), + latest: bool = typer.Option( + False, "-l", "--latest", help="Push to latest folder in s3" + ), + skip_archival: bool = typer.Option(False, "--skip-archival", "-s"), + csv: bool = typer.Option( + False, "-c", "--csv", help="Output csv locally as well as parquet" + ), + local_file_path: Path = typer.Option( + None, + "--local-file-path", + "-p", + help="Use local file path as source, overriding source in template", + ), +): + ingest( + dataset_id, + version, + mode=mode, + latest=latest, + skip_archival=skip_archival, + output_csv=csv, + local_file_path=local_file_path, + ) diff --git a/dcpy/lifecycle/ingest/configure.py b/dcpy/lifecycle/ingest/configure.py index 15ad13e0a..3e05d50e0 100644 --- a/dcpy/lifecycle/ingest/configure.py +++ b/dcpy/lifecycle/ingest/configure.py @@ -146,6 +146,7 @@ def get_config( *, mode: str | None = None, template_dir: Path = TEMPLATE_DIR, + local_file_path: Path | None = None, ) -> Config: """Generate config object for dataset and optional version""" run_details = metadata.get_run_details() @@ -155,6 +156,11 @@ def get_config( version = version or get_version(template.ingestion.source, run_details.timestamp) template = read_template(dataset_id, version=version, template_dir=template_dir) + if local_file_path: + template.ingestion.source = LocalFileSource( + type="local_file", path=local_file_path + ) + processing_steps = determine_processing_steps( template.ingestion.processing_steps, target_crs=template.ingestion.target_crs, diff --git a/dcpy/lifecycle/ingest/run.py b/dcpy/lifecycle/ingest/run.py index 05ce27636..7510c322d 100644 --- a/dcpy/lifecycle/ingest/run.py +++ b/dcpy/lifecycle/ingest/run.py @@ -1,56 +1,70 @@ import json from pathlib import Path -import typer - +import shutil from dcpy.models.lifecycle.ingest import Config from dcpy.connectors.edm import recipes +from dcpy.lifecycle import BASE_PATH from . import configure, extract, transform, validate -TMP_DIR = Path("tmp") +INGEST_DIR = BASE_PATH / "ingest" +INGEST_STAGING_DIR = INGEST_DIR / "staging" +INGEST_OUTPUT_DIR = INGEST_DIR / "datasets" +CONFIG_FILENAME = "config.json" -def run( +def ingest( dataset_id: str, version: str | None = None, *, - staging_dir: Path | None = None, + dataset_staging_dir: Path | None = None, + ingest_output_dir: Path = INGEST_OUTPUT_DIR, mode: str | None = None, latest: bool = False, skip_archival: bool = False, output_csv: bool = False, template_dir: Path = configure.TEMPLATE_DIR, + local_file_path: Path | None = None, ) -> Config: config = configure.get_config( - dataset_id, version=version, mode=mode, template_dir=template_dir + dataset_id, + version=version, + mode=mode, + template_dir=template_dir, + local_file_path=local_file_path, ) transform.validate_processing_steps(config.id, config.ingestion.processing_steps) - if not staging_dir: - staging_dir = ( - TMP_DIR / dataset_id / config.archival.archival_timestamp.isoformat() + if not dataset_staging_dir: + dataset_staging_dir = ( + INGEST_STAGING_DIR + / dataset_id + / config.archival.archival_timestamp.isoformat() ) - staging_dir.mkdir(parents=True) + dataset_staging_dir.mkdir(parents=True) else: - staging_dir.mkdir(parents=True, exist_ok=True) + dataset_staging_dir.mkdir(parents=True, exist_ok=True) + + with open(dataset_staging_dir / CONFIG_FILENAME, "w") as f: + json.dump(config.model_dump(mode="json"), f, indent=4) # download dataset extract.download_file_from_source( config.ingestion.source, config.archival.raw_filename, config.version, - staging_dir, + dataset_staging_dir, ) - file_path = staging_dir / config.archival.raw_filename + file_path = dataset_staging_dir / config.archival.raw_filename if not skip_archival: # archive to edm-recipes/raw_datasets - recipes.archive_raw_dataset(config, file_path) + recipes.archive_dataset(config, file_path, raw=True) init_parquet = "init.parquet" transform.to_parquet( config.ingestion.file_format, file_path, - dir=staging_dir, + dir=dataset_staging_dir, output_filename=init_parquet, ) @@ -58,22 +72,24 @@ def run( config.id, config.ingestion.processing_steps, config.columns, - staging_dir / init_parquet, - staging_dir / config.filename, + dataset_staging_dir / init_parquet, + dataset_staging_dir / config.filename, output_csv=output_csv, ) - with open(staging_dir / "config.json", "w") as f: - json.dump(config.model_dump(mode="json"), f, indent=4) + dataset_output_dir = ingest_output_dir / dataset_id / config.version + dataset_output_dir.mkdir(parents=True, exist_ok=True) + shutil.copy(dataset_staging_dir / CONFIG_FILENAME, dataset_output_dir) + shutil.copy(dataset_staging_dir / config.filename, dataset_output_dir) action = validate.validate_against_existing_versions( - config.dataset, staging_dir / config.filename + config.dataset, dataset_staging_dir / config.filename ) if not skip_archival: match action: case validate.ArchiveAction.push: recipes.archive_dataset( - config, staging_dir / config.filename, latest=latest + config, dataset_staging_dir / config.filename, latest=latest ) case validate.ArchiveAction.update_freshness: recipes.update_freshness( @@ -84,34 +100,3 @@ def run( case _: pass return config - - -app = typer.Typer(add_completion=False) - - -@app.command("run") -def _cli_wrapper_run( - dataset_id: str = typer.Argument(), - version: str = typer.Option( - None, - "-v", - "--version", - help="Version of dataset being archived", - ), - mode: str = typer.Option(None, "-m", "--mode", help="Preprocessing mode"), - latest: bool = typer.Option( - False, "-l", "--latest", help="Push to latest folder in s3" - ), - skip_archival: bool = typer.Option(False, "--skip-archival", "-s"), - csv: bool = typer.Option( - False, "-c", "--csv", help="Output csv locally as well as parquet" - ), -): - run( - dataset_id, - version, - mode=mode, - latest=latest, - skip_archival=skip_archival, - output_csv=csv, - ) diff --git a/dcpy/lifecycle/scripts/ingest_with_library_fallback.py b/dcpy/lifecycle/scripts/ingest_with_library_fallback.py index 3b38068b4..da79c68dc 100644 --- a/dcpy/lifecycle/scripts/ingest_with_library_fallback.py +++ b/dcpy/lifecycle/scripts/ingest_with_library_fallback.py @@ -1,6 +1,6 @@ import typer -from dcpy.lifecycle.ingest import configure, run as run_ingest +from dcpy.lifecycle.ingest import configure, ingest def run( @@ -21,7 +21,7 @@ def run( ), ): if (configure.TEMPLATE_DIR / f"{dataset_id}.yml").exists(): - run_ingest.run( + ingest( dataset_id, version, mode=mode, diff --git a/dcpy/lifecycle/scripts/validate_ingest.py b/dcpy/lifecycle/scripts/validate_ingest.py index a17fee511..e530e1779 100644 --- a/dcpy/lifecycle/scripts/validate_ingest.py +++ b/dcpy/lifecycle/scripts/validate_ingest.py @@ -12,7 +12,7 @@ from dcpy.models.lifecycle.ingest import DatasetAttributes from dcpy.data import compare from dcpy.connectors.edm import recipes -from dcpy.lifecycle.ingest.run import TMP_DIR, run as run_ingest +from dcpy.lifecycle.ingest.run import INGEST_DIR, ingest as run_ingest from dcpy.lifecycle.builds import metadata as build_metadata DATABASE = "sandbox" @@ -108,18 +108,29 @@ def library_archive( def ingest( dataset: str, version: str | None = None, - ingest_parent_dir: Path = TMP_DIR, ) -> None: - ingest_dir = ingest_parent_dir / dataset / "staging" - if ingest_dir.is_dir(): - shutil.rmtree(ingest_dir) - run_ingest(dataset, version=version, staging_dir=ingest_dir, skip_archival=True) + ## separate from "standard" ingest folders + ingest_dir = INGEST_DIR / "migrate_from_library" + ## where ingest will do processing + dataset_staging_dir = ingest_dir / "staging" + ## where ingest will dump outputs + ingest_output_dir = ingest_dir / "datasets" + if dataset_staging_dir.is_dir(): + shutil.rmtree(dataset_staging_dir) + + config = run_ingest( + dataset, + version=version, + dataset_staging_dir=dataset_staging_dir, + ingest_output_dir=ingest_output_dir, + skip_archival=True, + ) - ingest_output_path = ingest_dir / f"{dataset}.parquet" + ## copy so that it's in the "library" file system for easy import + output_path = ingest_output_dir / dataset / config.version / f"{dataset}.parquet" ingest_path = LIBRARY_PATH / dataset / "ingest" / f"{dataset}.parquet" - ingest_path.parent.mkdir(exist_ok=True, parents=True) - shutil.copy(ingest_output_path, ingest_path) + shutil.copy(output_path, ingest_path) def load_recipe( diff --git a/dcpy/test/connectors/edm/test_recipes.py b/dcpy/test/connectors/edm/test_recipes.py index db25a5b60..fbb82ddc0 100644 --- a/dcpy/test/connectors/edm/test_recipes.py +++ b/dcpy/test/connectors/edm/test_recipes.py @@ -89,7 +89,7 @@ class TestArchiveDataset: def test_archive_raw_dataset(self, create_buckets, create_temp_filesystem: Path): tmp_file = create_temp_filesystem / self.raw_file_name tmp_file.touch() - recipes.archive_raw_dataset(self.config, tmp_file) + recipes.archive_dataset(self.config, tmp_file, raw=True) assert s3.folder_exists( RECIPES_BUCKET, recipes.s3_raw_folder_path(self.config.raw_dataset_key) ) diff --git a/dcpy/test/lifecycle/ingest/test_configure.py b/dcpy/test/lifecycle/ingest/test_configure.py index c730313cb..fb141c0e7 100644 --- a/dcpy/test/lifecycle/ingest/test_configure.py +++ b/dcpy/test/lifecycle/ingest/test_configure.py @@ -155,3 +155,15 @@ def test_invalid_mode(self): mode="fake_mode", template_dir=TEMPLATE_DIR, ) + + def test_file_path_override(self): + file_path = Path("dir/fake_file_path") + config = configure.get_config( + "dcp_addresspoints", + version="24c", + template_dir=TEMPLATE_DIR, + local_file_path=file_path, + ) + assert config.ingestion.source == LocalFileSource( + type="local_file", path=file_path + ) diff --git a/dcpy/test/lifecycle/ingest/test_run.py b/dcpy/test/lifecycle/ingest/test_run.py index d5013f5ab..132990f3f 100644 --- a/dcpy/test/lifecycle/ingest/test_run.py +++ b/dcpy/test/lifecycle/ingest/test_run.py @@ -6,7 +6,7 @@ from dcpy.configuration import RECIPES_BUCKET from dcpy.utils import s3 from dcpy.connectors.edm import recipes -from dcpy.lifecycle.ingest.run import run as run_ingest, TMP_DIR +from dcpy.lifecycle.ingest.run import ingest as run_ingest, INGEST_STAGING_DIR from dcpy.test.conftest import mock_request_get from .shared import FAKE_VERSION, TEMPLATE_DIR @@ -18,11 +18,11 @@ @pytest.fixture @mock.patch("requests.get", side_effect=mock_request_get) -def run_basic(mock_request_get, create_buckets, create_temp_filesystem): +def run_basic(mock_request_get, create_buckets, tmp_path): return run_ingest( dataset_id=DATASET, version=FAKE_VERSION, - staging_dir=create_temp_filesystem, + dataset_staging_dir=tmp_path, template_dir=TEMPLATE_DIR, ) @@ -45,19 +45,19 @@ def test_run_output_exists(run_basic): @mock.patch("requests.get", side_effect=mock_request_get) -def test_run_default_folder(mock_request_get, create_buckets, create_temp_filesystem): +def test_run_default_folder(mock_request_get, create_buckets): run_ingest(dataset_id=DATASET, version=FAKE_VERSION, template_dir=TEMPLATE_DIR) assert s3.object_exists(RECIPES_BUCKET, S3_PATH) - assert (TMP_DIR / DATASET).exists() - shutil.rmtree(TMP_DIR) + assert (INGEST_STAGING_DIR / DATASET).exists() + shutil.rmtree(INGEST_STAGING_DIR) @mock.patch("requests.get", side_effect=mock_request_get) -def test_skip_archival(mock_request_get, create_buckets, create_temp_filesystem): +def test_skip_archival(mock_request_get, create_buckets, tmp_path): run_ingest( dataset_id=DATASET, version=FAKE_VERSION, - staging_dir=create_temp_filesystem, + dataset_staging_dir=tmp_path, skip_archival=True, template_dir=TEMPLATE_DIR, ) @@ -65,11 +65,11 @@ def test_skip_archival(mock_request_get, create_buckets, create_temp_filesystem) @mock.patch("requests.get", side_effect=mock_request_get) -def test_run_update_freshness(mock_request_get, create_buckets, create_temp_filesystem): +def test_run_update_freshness(mock_request_get, create_buckets, tmp_path): run_ingest( dataset_id=DATASET, version=FAKE_VERSION, - staging_dir=create_temp_filesystem, + dataset_staging_dir=tmp_path, template_dir=TEMPLATE_DIR, ) config = recipes.get_config(DATASET, FAKE_VERSION) @@ -77,7 +77,7 @@ def test_run_update_freshness(mock_request_get, create_buckets, create_temp_file run_ingest( dataset_id=DATASET, version=FAKE_VERSION, - staging_dir=create_temp_filesystem, + dataset_staging_dir=tmp_path, latest=True, template_dir=TEMPLATE_DIR, ) @@ -92,13 +92,13 @@ def test_run_update_freshness(mock_request_get, create_buckets, create_temp_file @mock.patch("requests.get", side_effect=mock_request_get) def test_run_update_freshness_fails_if_data_diff( - mock_request_get, create_buckets, create_temp_filesystem + mock_request_get, create_buckets, tmp_path ): """Mainly an integration test to make sure code runs without error""" run_ingest( dataset_id=DATASET, version=FAKE_VERSION, - staging_dir=create_temp_filesystem, + dataset_staging_dir=tmp_path, template_dir=TEMPLATE_DIR, ) @@ -112,6 +112,6 @@ def test_run_update_freshness_fails_if_data_diff( run_ingest( dataset_id=DATASET, version=FAKE_VERSION, - staging_dir=create_temp_filesystem, + dataset_staging_dir=tmp_path, template_dir=TEMPLATE_DIR, )