Skip to content

Commit

Permalink
Merge pull request #62 from bcdev/forman-57-ml_dataset_as_datatree
Browse files Browse the repository at this point in the history
refactor towards datatree
  • Loading branch information
forman authored Feb 21, 2025
2 parents 8c0f848 + 04f1873 commit 34827af
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 35 deletions.
4 changes: 0 additions & 4 deletions tests/plugins/xcube/processors/test_mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
from xrlint.plugins.xcube.util import LevelInfo, LevelsMeta
from xrlint.result import Message

# TODO: This tests requires zarr >=2, <3, because the test used fsspec's
# memory filesystem, which is not async but zarr wants all filesystems
# to be async now.


class MultiLevelDatasetProcessorTest(TestCase):
levels_name = "xrlint-test"
Expand Down
36 changes: 36 additions & 0 deletions tests/plugins/xcube/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright © 2025 Brockmann Consult GmbH.
# This software is distributed under the terms and conditions of the
# MIT license (https://mit-license.org/).

from unittest import TestCase

from xrlint.plugins.xcube.util import is_absolute_path
from xrlint.plugins.xcube.util import resolve_path


class UtilTest(TestCase):
def test_is_absolute_path(self):
self.assertTrue(is_absolute_path("/home/forman"))
self.assertTrue(is_absolute_path("//bcserver2/fs1"))
self.assertTrue(is_absolute_path("file://home/forman"))
self.assertTrue(is_absolute_path("s3://xcube-data"))
self.assertTrue(is_absolute_path(r"C:\Users\Norman"))
self.assertTrue(is_absolute_path(r"C:/Users/Norman"))
self.assertTrue(is_absolute_path(r"C:/Users/Norman"))
self.assertTrue(is_absolute_path(r"\\bcserver2\fs1"))

self.assertFalse(is_absolute_path(r"data"))
self.assertFalse(is_absolute_path(r"./data"))
self.assertFalse(is_absolute_path(r"../data"))

def test_resolve_path(self):
self.assertEqual(
"/home/forman/data", resolve_path("data", root_path="/home/forman")
)
self.assertEqual(
"/home/forman/data", resolve_path("./data", root_path="/home/forman")
)
self.assertEqual(
"/home/data", resolve_path("../data", root_path="/home/forman")
)
self.assertEqual("s3://opensr/test.zarr", resolve_path("s3://opensr/test.zarr"))
12 changes: 5 additions & 7 deletions xrlint/_linter/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,11 @@ def _open_and_validate_dataset(
except (OSError, ValueError, TypeError) as e:
return [new_fatal_message(str(e))]
access_latency = time.time() - t0
return processor_op.postprocess(
[
_validate_dataset(config_obj, ds, path, i, access_latency)
for i, (ds, path) in enumerate(ds_path_list)
],
file_path,
)
messages = [
_validate_dataset(config_obj, ds, path, i, access_latency)
for i, (ds, path) in enumerate(ds_path_list)
]
return processor_op.postprocess(messages, file_path)
else:
try:
dataset, access_latency = _open_dataset(
Expand Down
48 changes: 30 additions & 18 deletions xrlint/plugins/xcube/processors/mldataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@

from xrlint.plugins.xcube.constants import ML_FILE_PATTERN, ML_META_FILENAME
from xrlint.plugins.xcube.plugin import plugin
from xrlint.plugins.xcube.util import LevelsMeta, attach_dataset_level_infos, norm_path
from xrlint.plugins.xcube.util import (
LevelsMeta,
attach_dataset_level_infos,
resolve_path,
)
from xrlint.processor import ProcessorOp
from xrlint.result import Message

level_pattern = re.compile(r"^(\d+)(?:\.zarr)?$")
link_pattern = re.compile(r"^(\d+)(?:\.link)?$")


@plugin.define_processor("multi-level-dataset")
Expand All @@ -25,7 +30,7 @@ class MultiLevelDatasetProcessor(ProcessorOp):

def preprocess(
self, file_path: str, opener_options: dict[str, Any]
) -> list[tuple[xr.Dataset, str]]:
) -> list[tuple[xr.Dataset | xr.DataTree, str]]:
fs, fs_path = get_filesystem(file_path, opener_options)

file_names = [
Expand All @@ -40,18 +45,17 @@ def preprocess(
with fs.open(f"{fs_path}/{ML_META_FILENAME}") as stream:
meta = LevelsMeta.from_value(json.load(stream))

# check for optional ".0.link" that locates level 0 somewhere else
level_0_path = None
if "0.link" in file_names:
level_0_path = fs.read_text(f"{fs_path}/0.link")
# check for optional ".zgroup"
# if ".zgroup" in file_names:
# with fs.open(f"{fs_path}/.zgroup") as stream:
# group_props = json.load(stream)

level_names, num_levels = parse_levels(file_names, level_0_path)
level_paths, num_levels = parse_levels(fs, file_path, file_names)

engine = opener_options.pop("engine", "zarr")

level_datasets: list[xr.Dataset | None] = []
for level, level_name in level_names.items():
level_path = norm_path(f"{file_path}/{level_name}")
for level, level_path in level_paths.items():
level_dataset = xr.open_dataset(level_path, engine=engine, **opener_options)
level_datasets.append((level_dataset, level_path))

Expand Down Expand Up @@ -80,22 +84,30 @@ def get_filesystem(file_path: str, opener_options: dict[str, Any]):


def parse_levels(
file_names: list[str], level_0_path: str | None
fs: fsspec.AbstractFileSystem, dataset_path: str, file_names: list[str]
) -> tuple[dict[int, str], int]:
level_names: dict[int, str] = {0: level_0_path} if level_0_path else {}
num_levels = 0
level_paths: dict[int, str] = {}
for file_name in file_names:
# check for optional "<level>.link" that locates a level somewhere else
m = link_pattern.match(file_name)
if m is not None:
level = int(m.group(1))
link_path = fs.read_text(f"{dataset_path}/{file_name}")
level_paths[level] = resolve_path(link_path, root_path=dataset_path)
# check for regular "<level>.zarr"
m = level_pattern.match(file_name)
if m is not None:
level = int(m.group(1))
level_names[level] = file_name
num_levels = max(num_levels, level + 1)
if not level_names:
level_paths[level] = f"{dataset_path}/{file_name}"

if not level_paths:
raise ValueError("empty multi-level dataset")
num_levels = max(level_names.keys()) + 1

num_levels = max(level_paths.keys()) + 1
for level in range(num_levels):
if level not in level_names:
if level not in level_paths:
raise ValueError(
f"missing dataset for level {level} in multi-level dataset"
)
return level_names, num_levels

return level_paths, num_levels
23 changes: 19 additions & 4 deletions xrlint/plugins/xcube/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,26 @@ def get_spatial_size(
return None


def norm_path(level_path: str) -> str:
parts = level_path.replace("\\", "/").split("/")
level_path = "/".join(
def resolve_path(path: str, root_path: str | None = None) -> str:
abs_level_path = path
if root_path is not None and not is_absolute_path(path):
abs_level_path = f"{root_path}/{path}"
parts = abs_level_path.rstrip("/").replace("\\", "/").split("/")
return "/".join(
p
for i, p in enumerate(parts)
if p not in (".", "..") and (i == len(parts) - 1 or parts[i + 1] != "..")
)
return level_path


def is_absolute_path(path: str) -> bool:
return (
# Unix abs path
path.startswith("/")
# URL
or "://" in path
# Windows abs paths
or path.startswith("\\\\")
or path.find(":\\", 1) == 1
or path.find(":/", 1) == 1
)
4 changes: 2 additions & 2 deletions xrlint/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ProcessorOp(ABC):
@abstractmethod
def preprocess(
self, file_path: str, opener_options: dict[str, Any]
) -> list[tuple[xr.Dataset, str]]:
) -> list[tuple[xr.Dataset | xr.DataTree, str]]:
"""Pre-process a dataset given by its `file_path` and `opener_options`.
In this method you use the `file_path` to read zero, one, or more
datasets to lint.
Expand All @@ -28,7 +28,7 @@ def preprocess(
opener_options: The configuration's `opener_options`.
Returns:
A list of (dataset, file_path) pairs
A list of (dataset or datatree, file_path) pairs
"""

@abstractmethod
Expand Down

0 comments on commit 34827af

Please sign in to comment.