Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Development in partial pipeline demo #156

Closed
wants to merge 15 commits into from
Closed
2 changes: 0 additions & 2 deletions oteapi_dlite/strategies/parse_excel.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:

names, units = zip(*[split_column_name(column) for column in columns])
rec = dict2recarray(columns, names=names)

if config.metadata:
if config.storage_path is not None:
for storage_path in config.storage_path.split("|"):
Expand All @@ -131,7 +130,6 @@ def get(self, session: "Optional[Dict[str, Any]]" = None) -> SessionUpdate:
inst = meta(dimensions=[len(rec)], id=config.id)
for name in names:
inst[name] = rec[name]

# Insert inst into collection
coll = get_collection(session)
coll.add(config.label, inst)
Expand Down
145 changes: 145 additions & 0 deletions oteapi_dlite/strategies/parse_mpr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""Strategy that parses an mpr file"""
from typing import Any, Dict, Optional

# pylint: disable=unused-argument,C0301,R0914
import dlite
import pandas as pd
import requests # type: ignore
from galvani import BioLogic as BL
from oteapi.datacache import DataCache
from oteapi.models import (
AttrDict,
DataCacheConfig,
ResourceConfig,
SessionUpdate,
)
from pydantic import Field, HttpUrl
from pydantic.dataclasses import dataclass

from oteapi_dlite.utils import dict2recarray, get_collection, update_collection


class MPRConfig(AttrDict):
"""MPR parse-specific Configuration Data Model."""

datacache_config: Optional[DataCacheConfig] = Field(
None,
description=(
"Configurations for the data cache for storing the downloaded file "
"content."
),
)
id: Optional[str] = Field(None, description="Optional id on new instance.")
metadata: Optional[HttpUrl] = Field(
None,
description=(
"URI of DLite metadata to return. If not provided, the metadata "
"will be inferred from the excel file."
),
)

label: Optional[str] = Field(
"mpr-data",
description="Optional label for new instance in collection.",
)

mpr_config: AttrDict = Field(
AttrDict(),
description="Co .",
)
storage_path: Optional[str] = Field(
None,
description="Path to metadata storage",
)


class MPRParseConfig(ResourceConfig):
"""File download strategy filter config."""

mediaType: str = Field(
"application/parse-mpr",
const=True,
description=ResourceConfig.__fields__[
"mediaType"
].field_info.description,
)

datacache_config: Optional[DataCacheConfig] = Field(
None,
description=(
"Configurations for the data cache for storing the downloaded file "
"content."
),
)

configuration: MPRConfig = Field(
MPRConfig(), description="MPR parse strategy-specific configuration."
)


class SessionUpdateMPRParse(SessionUpdate):
"""Class for returning values from MPR Parse."""

eis_data: dict = Field(..., description="Content of the EISDlite document.")


@dataclass
class MPRDataParseStrategy:
"""Parse strategy for MPR.

**Registers strategies**:

- `("mediaType", "application/parse-mpr")`

"""

parse_config: MPRParseConfig

def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()

def get(
self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdateMPRParse:
"""Download mpr file and return a list of dowload urls for later analysis."""
coll = get_collection(session)
config = self.parse_config
relations = config.configuration.mpr_config
req = requests.get(
config.downloadUrl,
allow_redirects=True,
timeout=(3, 27), # timeout: (connect, read) in seconds
)
cache = DataCache()
key = cache.add(req.content)
# using the key get file from cache
with cache.getfile(key, suffix=".mpr") as filename:
mpr_file = BL.MPRfile(str(filename))
data = {}
for relation in relations:
data[relation] = mpr_file.data[relations[relation]]
eis_file_data = pd.DataFrame(data)
eis_data = None
if eis_data is None:
eis_data = eis_file_data
else:
# concatenate the data with previous EIS files' data
eis_data = pd.concat([eis_data, eis_file_data], ignore_index=True)
rec = dict2recarray(data)
configuration = config.configuration
if configuration.metadata:
if configuration.storage_path is not None:
for storage_path in configuration.storage_path.split("|"):
dlite.storage_path.append(storage_path)
meta = dlite.get_instance(configuration.metadata)

inst = meta(dims=[len(rec)], id=configuration.id)
for name in relations:
inst[name] = data[name]
# # Insert inst into collection
coll.add(configuration.label, inst)
update_collection(coll)
return SessionUpdateMPRParse(eis_data=eis_data.to_dict())
153 changes: 153 additions & 0 deletions oteapi_dlite/strategies/parse_txt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
"""Strategy that parses text and return all associated download links."""
from typing import Any, Dict, Optional

# pylint: disable=unused-argument,C0206
import dlite
import requests # type: ignore
from oteapi.models import (
AttrDict,
DataCacheConfig,
ResourceConfig,
SessionUpdate,
)
from pydantic import Field, HttpUrl
from pydantic.dataclasses import dataclass

from oteapi_dlite.utils import get_collection, update_collection


class TXTConfig(AttrDict):
"""TXT parse-specific Configuration Data Model."""

datacache_config: Optional[DataCacheConfig] = Field(
None,
description=(
"Configurations for the data cache for storing the downloaded file "
"content."
),
)
id: Optional[str] = Field(None, description="Optional id on new instance.")
metadata: Optional[HttpUrl] = Field(
None,
description=(
"URI of DLite metadata to return. If not provided, the metadata "
"will be inferred from the excel file."
),
)

label: Optional[str] = Field(
"txt-data",
description="Optional label for new instance in collection.",
)

splitBy: str = Field(
None,
description="identifier to split",
)
storage_path: Optional[str] = Field(
None,
description="Path to metadata storage",
)


class TXTParseConfig(ResourceConfig):
"""File download strategy filter config."""

mediaType: str = Field(
"application/parse-txt",
const=True,
description=ResourceConfig.__fields__[
"mediaType"
].field_info.description,
)

datacache_config: Optional[DataCacheConfig] = Field(
None,
description=(
"Configurations for the data cache for storing the downloaded file "
"content."
),
)

configuration: TXTConfig = Field(
TXTConfig(), description="TXT parse strategy-specific configuration."
)


class SessionUpdateTXTParse(SessionUpdate):
"""Class for returning values from TXT Parse."""

image_metadata: dict = Field(..., description="Image Metadata.")


@dataclass
class TXTDataParseStrategy:
"""Parse strategy for TXT.

**Registers strategies**:

- `("mediaType", "application/parse-txt")`

"""

parse_config: TXTParseConfig

def initialize(
self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdate:
"""Initialize."""
return SessionUpdate()

def get(
self, session: "Optional[Dict[str, Any]]" = None
) -> SessionUpdateTXTParse:
"""Download TXT file and return a list of dowload urls
for later analysis."""
coll = get_collection(session)
config = self.parse_config

req = requests.get(
config.downloadUrl,
allow_redirects=True,
timeout=(3, 27), # timeout: (connect, read) in seconds
)
image_metadata = parse_metadata(req, config.configuration.splitBy)
configuration = config.configuration
if configuration.metadata:
if configuration.storage_path is not None:
for storage_path in configuration.storage_path.split("|"):
dlite.storage_path.append(storage_path)
meta = dlite.get_instance(configuration.metadata)

inst = meta(dims=[len(image_metadata)], id=configuration.id)
for name in image_metadata:
inst[name] = image_metadata[name]
# # Insert inst into collection
coll.add(configuration.label, inst)
update_collection(coll)
return SessionUpdateTXTParse(image_metadata=image_metadata)


def parse_metadata(response, splitby):
"""Parse the metadata using splitby symbol"""
metadata = {}

# Decode the content to text and split into lines
lines = response.content.decode().splitlines()

for line in lines:
# Ignore lines that do not contain keyword-value pairs
if "=" not in line:
continue

# Split the line into keyword and value
keyword, value = line.strip().split(splitby, 1)

# Ignore empty values
if not value:
continue

# Add the keyword-value pair to the dictionary
metadata[keyword] = value

return metadata
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
DLite-Python>=0.3.3,<1.0
galvani==0.2.1
numpy>=1.21,<2
numpy>=1.21,<2
oteapi-core>=0.1.2,<0.6.0
pandas==2.1.0
Pillow>=9.0.1,<11
# psycopg2-binary!=2.9.6
pydantic>=1.10,<2 # Indirect requirement
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[options.entry_points]
oteapi.parse =
oteapi_dlite.application/vnd.dlite-parse = oteapi_dlite.strategies.parse:DLiteParseStrategy
oteapi_dlite.application/parse-mpr = oteapi_dlite.strategies.parse_mpr:MPRDataParseStrategy
oteapi_dlite.application/parse-txt = oteapi_dlite.strategies.parse_txt:TXTDataParseStrategy
#oteapi_dlite.application/json = oteapi_dlite.strageties.parse:DLiteParseStrategy
#oteapi_dlite.application/yaml = oteapi_dlite.strategies.parse:DLiteParseStrategy
#oteapi_dlite.application/x-hdf5 = oteapi_dlite.strategies.parse:DLiteParseStrategy
Expand Down