Skip to content

Commit

Permalink
Added BATCH message. (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgpayne authored Dec 1, 2020
1 parent 50a10c1 commit 98aea12
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ celerybeat-schedule
venv/
ENV/

# Pipenv
Pipfile
Pipfile.lock
pyproject.toml

# Spyder project settings
.spyderproject

Expand Down
2 changes: 2 additions & 0 deletions singer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
RecordMessage,
SchemaMessage,
StateMessage,
BatchMessage,
format_message,
parse_message,
write_message,
Expand All @@ -34,6 +35,7 @@
write_schema,
write_state,
write_version,
write_batch
)

from singer.transform import (
Expand Down
68 changes: 68 additions & 0 deletions singer/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,52 @@ def asdict(self):
}


class BatchMessage(Message):
""" BATCH message (EXPERIMENTAL).
The BATCH message has these fields:
* stream (string) - The name of the stream.
* filepath (string) - The location of a batch file. e.g. '/tmp/users001.jsonl'.
* format (string, optional) - An indication of serialization format.
If none is provided, 'jsonl' will be assumed. e.g. 'csv'.
* compression (string, optional) - An indication of file compression format. e.g. 'gzip'.
* batch_size (int, optional) - Number of records in this batch. e.g. 100000.
If file_properties are not provided, uncompressed jsonl files are assumed.
A BATCH record points to a collection of messages (from a single stream) serialized to disk,
and is implemented for performance reasons. Most Taps and Targets should not need to use
BATCH messages at all.
msg = singer.BatchMessage(
stream='users',
filepath='/tmp/users0001.jsonl'
)
"""

def __init__(self, stream, filepath, file_format=None, compression=None, batch_size=None):
self.stream = stream
self.filepath = filepath
self.format = file_format or 'jsonl'
self.compression = compression
self.batch_size = batch_size

def asdict(self):
result = {
'type': 'BATCH',
'stream': self.stream,
'filepath': self.filepath,
'format': self.format
}
if self.compression is not None:
result['compression'] = self.compression
if self.batch_size is not None:
result['batch_size'] = self.batch_size
return result


def _required_key(msg, k):
if k not in msg:
raise Exception("Message is missing required key '{}': {}".format(k, msg))
Expand Down Expand Up @@ -214,6 +260,14 @@ def parse_message(msg):
elif msg_type == 'ACTIVATE_VERSION':
return ActivateVersionMessage(stream=_required_key(obj, 'stream'),
version=_required_key(obj, 'version'))

elif msg_type == 'BATCH':
return BatchMessage(stream=_required_key(obj, 'stream'),
filepath=_required_key(obj, 'filepath'),
file_format=_required_key(obj, 'format'),
compression=obj.get('compression'),
batch_size=obj.get('batch_size'))

else:
return None

Expand Down Expand Up @@ -285,3 +339,17 @@ def write_version(stream_name, version):
write_version(stream, version)
"""
write_message(ActivateVersionMessage(stream_name, version))

def write_batch(
stream_name, filepath, file_format=None,
compression=None, batch_size=None
):
"""Write a batch message.
stream = 'users'
filepath = '/tmp/users0001.jsonl'
file_format = 'jsonl'
compression = None
batch_size = 100000
"""
write_message(BatchMessage(stream_name, filepath, file_format, compression, batch_size))
14 changes: 13 additions & 1 deletion tests/test_singer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ def test_parse_message_state_missing_value(self):
with self.assertRaises(Exception):
singer.parse_message('{"type": "STATE"}')

def test_parse_message_batch_good(self):
message = singer.parse_message(
'{"type": "BATCH", "stream": "users", "filepath": "/tmp/users0001.jsonl", "format": "jsonl"}')
self.assertEqual(message, singer.BatchMessage(stream='users', filepath='/tmp/users0001.jsonl'))

def test_parse_message_batch_missing_value(self):
with self.assertRaises(Exception):
singer.parse_message('{"type": "BATCH"}')

def test_round_trip(self):
record_message = singer.RecordMessage(
record={'name': 'foo'},
Expand All @@ -112,7 +121,6 @@ def test_round_trip(self):
singer.parse_message(singer.format_message(state_message)))

## These three tests just confirm that writing doesn't throw

def test_write_record(self):
singer.write_record("users", {"name": "mike"})

Expand All @@ -125,6 +133,10 @@ def test_write_schema(self):
def test_write_state(self):
singer.write_state({"foo": 1})

def test_write_batch(self):
singer.write_batch("users", "/tmp/users0001.jsonl")


class TestParsingNumbers(unittest.TestCase):
def create_record(self, value):
raw = '{"type": "RECORD", "stream": "test", "record": {"value": ' + value + '}}'
Expand Down

0 comments on commit 98aea12

Please sign in to comment.