diff --git a/docs/design/interface/functions.qmd b/docs/design/interface/functions.qmd index 2d340a13..51ffaaf3 100644 --- a/docs/design/interface/functions.qmd +++ b/docs/design/interface/functions.qmd @@ -101,7 +101,7 @@ more details. ## Data resource functions -### {{< var done >}}`create_resource_structure(path)` +### {{< var done >}} `create_resource_structure(path)` See the help documentation with `help(create_resource_structure)` for more details. @@ -127,13 +127,21 @@ flowchart function --> out ``` -### {{< var wip >}} `write_resource_parquet(raw_files, path)` +### {{< var wip >}} `build_resource_parquet(raw_files_path, resource_properties)` -This function takes the files provided by `raw_files` and merges them -into a `data.parquet` file provided by `path`. Use -`path_resource_data()` to provide the correct path location for `path` -and `path_resource_raw_files()` for the `raw_files` argument. Outputs -the path object of the created file. +See the help documentation with `help(build_resource_parquet)` for more +details. + +```{mermaid} +flowchart + in_raw_files_path[/raw_files_path/] + in_properties[/resource_properties/] + function("build_resource_parquet()") + out[("./resources/{id}/data.parquet")] + in_raw_files_path --> function + in_properties --> function + function --> out +``` ### {{< var wip >}} `edit_resource_properties(path, properties)` diff --git a/docs/design/interface/pseudocode/build_resource_parquet.py b/docs/design/interface/pseudocode/build_resource_parquet.py new file mode 100644 index 00000000..ac30d5d4 --- /dev/null +++ b/docs/design/interface/pseudocode/build_resource_parquet.py @@ -0,0 +1,73 @@ +# ruff: noqa +def build_resource_parquet( + raw_files_path: list[Path], resource_properties: ResourceProperties +) -> Path: + """Merge all raw resource file(s) and write into a Parquet file. + + This function takes the file(s) provided by `raw_files_path` and merges them into + a `data.parquet` file. The Parquet file will be stored at the path found in `ResourceProperties.path`. + While Sprout generally assumes + that the files stored in the `resources/raw/` folder have already been + verified and validated, this function does some quick verification checks + of the data after reading it into Python from the raw file(s) by comparing + with the current properties given by the `resource_properties`. All data in the + `resources/raw/` folder will be merged into one single data object and then + written back to the Parquet file. The Parquet file will be overwritten. + + If there are any duplicate observation units in the data, only the most recent + observation unit will be kept. This way, if there are any errors or mistakes + in older raw files that has been corrected in later files, the mistake can still + be kept, but won't impact the data that will actually be used. + + Examples: + + ``` python + import seedcase_sprout.core as sp + + sp.write_resource_parquet( + raw_files_path=sp.path_resources_raw_files(1, 1), + parquet_path=sp.path_resource_data(1, 1), + properties_path=sp.path_package_properties(1, 1), + ) + ``` + + Args: + raw_files_path: A list of paths for all the raw files, mostly commonly stored in the + `.csv.gz` format. Use `path_resource_raw_files()` to help provide the + correct paths to the raw files. + resource_properties: The `ResourceProperties` object that contains the properties + of the resource you want to create the Parquet file for. + + Returns: + Outputs the path object of the created Parquet file. + """ + # Not sure if this is the correct way to verify multiple files. + [check_is_file(path) for path in raw_files_path] + check_resource_properties(resource_properties) + + data = read_raw_files(raw_files_path) + data = drop_duplicate_obs_units(data) + + # This function could be several functions or the one full function. + check_data(data, resource_properties) + + return write_parquet(data, resource_properties["path"]) + + +def write_parquet(data: DataFrame, path: Path) -> Path: + return path + + +def read_raw_files(paths: list[Path]) -> DataFrame: + # Can read gzip files. + data_list = [polars.read_csv(path) for path in paths] + # Merge them all together. + data = polars.concat(data_list) + return data + + +def drop_duplicate_obs_units(data: DataFrame) -> DataFrame: + # Drop duplicates based on the observation unit, keeping only the most + # recent one. This allows older raw files to contain potentially wrong + # data that was corrected in the most recent file. + return data.drop_duplicates()