Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/issue 104 add history attributes #113

Merged
merged 13 commits into from
Mar 19, 2024
Merged
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ repos:

- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: 'v0.3.0'
rev: 'v0.3.3'
hooks:
- id: ruff
args: [ "--fix" ]

# https://github.com/python/black#version-control-integration
- repo: https://github.com/psf/black
rev: 24.2.0
rev: 24.3.0
hooks:
- id: black-jupyter

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
rev: v1.9.0
hooks:
- id: mypy

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [Issue #10](https://github.com/danielfromearth/stitchee/issues/10): Add code necessary to communicate with Harmony
- [Issue #49](https://github.com/danielfromearth/stitchee/issues/49): More CLI arguments for finer control of concatenation method
- [Pull #99](https://github.com/danielfromearth/stitchee/pull/99): Add Docker build steps to GitHub Actions workflow
- [Pull #113](https://github.com/danielfromearth/stitchee/pull/113): Add history attributes
- [Pull #115](https://github.com/danielfromearth/stitchee/pull/115): Add readme badges
- [Pull #116](https://github.com/danielfromearth/stitchee/pull/116): Add tutorial Jupyter notebook
- [Pull #128](https://github.com/danielfromearth/stitchee/pull/116): Create toy netCDFs in testing suite
Expand Down
186 changes: 173 additions & 13 deletions concatenator/attribute_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@
Functions for converting "coordinates" in netCDF variable attributes
between paths that reference a group hierarchy and flattened paths.
"""

import json
import re
from datetime import datetime, timezone

import importlib_metadata
import netCDF4
import xarray as xr

from concatenator import COORD_DELIM, GROUP_DELIM

# Values needed for history_json attribute
HISTORY_JSON_SCHEMA = "https://harmony.earthdata.nasa.gov/schemas/history/0.1.0/history-v0.1.0.json"
PROGRAM = "stitchee"
PROGRAM_REF = "https://cmr.earthdata.nasa.gov:443/search/concepts/S1262025641-LARC_CLOUD"
VERSION = importlib_metadata.distribution("stitchee").version


def regroup_coordinate_attribute(attribute_string: str) -> str:
"""
Expand All @@ -29,29 +40,28 @@ def regroup_coordinate_attribute(attribute_string: str) -> str:
"""
# Use the separator that's in the attribute string only if all separators in the string are the same.
# Otherwise, we will use our own default separator.
whitespaces = re.findall(r'\s+', attribute_string)
whitespaces = re.findall(r"\s+", attribute_string)
if len(set(whitespaces)) <= 1:
new_sep = whitespaces[0]
else:
new_sep = COORD_DELIM

return new_sep.join(
'/'.join(c.split(GROUP_DELIM))[1:]
for c
in attribute_string.split() # split on any whitespace
"/".join(c.split(GROUP_DELIM))[1:]
for c in attribute_string.split() # split on any whitespace
)


def flatten_coordinate_attribute_paths(dataset: netCDF4.Dataset,
var: netCDF4.Variable,
variable_name: str) -> None:
def flatten_coordinate_attribute_paths(
dataset: netCDF4.Dataset, var: netCDF4.Variable, variable_name: str
) -> None:
"""Flatten the paths of variables referenced in the coordinates attribute."""
if 'coordinates' in var.ncattrs():
coord_att = var.getncattr('coordinates')
if "coordinates" in var.ncattrs():
coord_att = var.getncattr("coordinates")

new_coord_att = _flatten_coordinate_attribute(coord_att)

dataset.variables[variable_name].setncattr('coordinates', new_coord_att)
dataset.variables[variable_name].setncattr("coordinates", new_coord_att)


def _flatten_coordinate_attribute(attribute_string: str) -> str:
Expand All @@ -73,7 +83,7 @@ def _flatten_coordinate_attribute(attribute_string: str) -> str:
"""
# Use the separator that's in the attribute string only if all separators in the string are the same.
# Otherwise, we will use our own default separator.
whitespaces = re.findall(r'\s+', attribute_string)
whitespaces = re.findall(r"\s+", attribute_string)
if len(set(whitespaces)) <= 1:
new_sep = whitespaces[0]
else:
Expand All @@ -82,6 +92,156 @@ def _flatten_coordinate_attribute(attribute_string: str) -> str:
# A new string is constructed.
return new_sep.join(
f'{GROUP_DELIM}{c.replace("/", GROUP_DELIM)}'
for c
in attribute_string.split() # split on any whitespace
for c in attribute_string.split() # split on any whitespace
)


def create_new_attributes(input_dataset: xr.Dataset, request_parameters: dict) -> dict:
"""Set the global attributes of the merged output file.

These begin as the global attributes of the input granule, but are updated to also include
the provenance data via an updated `history` CF attribute (or `History`
if that is already present), and a `history_json` attribute that is
compliant with the schema defined at the URL specified by
`HISTORY_JSON_SCHEMA`.

`projection` is not included in the output parameters, as this is not
an original message parameter. It is a derived `pyproj.Proj` instance
that is defined by the input `crs` parameter.

`x_extent` and `y_extent` are not serializable, and are instead
included by `x_min`, `x_max` and `y_min` `y_max` accordingly.

Parameters
----------
input_dataset : Dataset
request_parameters : dict
"""
# Get attributes from input file
output_attributes = input_dataset.attrs

# Reconstruct parameters' dictionary with only keys that correspond to non-null values.
valid_request_parameters = {
parameter_name: parameter_value
for parameter_name, parameter_value in request_parameters.items()
if parameter_value is not None
}

# Remove unnecessary and unserializable request parameters
for surplus_key in ["projection", "x_extent", "y_extent"]:
valid_request_parameters.pop(surplus_key, None)

# Retrieve `granule_url` and replace the `input_file` attribute.
# This ensures `history_json` refers to the archived granule location, rather
# than a temporary file in the Docker container.
valid_request_parameters["input_file"] = valid_request_parameters.pop("granule_url", None)

# Preferentially use `history`, unless `History` is already present in the
# input file.
cf_att_name = "History" if hasattr(input_dataset, "History") else "history"
input_history = getattr(input_dataset, cf_att_name, None)

# Create new history_json attribute
new_history_json_record = create_history_record(str(input_history), valid_request_parameters)

# Extract existing `history_json` from input granule
if hasattr(input_dataset, "history_json"):
old_history_json = json.loads(output_attributes["history_json"])
if isinstance(old_history_json, list):
output_history_json = old_history_json
else:
# Single `history_record` element.
output_history_json = [old_history_json]
else:
output_history_json = []

# Append `history_record` to the existing `history_json` array:
output_history_json.append(new_history_json_record)
output_attributes["history_json"] = json.dumps(output_history_json)

# Create history attribute
history_parameters = {
parameter_name: parameter_value
for parameter_name, parameter_value in new_history_json_record["parameters"].items()
if parameter_name != "input_file"
}

new_history_line = " ".join(
[
new_history_json_record["date_time"],
new_history_json_record["program"],
new_history_json_record["version"],
json.dumps(history_parameters),
]
)

output_history = "\n".join(filter(None, [input_history, new_history_line]))
output_attributes[cf_att_name] = output_history

return output_attributes


def create_history_record(input_history: str, request_parameters: dict) -> dict:
"""Create a serializable dictionary for the `history_json` global
attribute in the merged output NetCDF-4 file.

"""
history_record = {
"$schema": HISTORY_JSON_SCHEMA,
"date_time": datetime.utcnow().replace(tzinfo=timezone.utc).isoformat(),
"program": PROGRAM,
"version": VERSION,
"parameters": request_parameters,
"derived_from": request_parameters["input_file"],
"program_ref": PROGRAM_REF,
}

if isinstance(input_history, str):
history_record["cf_history"] = input_history.split("\n")
elif isinstance(input_history, list):
history_record["cf_history"] = input_history

return history_record


def retrieve_history(dataset: netCDF4.Dataset) -> dict:
"""
Retrieve history_json field from NetCDF dataset, if it exists

Parameters
----------
dataset: NetCDF Dataset representing a single granule

Returns
-------
A history_json field
"""
if "history_json" not in dataset.ncattrs():
return {}
history_json = dataset.getncattr("history_json")
return json.loads(history_json)


def construct_history(input_files: list, granule_urls: list) -> dict:
"""
Construct history JSON entry for this concatenation operation
https://wiki.earthdata.nasa.gov/display/TRT/In-File+Provenance+Metadata+-+TRT-42

Parameters
----------
input_files: List of input files

Returns
-------
History JSON constructed for this concat operation
"""
history_json = {
"$schema": HISTORY_JSON_SCHEMA,
"date_time": datetime.now(tz=timezone.utc).isoformat(),
"program": PROGRAM,
"version": VERSION,
"parameters": f"input_files={input_files}",
"derived_from": granule_urls,
"program_ref": PROGRAM_REF,
}
return history_json
7 changes: 5 additions & 2 deletions concatenator/group_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def flatten_grouped_dataset(


def regroup_flattened_dataset(
dataset: xr.Dataset, output_file: str
dataset: xr.Dataset, output_file: str, history_to_append: str | None
) -> None: # pylint: disable=too-many-branches
"""
Given a list of xarray datasets, combine those datasets into a
Expand All @@ -173,7 +173,10 @@ def regroup_flattened_dataset(
"""
with nc.Dataset(output_file, mode="w", format="NETCDF4") as base_dataset:
# Copy global attributes
base_dataset.setncatts(dataset.attrs)
output_attributes = dataset.attrs
if history_to_append is not None:
output_attributes["history_json"] = history_to_append
base_dataset.setncatts(output_attributes)

# Create Groups
group_lst = []
Expand Down
3 changes: 1 addition & 2 deletions concatenator/harmony/download_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ def multi_core_download(

Returns
-------
list
list of downloaded files as pathlib.Path objects
list of downloaded files as pathlib.Path objects
"""

if process_count is None:
Expand Down
12 changes: 12 additions & 0 deletions concatenator/harmony/service_adapter.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import json
from pathlib import Path
from shutil import copyfile
from tempfile import TemporaryDirectory
from urllib.parse import urlsplit
from uuid import uuid4

import netCDF4 as nc
import pystac
from harmony.adapter import BaseHarmonyAdapter
from harmony.util import bbox_to_geometry, stage
from pystac import Item
from pystac.item import Asset

from concatenator.attribute_handling import construct_history, retrieve_history
from concatenator.harmony.download_worker import multi_core_download
from concatenator.harmony.util import (
_get_netcdf_urls,
Expand Down Expand Up @@ -98,10 +101,18 @@ def process_catalog(self, catalog: pystac.Catalog) -> pystac.Catalog:
)
self.logger.info("Finished granule downloads.")

history_json: list[dict] = []
for file_count, file in enumerate(input_files):
file_size = sizeof_fmt(file.stat().st_size)
self.logger.info(f"File {file_count} is size <{file_size}>. Path={file}")

with nc.Dataset(file, "r") as dataset:
history_json.extend(retrieve_history(dataset))

history_json.append(construct_history(input_files, netcdf_urls))

new_history_json = json.dumps(history_json, default=str)

self.logger.info("Running Stitchee..")
output_path = str(Path(temp_dir).joinpath(filename).resolve())

Expand All @@ -112,6 +123,7 @@ def process_catalog(self, catalog: pystac.Catalog) -> pystac.Catalog:
write_tmp_flat_concatenated=False,
keep_tmp_files=False,
concat_dim="mirror_step", # This is currently set only for TEMPO
history_to_append=new_history_json,
logger=self.logger,
)
self.logger.info("Stitchee completed.")
Expand Down
15 changes: 15 additions & 0 deletions concatenator/run_stitchee.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""A simple CLI wrapper around the main concatenation process."""

import json
import logging
import os
import shutil
Expand All @@ -7,6 +9,9 @@
from argparse import ArgumentParser
from pathlib import Path

import netCDF4 as nc

from concatenator.attribute_handling import construct_history, retrieve_history
from concatenator.file_ops import add_label_to_path
from concatenator.stitchee import stitchee

Expand Down Expand Up @@ -225,6 +230,15 @@ def run_stitchee(args: list) -> None:
) = parse_args(args)
num_inputs = len(input_files)

history_json: list[dict] = []
for file_count, file in enumerate(input_files):
with nc.Dataset(file, "r") as dataset:
history_json.extend(retrieve_history(dataset))

history_json.append(construct_history(input_files, input_files))

new_history_json = json.dumps(history_json, default=str)

logging.info("Executing stitchee concatenation on %d files...", num_inputs)
stitchee(
input_files,
Expand All @@ -234,6 +248,7 @@ def run_stitchee(args: list) -> None:
concat_method=concat_method,
concat_dim=concat_dim,
concat_kwargs=concat_kwargs,
history_to_append=new_history_json,
)
logging.info("STITCHEE complete. Result in %s", output_path)

Expand Down
Loading