Skip to content

Commit

Permalink
POST REVIEW - clarify 'ingest' vs 'dataset' dirs
Browse files Browse the repository at this point in the history
  • Loading branch information
fvankrieken committed Feb 27, 2025
1 parent 8ba04c6 commit 31ade6d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 31 deletions.
35 changes: 18 additions & 17 deletions dcpy/lifecycle/ingest/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ def ingest(
dataset_id: str,
version: str | None = None,
*,
staging_dir: Path | None = None,
dataset_staging_dir: Path | None = None,
ingest_output_dir: Path = INGEST_OUTPUT_DIR,
mode: str | None = None,
latest: bool = False,
skip_archival: bool = False,
Expand All @@ -33,27 +34,27 @@ def ingest(
)
transform.validate_processing_steps(config.id, config.ingestion.processing_steps)

if not staging_dir:
staging_dir = (
if not dataset_staging_dir:
dataset_staging_dir = (
INGEST_STAGING_DIR
/ dataset_id
/ config.archival.archival_timestamp.isoformat()
)
staging_dir.mkdir(parents=True)
dataset_staging_dir.mkdir(parents=True)
else:
staging_dir.mkdir(parents=True, exist_ok=True)
dataset_staging_dir.mkdir(parents=True, exist_ok=True)

with open(staging_dir / CONFIG_FILENAME, "w") as f:
with open(dataset_staging_dir / CONFIG_FILENAME, "w") as f:
json.dump(config.model_dump(mode="json"), f, indent=4)

# download dataset
extract.download_file_from_source(
config.ingestion.source,
config.archival.raw_filename,
config.version,
staging_dir,
dataset_staging_dir,
)
file_path = staging_dir / config.archival.raw_filename
file_path = dataset_staging_dir / config.archival.raw_filename

if not skip_archival:
# archive to edm-recipes/raw_datasets
Expand All @@ -63,32 +64,32 @@ def ingest(
transform.to_parquet(
config.ingestion.file_format,
file_path,
dir=staging_dir,
dir=dataset_staging_dir,
output_filename=init_parquet,
)

transform.process(
config.id,
config.ingestion.processing_steps,
config.columns,
staging_dir / init_parquet,
staging_dir / config.filename,
dataset_staging_dir / init_parquet,
dataset_staging_dir / config.filename,
output_csv=output_csv,
)

output_dir = INGEST_OUTPUT_DIR / dataset_id / config.version
output_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(staging_dir / CONFIG_FILENAME, output_dir)
shutil.copy(staging_dir / config.filename, output_dir)
dataset_output_dir = ingest_output_dir / dataset_id / config.version
dataset_output_dir.mkdir(parents=True, exist_ok=True)
shutil.copy(dataset_staging_dir / CONFIG_FILENAME, dataset_output_dir)
shutil.copy(dataset_staging_dir / config.filename, dataset_output_dir)

action = validate.validate_against_existing_versions(
config.dataset, staging_dir / config.filename
config.dataset, dataset_staging_dir / config.filename
)
if not skip_archival:
match action:
case validate.ArchiveAction.push:
recipes.archive_dataset(
config, staging_dir / config.filename, latest=latest
config, dataset_staging_dir / config.filename, latest=latest
)
case validate.ArchiveAction.update_freshness:
recipes.update_freshness(
Expand Down
27 changes: 19 additions & 8 deletions dcpy/lifecycle/scripts/validate_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dcpy.models.lifecycle.ingest import DatasetAttributes
from dcpy.data import compare
from dcpy.connectors.edm import recipes
from dcpy.lifecycle.ingest.run import INGEST_STAGING_DIR, ingest as run_ingest
from dcpy.lifecycle.ingest.run import INGEST_DIR, ingest as run_ingest
from dcpy.lifecycle.builds import metadata as build_metadata

DATABASE = "sandbox"
Expand Down Expand Up @@ -108,18 +108,29 @@ def library_archive(
def ingest(
dataset: str,
version: str | None = None,
ingest_staging_dir: Path = INGEST_STAGING_DIR,
) -> None:
ingest_dir = ingest_staging_dir / "migrate_from_library"
if ingest_dir.is_dir():
shutil.rmtree(ingest_dir)
run_ingest(dataset, version=version, staging_dir=ingest_dir, skip_archival=True)
## separate from "standard" ingest folders
ingest_dir = INGEST_DIR / "migrate_from_library"
## where ingest will do processing
dataset_staging_dir = ingest_dir / "staging"
## where ingest will dump outputs
ingest_output_dir = ingest_dir / "datasets"

if dataset_staging_dir.is_dir():
shutil.rmtree(dataset_staging_dir)

run_ingest(
dataset,
version=version,
dataset_staging_dir=dataset_staging_dir,
output_dir=ingest_output_dir,
skip_archival=True,
)

ingest_output_path = ingest_dir / f"{dataset}.parquet"
ingest_path = LIBRARY_PATH / dataset / "ingest" / f"{dataset}.parquet"

ingest_path.parent.mkdir(exist_ok=True, parents=True)
shutil.copy(ingest_output_path, ingest_path)
shutil.copy(ingest_output_dir / f"{dataset}.parquet", ingest_path)


def load_recipe(
Expand Down
12 changes: 6 additions & 6 deletions dcpy/test/lifecycle/ingest/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def run_basic(mock_request_get, create_buckets, tmp_path):
return run_ingest(
dataset_id=DATASET,
version=FAKE_VERSION,
staging_dir=tmp_path,
dataset_staging_dir=tmp_path,
template_dir=TEMPLATE_DIR,
)

Expand Down Expand Up @@ -57,7 +57,7 @@ def test_skip_archival(mock_request_get, create_buckets, tmp_path):
run_ingest(
dataset_id=DATASET,
version=FAKE_VERSION,
staging_dir=tmp_path,
dataset_staging_dir=tmp_path,
skip_archival=True,
template_dir=TEMPLATE_DIR,
)
Expand All @@ -69,15 +69,15 @@ def test_run_update_freshness(mock_request_get, create_buckets, tmp_path):
run_ingest(
dataset_id=DATASET,
version=FAKE_VERSION,
staging_dir=tmp_path,
dataset_staging_dir=tmp_path,
template_dir=TEMPLATE_DIR,
)
config = recipes.get_config(DATASET, FAKE_VERSION)
assert config.archival.check_timestamps == []
run_ingest(
dataset_id=DATASET,
version=FAKE_VERSION,
staging_dir=tmp_path,
dataset_staging_dir=tmp_path,
latest=True,
template_dir=TEMPLATE_DIR,
)
Expand All @@ -98,7 +98,7 @@ def test_run_update_freshness_fails_if_data_diff(
run_ingest(
dataset_id=DATASET,
version=FAKE_VERSION,
staging_dir=tmp_path,
dataset_staging_dir=tmp_path,
template_dir=TEMPLATE_DIR,
)

Expand All @@ -112,6 +112,6 @@ def test_run_update_freshness_fails_if_data_diff(
run_ingest(
dataset_id=DATASET,
version=FAKE_VERSION,
staging_dir=tmp_path,
dataset_staging_dir=tmp_path,
template_dir=TEMPLATE_DIR,
)

0 comments on commit 31ade6d

Please sign in to comment.