Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/4 parse datetime in model #8

Merged
merged 14 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
736 changes: 378 additions & 358 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 3 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ version = "0.1.0"
description = "Use pyspark to read CDM entities."
authors = ["JulesHuisman <[email protected]>"]
readme = "README.md"
packages = [{include = "pyspark_cdm"}]
packages = [{ include = "pyspark_cdm" }]

[tool.poetry.dependencies]
python = "<4.0,>=3.8"
commondatamodel-objectmodel = "^1.7.3"
commondatamodel-objectmodel = { git = "https://github.com/quantile-development/CDM.git", branch = "master", subdirectory = "objectModel/Python" }
nest-asyncio = "^1.5.6"
tenacity = "^8.2.3"

Expand All @@ -24,7 +24,4 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.pytest.ini_options]
filterwarnings = [
"error",
"ignore::DeprecationWarning",
]
filterwarnings = ["error", "ignore::DeprecationWarning"]
27 changes: 27 additions & 0 deletions pyspark_cdm/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DecimalType,
)

TIMESTAMP_TYPES = [TimestampType(), DateType()]

def catalog_factory(entity: "Entity") -> "Catalog":
if entity.is_model:
Expand All @@ -36,6 +37,32 @@ def type_mapping(self) -> dict:
def schema(self) -> StructType:
pass

@property
def timestamp_columns(self) -> list[str]:
"""
Extract the columns that are parsed as timestamp or datetimes data types.
"""
return [
entity.name
for entity
in self.schema
if entity.dataType in TIMESTAMP_TYPES
]

def overwrite_timestamp_types(self, schema: StructType) -> StructType:
"""
Overwrite in the provided schema the timestamp date types to strings.
"""
overwritten_date_types = list()

for entity in schema:
if entity.dataType in TIMESTAMP_TYPES:
entity.dataType = StringType()

overwritten_date_types.append(entity)

return StructType(overwritten_date_types)


class ModelCatalog(Catalog):
@property
Expand Down
85 changes: 85 additions & 0 deletions pyspark_cdm/datetime_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import re
from pyspark.sql import DataFrame
from pyspark.sql.functions import to_timestamp, col
from pyspark_cdm.catalog import Catalog
from typing import Optional
from .utils import first_non_empty_values

DATE_FORMATS = {
None: r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,}Z", # yyyy-MM-dd'T'HH:mm:ss.SSSZ doesn't work, pyspark format should be empty
"M/d/yyyy h:mm:ss a": r"\d{1,2}/\d{1,2}/\d{4} \d{1,2}:\d{2}:\d{2} [AP]M",
"yyyy-MM-dd'T'HH:mm:ss.SSS": r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}",
"yyyy-MM-dd'T'HH:mm:ss'Z'": r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z",
"yyyy-MM-dd'T'HH:mm:ss": r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}",
"dd-MM-yyyy HH:mm:ss": r"\d{2}-(0[1-9]|1[0-2])-\d{4} \d{2}:\d{2}:\d{2}",
"dd-MM-yyyy": r"\d{2}-(0[1-9]|1[0-2])-\d{4}",
"MM-dd-yyyy HH:mm:ss": r"(0[1-9]|1[0-2])-([0-2][0-9]|3[0-1])-\d{4} \d{2}:\d{2}:\d{2}",
"MM-dd-yyyy": r"(0[1-9]|1[0-2])-([0-2][0-9]|3[0-1])-\d{4}",
}

class DatetimeParser:
def __init__(self, df: DataFrame, catalog: Catalog) -> None:
self.df = df
self.catalog = catalog

def detect_date_format(self, date_string: str) -> Optional[str]:
"""
Tries to find a matching regex pattern on the provided date_string. If it
matches it returns the corresponding pyspark_format. Otherwise, it raises
an exception.
"""
for pyspark_format, regex in DATE_FORMATS.items():
# The regex match fails if the provided date_string is a none value, therefore,
# we return none once we stumble upon a none value.
if date_string == None:
return None

if re.match(regex, date_string):
return pyspark_format

raise Exception(f"Cant find a matching datetime pattern for {date_string}")

def try_parsing_datetime_column(
self,
df: DataFrame,
column_name: str,
datetime_format: str,
) -> DataFrame:
"""
Convert the a single datetime column using the to_timestamp method
with the required datetime format (e.g. "M/d/yyyy hh:mm:ss a").
"""
try:
df = df.withColumn(column_name, to_timestamp(col(column_name), datetime_format))
return df

except:
print(f'Failed parsing {column_name} with {datetime_format}')
return df

def convert_datetime_columns(
self,
) -> DataFrame:
"""
Loops over all the timestamp related columns and transforms these from strings
into datetime objects.
"""
# Get for all the timestamp columns the first non empty value
sampled_values = first_non_empty_values(
self.df,
self.catalog.timestamp_columns,
)

# Loop over all the timestamp columns and convert it into a datetime column
df_parsed = self.df

for column_name in self.catalog.timestamp_columns:
pyspark_format = self.detect_date_format(sampled_values[column_name])

df_parsed = self.try_parsing_datetime_column(
df_parsed,
column_name,
pyspark_format
)

return df_parsed
61 changes: 49 additions & 12 deletions pyspark_cdm/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
from cdm.utilities.copy_options import CopyOptions
from cdm.utilities.resolve_options import ResolveOptions
from cdm.persistence.modeljson import LocalEntityDeclarationPersistence
from pyspark.sql.types import StructField, StructType
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame
from pyspark_cdm.datetime_parser import DatetimeParser
from tenacity import retry, stop_after_attempt, wait_random_exponential


Expand Down Expand Up @@ -131,31 +132,67 @@ def file_paths(self) -> Generator[str, None, None]:
yield remove_root_from_path(path, "/dbfs")

@property
def schema(self) -> StructType:
def catalog(self) -> StructType:
"""
The schema of the entity.

Returns:
str: The schema of the entity.
"""
catalog = catalog_factory(self)
return catalog.schema
return catalog


@retry(
stop=stop_after_attempt(2),
wait=wait_random_exponential(multiplier=3, max=60),
after=log_attempt_number,
)
def get_dataframe(
self,
self,
spark,
infer_timestamp_formats: bool = False,
alter_schema=lambda schema: schema,
) -> DataFrame:
return spark.read.csv(
list(self.file_paths),
header=False,
schema=alter_schema(self.schema),
inferSchema=False,
multiLine=True,
escape='"',
)
"""
Loads the data using Spark.

Args:
spark: spark session.
infer_timestamp_formats (bool, optional): Whether we should infer the timestamp
formats using regex. Defaults to False.
alter_schema: Alter the schema.

Returns:
DataFrame: Spark dataframe with the loaded data.
"""


if infer_timestamp_formats:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
schema_with_replaced_timestamp_types = self.catalog.overwrite_timestamp_types(self.catalog.schema)

df = spark.read.csv(
list(self.file_paths),
header=False,
schema=alter_schema(schema_with_replaced_timestamp_types),
inferSchema=False,
multiLine=True,
escape='"',
)

datetime_parser = DatetimeParser(df, self.catalog)
parsed_df = datetime_parser.convert_datetime_columns()

return parsed_df

else:

return spark.read.csv(
list(self.file_paths),
header=False,
schema=alter_schema(self.catalog.schema),
inferSchema=False,
multiLine=True,
escape='"',
)
23 changes: 22 additions & 1 deletion pyspark_cdm/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
from typing import Optional
from typing import Optional, List
from cdm.enums import CdmStatusLevel, CdmDataFormat
from cdm.objectmodel import (
CdmCorpusDefinition,
CdmObject,
)
from pyspark.sql import DataFrame, Row
import pyspark.sql.functions as F


from pyspark_cdm.exceptions import DocumentLoadingException
Expand Down Expand Up @@ -73,3 +75,22 @@ def remove_root_from_path(path: str, root: str) -> str:
str: Path without the root.
"""
return f"/{path.lstrip(root)}"

def first_non_empty_values(
df: DataFrame,
selected_columns: List[str]
) -> Row:
"""
Returns for each given column the first non empty value. Be aware that
once a column is full with nulls, it returns a None for that given
column.
"""
row = df.select(
[
F.first(column, ignorenulls=True).alias(column)
for column
in selected_columns
]
).first()

return row
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from tests.fixtures.manifest import manifest
from tests.fixtures.corpus import manifest_corpus, model_corpus
from tests.fixtures.entity import entity
from tests.fixtures.datetime_parser import datetime_parser

warnings.filterwarnings("ignore", module="cdm")
warnings.simplefilter("ignore", category=DeprecationWarning)
16 changes: 16 additions & 0 deletions tests/fixtures/datetime_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pytest
from pyspark.sql import SparkSession
from pyspark_cdm.datetime_parser import DatetimeParser
from pyspark_cdm.entity import Entity

@pytest.fixture
def datetime_parser(spark: SparkSession, entity: Entity):
df = entity.get_dataframe(spark, True)
catalog = entity.catalog

datetime_parser = DatetimeParser(
df=df,
catalog=catalog,
)

return datetime_parser
3 changes: 0 additions & 3 deletions tests/samples/model/example/1.csv

This file was deleted.

6 changes: 3 additions & 3 deletions tests/samples/model/example/Snapshot/1_1690183552.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
7282fed7-c82c-eb11-a813-000d3a3a75c1,"6/12/2023 12:03:02 PM","6/12/2023 12:03:02 PM",6b82fed7-c82c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,7282fed7-c82c-eb11-a813-000d3a3a75c1,56635,False,"0001-01-01T00:00:00.0000000"
32b07058-d02c-eb11-a813-000d3a3a75c1,"6/12/2023 12:03:02 PM","6/12/2023 12:03:02 PM",2ab07058-d02c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,32b07058-d02c-eb11-a813-000d3a3a75c1,393639,False,"0001-01-01T00:00:00.0000000"
fd11d7fa-d42c-eb11-a813-000d3a3a75c1,"6/12/2023 12:03:02 PM","6/12/2023 12:03:02 PM",f511d7fa-d42c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,fd11d7fa-d42c-eb11-a813-000d3a3a75c1,542839,False,"0001-01-01T00:00:00.0000000"
7282fed7-c82c-eb11-a813-000d3a3a75c1,"6/15/2023 12:03:02 PM","6/12/2023 12:03:02 PM",6b82fed7-c82c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,7282fed7-c82c-eb11-a813-000d3a3a75c1,56635,False,"0001-01-01T00:00:00.0000000","2023-12-01T08:25:39.0000000+00:00","2024-01-30T13:24:06Z","20-12-2023","20-12-2023 15:45:03","2022-09-30T07:34:52.5348484Z","2022-03-22T13:40:11.0000000","20-12-2023","2022-05-05T14:38:48.8547924Z","2022-03-12T15:50:11.0000000"
32b07058-d02c-eb11-a813-000d3a3a75c1,"6/15/2023 12:03:02 PM","6/12/2023 12:03:02 PM",2ab07058-d02c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,32b07058-d02c-eb11-a813-000d3a3a75c1,393639,False,"0001-01-01T00:00:00.0000000","2023-12-01T08:25:39.0000000+00:00","2024-01-30T13:24:06Z","20-12-2023","20-12-2023 15:45:03","2022-09-30T07:34:52.5348484Z","2022-03-22T13:40:11.0000000","20-12-2023","2022-05-05T14:38:48.8547924Z","2022-03-12T15:50:11.0000000"
fd11d7fa-d42c-eb11-a813-000d3a3a75c1,"6/15/2023 12:03:02 PM","6/12/2023 12:03:02 PM",f511d7fa-d42c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,fd11d7fa-d42c-eb11-a813-000d3a3a75c1,542839,False,"0001-01-01T00:00:00.0000000","2023-12-01T08:25:39.0000000+00:00","2024-01-30T13:24:06Z","20-12-2023","20-12-2023 15:45:03","2022-09-30T07:34:52.5348484Z","2022-03-22T13:40:11.0000000","20-12-2023","2022-05-05T14:38:48.8547924Z","2022-03-12T15:50:11.0000000"
45 changes: 45 additions & 0 deletions tests/samples/model/model.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,51 @@
"name": "CreatedOn",
"dataType": "dateTimeOffset",
"maxLength": -1
},
{
"name": "RandomDateTime",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime2",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime3",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime4",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime5",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime6",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime7",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime8",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime9",
"dataType": "dateTime",
"maxLength": -1
}
],
"partitions": [
Expand Down
Loading
Loading