Skip to content

Commit

Permalink
passed pre-commit checks
Browse files Browse the repository at this point in the history
  • Loading branch information
melgazar9 committed Oct 19, 2023
1 parent ae48025 commit 6031935
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 15 deletions.
34 changes: 34 additions & 0 deletions plugins/loaders/target-jsonl--andyh1203.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"plugin_type": "loaders",
"name": "target-jsonl",
"namespace": "target_jsonl",
"variant": "andyh1203",
"label": "JSON Lines (JSONL)",
"docs": "https://hub.meltano.com/loaders/target-jsonl--andyh1203",
"repo": "https://github.com/andyh1203/target-jsonl",
"pip_url": "target-jsonl",
"description": "JSONL loader",
"logo_url": "https://hub.meltano.com/assets/logos/loaders/jsonl.png",
"settings": [
{
"name": "destination_path",
"kind": "string",
"value": "output",
"label": "Destination Path",
"description": "Sets the destination path the JSONL files are written to, relative\nto the project root.\n\nThe directory needs to exist already, it will not be created\nautomatically.\n\nTo write JSONL files to the project root, set an empty string (`\"\"`).\n"
},
{
"name": "do_timestamp_file",
"kind": "boolean",
"value": false,
"label": "Include Timestamp in File Names",
"description": "Specifies if the files should get timestamped.\n\nBy default, the resulting file will not have a timestamp in the file name (i.e. `exchange_rate.jsonl`).\n\nIf this option gets set to `true`, the resulting file will have a timestamp associated with it (i.e. `exchange_rate-{timestamp}.jsonl`).\n"
},
{
"name": "custom_name",
"kind": "string",
"label": "Custom File Name Override",
"description": "Specifies a custom name for the filename, instead of the stream name.\n\nThe file name will be `{custom_name}-{timestamp}.jsonl`, if `do_timestamp_file` is `true`.\nOtherwise the file name will be `{custom_name}.jsonl`.\n\nIf custom name is not provided, the stream name will be used.\n"
}
]
}
37 changes: 22 additions & 15 deletions tap_mongodb/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

from __future__ import annotations

import math
from datetime import datetime
from typing import Any, Generator, Iterable, Optional
import math

from bson.objectid import ObjectId
from pendulum import DateTime
from pymongo import ASCENDING
Expand All @@ -18,19 +19,20 @@
from singer_sdk.helpers._state import increment_state
from singer_sdk.helpers._typing import conform_record_data_types
from singer_sdk.helpers._util import utc_now
from singer_sdk.streams.core import (
REPLICATION_INCREMENTAL,
REPLICATION_LOG_BASED,
Stream,
TypeConformanceLevel
)
from singer_sdk.streams.core import REPLICATION_INCREMENTAL, REPLICATION_LOG_BASED, Stream, TypeConformanceLevel

from tap_mongodb.connector import MongoDBConnector
from tap_mongodb.types import IncrementalId

DEFAULT_START_DATE: str = "1970-01-01"


def recursive_replace_empty_in_dict(dct):
"""
Recursively replace empty values with None in a dictionary.
NaN, inf, and -inf are unable to be parsed by the json library, so these values will be replaced with None.
"""

for key, value in dct.items():
if value in [-math.inf, math.inf, math.nan]:
dct[key] = None
Expand All @@ -42,13 +44,15 @@ def recursive_replace_empty_in_dict(dct):
value[i] = None
elif isinstance(value, dict):
recursive_replace_empty_in_dict(value)
return
return dct


def to_object_id(replication_key_value: str) -> ObjectId:
"""Converts an ISO-8601 date string into a BSON ObjectId."""
incremental_id: IncrementalId = IncrementalId.from_string(replication_key_value)
return incremental_id.object_id


class MongoDBCollectionStream(Stream):
"""Stream class for mongodb streams."""

Expand Down Expand Up @@ -98,7 +102,7 @@ def primary_keys(self) -> Optional[list[str]]:
def primary_keys(self, new_value: list[str]) -> None:
"""Set primary keys for the stream."""
self._primary_keys = new_value
return
return self

@property
def is_sorted(self) -> bool:
Expand All @@ -107,7 +111,8 @@ def is_sorted(self) -> bool:
When the tap is running in incremental mode, it is sorted - the replication key value is an ISO-8601-formatted
string, and these are alphanumerically sortable.
When the tap is running in log-based mode, it is not sorted - the replication key value here is a hex string."""
When the tap is running in log-based mode, it is not sorted - the replication key value here is a hex string.
"""

return self.replication_method == REPLICATION_INCREMENTAL

Expand All @@ -130,6 +135,7 @@ def _increment_stream_state(self, latest_record: dict[str, Any], *, context: dic
state_dict = self.get_context_state(context)

# Advance state bookmark values if applicable

if self.replication_method not in {
REPLICATION_INCREMENTAL,
REPLICATION_LOG_BASED,
Expand All @@ -147,9 +153,11 @@ def _increment_stream_state(self, latest_record: dict[str, Any], *, context: dic
f"(replication method={self.replication_method})",
)
treat_as_sorted = self.is_sorted

if not treat_as_sorted and self.state_partitioning_keys is not None:
# Streams with custom state partitioning are not resumable.
treat_as_sorted = False

increment_state(
state_dict,
replication_key=self.replication_key,
Expand All @@ -158,6 +166,8 @@ def _increment_stream_state(self, latest_record: dict[str, Any], *, context: dic
check_sorted=self.check_sorted,
)

return self

def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMessage, None, None]:
"""Write out a RECORD message.
Expand All @@ -179,17 +189,14 @@ def _generate_record_messages(self, record: dict) -> Generator[singer.RecordMess
record=record,
schema=self.schema,
level=self.TYPE_CONFORMANCE_LEVEL,
logger=self.logger
logger=self.logger,
)
for stream_map in self.stream_maps:
mapped_record = stream_map.transform(record)
# Emit record if not filtered
if mapped_record is not None:
record_message = singer.RecordMessage(
stream=stream_map.stream_alias,
record=mapped_record,
version=None,
time_extracted=extracted_at
stream=stream_map.stream_alias, record=mapped_record, version=None, time_extracted=extracted_at
)

yield record_message
Expand Down

0 comments on commit 6031935

Please sign in to comment.