Skip to content

Commit

Permalink
feat: add s3 support (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgpayne authored Jan 5, 2023
1 parent b60f4ea commit 646b36b
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 17 deletions.
81 changes: 77 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ packages = [{include = "target_singer_jsonl"}]
[tool.poetry.dependencies]
python = "^3.10"
jsonschema = "^4.17.3"
smart-open = "^6.3.0"
smart-open = {extras = ["s3"], version = "^6.3.0"}

[tool.poetry.group.dev.dependencies]
tap-carbon-intensity = {git = "https://gitlab.com/meltano/tap-carbon-intensity.git"}
Expand Down
68 changes: 56 additions & 12 deletions target_singer_jsonl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@
import logging
import sys
from datetime import datetime
from functools import reduce
from pathlib import Path

from jsonschema.validators import Draft4Validator
from smart_open import open
from smart_open.smart_open_lib import patch_pathlib

_ = patch_pathlib() # replace `Path.open` with `smart_open.open`

logging.basicConfig(stream=sys.stderr, level=logging.INFO)

logger = logging.getLogger(__name__)

example_config = {
Expand All @@ -26,6 +30,14 @@
now = datetime.now().strftime("%Y%m%dT%H%M%S%z")


def join_slash(a, b):
return a.rstrip("/") + "/" + b.lstrip("/")


def urljoin(*args):
return reduce(join_slash, args) if args else ""


def emit_state(state):
if state is not None:
line = json.dumps(state)
Expand All @@ -34,28 +46,60 @@ def emit_state(state):
sys.stdout.flush()


def get_file_path(config, stream):
destination = config.get("destination", "local")
def get_file_path(stream, destination, config):
filename = f"{stream}/{stream}-{now}.singer.gz"
if destination == "local":
return Path(config["local"]["folder"]).joinpath(
f"{stream}/{stream}-{now}.jsonl.gz"
)
raise KeyError(f"Destination {destination} not supported.")
return Path(config["folder"]).joinpath(filename)
elif destination == "s3":
bucket = config["bucket"]
prefix = config["prefix"]
return urljoin(f"s3://{bucket}/{prefix}/", filename)
else:
raise KeyError(f"Destination {destination} not supported.")


def write_lines(config, stream, lines):
def write_lines_local(destination, config, stream, lines):
if stream not in stream_files:
stream_files[stream] = get_file_path(config, stream)
stream_files[stream] = get_file_path(
stream=stream, destination=destination, config=config
)
stream_files[stream].parent.mkdir(parents=True, exist_ok=True)

with stream_files[stream].open("w", encoding="utf-8") as outfile:
logging.info(f"Writing to file: {stream_files[stream]}")
for line in lines:
outfile.write(line)

destination = config.get("destination", "local")
if destination == "local":
stream_files[stream].parent.mkdir(parents=True, exist_ok=True)

with stream_files[stream].open("w") as outfile:
def write_lines_s3(destination, config, stream, lines):
if stream not in stream_files:
stream_files[stream] = get_file_path(
stream=stream, destination=destination, config=config
)
with open(stream_files[stream], "w", encoding="utf-8") as outfile:
logging.info(f"Writing to file: {stream_files[stream]}")
for line in lines:
outfile.write(line)


def write_lines(config, stream, lines):
destination = config.get("destination", "local")
if destination == "local":
return write_lines_local(
destination=destination,
config=config[destination],
stream=stream,
lines=lines,
)
elif destination == "s3":
return write_lines_s3(
destination=destination,
config=config[destination],
stream=stream,
lines=lines,
)


def persist_lines(config, lines):
state = None
schemas = {}
Expand Down

0 comments on commit 646b36b

Please sign in to comment.