Skip to content

Commit

Permalink
Feature/4 parse datetime in model (#8)
Browse files Browse the repository at this point in the history
* Added additional datetime columns to test csv's

* Updated sample data and added first iteration of datetime parsing detection

* Added print statement to detect date format

* Added additional code to detect date format for date strings that are none.

* Added additional regex date foramt

* Added additional docstring.

* Added additional regex date formats.

* Add new columns to the model.json file

* Added some failing pytest to the date time parser.

* Added docstring to get_frame and changed an argument

* Added a sentence to the get_dataframe method docstring

* Added missing parameter alter_schema to entity get_dataframe

* Added forked cdm repo.
  • Loading branch information
paulorijnberg authored Feb 20, 2024
1 parent 6bff211 commit b4805fa
Show file tree
Hide file tree
Showing 15 changed files with 755 additions and 388 deletions.
736 changes: 378 additions & 358 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 3 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ version = "0.1.0"
description = "Use pyspark to read CDM entities."
authors = ["JulesHuisman <[email protected]>"]
readme = "README.md"
packages = [{include = "pyspark_cdm"}]
packages = [{ include = "pyspark_cdm" }]

[tool.poetry.dependencies]
python = "<4.0,>=3.8"
commondatamodel-objectmodel = "^1.7.3"
commondatamodel-objectmodel = { git = "https://github.com/quantile-development/CDM.git", branch = "master", subdirectory = "objectModel/Python" }
nest-asyncio = "^1.5.6"
tenacity = "^8.2.3"

Expand All @@ -24,7 +24,4 @@ requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.pytest.ini_options]
filterwarnings = [
"error",
"ignore::DeprecationWarning",
]
filterwarnings = ["error", "ignore::DeprecationWarning"]
27 changes: 27 additions & 0 deletions pyspark_cdm/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DecimalType,
)

TIMESTAMP_TYPES = [TimestampType(), DateType()]

def catalog_factory(entity: "Entity") -> "Catalog":
if entity.is_model:
Expand All @@ -36,6 +37,32 @@ def type_mapping(self) -> dict:
def schema(self) -> StructType:
pass

@property
def timestamp_columns(self) -> list[str]:
"""
Extract the columns that are parsed as timestamp or datetimes data types.
"""
return [
entity.name
for entity
in self.schema
if entity.dataType in TIMESTAMP_TYPES
]

def overwrite_timestamp_types(self, schema: StructType) -> StructType:
"""
Overwrite in the provided schema the timestamp date types to strings.
"""
overwritten_date_types = list()

for entity in schema:
if entity.dataType in TIMESTAMP_TYPES:
entity.dataType = StringType()

overwritten_date_types.append(entity)

return StructType(overwritten_date_types)


class ModelCatalog(Catalog):
@property
Expand Down
85 changes: 85 additions & 0 deletions pyspark_cdm/datetime_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import re
from pyspark.sql import DataFrame
from pyspark.sql.functions import to_timestamp, col
from pyspark_cdm.catalog import Catalog
from typing import Optional
from .utils import first_non_empty_values

DATE_FORMATS = {
None: r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{1,}Z", # yyyy-MM-dd'T'HH:mm:ss.SSSZ doesn't work, pyspark format should be empty
"M/d/yyyy h:mm:ss a": r"\d{1,2}/\d{1,2}/\d{4} \d{1,2}:\d{2}:\d{2} [AP]M",
"yyyy-MM-dd'T'HH:mm:ss.SSS": r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}",
"yyyy-MM-dd'T'HH:mm:ss'Z'": r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z",
"yyyy-MM-dd'T'HH:mm:ss": r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}",
"dd-MM-yyyy HH:mm:ss": r"\d{2}-(0[1-9]|1[0-2])-\d{4} \d{2}:\d{2}:\d{2}",
"dd-MM-yyyy": r"\d{2}-(0[1-9]|1[0-2])-\d{4}",
"MM-dd-yyyy HH:mm:ss": r"(0[1-9]|1[0-2])-([0-2][0-9]|3[0-1])-\d{4} \d{2}:\d{2}:\d{2}",
"MM-dd-yyyy": r"(0[1-9]|1[0-2])-([0-2][0-9]|3[0-1])-\d{4}",
}

class DatetimeParser:
def __init__(self, df: DataFrame, catalog: Catalog) -> None:
self.df = df
self.catalog = catalog

def detect_date_format(self, date_string: str) -> Optional[str]:
"""
Tries to find a matching regex pattern on the provided date_string. If it
matches it returns the corresponding pyspark_format. Otherwise, it raises
an exception.
"""
for pyspark_format, regex in DATE_FORMATS.items():
# The regex match fails if the provided date_string is a none value, therefore,
# we return none once we stumble upon a none value.
if date_string == None:
return None

if re.match(regex, date_string):
return pyspark_format

raise Exception(f"Cant find a matching datetime pattern for {date_string}")

def try_parsing_datetime_column(
self,
df: DataFrame,
column_name: str,
datetime_format: str,
) -> DataFrame:
"""
Convert the a single datetime column using the to_timestamp method
with the required datetime format (e.g. "M/d/yyyy hh:mm:ss a").
"""
try:
df = df.withColumn(column_name, to_timestamp(col(column_name), datetime_format))
return df

except:
print(f'Failed parsing {column_name} with {datetime_format}')
return df

def convert_datetime_columns(
self,
) -> DataFrame:
"""
Loops over all the timestamp related columns and transforms these from strings
into datetime objects.
"""
# Get for all the timestamp columns the first non empty value
sampled_values = first_non_empty_values(
self.df,
self.catalog.timestamp_columns,
)

# Loop over all the timestamp columns and convert it into a datetime column
df_parsed = self.df

for column_name in self.catalog.timestamp_columns:
pyspark_format = self.detect_date_format(sampled_values[column_name])

df_parsed = self.try_parsing_datetime_column(
df_parsed,
column_name,
pyspark_format
)

return df_parsed
61 changes: 49 additions & 12 deletions pyspark_cdm/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
from cdm.utilities.copy_options import CopyOptions
from cdm.utilities.resolve_options import ResolveOptions
from cdm.persistence.modeljson import LocalEntityDeclarationPersistence
from pyspark.sql.types import StructField, StructType
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame
from pyspark_cdm.datetime_parser import DatetimeParser
from tenacity import retry, stop_after_attempt, wait_random_exponential


Expand Down Expand Up @@ -131,31 +132,67 @@ def file_paths(self) -> Generator[str, None, None]:
yield remove_root_from_path(path, "/dbfs")

@property
def schema(self) -> StructType:
def catalog(self) -> StructType:
"""
The schema of the entity.
Returns:
str: The schema of the entity.
"""
catalog = catalog_factory(self)
return catalog.schema
return catalog


@retry(
stop=stop_after_attempt(2),
wait=wait_random_exponential(multiplier=3, max=60),
after=log_attempt_number,
)
def get_dataframe(
self,
self,
spark,
infer_timestamp_formats: bool = False,
alter_schema=lambda schema: schema,
) -> DataFrame:
return spark.read.csv(
list(self.file_paths),
header=False,
schema=alter_schema(self.schema),
inferSchema=False,
multiLine=True,
escape='"',
)
"""
Loads the data using Spark.
Args:
spark: spark session.
infer_timestamp_formats (bool, optional): Whether we should infer the timestamp
formats using regex. Defaults to False.
alter_schema: Alter the schema.
Returns:
DataFrame: Spark dataframe with the loaded data.
"""


if infer_timestamp_formats:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
schema_with_replaced_timestamp_types = self.catalog.overwrite_timestamp_types(self.catalog.schema)

df = spark.read.csv(
list(self.file_paths),
header=False,
schema=alter_schema(schema_with_replaced_timestamp_types),
inferSchema=False,
multiLine=True,
escape='"',
)

datetime_parser = DatetimeParser(df, self.catalog)
parsed_df = datetime_parser.convert_datetime_columns()

return parsed_df

else:

return spark.read.csv(
list(self.file_paths),
header=False,
schema=alter_schema(self.catalog.schema),
inferSchema=False,
multiLine=True,
escape='"',
)
23 changes: 22 additions & 1 deletion pyspark_cdm/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import asyncio
from typing import Optional
from typing import Optional, List
from cdm.enums import CdmStatusLevel, CdmDataFormat
from cdm.objectmodel import (
CdmCorpusDefinition,
CdmObject,
)
from pyspark.sql import DataFrame, Row
import pyspark.sql.functions as F


from pyspark_cdm.exceptions import DocumentLoadingException
Expand Down Expand Up @@ -73,3 +75,22 @@ def remove_root_from_path(path: str, root: str) -> str:
str: Path without the root.
"""
return f"/{path.lstrip(root)}"

def first_non_empty_values(
df: DataFrame,
selected_columns: List[str]
) -> Row:
"""
Returns for each given column the first non empty value. Be aware that
once a column is full with nulls, it returns a None for that given
column.
"""
row = df.select(
[
F.first(column, ignorenulls=True).alias(column)
for column
in selected_columns
]
).first()

return row
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from tests.fixtures.manifest import manifest
from tests.fixtures.corpus import manifest_corpus, model_corpus
from tests.fixtures.entity import entity
from tests.fixtures.datetime_parser import datetime_parser

warnings.filterwarnings("ignore", module="cdm")
warnings.simplefilter("ignore", category=DeprecationWarning)
16 changes: 16 additions & 0 deletions tests/fixtures/datetime_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import pytest
from pyspark.sql import SparkSession
from pyspark_cdm.datetime_parser import DatetimeParser
from pyspark_cdm.entity import Entity

@pytest.fixture
def datetime_parser(spark: SparkSession, entity: Entity):
df = entity.get_dataframe(spark, True)
catalog = entity.catalog

datetime_parser = DatetimeParser(
df=df,
catalog=catalog,
)

return datetime_parser
3 changes: 0 additions & 3 deletions tests/samples/model/example/1.csv

This file was deleted.

6 changes: 3 additions & 3 deletions tests/samples/model/example/Snapshot/1_1690183552.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
7282fed7-c82c-eb11-a813-000d3a3a75c1,"6/12/2023 12:03:02 PM","6/12/2023 12:03:02 PM",6b82fed7-c82c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,7282fed7-c82c-eb11-a813-000d3a3a75c1,56635,False,"0001-01-01T00:00:00.0000000"
32b07058-d02c-eb11-a813-000d3a3a75c1,"6/12/2023 12:03:02 PM","6/12/2023 12:03:02 PM",2ab07058-d02c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,32b07058-d02c-eb11-a813-000d3a3a75c1,393639,False,"0001-01-01T00:00:00.0000000"
fd11d7fa-d42c-eb11-a813-000d3a3a75c1,"6/12/2023 12:03:02 PM","6/12/2023 12:03:02 PM",f511d7fa-d42c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,fd11d7fa-d42c-eb11-a813-000d3a3a75c1,542839,False,"0001-01-01T00:00:00.0000000"
7282fed7-c82c-eb11-a813-000d3a3a75c1,"6/15/2023 12:03:02 PM","6/12/2023 12:03:02 PM",6b82fed7-c82c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,7282fed7-c82c-eb11-a813-000d3a3a75c1,56635,False,"0001-01-01T00:00:00.0000000","2023-12-01T08:25:39.0000000+00:00","2024-01-30T13:24:06Z","20-12-2023","20-12-2023 15:45:03","2022-09-30T07:34:52.5348484Z","2022-03-22T13:40:11.0000000","20-12-2023","2022-05-05T14:38:48.8547924Z","2022-03-12T15:50:11.0000000"
32b07058-d02c-eb11-a813-000d3a3a75c1,"6/15/2023 12:03:02 PM","6/12/2023 12:03:02 PM",2ab07058-d02c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,32b07058-d02c-eb11-a813-000d3a3a75c1,393639,False,"0001-01-01T00:00:00.0000000","2023-12-01T08:25:39.0000000+00:00","2024-01-30T13:24:06Z","20-12-2023","20-12-2023 15:45:03","2022-09-30T07:34:52.5348484Z","2022-03-22T13:40:11.0000000","20-12-2023","2022-05-05T14:38:48.8547924Z","2022-03-12T15:50:11.0000000"
fd11d7fa-d42c-eb11-a813-000d3a3a75c1,"6/15/2023 12:03:02 PM","6/12/2023 12:03:02 PM",f511d7fa-d42c-eb11-a813-000d3a3a75c1,7a7bfed7-c82c-eb11-a813-000d3a3a75c1,fd11d7fa-d42c-eb11-a813-000d3a3a75c1,542839,False,"0001-01-01T00:00:00.0000000","2023-12-01T08:25:39.0000000+00:00","2024-01-30T13:24:06Z","20-12-2023","20-12-2023 15:45:03","2022-09-30T07:34:52.5348484Z","2022-03-22T13:40:11.0000000","20-12-2023","2022-05-05T14:38:48.8547924Z","2022-03-12T15:50:11.0000000"
45 changes: 45 additions & 0 deletions tests/samples/model/model.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,51 @@
"name": "CreatedOn",
"dataType": "dateTimeOffset",
"maxLength": -1
},
{
"name": "RandomDateTime",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime2",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime3",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime4",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime5",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime6",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime7",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime8",
"dataType": "dateTime",
"maxLength": -1
},
{
"name": "RandomDateTime9",
"dataType": "dateTime",
"maxLength": -1
}
],
"partitions": [
Expand Down
Loading

0 comments on commit b4805fa

Please sign in to comment.