From 98aea1232bfeca3b564227c25292b411d0884267 Mon Sep 17 00:00:00 2001 From: kgpayne Date: Tue, 1 Dec 2020 15:36:07 +0000 Subject: [PATCH] Added BATCH message. (#17) --- .gitignore | 5 ++++ singer/__init__.py | 2 ++ singer/messages.py | 68 ++++++++++++++++++++++++++++++++++++++++++++ tests/test_singer.py | 14 ++++++++- 4 files changed, 88 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 9aa5b22..03e17bb 100644 --- a/.gitignore +++ b/.gitignore @@ -83,6 +83,11 @@ celerybeat-schedule venv/ ENV/ +# Pipenv +Pipfile +Pipfile.lock +pyproject.toml + # Spyder project settings .spyderproject diff --git a/singer/__init__.py b/singer/__init__.py index e0deeeb..e9d33ab 100644 --- a/singer/__init__.py +++ b/singer/__init__.py @@ -26,6 +26,7 @@ RecordMessage, SchemaMessage, StateMessage, + BatchMessage, format_message, parse_message, write_message, @@ -34,6 +35,7 @@ write_schema, write_state, write_version, + write_batch ) from singer.transform import ( diff --git a/singer/messages.py b/singer/messages.py index 3848801..61cc515 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -167,6 +167,52 @@ def asdict(self): } +class BatchMessage(Message): + """ BATCH message (EXPERIMENTAL). + + The BATCH message has these fields: + + * stream (string) - The name of the stream. + * filepath (string) - The location of a batch file. e.g. '/tmp/users001.jsonl'. + * format (string, optional) - An indication of serialization format. + If none is provided, 'jsonl' will be assumed. e.g. 'csv'. + * compression (string, optional) - An indication of file compression format. e.g. 'gzip'. + * batch_size (int, optional) - Number of records in this batch. e.g. 100000. + + If file_properties are not provided, uncompressed jsonl files are assumed. + + A BATCH record points to a collection of messages (from a single stream) serialized to disk, + and is implemented for performance reasons. Most Taps and Targets should not need to use + BATCH messages at all. + + msg = singer.BatchMessage( + stream='users', + filepath='/tmp/users0001.jsonl' + ) + + """ + + def __init__(self, stream, filepath, file_format=None, compression=None, batch_size=None): + self.stream = stream + self.filepath = filepath + self.format = file_format or 'jsonl' + self.compression = compression + self.batch_size = batch_size + + def asdict(self): + result = { + 'type': 'BATCH', + 'stream': self.stream, + 'filepath': self.filepath, + 'format': self.format + } + if self.compression is not None: + result['compression'] = self.compression + if self.batch_size is not None: + result['batch_size'] = self.batch_size + return result + + def _required_key(msg, k): if k not in msg: raise Exception("Message is missing required key '{}': {}".format(k, msg)) @@ -214,6 +260,14 @@ def parse_message(msg): elif msg_type == 'ACTIVATE_VERSION': return ActivateVersionMessage(stream=_required_key(obj, 'stream'), version=_required_key(obj, 'version')) + + elif msg_type == 'BATCH': + return BatchMessage(stream=_required_key(obj, 'stream'), + filepath=_required_key(obj, 'filepath'), + file_format=_required_key(obj, 'format'), + compression=obj.get('compression'), + batch_size=obj.get('batch_size')) + else: return None @@ -285,3 +339,17 @@ def write_version(stream_name, version): write_version(stream, version) """ write_message(ActivateVersionMessage(stream_name, version)) + +def write_batch( + stream_name, filepath, file_format=None, + compression=None, batch_size=None +): + """Write a batch message. + + stream = 'users' + filepath = '/tmp/users0001.jsonl' + file_format = 'jsonl' + compression = None + batch_size = 100000 + """ + write_message(BatchMessage(stream_name, filepath, file_format, compression, batch_size)) diff --git a/tests/test_singer.py b/tests/test_singer.py index 4fb74de..8f24262 100644 --- a/tests/test_singer.py +++ b/tests/test_singer.py @@ -90,6 +90,15 @@ def test_parse_message_state_missing_value(self): with self.assertRaises(Exception): singer.parse_message('{"type": "STATE"}') + def test_parse_message_batch_good(self): + message = singer.parse_message( + '{"type": "BATCH", "stream": "users", "filepath": "/tmp/users0001.jsonl", "format": "jsonl"}') + self.assertEqual(message, singer.BatchMessage(stream='users', filepath='/tmp/users0001.jsonl')) + + def test_parse_message_batch_missing_value(self): + with self.assertRaises(Exception): + singer.parse_message('{"type": "BATCH"}') + def test_round_trip(self): record_message = singer.RecordMessage( record={'name': 'foo'}, @@ -112,7 +121,6 @@ def test_round_trip(self): singer.parse_message(singer.format_message(state_message))) ## These three tests just confirm that writing doesn't throw - def test_write_record(self): singer.write_record("users", {"name": "mike"}) @@ -125,6 +133,10 @@ def test_write_schema(self): def test_write_state(self): singer.write_state({"foo": 1}) + def test_write_batch(self): + singer.write_batch("users", "/tmp/users0001.jsonl") + + class TestParsingNumbers(unittest.TestCase): def create_record(self, value): raw = '{"type": "RECORD", "stream": "test", "record": {"value": ' + value + '}}'