From 99424fee5ba6ff830df39be8f47c3e3d685b444a Mon Sep 17 00:00:00 2001 From: Jeff Huth Date: Fri, 15 Nov 2019 01:58:55 -0800 Subject: pylint and testing pylint and testing --- README.md | 28 +++++++++++++++++-- tap_google_sheets/client.py | 8 ++---- tap_google_sheets/discover.py | 6 ++-- tap_google_sheets/schema.py | 56 ++++++++++++++++++++----------------- tap_google_sheets/streams.py | 7 ++--- tap_google_sheets/sync.py | 65 ++++++++++++++++++++++++------------------- 6 files changed, 101 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index 5752b64..8c9cc9d 100644 --- a/README.md +++ b/README.md @@ -154,7 +154,7 @@ The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id= ``` Pylint test resulted in the following score: ```bash - TBD + Your code has been rated at 9.78/10 ``` To [check the tap](https://github.com/singer-io/singer-tools#singer-check-tap) and verify working: @@ -164,7 +164,31 @@ The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id= ``` Check tap resulted in the following: ```bash - TBD + The output is valid. + It contained 3881 messages for 13 streams. + + 13 schema messages + 3841 record messages + 27 state messages + + Details by stream: + +----------------------+---------+---------+ + | stream | records | schemas | + +----------------------+---------+---------+ + | file_metadata | 1 | 1 | + | spreadsheet_metadata | 1 | 1 | + | Test-1 | 9 | 1 | + | Test 2 | 2 | 1 | + | SKU COGS | 218 | 1 | + | Item Master | 216 | 1 | + | Retail Price | 273 | 1 | + | Retail Price NEW | 284 | 1 | + | Forecast Scenarios | 2681 | 1 | + | Promo Type | 91 | 1 | + | Shipping Method | 47 | 1 | + | sheet_metadata | 9 | 1 | + | sheets_loaded | 9 | 1 | + +----------------------+---------+---------+ ``` --- diff --git a/tap_google_sheets/client.py b/tap_google_sheets/client.py index 0a0ce5a..4f38352 100644 --- a/tap_google_sheets/client.py +++ b/tap_google_sheets/client.py @@ -1,8 +1,7 @@ from datetime import datetime, timedelta +from collections import OrderedDict import backoff import requests -from collections import OrderedDict - import singer from singer import metrics from singer import utils @@ -123,8 +122,7 @@ def raise_for_error(response): error_code = response.get('error', {}).get('code') ex = get_exception_for_error_code(error_code) raise ex(message) - else: - raise GoogleError(error) + raise GoogleError(error) except (ValueError, TypeError): raise GoogleError(error) @@ -196,9 +194,7 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes factor=3) @utils.ratelimit(100, 100) def request(self, method, path=None, url=None, api=None, **kwargs): - self.get_access_token() - self.base_url = 'https://sheets.googleapis.com/v4' if api == 'files': self.base_url = 'https://www.googleapis.com/drive/v3' diff --git a/tap_google_sheets/discover.py b/tap_google_sheets/discover.py index 6477a5f..6cf0d09 100644 --- a/tap_google_sheets/discover.py +++ b/tap_google_sheets/discover.py @@ -10,11 +10,11 @@ def discover(client, spreadsheet_id): schema = Schema.from_dict(schema_dict) mdata = field_metadata[stream_name] key_properties = None - for md in mdata: - table_key_properties = md.get('metadata', {}).get('table-key-properties') + for mdt in mdata: + table_key_properties = mdt.get('metadata', {}).get('table-key-properties') if table_key_properties: key_properties = table_key_properties - + catalog.streams.append(CatalogEntry( stream=stream_name, tap_stream_id=stream_name, diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index 237ab06..d4fead5 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py @@ -11,19 +11,19 @@ LOGGER = singer.get_logger() # https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata # Convert column index to column letter -def colnum_string(n): +def colnum_string(num): string = "" - while n > 0: - n, remainder = divmod(n - 1, 26) + while num > 0: + num, remainder = divmod(num - 1, 26) string = chr(65 + remainder) + string return string # Create sheet_metadata_json with columns from sheet -def get_sheet_schema_columns(sheet, spreadsheet_id, client): +def get_sheet_schema_columns(sheet): sheet_json_schema = OrderedDict() data = next(iter(sheet.get('data', [])), {}) - row_data = data.get('rowData',[]) + row_data = data.get('rowData', []) # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse headers = row_data[0].get('values', []) @@ -65,33 +65,32 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client): column_name = '{}'.format(header_value) if column_name in header_list: raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) - else: - header_list.append(column_name) + header_list.append(column_name) first_value = first_values[i] - # LOGGER.info('first_value[{}] = {}'.format(i, json.dumps(first_value, indent=2, sort_keys=True))) column_effective_value = first_value.get('effectiveValue', {}) for key in column_effective_value.keys(): if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): column_effective_value_type = key - column_number_format = first_values[i].get('effectiveFormat', {}).get('numberFormat', {}) + column_number_format = first_values[i].get('effectiveFormat', {}).get( + 'numberFormat', {}) column_number_format_type = column_number_format.get('type') # Determine datatype for sheet_json_schema # - # column_effective_value_type = numberValue, stringValue, boolValue; INVALID: errorType, formulaType - # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue + # column_effective_value_type = numberValue, stringValue, boolValue; + # INVALID: errorType, formulaType + # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue # - # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC - # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType + # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, + # TIME, DATE_TIME, SCIENTIFIC + # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType # column_format = None # Default # column_multiple_of = None # Default - if column_effective_value_type in ('formulaValue', 'errorValue'): - raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name)) - elif column_effective_value_type == 'stringValue': + if column_effective_value_type == 'stringValue': column_type = ['null', 'string'] column_gs_type = 'stringValue' elif column_effective_value_type == 'boolValue': @@ -116,7 +115,9 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client): else: column_type = ['null', 'number', 'string'] column_gs_type = 'numberType' - + elif column_effective_value_type in ('formulaValue', 'errorValue'): + raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name, \ + column_effective_value_type)) else: # skipped column_is_skipped = True skipped = skipped + 1 @@ -130,7 +131,6 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client): # skipped = 2 consecutive skipped headers # Remove prior_header column_name sheet_json_schema['properties'].pop(prior_header, None) - column_count = i - 1 break else: @@ -164,12 +164,14 @@ def get_sheet_metadata(sheet, spreadsheet_id, client): stream_metadata = STREAMS.get(stream_name) api = stream_metadata.get('api', 'sheets') params = stream_metadata.get('params', {}) - querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace('{sheet_title}', sheet_title) - path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring) + querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in \ + params.items()]).replace('{sheet_title}', sheet_title) + path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \ + spreadsheet_id), querystring) sheet_md_results = client.get(path=path, api=api, endpoint=stream_name) sheet_cols = sheet_md_results.get('sheets')[0] - sheet_schema, columns = get_sheet_schema_columns(sheet_cols, spreadsheet_id, client) + sheet_schema, columns = get_sheet_schema_columns(sheet_cols) return sheet_schema, columns @@ -199,20 +201,22 @@ def get_schemas(client, spreadsheet_id): replication_method=stream_metadata.get('replication_method', None) ) field_metadata[stream_name] = mdata - + if stream_name == 'spreadsheet_metadata': api = stream_metadata.get('api', 'sheets') params = stream_metadata.get('params', {}) querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]) - path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring) + path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \ + spreadsheet_id), querystring) - spreadsheet_md_results = client.get(path=path, params=querystring, api=api, endpoint=stream_name) + spreadsheet_md_results = client.get(path=path, params=querystring, api=api, \ + endpoint=stream_name) sheets = spreadsheet_md_results.get('sheets') if sheets: for sheet in sheets: sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) - # LOGGER.info('sheet_schema = {}'.format(json.dumps(sheet_schema, indent=2, sort_keys=True))) + LOGGER.info('columns = {}'.format(columns)) sheet_title = sheet.get('properties', {}).get('title') schemas[sheet_title] = sheet_schema @@ -224,5 +228,5 @@ def get_schemas(client, spreadsheet_id): replication_method='FULL_TABLE' ) field_metadata[sheet_title] = sheet_mdata - + return schemas, field_metadata diff --git a/tap_google_sheets/streams.py b/tap_google_sheets/streams.py index 231a41d..b8e3eff 100644 --- a/tap_google_sheets/streams.py +++ b/tap_google_sheets/streams.py @@ -8,11 +8,10 @@ from collections import OrderedDict # key_properties: Primary key fields for identifying an endpoint record. # replication_method: INCREMENTAL or FULL_TABLE # replication_keys: bookmark_field(s), typically a date-time, used for filtering the results -# and setting the state +# and setting the state # params: Query, sort, and other endpoint specific parameters; default = {} -# data_key: JSON element containing the results list for the endpoint; default = root (no data_key) -# bookmark_query_field: From date-time field used for filtering the query -# bookmark_type: Data type for bookmark, integer or datetime +# data_key: JSON element containing the results list for the endpoint; +# default = root (no data_key) FILE_METADATA = { "api": "files", diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 79e05f9..d7d7184 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py @@ -1,10 +1,9 @@ import time import math -import singer import json -import pytz from datetime import datetime, timedelta -from collections import OrderedDict +import pytz +import singer from singer import metrics, metadata, Transformer, utils from singer.utils import strptime_to_utc, strftime from tap_google_sheets.streams import STREAMS @@ -71,17 +70,21 @@ def process_records(catalog, return counter.value -def sync_stream(stream_name, selected_streams, catalog, state, records): +def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None): # Should sheets_loaded be synced? if stream_name in selected_streams: LOGGER.info('STARTED Syncing {}'.format(stream_name)) update_currently_syncing(state, stream_name) + selected_fields = get_selected_fields(catalog, stream_name) + LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields)) write_schema(catalog, stream_name) + if not time_extracted: + time_extracted = utils.now() record_count = process_records( catalog=catalog, stream_name=stream_name, records=records, - time_extracted=utils.now()) + time_extracted=time_extracted) LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) update_currently_syncing(state, None) @@ -105,9 +108,9 @@ def get_selected_fields(catalog, stream_name): mdata_list = singer.metadata.to_list(mdata) selected_fields = [] for entry in mdata_list: - field = None + field = None try: - field = entry['breadcrumb'][1] + field = entry['breadcrumb'][1] if entry.get('metadata', {}).get('selected', False): selected_fields.append(field) except IndexError: @@ -172,7 +175,7 @@ def transform_spreadsheet_metadata(spreadsheet_metadata): def transform_sheet_metadata(spreadsheet_id, sheet, columns): # Convert to properties to dict sheet_metadata = sheet.get('properties') - sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) + sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) sheet_id = sheet_metadata_tf.get('sheetId') sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( spreadsheet_id, sheet_id) @@ -187,13 +190,13 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns): def excel_to_dttm_str(excel_date_sn, timezone_str=None): if not timezone_str: timezone_str = 'UTC' - tz = pytz.timezone(timezone_str) + tzn = pytz.timezone(timezone_str) sec_per_day = 86400 excel_epoch = 25569 # 1970-01-01T00:00:00Z epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) epoch_dttm = datetime(1970, 1, 1) excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) - utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc) + utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc) utc_dttm_str = strftime(utc_dttm) return utc_dttm_str @@ -205,7 +208,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data is_last_row = False row_num = from_row # Create sorted list of columns based on columnIndex - cols = sorted(columns, key = lambda i: i['columnIndex']) + cols = sorted(columns, key=lambda i: i['columnIndex']) # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) for row in sheet_data_rows: @@ -228,17 +231,17 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col_type = col.get('columnType') # Convert dates/times from Lotus Notes Serial Numbers if col_type == 'numberType.DATE_TIME': - if isinstance(value, int) or isinstance(value, float): + if isinstance(value, (int, float)): col_val = excel_to_dttm_str(value) else: col_val = str(value) elif col_type == 'numberType.DATE': - if isinstance(value, int) or isinstance(value, float): + if isinstance(value, (int, float)): col_val = excel_to_dttm_str(value)[:10] else: col_val = str(value) elif col_type == 'numberType.TIME': - if isinstance(value, int) or isinstance(value, float): + if isinstance(value, (int, float)): try: total_secs = value * 86400 # seconds in day col_val = str(timedelta(seconds=total_secs)) @@ -267,7 +270,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data else: col_val = str(value) elif isinstance(value, int): - if value == 1 or value == -1: + if value in (1, -1): col_val = True elif value == 0: col_val = False @@ -321,11 +324,10 @@ def sync(client, config, catalog, state): LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) if this_datetime <= last_datetime: LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') - return 0 - else: - # Sync file_metadata if selected - sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf) - write_bookmark(state, stream_name, strftime(this_datetime)) + return + # Sync file_metadata if selected + sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted) + write_bookmark(state, stream_name, strftime(this_datetime)) # SPREADSHEET_METADATA spreadsheet_metadata = {} @@ -334,7 +336,7 @@ def sync(client, config, catalog, state): # GET spreadsheet_metadata LOGGER.info('GET spreadsheet_meatadata') - spreadsheet_metadata, ss_time_extracted = get_data( + spreadsheet_metadata, ss_time_extracted = get_data( stream_name=stream_name, endpoint_config=spreadsheet_metadata_config, client=client, @@ -345,7 +347,8 @@ def sync(client, config, catalog, state): spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) # Sync spreadsheet_metadata if selected - sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf) + sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \ + ss_time_extracted) # SHEET_METADATA and SHEET_DATA sheets = spreadsheet_metadata.get('sheets') @@ -360,6 +363,7 @@ def sync(client, config, catalog, state): # GET sheet_metadata and columns sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) + LOGGER.info('sheet_schema: {}'.format(sheet_schema)) # Transform sheet_metadata sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) @@ -371,6 +375,8 @@ def sync(client, config, catalog, state): if sheet_title in selected_streams: LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) update_currently_syncing(state, sheet_title) + selected_fields = get_selected_fields(catalog, sheet_title) + LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) write_schema(catalog, sheet_title) # Determine max range of columns and rows for "paging" through the data @@ -396,7 +402,7 @@ def sync(client, config, catalog, state): # Loop thru batches (each having 200 rows of data) while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) - + # GET sheet_data for a worksheet tab sheet_data, time_extracted = get_data( stream_name=sheet_title, @@ -423,7 +429,9 @@ def sync(client, config, catalog, state): stream_name=sheet_title, records=sheet_data_tf, time_extracted=ss_time_extracted) - + LOGGER.info('Sheet: {}, ecords processed: {}'.format( + sheet_title, record_count)) + # Update paging from/to_row for next batch from_row = to_row + 1 if to_row + batch_rows > sheet_max_row: @@ -442,8 +450,8 @@ def sync(client, config, catalog, state): sheets_loaded.append(sheet_loaded) # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. - # This forces hard deletes on the data downstream if fewer records are sent for a sheet. - # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167 + # This forces hard deletes on the data downstream if fewer records are sent. + # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 activate_version_message = singer.ActivateVersionMessage( stream=sheet_title, version=int(time.time() * 1000)) @@ -451,12 +459,13 @@ def sync(client, config, catalog, state): LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( sheet_title, row_num - 1)) - update_currently_syncing(state, None) stream_name = 'sheet_metadata' # Sync sheet_metadata if selected sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) - + stream_name = 'sheets_loaded' # Sync sheet_metadata if selected sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) + + return -- cgit v1.2.3