Skip to content

Commit

Permalink
Merge pull request #9 from anelendata/v0.3.0
Browse files Browse the repository at this point in the history
v0.3.1
  • Loading branch information
daigotanaka authored Sep 2, 2020
2 parents 72f4153 + 76c97e5 commit 3ae8c80
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 11 deletions.
8 changes: 8 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
## History

### 0.3.1

- Fix the issue of schema row not wrapped with object
- Automatically convert date type to datetime type since date is not
supported by schema.
- Better error handling when --start_datetime is missing.
- --catalog must be always set for sync (no more auto generate in sync)

### 0.3.0

- Handle multiple streams
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env python
from setuptools import setup

VERSION = "0.3.0"
VERSION = "0.3.1"

with open("README.md", "r") as fh:
long_description = fh.read()
Expand Down
15 changes: 9 additions & 6 deletions tap_bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,18 +164,21 @@ def main():

singer_utils.check_config(CONFIG, REQUIRED_CONFIG_KEYS)

if not CONFIG.get("start_datetime"):
LOGGER.critical("start_datetime not specified")
return

# If discover flag was passed, run discovery mode and dump output to stdout
if args.discover:
catalog = discover(CONFIG)
print(json.dumps(catalog, indent=2))
# Otherwise run in sync mode
else:
if args.catalog:
catalog = args.catalog
else:
catalog = Catalog.from_dict(discover(CONFIG))

elif args.catalog:
catalog = args.catalog
sync(CONFIG, args.state, catalog)
else:
LOGGER.critical("Catalog file not specified")
return


CONFIG = {}
Expand Down
15 changes: 11 additions & 4 deletions tap_bigquery/sync_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def do_discover(config, stream, output_schema_file=None,
key_properties = []

catalog = {"selected": True,
"type": "SCHEMA",
"type": "object",
"stream": stream["name"],
"key_properties": key_properties,
"properties": schema["properties"]
Expand All @@ -141,6 +141,9 @@ def do_discover(config, stream, output_schema_file=None,


def do_sync(config, state, stream):
singer.set_currently_syncing(state, stream.tap_stream_id)
singer.write_state(state)

client = bigquery.Client()
metadata = stream.metadata[0]["metadata"]
tap_stream_id = stream.tap_stream_id
Expand All @@ -160,7 +163,7 @@ def do_sync(config, state, stream):
end_datetime = dateutil.parser.parse(
config.get("end_datetime")).strftime("%Y-%m-%d %H:%M:%S.%f")

singer.write_schema(tap_stream_id, stream.schema.to_dict()["properties"],
singer.write_schema(tap_stream_id, stream.schema.to_dict(),
stream.key_properties)

keys = {"table": metadata["table"],
Expand Down Expand Up @@ -197,8 +200,12 @@ def do_sync(config, state, stream):
if prop.format == "date-time":
if type(row[key]) == str:
r = dateutil.parser.parse(row[key])
elif (type(row[key]) == datetime.datetime or
type(row[key]) == datetime.date):
elif type(row[key]) == datetime.date:
r = datetime.datetime(
year=row[key].year,
month=row[key].month,
day=row[key].day)
elif type(row[key]) == datetime.datetime:
r = row[key]
else:
raise ValueError(
Expand Down

0 comments on commit 3ae8c80

Please sign in to comment.