Skip to content

Commit

Permalink
deprecate file_metadata stream (#96)
Browse files Browse the repository at this point in the history
* updated changelog link
* remove stream
* updated test
* removed references to file metadata
* updated bookmark test docs
  • Loading branch information
somethingmorerelevant authored Sep 5, 2024
1 parent b07f8ee commit aa6bac1
Show file tree
Hide file tree
Showing 12 changed files with 16 additions and 163 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 3.1.0
* Remove deprecated stream file_metadata [#96](https://github.com/singer-io/tap-google-sheets/pull/96)

## 3.0.0
* Remove support for date datatype [#95](https://github.com/singer-io/tap-google-sheets/pull/95)

Expand Down
7 changes: 1 addition & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ This tap:

## Authentication
The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id=1FojlvtLwS0-BzGS37R0jEXtwSHqSiO1Uw-7RKQQO-C4) Google Doc provides instructions show how to configure the Google Cloud API credentials to enable Google Drive and Google Sheets APIs, configure Google Cloud to authorize/verify your domain ownership, generate an API key (client_id, client_secret), authenticate and generate a refresh_token, and prepare your tap config.json with the necessary parameters.
- Enable Googe Drive APIs and Authorization Scope: https://www.googleapis.com/auth/drive.metadata.readonly
- Enable Google Sheets API and Authorization Scope: https://www.googleapis.com/auth/spreadsheets.readonly
- Tap config.json parameters:
- client_id: identifies your application
Expand Down Expand Up @@ -122,10 +121,7 @@ The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id=
```json
{
"currently_syncing": "file_metadata",
"bookmarks": {
"file_metadata": "2019-09-27T22:34:39.000000Z"
}
"currently_syncing": "sheet_metadata",
}
```
Expand Down Expand Up @@ -185,7 +181,6 @@ The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id=
+----------------------+---------+---------+
| stream | records | schemas |
+----------------------+---------+---------+
| file_metadata | 1 | 1 |
| spreadsheet_metadata | 1 | 1 |
| Test-1 | 9 | 1 |
| Test 2 | 2 | 1 |
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup, find_packages

setup(name='tap-google-sheets',
version='3.0.0',
version='3.1.0',
description='Singer.io tap for extracting data from the Google Sheets v4 API',
author='[email protected]',
classifiers=['Programming Language :: Python :: 3 :: Only'],
Expand Down
5 changes: 1 addition & 4 deletions state.json.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
{
"currently_syncing": "file_metadata",
"bookmarks": {
"file_metadata": "2019-09-27T22:34:39.000000Z"
}
"currently_syncing": "sheet_metadata"
}
45 changes: 0 additions & 45 deletions tap_google_sheets/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,49 +243,6 @@ def sync_stream(self, records, catalog, time_extracted=None):
LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(self.stream_name, record_count))
update_currently_syncing(self.state, None)

class FileMetadata(GoogleSheets):
stream_name = "file_metadata"
api = "files"
path = "files/{spreadsheet_id}"
key_properties = ["id"]
replication_method = "INCREMENTAL"
replication_keys = ["modifiedTime"]
params = {
"fields": "id,name,createdTime,modifiedTime,version,teamDriveId,driveId,lastModifyingUser",
"supportsAllDrives": True
}

def sync(self, catalog, state, selected_streams):
"""
sync file's metadata
"""
self.state = state
# variable to check if file is changed or not

# get date to start sync from, ie. start date or bookmark date
start_date = strptime_to_utc(get_bookmark(state, self.stream_name, self.config_start_date))

LOGGER.info("GET file_metadata")
file_metadata, time_extracted = self.get_data(stream_name=self.stream_name)
LOGGER.info("Transform file_metadata")

file_modified_time = strptime_to_utc(file_metadata.get("modifiedTime"))
LOGGER.info("last_datetime = {}, file_modified_time = {}".format(start_date, file_modified_time))
if file_modified_time <= start_date:
# if file is not changed, update the variable
LOGGER.info("file_modified_time <= last_datetime, FILE NOT CHANGED. EXITING.")
# return and stop syncing the next streams, as the file is not changed
return False, file_modified_time

# only perform sync if file metadata stream is selected and file is changed
if self.stream_name in selected_streams:
# transform file metadata records
file_metadata_transformed = internal_transform.transform_file_metadata(file_metadata)
# do sync
self.sync_stream(file_metadata_transformed, catalog, time_extracted)

return True, file_modified_time

class SpreadSheetMetadata(GoogleSheets):
stream_name = "spreadsheet_metadata"
api = "sheets"
Expand Down Expand Up @@ -641,11 +598,9 @@ def sync(self, catalog, state, sheets_loaded_records):


# create OrderDict, as the order matters for syncing the streams
# "file_metadata" -> do not sync other streams, if file is not changed
# "spreadsheet_metadata" -> get sheets in the spreadsheet and load sheet's records
# and prepare records for "sheet_metadata" and "sheets_loaded" streams
STREAMS = OrderedDict()
STREAMS['file_metadata'] = FileMetadata
STREAMS['spreadsheet_metadata'] = SpreadSheetMetadata
STREAMS['sheet_metadata'] = SheetMetadata
STREAMS['sheets_loaded'] = SheetsLoaded
11 changes: 0 additions & 11 deletions tap_google_sheets/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
def sync(client, config, catalog, state):
"""
Sync the streams, loop over STREAMS
"file_metadata" -> get the file's metadata and if the spreadsheet file is updated then continue the sync else stop the sync
"spreadsheet_metadata" -> get the spreadsheet's metadata
- sync the spreadsheet_metadata stream if selected
- get the sheets in the spreadsheet and loop over the sheets and sync the sheet's records if selected
Expand Down Expand Up @@ -60,14 +59,4 @@ def sync(client, config, catalog, state):
else:
stream_obj.sync(catalog, state, sheets_loaded_records)

# sync file metadata
elif stream_name == "file_metadata":
file_changed, file_modified_time = stream_obj.sync(catalog, state, selected_streams)
if not file_changed:
break

LOGGER.info("FINISHED Syncing: %s", stream_name)

# write "file_metadata" bookmark, as we have successfully synced all the sheet's records
# it will force to re-sync of there is any interrupt between the sync
write_bookmark(state, 'file_metadata', strftime(file_modified_time))
14 changes: 0 additions & 14 deletions tap_google_sheets/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,6 @@ def transform_spreadsheet_metadata(spreadsheet_metadata):
spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
return spreadsheet_metadata_arr

# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
def transform_file_metadata(file_metadata):
# Convert to dict
file_metadata_tf = json.loads(json.dumps(file_metadata))
# Remove keys
if file_metadata_tf.get('lastModifyingUser'):
file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
file_metadata_tf['lastModifyingUser'].pop('me', None)
file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
# Add record to an array of 1
file_metadata_arr = []
file_metadata_arr.append(file_metadata_tf)
return file_metadata_arr

# Convert Excel Date Serial Number (excel_date_sn) to datetime string
# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
def excel_to_dttm_str(string_value, excel_date_sn, timezone_str=None):
Expand Down
10 changes: 1 addition & 9 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,6 @@ def expected_metadata(self):
# self.REPLICATION_KEYS: {"modified_at"}
}
return {
"file_metadata": {
self.PRIMARY_KEYS: {"id", },
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {"modifiedTime"}
},
"sheet_metadata": {
self.PRIMARY_KEYS: {"sheetId"}, # "spreadsheetId"}, # BUG? | This is not in the real tap, "spreadsheetId"},
self.REPLICATION_METHOD: self.FULL_TABLE,
Expand Down Expand Up @@ -295,9 +290,6 @@ def perform_and_verify_table_and_field_selection(self,
# Verify only automatic fields are selected
expected_automatic_fields = self.expected_automatic_fields().get(cat['stream_name'])
selected_fields = self.get_selected_fields_from_metadata(catalog_entry['metadata'])
# BUG TDL-14241 | Replication keys are not automatic
if cat['stream_name'] == "file_metadata":
expected_automatic_fields.remove('modifiedTime')
self.assertEqual(expected_automatic_fields, selected_fields)

@staticmethod
Expand Down Expand Up @@ -373,7 +365,7 @@ def timedelta_formatted(self, dtime, days=0):
##########################################################################

def is_sheet(self, stream):
non_sheets_streams = {'sheet_metadata', 'file_metadata', 'sheets_loaded', 'spreadsheet_metadata'}
non_sheets_streams = {'sheet_metadata', 'sheets_loaded', 'spreadsheet_metadata'}
return stream in self.expected_streams().difference(non_sheets_streams)

def undiscoverable_sheets(self):
Expand Down
15 changes: 1 addition & 14 deletions tests/test_google_sheets_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,4 @@ def test_run(self):
# verify all fields for a stream were replicated
self.assertGreater(len(expected_all_keys), len(expected_automatic_keys))
self.assertTrue(expected_automatic_keys.issubset(expected_all_keys), msg=f'{expected_automatic_keys-expected_all_keys} is not in "expected_all_keys"')
if stream == "file_metadata":

# As per google documentation https://developers.google.com/drive/api/v3/reference/files `teamDriveId` is deprecated. There is mentioned that use `driveId` instead.
# `driveId` is populated from items in the team shared drives. But stitch integration does not support shared team drive. So replicating driveid is not possible.
# So, these two fields will not be synced.
expected_all_keys.remove('teamDriveId')
expected_all_keys.remove('driveId')
# Earlier field `emailAddress` was defined as `emailAddress`(typo mismatch) in file_metadata.json.
# So, this particular field did not collected. Because API response contain `emailAddress` field.
# Now, typo has been corrected and verifying that `emailAddress` field collected.
lastModifyingUser_fields = set(messages['messages'][0].get('data', {}).get('lastModifyingUser', {}).keys()) # Get `lastModifyingUser` from file_metadata records
# Verify that `emailAddress` field under `lastModifyingUser` collected.
self.assertTrue({'emailAddress'}.issubset(lastModifyingUser_fields), msg="emailAddress does not found in lastModifyingUser")
self.assertSetEqual(expected_all_keys, actual_all_keys)
self.assertSetEqual(expected_all_keys, actual_all_keys)
4 changes: 0 additions & 4 deletions tests/test_google_sheets_automatic_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,5 @@ def test_run(self):
# Verify that you get some records for each stream
self.assertGreater(record_count_by_stream.get(stream, -1), 0)

# Verify that only the automatic fields are sent to the target
# BUG TDL-14241 | Replication keys are not automatic
if stream == "file_metadata":
expected_keys.remove('modifiedTime')
for actual_keys in record_messages_keys:
self.assertSetEqual(expected_keys, actual_keys)
61 changes: 8 additions & 53 deletions tests/test_google_sheets_bookmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@


class BookmarksTest(GoogleSheetsBaseTest):
"""Ensure all sheets streams will replicate based off of the most recent bookmarked state for 'file_metadata'"""
"""Ensure all sheets streams will replicate in full table mode and create appropriate bookmarks"""

conn_id = ""
expected_test_streams = ""
record_count_by_stream_1 = ""
Expand All @@ -20,20 +20,12 @@ def name():
def test_run(self):
"""
Run check mode, perform table and field selection, and run a sync.
Replication can be triggered by pushing back state to prior 'file_metadata' state.
Run a second sync after not updating state to verify no streams are being synced
Run a 3rd sync and ensure full table streams are triggered by the simulated bookmark value.
- Verify initial sync message actions include activate versions and the upserts
- Verify no streams are synced when 'file_metadata' bookmark does not change
- Verify that the third sync with the updated simulated bookmark has the same synced streams as the first sync
- Verify that streams will sync based off of 'file_metadata' even when it is not selected
- check if bookmark include activate versions for all streams
"""
skipped_streams = {stream
for stream in self.expected_streams()
if stream.startswith('sadsheet')}.union({
'file_metadata' # testing case without file_metadata selected, but still providing bookmark
})
if stream.startswith('sadsheet')}
self.expected_test_streams = self.expected_streams() - skipped_streams

# Grab connection, and run discovery and initial sync
Expand All @@ -43,7 +35,7 @@ def test_run(self):

# Grab state to be updated later
state = menagerie.get_state(self.conn_id)

# BUG full table streams are saving bookmarks unnecessarily https://jira.talendforge.org/browse/TDL-14343

# BUG there are no activate version messages in the sheet_metadata, spreadsheet_metadata
Expand All @@ -55,40 +47,7 @@ def test_run(self):
self.assertEqual('activate_version', sync1_message_actions[0])
self.assertEqual('activate_version', sync1_message_actions[-1])
self.assertSetEqual({'upsert'}, set(sync1_message_actions[1:-1]))

# run a sync again, this time we shouldn't get any records back
sync_job_name = runner.run_sync_mode(self, self.conn_id)
exit_status = menagerie.get_exit_status(self.conn_id, sync_job_name)
menagerie.verify_sync_exit_status(self, exit_status, sync_job_name)
record_count_by_stream_2 = runner.examine_target_output_file(
self, self.conn_id, self.expected_streams(), self.expected_primary_keys())

# verify we do not sync any unexpected streams
self.assertSetEqual(set(), set(record_count_by_stream_2.keys()))

# verify no records were synced for our expected streams
for stream in self.expected_test_streams:
with self.subTest(stream=stream):
self.assertEqual(0, record_count_by_stream_2.get(stream, 0))

# roll back the state of the file_metadata stream to ensure that we sync sheets
# based off of this state
file_metadata_stream = 'file_metadata'
file_metadata_bookmark = state['bookmarks'][file_metadata_stream]
bookmark_datetime = datetime.datetime.strptime(file_metadata_bookmark, self.BOOKMARK_COMPARISON_FORMAT)
target_datetime = bookmark_datetime + datetime.timedelta(days=-1)
target_bookmark = datetime.datetime.strftime(target_datetime, self.BOOKMARK_COMPARISON_FORMAT)

new_state = copy.deepcopy(state)
new_state['bookmarks'][file_metadata_stream] = target_bookmark

menagerie.set_state(self.conn_id, new_state)

record_count_by_stream_3 = self.run_and_verify_sync(self.conn_id)
synced_records_3 = runner.get_records_from_target_output()

# verify we sync sheets based off the state of file_metadata
self.assertDictEqual(self.record_count_by_stream_1, record_count_by_stream_3)
self.assertIn(stream, state["bookmarks"].keys())

def starter(self):
"""
Expand All @@ -102,7 +61,7 @@ def starter(self):
### Instantiate connection
##########################################################################
self.conn_id = connections.ensure_connection(self)

##########################################################################
### Discovery without the backoff
##########################################################################
Expand All @@ -118,7 +77,7 @@ def starter(self):
self.assertSetEqual(self.expected_streams(), found_catalog_names, msg="discovered schemas do not match")
LOGGER.info("discovered schemas are OK")


# table and field selection
test_catalogs = [catalog for catalog in found_catalogs
if catalog.get('stream_name') in self.expected_test_streams]
Expand All @@ -143,7 +102,3 @@ def starter(self):
msg="failed to replicate any data: {}".format(self.record_count_by_stream_1)
)
LOGGER.info("total replicated row count: %s", sum(self.record_count_by_stream_1.values()))




2 changes: 0 additions & 2 deletions tests/test_google_sheets_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ def test_run(self):
# verify that primary keys and replication keys
# are given the inclusion of automatic in metadata.
# BUG TDL-14241 | Replication keys are not automatic
if stream == 'file_metadata':
expected_automatic_fields.remove('modifiedTime')
self.assertSetEqual(expected_automatic_fields, actual_automatic_fields)

# verify missing values where __sdc_row = 2
Expand Down

0 comments on commit aa6bac1

Please sign in to comment.