From 57f7c88d0e8f0f0db464e45f2a177521d69de3ff Mon Sep 17 00:00:00 2001 From: Michal Wrobel Date: Wed, 23 Mar 2022 14:45:49 +0100 Subject: [PATCH] Fix key error when `time_extracted` is not present in the message (#97) * time_extracted is optional Some taps don't add it (which is not the best practice but what can you do) If time_extracted is not provided default to the current time. https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#record-message * Update target_bigquery/stream_utils.py Co-authored-by: jmriego * Apply suggestions from code review Co-authored-by: jmriego Co-authored-by: jmriego --- target_bigquery/stream_utils.py | 2 +- tests/unit/resources/test_stream_utils.py | 37 +++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 tests/unit/resources/test_stream_utils.py diff --git a/target_bigquery/stream_utils.py b/target_bigquery/stream_utils.py index 14b7874..d310aef 100644 --- a/target_bigquery/stream_utils.py +++ b/target_bigquery/stream_utils.py @@ -105,7 +105,7 @@ def parse_datetime(dt): return None extended_record = record_message['record'] - extended_record['_sdc_extracted_at'] = parse_datetime(record_message['time_extracted']) + extended_record['_sdc_extracted_at'] = parse_datetime(record_message.get('time_extracted', datetime.now())) extended_record['_sdc_batched_at'] = datetime.now() extended_record['_sdc_deleted_at'] = parse_datetime(record_message.get('record', {}).get('_sdc_deleted_at')) diff --git a/tests/unit/resources/test_stream_utils.py b/tests/unit/resources/test_stream_utils.py new file mode 100644 index 0000000..599ff39 --- /dev/null +++ b/tests/unit/resources/test_stream_utils.py @@ -0,0 +1,37 @@ +import unittest +from datetime import datetime + +from target_bigquery import stream_utils + +class TestStreamUtils(unittest.TestCase): + """ + Unit Tests + """ + + def test_add_metadata_values_to_record(self): + """Test adding metadata""" + + dt = "2017-11-20T16:45:33.000Z" + record = { "type": "RECORD", "stream": "foo", "time_extracted": dt, "record": {"id": "2"} } + result = stream_utils.add_metadata_values_to_record(record) + + self.assertEqual(result.get("id"), "2") + self.assertEqual(result.get("_sdc_extracted_at"), datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S.%fZ')) + + extra_attrs = ['_sdc_batched_at', '_sdc_deleted_at'] + for attr in extra_attrs: + self.assertTrue(attr in result) + + def test_add_metadata_values_to_record_when_no_time_extracted(self): + """Test adding metadata when there's no time extracted in the record message """ + + record = { "type": "RECORD", "stream": "foo", "record": {"id": "2"} } + + dt = datetime.now() + result = stream_utils.add_metadata_values_to_record(record) + self.assertEqual(result.get("id"), "2") + self.assertGreaterEqual(result.get("_sdc_extracted_at"), dt) + + extra_attrs = ['_sdc_extracted_at', '_sdc_batched_at', '_sdc_deleted_at'] + for attr in extra_attrs: + self.assertTrue(attr in result)