Skip to content

Commit

Permalink
Repeated records (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon authored Dec 20, 2021
1 parent 4b63f3b commit d1be9c6
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 30 deletions.
32 changes: 25 additions & 7 deletions target_bigquery/db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,20 @@ def bigquery_type(property_type, property_format):
return 'integer'
elif 'boolean' in property_type:
return 'boolean'
elif 'object' in property_type:
return 'record'
else:
return 'string'


def handle_record_type(safe_name, schema_property, mode="NULLABLE"):
fields = [column_type(col, t) for col, t in schema_property.get('properties', {}).items()]
if fields:
return SchemaField(safe_name, 'RECORD', mode, fields=fields)
else:
return SchemaField(safe_name, 'string', mode)


def column_type(name, schema_property):
safe_name = safe_column_name(name, quotes=False)
property_type = schema_property['type']
Expand All @@ -82,15 +92,12 @@ def column_type(name, schema_property):
except KeyError:
return SchemaField(safe_name, 'string', 'NULLABLE')
else:
result_type = items_type
if items_type == "record":
return handle_record_type(safe_name, items_schema, "REPEATED")
return SchemaField(safe_name, items_type, 'REPEATED')

elif 'object' in property_type:
fields = [column_type(col, t) for col, t in schema_property.get('properties', {}).items()]
if fields:
return SchemaField(safe_name, 'RECORD', 'NULLABLE', fields=fields)
else:
return SchemaField(safe_name, 'string', 'NULLABLE')
return handle_record_type(safe_name, schema_property)

else:
result_type = bigquery_type(property_type, property_format)
Expand Down Expand Up @@ -160,6 +167,11 @@ def safe_column_name(name, quotes=False):
return '{}'.format(name).lower()


def is_unstructured_object(props):
"""Check if property is object and it has no properties."""
return 'object' in props['type'] and not props.get('properties')


def flatten_key(k, parent_key, sep):
full_key = parent_key + [k]
inflected_key = full_key.copy()
Expand Down Expand Up @@ -419,13 +431,19 @@ def records_to_avro(self, records):
result = {}
for name, props in self.flatten_schema.items():
if name in flatten:
if 'object' in props['type'] and not 'properties' in props:
if is_unstructured_object(props):
result[name] = json.dumps(flatten[name])
# dump to string if array without items or recursive
elif ('array' in props['type'] and
(not 'items' in props
or '$ref' in props['items'])):
result[name] = json.dumps(flatten[name])
# dump array elements to strings
elif (
'array' in props['type'] and
is_unstructured_object(props.get('items', {}))
):
result[name] = [json.dumps(value) for value in flatten[name]]
elif 'number' in props['type']:
if flatten[name] is None:
result[name] = None
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/resources/messages-with-multi-schemas.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_two": {"initial_full_table_complete": true}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_two", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_two", "version": 3}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 1, "c_varchar": "1", "c_int": 1, "c_date": "2019-02-01 15:12:45", "_sdc_deleted_at": "2019-02-12T01:10:10.000000Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-10 02:00:00"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 1, "c_varchar": "1", "c_int": 1, "c_date": "2019-02-01T15:12:45Z", "_sdc_deleted_at": "2019-02-12T01:10:10.000000Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-10T02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_wo": {"initial_full_table_complete": true}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_three", "version": 3}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_repeated_records"}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_repeated_records", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_array_integers": {"type": ["null", "array"], "items": {"type": "string"}}, "c_array_objects": {"type": ["null", "array"], "items": {"type": "object", "properties": {"nested": {"type": "integer"}}}}, "c_array_objects_no_props": {"type": ["null", "array"], "items": {"type": "object"}}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_repeated_records", "version": 2}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_repeated_records", "record": {"c_pk": 2, "c_array_integers": ["1", "2", "3"], "c_array_objects": [{"nested": 1}, {"nested": 2}], "c_array_objects_no_props": [{"nested": 1}, {"nested": 2}]}, "version": 2, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_repeated_records", "bookmarks": {"tap_mysql_test-test_table_repeated_records": {"initial_full_table_complete": true}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_repeated_records", "version": 2}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_table_repeated_records": {"initial_full_table_complete": true}}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_repeated_records"}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_repeated_records", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_array_integers": {"type": ["null", "array"], "items": {"type": "integer"}}, "c_array_objects": {"type": ["null", "array"], "items": {"type": "object", "properties": {"nested": {"type": "integer"}}}}, "c_array_objects_no_props": {"type": ["null", "array"], "items": {"type": "object"}}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_repeated_records", "version": 1}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_repeated_records", "record": {"c_pk": 1, "c_array_integers": [1, 2, 3], "c_array_objects": [{"nested": 1}, {"nested": 2}], "c_array_objects_no_props": [{"nested": 1}, {"nested": 2}]}, "version": 1, "time_extracted": "2019-01-31T15:51:50.215998Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_repeated_records", "bookmarks": {"tap_mysql_test-test_table_repeated_records": {"initial_full_table_complete": true}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_repeated_records", "version": 1}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"tap_mysql_test-test_table_repeated_records": {"initial_full_table_complete": true}}}}
4 changes: 2 additions & 2 deletions tests/integration/resources/messages-with-three-streams.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_two": {"initial_full_table_complete": true}}}}
{"type": "SCHEMA", "stream": "tap_mysql_test-test_table_two", "schema": {"properties": {"c_pk": {"inclusion": "automatic", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_varchar": {"inclusion": "available", "maxLength": 16, "type": ["null", "string"]}, "c_int": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "c_date": {"format": "date-time", "inclusion": "available", "type": ["null", "string"]}}, "type": "object"}, "key_properties": ["c_pk"]}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_two", "version": 3}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 1, "c_varchar": "1", "c_int": 1, "c_date": "2019-02-01 15:12:45", "_sdc_deleted_at": "2019-02-12T01:10:10.000000Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-10 02:00:00"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 1, "c_varchar": "1", "c_int": 1, "c_date": "2019-02-01T15:12:45Z", "_sdc_deleted_at": "2019-02-12T01:10:10.000000Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "RECORD", "stream": "tap_mysql_test-test_table_two", "record": {"c_pk": 2, "c_varchar": "2", "c_int": 2, "c_date": "2019-02-10T02:00:00Z"}, "version": 3, "time_extracted": "2019-01-31T15:51:48.861962Z"}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_wo": {"initial_full_table_complete": true}}}}
{"type": "ACTIVATE_VERSION", "stream": "tap_mysql_test-test_table_three", "version": 3}
{"type": "STATE", "value": {"currently_syncing": "tap_mysql_test-test_table_two", "bookmarks": {"tap_mysql_test-test_table_one": {"initial_full_table_complete": true}, "tap_mysql_test-test_table_three": {"initial_full_table_complete": true}}}}
Expand Down
61 changes: 58 additions & 3 deletions tests/integration/test_target_bigquery.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import datetime
from datetime import timezone
import json
import unittest
import mock
import os
import unittest
import unittest.mock as mock
from datetime import timezone
from decimal import Decimal, getcontext

import target_bigquery
Expand All @@ -25,6 +25,7 @@ def query(bigquery, query):
result = bigquery.query(query)
return [dict(row.items()) for row in result]


class TestIntegration(unittest.TestCase):
"""
Integration Tests
Expand Down Expand Up @@ -476,6 +477,60 @@ def test_nested_schema_flattening(self):
'c_nested_object__nested_prop_3__multi_nested_prop_2': 'multi_value_2',
}])

def test_repeated_records(self):
"""Loading arrays of JSON objects."""
tap_lines = test_utils.get_test_tap_lines('messages-with-repeated-records.json')
tap_lines_modified = test_utils.get_test_tap_lines('messages-with-repeated-records-modified.json')

# Load with default settings
self.persist_lines(tap_lines)
self.persist_lines(tap_lines_modified)

# Get loaded rows from tables
bigquery = DbSync(self.config)
target_schema = self.config.get('default_target_schema', '')
flattened_table = query(
bigquery,
"SELECT * FROM {}.test_table_repeated_records ORDER BY c_pk".format(
target_schema,
)

)

# Structured objects should be handled as dictionaries,
# unstructured objects should be handled as strings.
self.assertEqual(
flattened_table,
[
{
'c_pk': 1,
'c_array_integers': [1, 2, 3],
'c_array_integers__st': [],
'c_array_objects': [
{'nested': 1},
{'nested': 2},
],
'c_array_objects_no_props': [
'{"nested": 1}',
'{"nested": 2}',
],
},
{
'c_pk': 2,
'c_array_integers': [],
'c_array_integers__st': ["1", "2", "3"],
'c_array_objects': [
{'nested': 1},
{'nested': 2},
],
'c_array_objects_no_props': [
'{"nested": 1}',
'{"nested": 2}',
],
},
]
)

def test_column_name_change(self):
"""Tests correct renaming of bigquery columns after source change"""
tap_lines_before_column_name_change = test_utils.get_test_tap_lines('messages-with-three-streams.json')
Expand Down
61 changes: 45 additions & 16 deletions tests/unit/test_db_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ def test_config_validation(self):

def test_column_type_mapping(self):
"""Test JSON type to BigQuery column type mappings"""
mapper = lambda x: db_sync.column_type('dummy', x).field_type
def mapper(schema_property):
field = db_sync.column_type('dummy', schema_property)
return field.field_type, field.mode

# Incoming JSON schema types
json_str = {"type": ["string"]}
Expand All @@ -63,23 +65,50 @@ def test_column_type_mapping(self):
json_obj = {"type": ["object"]}
json_arr = {"type": ["array"]}
jsonb = {"type": ["null", "object"]}
jsonb_props = {
"type": ["null", "object"],
"properties": {
"prop1": json_int,
"prop2": json_str
}
}
jsonb_arr_str = {
"type": ["array"],
"items": {"type": ["string"]}
}
jsonb_arr_unstructured = {
"type": ["array"],
"items": {"type": ["object"], "properties": {}}
}
jsonb_arr_records = {
"type": ["array"],
"items": {
"type": ["object"],
"properties": {
"prop1": json_int,
"prop2": json_str
}
}
}

# Mapping from JSON schema types ot BigQuery column types
self.assertEqual(mapper(json_str), 'string')
self.assertEqual(mapper(json_str_or_null), 'string')
self.assertEqual(mapper(json_dt), 'timestamp')
self.assertEqual(mapper(json_dt_or_null), 'timestamp')
self.assertEqual(mapper(json_t), 'time')
self.assertEqual(mapper(json_t_or_null), 'time')
self.assertEqual(mapper(json_num), 'numeric')
self.assertEqual(mapper(json_int), 'integer')
self.assertEqual(mapper(json_int_or_str), 'string')
self.assertEqual(mapper(json_bool), 'boolean')
self.assertEqual(mapper(json_obj), 'string')
self.assertEqual(mapper(json_arr), 'string')
self.assertEqual(mapper(jsonb), 'string')
# TODO: add tests for array with items
# TODO: add tests for record with properties
self.assertEqual(mapper(json_str), ('string', 'NULLABLE'))
self.assertEqual(mapper(json_str_or_null), ('string', 'NULLABLE'))
self.assertEqual(mapper(json_dt), ('timestamp', 'NULLABLE'))
self.assertEqual(mapper(json_dt_or_null), ('timestamp', 'NULLABLE'))
self.assertEqual(mapper(json_t), ('time', 'NULLABLE'))
self.assertEqual(mapper(json_t_or_null), ('time', 'NULLABLE'))
self.assertEqual(mapper(json_num), ('numeric', 'NULLABLE'))
self.assertEqual(mapper(json_int), ('integer', 'NULLABLE'))
self.assertEqual(mapper(json_int_or_str), ('string', 'NULLABLE'))
self.assertEqual(mapper(json_bool), ('boolean', 'NULLABLE'))
self.assertEqual(mapper(json_obj), ('string', 'NULLABLE'))
self.assertEqual(mapper(json_arr), ('string', 'NULLABLE'))
self.assertEqual(mapper(jsonb), ('string', 'NULLABLE'))
self.assertEqual(mapper(jsonb_props), ('RECORD', 'NULLABLE'))
self.assertEqual(mapper(jsonb_arr_str), ('string', 'REPEATED'))
self.assertEqual(mapper(jsonb_arr_unstructured), ('string', 'REPEATED'))
self.assertEqual(mapper(jsonb_arr_records), ('RECORD', 'REPEATED'))

def test_stream_name_to_dict(self):
"""Test identifying catalog, schema and table names from fully qualified stream and table names"""
Expand Down

0 comments on commit d1be9c6

Please sign in to comment.