From da690bda91ea6a14964a2378e5dbb5d4de91a7e2 Mon Sep 17 00:00:00 2001 From: Jeff Huth Date: Fri, 15 Nov 2019 00:49:39 -0800 Subject: client.py rate limit, sync.py changes client.py rate limit, fix json schemas, sync.py many changes --- tap_google_sheets/client.py | 8 +- tap_google_sheets/schemas/file_metadata.json | 2 +- tap_google_sheets/schemas/sheets_loaded.json | 2 +- tap_google_sheets/sync.py | 354 ++++++++++++++++++++------- 4 files changed, 275 insertions(+), 91 deletions(-) diff --git a/tap_google_sheets/client.py b/tap_google_sheets/client.py index 12f0811..0a0ce5a 100644 --- a/tap_google_sheets/client.py +++ b/tap_google_sheets/client.py @@ -151,6 +151,7 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes def __exit__(self, exception_type, exception_value, traceback): self.__session.close() + @backoff.on_exception(backoff.expo, Server5xxError, max_tries=5, @@ -187,13 +188,13 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes LOGGER.info('Authorized, token expires = {}'.format(self.__expires)) + # Rate Limit: https://developers.google.com/sheets/api/limits + # 100 request per 100 seconds per User @backoff.on_exception(backoff.expo, (Server5xxError, ConnectionError, Server429Error), max_tries=7, factor=3) - # Rate Limit: - # https://developers.google.com/webmaster-tools/search-console-api-original/v3/limits - @utils.ratelimit(1200, 60) + @utils.ratelimit(100, 100) def request(self, method, path=None, url=None, api=None, **kwargs): self.get_access_token() @@ -211,6 +212,7 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes del kwargs['endpoint'] else: endpoint = None + LOGGER.info('{} URL = {}'.format(endpoint, url)) if 'headers' not in kwargs: kwargs['headers'] = {} diff --git a/tap_google_sheets/schemas/file_metadata.json b/tap_google_sheets/schemas/file_metadata.json index 25c19c4..03fefc6 100644 --- a/tap_google_sheets/schemas/file_metadata.json +++ b/tap_google_sheets/schemas/file_metadata.json @@ -30,7 +30,7 @@ "additionalProperties": false, "properties": { "kind": { - "type": ["null", "integer"] + "type": ["null", "string"] }, "displayName": { "type": ["null", "string"] diff --git a/tap_google_sheets/schemas/sheets_loaded.json b/tap_google_sheets/schemas/sheets_loaded.json index 12f967a..f7a323d 100644 --- a/tap_google_sheets/schemas/sheets_loaded.json +++ b/tap_google_sheets/schemas/sheets_loaded.json @@ -8,7 +8,7 @@ "sheetId": { "type": ["null", "integer"] }, - "sheetTitle": { + "title": { "type": ["null", "string"] }, "loadDate": { diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 5b57e77..79e05f9 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py @@ -2,6 +2,8 @@ import time import math import singer import json +import pytz +from datetime import datetime, timedelta from collections import OrderedDict from singer import metrics, metadata, Transformer, utils from singer.utils import strptime_to_utc, strftime @@ -48,66 +50,40 @@ def write_bookmark(state, stream, value): singer.write_state(state) -# def transform_datetime(this_dttm): -def transform_datetime(this_dttm): - with Transformer() as transformer: - new_dttm = transformer._transform_datetime(this_dttm) - return new_dttm - - -def process_records(catalog, #pylint: disable=too-many-branches +# Transform/validate batch of records w/ schema and sent to target +def process_records(catalog, stream_name, records, - time_extracted, - bookmark_field=None, - bookmark_type=None, - max_bookmark_value=None, - last_datetime=None, - last_integer=None, - parent=None, - parent_id=None): + time_extracted): stream = catalog.get_stream(stream_name) schema = stream.schema.to_dict() stream_metadata = metadata.to_map(stream.metadata) - with metrics.record_counter(stream_name) as counter: for record in records: - # If child object, add parent_id to record - if parent_id and parent: - record[parent + '_id'] = parent_id - # Transform record for Singer.io with Transformer() as transformer: transformed_record = transformer.transform( record, schema, stream_metadata) - # Reset max_bookmark_value to new value if higher - if transformed_record.get(bookmark_field): - if max_bookmark_value is None or \ - transformed_record[bookmark_field] > transform_datetime(max_bookmark_value): - max_bookmark_value = transformed_record[bookmark_field] - - if bookmark_field and (bookmark_field in transformed_record): - if bookmark_type == 'integer': - # Keep only records whose bookmark is after the last_integer - if transformed_record[bookmark_field] >= last_integer: - write_record(stream_name, transformed_record, \ - time_extracted=time_extracted) - counter.increment() - elif bookmark_type == 'datetime': - last_dttm = transform_datetime(last_datetime) - bookmark_dttm = transform_datetime(transformed_record[bookmark_field]) - # Keep only records whose bookmark is after the last_datetime - if bookmark_dttm >= last_dttm: - write_record(stream_name, transformed_record, \ - time_extracted=time_extracted) - counter.increment() - else: - write_record(stream_name, transformed_record, time_extracted=time_extracted) - counter.increment() - - return max_bookmark_value, counter.value + write_record(stream_name, transformed_record, time_extracted=time_extracted) + counter.increment() + return counter.value + + +def sync_stream(stream_name, selected_streams, catalog, state, records): + # Should sheets_loaded be synced? + if stream_name in selected_streams: + LOGGER.info('STARTED Syncing {}'.format(stream_name)) + update_currently_syncing(state, stream_name) + write_schema(catalog, stream_name) + record_count = process_records( + catalog=catalog, + stream_name=stream_name, + records=records, + time_extracted=utils.now()) + LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) + update_currently_syncing(state, None) # Currently syncing sets the stream currently being delivered in the state. @@ -154,14 +130,16 @@ def get_data(stream_name, querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( '{sheet_title}', stream_name) data = {} + time_extracted = utils.now() data = client.get( path=path, api=api, params=querystring, endpoint=stream_name) - return data + return data, time_extracted +# Tranform file_metadata: remove nodes from lastModifyingUser, format as array def transform_file_metadata(file_metadata): # Convert to dict file_metadata_tf = json.loads(json.dumps(file_metadata)) @@ -176,10 +154,11 @@ def transform_file_metadata(file_metadata): return file_metadata_arr +# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array def transform_spreadsheet_metadata(spreadsheet_metadata): # Convert to dict spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) - # Remove keys + # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata) if spreadsheet_metadata_tf.get('properties'): spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) spreadsheet_metadata_tf.pop('sheets', None) @@ -189,6 +168,7 @@ def transform_spreadsheet_metadata(spreadsheet_metadata): return spreadsheet_metadata_arr +# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata def transform_sheet_metadata(spreadsheet_id, sheet, columns): # Convert to properties to dict sheet_metadata = sheet.get('properties') @@ -202,6 +182,107 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns): return sheet_metadata_tf +# Convert Excel Date Serial Number (excel_date_sn) to datetime string +# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes) +def excel_to_dttm_str(excel_date_sn, timezone_str=None): + if not timezone_str: + timezone_str = 'UTC' + tz = pytz.timezone(timezone_str) + sec_per_day = 86400 + excel_epoch = 25569 # 1970-01-01T00:00:00Z + epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) + epoch_dttm = datetime(1970, 1, 1) + excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) + utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc) + utc_dttm_str = strftime(utc_dttm) + return utc_dttm_str + + +# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times +# Convert from array of values to JSON with column names as keys +def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): + sheet_data_tf = [] + is_last_row = False + row_num = from_row + # Create sorted list of columns based on columnIndex + cols = sorted(columns, key = lambda i: i['columnIndex']) + + # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) + for row in sheet_data_rows: + # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1 + if row == []: + is_last_row = True + return sheet_data_tf, row_num - 1, is_last_row + sheet_data_row_tf = {} + # Add spreadsheet_id, sheet_id, and row + sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id + sheet_data_row_tf['__sdc_sheet_id'] = sheet_id + sheet_data_row_tf['__sdc_row'] = row_num + col_num = 1 + for value in row: + # Select column metadata based on column index + col = cols[col_num - 1] + col_skipped = col.get('columnSkipped') + if not col_skipped: + col_name = col.get('columnName') + col_type = col.get('columnType') + # Convert dates/times from Lotus Notes Serial Numbers + if col_type == 'numberType.DATE_TIME': + if isinstance(value, int) or isinstance(value, float): + col_val = excel_to_dttm_str(value) + else: + col_val = str(value) + elif col_type == 'numberType.DATE': + if isinstance(value, int) or isinstance(value, float): + col_val = excel_to_dttm_str(value)[:10] + else: + col_val = str(value) + elif col_type == 'numberType.TIME': + if isinstance(value, int) or isinstance(value, float): + try: + total_secs = value * 86400 # seconds in day + col_val = str(timedelta(seconds=total_secs)) + except ValueError: + col_val = str(value) + else: + col_val = str(value) + elif col_type == 'numberType': + if isinstance(value, int): + col_val = int(value) + else: + try: + col_val = float(value) + except ValueError: + col_val = str(value) + elif col_type == 'stringValue': + col_val = str(value) + elif col_type == 'boolValue': + if isinstance(value, bool): + col_val = value + elif isinstance(value, str): + if value.lower() in ('true', 't', 'yes', 'y'): + col_val = True + elif value.lower() in ('false', 'f', 'no', 'n'): + col_val = False + else: + col_val = str(value) + elif isinstance(value, int): + if value == 1 or value == -1: + col_val = True + elif value == 0: + col_val = False + else: + col_val = str(value) + + else: + col_val = value + sheet_data_row_tf[col_name] = col_val + col_num = col_num + 1 + sheet_data_tf.append(sheet_data_row_tf) + row_num = row_num + 1 + return sheet_data_tf, row_num, is_last_row + + def sync(client, config, catalog, state): start_date = config.get('start_date') spreadsheet_id = config.get('spreadsheet_id') @@ -218,63 +299,164 @@ def sync(client, config, catalog, state): if not selected_streams: return - # Get file_metadata + # FILE_METADATA file_metadata = {} - file_metadata_config = STREAMS.get('file_metadata') - file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id) + stream_name = 'file_metadata' + file_metadata_config = STREAMS.get(stream_name) + + # GET file_metadata + LOGGER.info('GET file_meatadata') + file_metadata, time_extracted = get_data(stream_name=stream_name, + endpoint_config=file_metadata_config, + client=client, + spreadsheet_id=spreadsheet_id) + # Transform file_metadata + LOGGER.info('Transform file_meatadata') file_metadata_tf = transform_file_metadata(file_metadata) # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) - last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date)) + + # Check if file has changed, if not break (return to __init__) + last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date)) this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) if this_datetime <= last_datetime: LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') return 0 - - # Get spreadsheet_metadata + else: + # Sync file_metadata if selected + sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf) + write_bookmark(state, stream_name, strftime(this_datetime)) + + # SPREADSHEET_METADATA spreadsheet_metadata = {} - spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata') - spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id) + stream_name = 'spreadsheet_metadata' + spreadsheet_metadata_config = STREAMS.get(stream_name) + + # GET spreadsheet_metadata + LOGGER.info('GET spreadsheet_meatadata') + spreadsheet_metadata, ss_time_extracted = get_data( + stream_name=stream_name, + endpoint_config=spreadsheet_metadata_config, + client=client, + spreadsheet_id=spreadsheet_id) + + # Transform spreadsheet_metadata + LOGGER.info('Transform spreadsheet_meatadata') spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) - # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf)) - # Get sheet_metadata + # Sync spreadsheet_metadata if selected + sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf) + + # SHEET_METADATA and SHEET_DATA sheets = spreadsheet_metadata.get('sheets') sheet_metadata = [] sheets_loaded = [] sheets_loaded_config = STREAMS['sheets_loaded'] if sheets: + # Loop thru sheets (worksheet tabs) in spreadsheet for sheet in sheets: sheet_title = sheet.get('properties', {}).get('title') + sheet_id = sheet.get('properties', {}).get('sheetId') + + # GET sheet_metadata and columns sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) + + # Transform sheet_metadata sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) sheet_metadata.append(sheet_metadata_tf) - # Determine range of rows and columns for "paging" through batch rows of data - sheet_last_col_index = 1 - sheet_last_col_letter = 'A' - for col in columns: - col_index = col.get('columnIndex') - col_letter = col.get('columnLetter') - if col_index > sheet_last_col_index: - sheet_last_col_index = col_index - sheet_last_col_letter = col_letter - sheet_max_row = sheet.get('gridProperties', {}).get('rowCount') - is_empty_row = False - batch_rows = 200 - from_row = 2 - if sheet_max_row < batch_rows: - to_row = sheet_max_row - else: - to_row = batch_rows - - while not is_empty_row and to_row <= sheet_max_row: - range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row) - - sheet_data = get_data( - stream_name=sheet_title, - endpoint_config=sheets_loaded_config, - client=client, - spreadsheet_id=spreadsheet_id, - range_rows=range_rows) + # SHEET_DATA + # Should this worksheet tab be synced? + if sheet_title in selected_streams: + LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) + update_currently_syncing(state, sheet_title) + write_schema(catalog, sheet_title) + + # Determine max range of columns and rows for "paging" through the data + sheet_last_col_index = 1 + sheet_last_col_letter = 'A' + for col in columns: + col_index = col.get('columnIndex') + col_letter = col.get('columnLetter') + if col_index > sheet_last_col_index: + sheet_last_col_index = col_index + sheet_last_col_letter = col_letter + sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') + + # Initialize paging for 1st batch + is_last_row = False + batch_rows = 200 + from_row = 2 + if sheet_max_row < batch_rows: + to_row = sheet_max_row + else: + to_row = batch_rows + + # Loop thru batches (each having 200 rows of data) + while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: + range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) + + # GET sheet_data for a worksheet tab + sheet_data, time_extracted = get_data( + stream_name=sheet_title, + endpoint_config=sheets_loaded_config, + client=client, + spreadsheet_id=spreadsheet_id, + range_rows=range_rows) + # Data is returned as a list of arrays, an array of values for each row + sheet_data_rows = sheet_data.get('values') + + # Transform batch of rows to JSON with keys for each column + sheet_data_tf, row_num, is_last_row = transform_sheet_data( + spreadsheet_id=spreadsheet_id, + sheet_id=sheet_id, + from_row=from_row, + columns=columns, + sheet_data_rows=sheet_data_rows) + if row_num < to_row: + is_last_row = True + + # Process records, send batch of records to target + record_count = process_records( + catalog=catalog, + stream_name=sheet_title, + records=sheet_data_tf, + time_extracted=ss_time_extracted) + + # Update paging from/to_row for next batch + from_row = to_row + 1 + if to_row + batch_rows > sheet_max_row: + to_row = sheet_max_row + else: + to_row = to_row + batch_rows + + # SHEETS_LOADED + # Add sheet to sheets_loaded + sheet_loaded = {} + sheet_loaded['spreadsheetId'] = spreadsheet_id + sheet_loaded['sheetId'] = sheet_id + sheet_loaded['title'] = sheet_title + sheet_loaded['loadDate'] = strftime(utils.now()) + sheet_loaded['lastRowNumber'] = row_num + sheets_loaded.append(sheet_loaded) + + # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. + # This forces hard deletes on the data downstream if fewer records are sent for a sheet. + # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167 + activate_version_message = singer.ActivateVersionMessage( + stream=sheet_title, + version=int(time.time() * 1000)) + singer.write_message(activate_version_message) + + LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( + sheet_title, row_num - 1)) + update_currently_syncing(state, None) + + stream_name = 'sheet_metadata' + # Sync sheet_metadata if selected + sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) + + stream_name = 'sheets_loaded' + # Sync sheet_metadata if selected + sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) -- cgit v1.2.3