Skip to content

Commit

Permalink
DAS-1374 - Add NetCDF-4 dimension parsing classes, ready for aggregat…
Browse files Browse the repository at this point in the history
…ion.
  • Loading branch information
owenlittlejohns committed Jan 27, 2022
1 parent bc8ba05 commit 7bd12ad
Show file tree
Hide file tree
Showing 10 changed files with 534 additions and 38 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[flake8]
ignore = W503
max-line-length = 120
2 changes: 1 addition & 1 deletion .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Run lint
run: |
flake8 --ignore=E731 harmony_netcdf_to_zarr
flake8 --ignore=W503 harmony_netcdf_to_zarr
- name: Run test
run: |
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ test:
bin/test

lint:
flake8 --ignore=E731 harmony_netcdf_to_zarr
flake8 --ignore=W503 harmony_netcdf_to_zarr

build-image:
LOCAL_SVCLIB_DIR=${LOCAL_SVCLIB_DIR} bin/build-image
Expand Down
2 changes: 1 addition & 1 deletion bin/build-image
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fi

# If we're running Docker in Docker (DIND) then the docker daemon is on the host
if [ -n "$DIND" ]; then
docker -H $DOCKER_DAEMON_ADDR build --build-arg service_lib_dir="$SERVICE_LIB_DIR" -t ${image}:${ag} .
docker -H $DOCKER_DAEMON_ADDR build --build-arg service_lib_dir="$SERVICE_LIB_DIR" -t ${image}:${tag} .
else
docker build --build-arg service_lib_dir="$SERVICE_LIB_DIR" --network host -t ${image}:${tag} .
fi
4 changes: 3 additions & 1 deletion harmony_netcdf_to_zarr/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ def scale_attribute(src, attr, scale_factor, add_offset):
list | number
the scaled data; either a list of floats or a float scalar
"""
scale_fn = lambda x: float(x * scale_factor + add_offset)
def scale_fn(x):
return float(x * scale_factor + add_offset)

unscaled = getattr(src, attr)
if isinstance(unscaled, collections.Sequence) or isinstance(unscaled, np.ndarray):
return [scale_fn(u) for u in unscaled]
Expand Down
195 changes: 195 additions & 0 deletions harmony_netcdf_to_zarr/mosaic_utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
""" A module including functions to take many input NetCDF-4 datasets and
create aggregated dimensions, allowing for a single Zarr output.
"""
from datetime import timedelta
from typing import List, Optional, Union

from dateutil.parser import parse as parse_datetime

from netCDF4 import Dataset, Group, Variable
import numpy as np


NetCDF4Attribute = Union[bytes, str, np.integer, np.floating, np.ndarray]

seconds_delta = timedelta(seconds=1)
minutes_delta = timedelta(minutes=1)
hours_delta = timedelta(hours=1)
days_delta = timedelta(days=1)

time_delta_to_unit_map = {seconds_delta: 'seconds',
minutes_delta: 'minutes',
hours_delta: 'hours',
days_delta: 'days'}

time_unit_to_delta_map = {'seconds': seconds_delta,
'second': seconds_delta,
'secs': seconds_delta,
'sec': seconds_delta,
's': seconds_delta,
'minutes': minutes_delta,
'minute': minutes_delta,
'mins': minutes_delta,
'min': minutes_delta,
'hours': hours_delta,
'hour': hours_delta,
'hrs': hours_delta,
'hr': hours_delta,
'h': hours_delta,
'days': days_delta,
'day': days_delta,
'd': days_delta}


class DimensionsMapping:
""" A class containing the information for all dimensions contained in each
of the input NetCDF-4 granules. This class also will produce single,
aggregated arrays and metadata (if required) for the output Zarr
object.
"""
def __init__(self, input_paths: List[str]):
self.input_paths = input_paths
self.input_dimensions = {}
self.output_dimensions = {} # DAS-1375
self._map_input_dimensions()

def _map_input_dimensions(self):
""" Iterate through all input files and extract their dimension
information.
"""
for input_path in self.input_paths:
with Dataset(input_path, 'r') as input_dataset:
self._parse_group(input_dataset, input_dataset)

def _parse_group(self, group: Union[Dataset, Group], dataset: Dataset):
""" Iterate through group variables extracting each. Then recursively
call this function to parse any subgroups.
"""
for variable in group.variables.values():
self._parse_variable_dimensions(variable, dataset)

for nested_group in group.groups.values():
self._parse_group(nested_group, dataset)

def _parse_variable_dimensions(self, variable: Variable, dataset: Dataset):
""" Extract the dimensions associated with a given variable.
This function will only save those dimensions that have associated
variables also within the input NetCDF-4 file.
"""
for dimension_name in variable.dimensions:
dimension_path = resolve_reference_path(variable, dimension_name)
input_path = dataset.filepath()

if is_variable_in_dataset(dimension_path, dataset):
dim_data = self.input_dimensions.setdefault(dimension_path, {})

if input_path not in dim_data:
dim_data[input_path] = DimensionInformation(dataset,
dimension_path)


class DimensionInformation:
""" A class containing information on a dimension, including the path of
the dimension variable within a NetCDF-4 or Zarr Dataset, the values of
the 1-dimensional dimension array and any associated temporal epoch
and unit. For TRT-121, it is initially assumed that non-temporal
dimensions can be aggregated without requiring offsets similar to
that stored in the temporal dimensions `units` metdata attribute.
This class can be used to contain both the dimension information from
individual input NetCDF-4 files, as well as the aggregated output
dimension.
"""
def __init__(self, dataset: Dataset, dimension_path: str):
self.dimension_path = dimension_path
self.values = dataset[dimension_path][:]
self.units = get_nc_attribute(dataset[dimension_path], 'units')
self.epoch = None
self.time_unit = None
self._get_epoch_and_unit()

def _get_epoch_and_unit(self):
""" Check the `units` attribute in the dimension variable metadata. If
present, compare the format to the CF-Convention format for a
temporal dimension (e.g., "seconds since 2000-01-02T03:04:05"), and
extract the type of unit.
For documentation on temporal dimensions see:
https://cfconventions.org/cf-conventions/cf-conventions.html#time-coordinate
"""
if self.units is not None and ' since ' in self.units:
time_unit_string, epoch_string = self.units.split(' since ')
self.epoch = parse_datetime(epoch_string)
self.time_unit = time_unit_to_delta_map.get(time_unit_string)


def get_nc_attribute(
variable: Variable, attribute_name: str,
default_value: Optional[NetCDF4Attribute] = None
) -> Optional[NetCDF4Attribute]:
""" A helper function that attempts to retrieve the value of a metadata
attribute. If that attribute is missing the resulting `AttributeError`
is handled and an optional default value is applied.
"""
try:
nc_attribute = variable.getncattr(attribute_name)
except AttributeError:
nc_attribute = default_value

return nc_attribute


def resolve_reference_path(variable: Variable, reference: str) -> str:
""" Extract the full path of a dimension reference based upon the path of
the variable containing the reference.
If the reference has a leading slash, it is assumed to already be
qualified. If the group containing the variable making the reference
has a variable matching that name, then assume the reference is to that
matching variable in the group.
"""
if reference.startswith('/'):
output_reference = reference
else:
group = variable.group()
if reference in group.variables and isinstance(group, Group):
output_reference = '/'.join([group.path, reference])
else:
output_reference = f'/{reference}'

return output_reference


def is_variable_in_dataset(variable_full_path: str, dataset: Dataset) -> bool:
""" Check if a variable is present in the full dataset, based on its
full path. Attempting to address a nested variable from a `Dataset`
using its full path will result in an `IndexError` otherwise, even if
trying to perform `if variable_name in dataset`.
If a group in the path is missing, the check will also return `False`.
"""
variable_parts = variable_full_path.lstrip('/').split('/')
current_group = dataset

while len(variable_parts) > 1:
nested_group = variable_parts.pop(0)
if nested_group in current_group.groups:
current_group = current_group.groups[nested_group]
else:
variable_parts = []

return (
len(variable_parts) == 1
and variable_parts[0] in current_group.variables
)
1 change: 1 addition & 0 deletions requirements/core.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ boto3 ~= 1.14
git+https://github.com/nasa/harmony-service-lib-py@main#egg=harmony-service-lib
netCDF4 ~= 1.5
numpy ~= 1.18.0
python-dateutil ~= 2.8.2
s3fs ~= 0.4.0
zarr ~= 2.4.0
16 changes: 7 additions & 9 deletions tests/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class object for multiprocessing.popen_fork.Popen
os.close(child_w)
os.close(child_r)
class_object.finalizer = mp_util.Finalize(class_object, mp_util.close_fds,
(parent_r, parent_w,))
(parent_r, parent_w,))
class_object.sentinel = parent_r


Expand All @@ -68,7 +68,7 @@ def setUp(self):

@patch.dict(os.environ, MOCK_ENV)
@patch.object(NetCDFToZarrAdapter, '_callback_post')
@patch.object(mp_Popen, '_launch', new = mock_mp_fork_popen_launch)
@patch.object(mp_Popen, '_launch', new=mock_mp_fork_popen_launch)
@mock_s3
def test_end_to_end_file_conversion(self, _callback_post):
"""
Expand Down Expand Up @@ -148,7 +148,7 @@ def test_end_to_end_file_conversion(self, _callback_post):
self.assertEqual(out['data/vertical/north'][0, 2, 0], 0)
self.assertEqual(out['data/vertical/south'][0, 2, 0], 16)
self.assertEqual(out['data/vertical/south'][0, 0, 2], 0)
self.assertEqual(out['data/horizontal/east'][0, 2, 2], 16) # scale_factor = 2
self.assertEqual(out['data/horizontal/east'][0, 2, 2], 16) # scale_factor = 2
self.assertEqual(out['data/horizontal/east'][0, 0, 0], 0)
self.assertEqual(out['data/horizontal/west'][0, 0, 0], 16)
self.assertEqual(out['data/horizontal/west'][0, 2, 2], 0)
Expand All @@ -164,16 +164,15 @@ def test_end_to_end_file_conversion(self, _callback_post):
self.assertFalse(hasattr(out['data/horizontal/east'], 'missing_value'))

# 2D Nested Float Arrays
self.assertEqual(out['location/lat'][0, 1], 5.5)
self.assertEqual(out['location/lat'][0, 1], 5.5)
self.assertEqual(out['location/lon'][0, 1], -5.5)

# 1D Root-Level Float Array sharing its name with a dimension
self.assertEqual(out['time'][0], 166536)

self.assertEqual(out['time'][0], 166536)

@patch.dict(os.environ, MOCK_ENV)
@patch.object(NetCDFToZarrAdapter, '_callback_post')
@patch.object(mp_Popen, '_launch', new = mock_mp_fork_popen_launch)
@patch.object(mp_Popen, '_launch', new=mock_mp_fork_popen_launch)
@mock_s3
def test_end_to_end_large_file_conversion(self, _callback_post):
"""
Expand Down Expand Up @@ -212,8 +211,7 @@ def test_end_to_end_large_file_conversion(self, _callback_post):
self.assertEqual(str(out.tree()), contents)

# -- Data Assertions --
self.assertEqual(out['data/var'].chunks, (10000,) )

self.assertEqual(out['data/var'].chunks, (10000,))

@patch.object(argparse.ArgumentParser, 'error', return_value=None)
def test_does_not_accept_non_harmony_clis(self, argparse_error):
Expand Down
47 changes: 22 additions & 25 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
"""
Tests the Harmony convert module
"""
""" Tests the Harmony convert module """
from unittest import TestCase
from pytest import raises

import unittest
import pytest
from harmony_netcdf_to_zarr.convert import compute_chunksize

from harmony_netcdf_to_zarr import convert


class TestConvert(unittest.TestCase):
class TestConvert(TestCase):
"""
Tests the Harmony adapter
"""
Expand All @@ -20,50 +17,50 @@ def test_compute_chunksize_small(self):
Test of compute_chunksize method for a small input shape
"""
chunksize_expected = (100, 100, 100)
chunksize_result = convert.compute_chunksize(shape=(100, 100,100), datatype='f8')
assert chunksize_expected == chunksize_result
chunksize_result = compute_chunksize(shape=(100, 100, 100), datatype='f8')
self.assertTupleEqual(chunksize_expected, chunksize_result)

def test_compute_chunksize_medium(self):
"""
Test of compute_chunksize method for a medium input shape
"""
chunksize_expected = (100, 140, 140)
chunksize_result = convert.compute_chunksize(shape=(100, 1000,1000), datatype='f8')
assert chunksize_expected == chunksize_result
chunksize_result = compute_chunksize(shape=(100, 1000, 1000), datatype='f8')
self.assertTupleEqual(chunksize_expected, chunksize_result)

def test_compute_chunksize_large(self):
"""
Test of compute_chunksize method for a large input shape
"""
chunksize_expected = (125, 125, 125)
chunksize_result = convert.compute_chunksize(shape=(1000, 1000,1000), datatype='f8')
assert chunksize_expected == chunksize_result
chunksize_result = compute_chunksize(shape=(1000, 1000, 1000), datatype='f8')
self.assertTupleEqual(chunksize_expected, chunksize_result)

def test_compute_chunksize_with_compression_args(self):
"""
Test of compute_chunksize method with non-default compression args
"""
chunksize_expected = (100, 680, 680)
chunksize_result = convert.compute_chunksize(shape=(100, 1000,1000),
datatype='i4',
compression_ratio = 6.8,
compressed_chunksize_byte = '26.8 Mi')
assert chunksize_expected == chunksize_result
chunksize_result = compute_chunksize(shape=(100, 1000, 1000),
datatype='i4',
compression_ratio=6.8,
compressed_chunksize_byte='26.8 Mi')
self.assertTupleEqual(chunksize_expected, chunksize_result)

def test_compute_chunksize_wrong_arguments(self):
"""
Test of compute_chunksize method for a large input shape
"""
with pytest.raises(ValueError) as execinfo:
chunksize_result = convert.compute_chunksize(shape=(100, 1000,1000),
datatype='i4',
compression_ratio = 6.8,
compressed_chunksize_byte = '26.8 MB')
with raises(ValueError) as execinfo:
compute_chunksize(shape=(100, 1000, 1000),
datatype='i4',
compression_ratio=6.8,
compressed_chunksize_byte='26.8 MB')
err_message_expected = """Chunksize needs to be either an integer or string.
If it's a string, assuming it follows NIST standard for binary prefix
(https://physics.nist.gov/cuu/Units/binary.html)
except that only Ki, Mi, and Gi are allowed."""
assert str(execinfo.value) == err_message_expected
self.assertEqual(str(execinfo.value), err_message_expected)

def tearDown(self):
pass
Loading

0 comments on commit 7bd12ad

Please sign in to comment.