diff --git a/meltano.yml b/meltano.yml index 9c61fdd..525469d 100644 --- a/meltano.yml +++ b/meltano.yml @@ -46,6 +46,8 @@ plugins: kind: string - name: s3.paths kind: array + - name: add_record_metadata + kind: boolean - name: tap-carbon-intensity variant: meltano pip_url: git+https://gitlab.com/meltano/tap-carbon-intensity.git @@ -66,6 +68,8 @@ plugins: kind: string - name: s3.prefix kind: string + - name: add_record_metadata + kind: boolean jobs: - name: job-simple-test tasks: diff --git a/target_singer_jsonl/__init__.py b/target_singer_jsonl/__init__.py index 2a6df31..21638b1 100644 --- a/target_singer_jsonl/__init__.py +++ b/target_singer_jsonl/__init__.py @@ -5,6 +5,7 @@ import json import logging import sys +import time from datetime import datetime from functools import reduce from pathlib import Path @@ -23,11 +24,13 @@ "destination": "local", "s3": {"bucket": "my-s3-bucket", "prefix": "put/files/in/here/"}, "local": {"folder": ".secrets/output/"}, + "add_record_metadata": False, } stream_files = {} stream_lines = {} -now = datetime.now().strftime("%Y%m%dT%H%M%S%z") +file_timestamp = datetime.now().strftime("%Y%m%dT%H%M%S%z") +target_start_timestamp = datetime.now().isoformat() def join_slash(a, b): @@ -47,7 +50,7 @@ def emit_state(state): def get_file_path(stream, destination, config): - filename = f"{stream}/{stream}-{now}.singer.gz" + filename = f"{stream}/{stream}-{file_timestamp}.singer.gz" if destination == "local": return Path(config["folder"]).joinpath(filename) elif destination == "s3": @@ -68,7 +71,7 @@ def write_lines_local(destination, config, stream, lines): with stream_files[stream].open("w", encoding="utf-8") as outfile: logging.info(f"Writing to file: {stream_files[stream]}") for line in lines: - outfile.write(line) + outfile.write(line + "\n") def write_lines_s3(destination, config, stream, lines): @@ -79,7 +82,7 @@ def write_lines_s3(destination, config, stream, lines): with open(stream_files[stream], "w", encoding="utf-8") as outfile: logging.info(f"Writing to file: {stream_files[stream]}") for line in lines: - outfile.write(line) + outfile.write(line + "\n") def write_lines(config, stream, lines): @@ -104,60 +107,93 @@ def persist_lines(config, lines): state = None schemas = {} key_properties = {} - headers = {} validators = {} + add_record_metadata = config.get("add_record_metadata", True) # Loop over lines from stdin for line in lines: try: - o = json.loads(line) + message = json.loads(line) except json.decoder.JSONDecodeError: logger.error(f"Unable to parse:\n{line}") raise - if "type" not in o: + if "type" not in message: raise Exception(f"Line is missing required key 'type': {line}") - t = o["type"] + t = message["type"] if t != "STATE": - if "stream" not in o: + if "stream" not in message: raise Exception(f"Line is missing required key 'stream': {line}") - stream = o["stream"] + stream = message["stream"] if stream not in stream_lines: stream_lines[stream] = [] - # persisting STATE messages is problematic when splitting records into separate - # files, therefore we omit them and allow tap-singer-jsonl to create new - # state messages from observed records - stream_lines[stream].append(line) - if t == "RECORD": - if stream not in schemas: raise Exception( f"A record for stream {stream} was encountered before a corresponding schema" ) + record = message["record"] # Get schema for this record's stream schema = schemas[stream] - # Validate record - validators[stream].validate(o["record"]) - + validators[stream].validate(record) + # Process record + if add_record_metadata: + now = datetime.now().isoformat() + record.update( + { + "_sdc_extracted_at": message.get( + "time_extracted", target_start_timestamp + ), + "_sdc_received_at": now, + "_sdc_batched_at": now, + "_sdc_deleted_at": record.get("_sdc_deleted_at"), + "_sdc_sequence": int(round(time.time() * 1000)), + "_sdc_table_version": message.get("version"), + } + ) + # Queue message for write state = None - elif t == "STATE": - logger.debug(f'Setting state to {o["value"]}') - state = o["value"] + stream_lines[stream].append(json.dumps(message)) + elif t == "SCHEMA": - schemas[stream] = o["schema"] - validators[stream] = Draft4Validator(o["schema"]) - if "key_properties" not in o: + schemas[stream] = message["schema"] + validators[stream] = Draft4Validator(message["schema"]) + if "key_properties" not in message: raise Exception("key_properties field is required") - key_properties[stream] = o["key_properties"] + key_properties[stream] = message["key_properties"] + # Add metadata properties + if add_record_metadata: + properties_dict = schemas[stream]["properties"] + for col in { + "_sdc_extracted_at", + "_sdc_received_at", + "_sdc_batched_at", + "_sdc_deleted_at", + }: + properties_dict[col] = { + "type": ["null", "string"], + "format": "date-time", + } + for col in {"_sdc_sequence", "_sdc_table_version"}: + properties_dict[col] = {"type": ["null", "integer"]} + # Queue message for write + stream_lines[stream].append(json.dumps(message)) + + elif t == "STATE": + # persisting STATE messages is problematic when splitting records into separate + # files, therefore we omit them and allow tap-singer-jsonl to create new + # state messages from observed records on read + logger.debug(f'Setting state to {message["value"]}') + state = message["value"] + else: - raise Exception(f"Unknown message type {t} in message {o}") + raise Exception(f"Unknown message type {t} in message {message}") for stream, messages in stream_lines.items(): write_lines(config, stream, messages)