Skip to content

Commit

Permalink
Fix key error when time_extracted is not present in the message (jm…
Browse files Browse the repository at this point in the history
…riego#97)

* time_extracted is optional

Some taps don't add it (which is not the best practice but what can you
do)

If time_extracted is not provided default to the current time.

https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#record-message

* Update target_bigquery/stream_utils.py

Co-authored-by: jmriego <[email protected]>

* Apply suggestions from code review

Co-authored-by: jmriego <[email protected]>

Co-authored-by: jmriego <[email protected]>
  • Loading branch information
sparrovv and jmriego authored Mar 23, 2022
1 parent 0d2dc03 commit 57f7c88
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
2 changes: 1 addition & 1 deletion target_bigquery/stream_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def parse_datetime(dt):
return None

extended_record = record_message['record']
extended_record['_sdc_extracted_at'] = parse_datetime(record_message['time_extracted'])
extended_record['_sdc_extracted_at'] = parse_datetime(record_message.get('time_extracted', datetime.now()))
extended_record['_sdc_batched_at'] = datetime.now()
extended_record['_sdc_deleted_at'] = parse_datetime(record_message.get('record', {}).get('_sdc_deleted_at'))

Expand Down
37 changes: 37 additions & 0 deletions tests/unit/resources/test_stream_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import unittest
from datetime import datetime

from target_bigquery import stream_utils

class TestStreamUtils(unittest.TestCase):
"""
Unit Tests
"""

def test_add_metadata_values_to_record(self):
"""Test adding metadata"""

dt = "2017-11-20T16:45:33.000Z"
record = { "type": "RECORD", "stream": "foo", "time_extracted": dt, "record": {"id": "2"} }
result = stream_utils.add_metadata_values_to_record(record)

self.assertEqual(result.get("id"), "2")
self.assertEqual(result.get("_sdc_extracted_at"), datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S.%fZ'))

extra_attrs = ['_sdc_batched_at', '_sdc_deleted_at']
for attr in extra_attrs:
self.assertTrue(attr in result)

def test_add_metadata_values_to_record_when_no_time_extracted(self):
"""Test adding metadata when there's no time extracted in the record message """

record = { "type": "RECORD", "stream": "foo", "record": {"id": "2"} }

dt = datetime.now()
result = stream_utils.add_metadata_values_to_record(record)
self.assertEqual(result.get("id"), "2")
self.assertGreaterEqual(result.get("_sdc_extracted_at"), dt)

extra_attrs = ['_sdc_extracted_at', '_sdc_batched_at', '_sdc_deleted_at']
for attr in extra_attrs:
self.assertTrue(attr in result)

0 comments on commit 57f7c88

Please sign in to comment.