From 99424fee5ba6ff830df39be8f47c3e3d685b444a Mon Sep 17 00:00:00 2001 From: Jeff Huth Date: Fri, 15 Nov 2019 01:58:55 -0800 Subject: pylint and testing pylint and testing --- tap_google_sheets/sync.py | 65 +++++++++++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 28 deletions(-) (limited to 'tap_google_sheets/sync.py') diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 79e05f9..d7d7184 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py @@ -1,10 +1,9 @@ import time import math -import singer import json -import pytz from datetime import datetime, timedelta -from collections import OrderedDict +import pytz +import singer from singer import metrics, metadata, Transformer, utils from singer.utils import strptime_to_utc, strftime from tap_google_sheets.streams import STREAMS @@ -71,17 +70,21 @@ def process_records(catalog, return counter.value -def sync_stream(stream_name, selected_streams, catalog, state, records): +def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None): # Should sheets_loaded be synced? if stream_name in selected_streams: LOGGER.info('STARTED Syncing {}'.format(stream_name)) update_currently_syncing(state, stream_name) + selected_fields = get_selected_fields(catalog, stream_name) + LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields)) write_schema(catalog, stream_name) + if not time_extracted: + time_extracted = utils.now() record_count = process_records( catalog=catalog, stream_name=stream_name, records=records, - time_extracted=utils.now()) + time_extracted=time_extracted) LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) update_currently_syncing(state, None) @@ -105,9 +108,9 @@ def get_selected_fields(catalog, stream_name): mdata_list = singer.metadata.to_list(mdata) selected_fields = [] for entry in mdata_list: - field = None + field = None try: - field = entry['breadcrumb'][1] + field = entry['breadcrumb'][1] if entry.get('metadata', {}).get('selected', False): selected_fields.append(field) except IndexError: @@ -172,7 +175,7 @@ def transform_spreadsheet_metadata(spreadsheet_metadata): def transform_sheet_metadata(spreadsheet_id, sheet, columns): # Convert to properties to dict sheet_metadata = sheet.get('properties') - sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) + sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) sheet_id = sheet_metadata_tf.get('sheetId') sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( spreadsheet_id, sheet_id) @@ -187,13 +190,13 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns): def excel_to_dttm_str(excel_date_sn, timezone_str=None): if not timezone_str: timezone_str = 'UTC' - tz = pytz.timezone(timezone_str) + tzn = pytz.timezone(timezone_str) sec_per_day = 86400 excel_epoch = 25569 # 1970-01-01T00:00:00Z epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) epoch_dttm = datetime(1970, 1, 1) excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) - utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc) + utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc) utc_dttm_str = strftime(utc_dttm) return utc_dttm_str @@ -205,7 +208,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data is_last_row = False row_num = from_row # Create sorted list of columns based on columnIndex - cols = sorted(columns, key = lambda i: i['columnIndex']) + cols = sorted(columns, key=lambda i: i['columnIndex']) # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) for row in sheet_data_rows: @@ -228,17 +231,17 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col_type = col.get('columnType') # Convert dates/times from Lotus Notes Serial Numbers if col_type == 'numberType.DATE_TIME': - if isinstance(value, int) or isinstance(value, float): + if isinstance(value, (int, float)): col_val = excel_to_dttm_str(value) else: col_val = str(value) elif col_type == 'numberType.DATE': - if isinstance(value, int) or isinstance(value, float): + if isinstance(value, (int, float)): col_val = excel_to_dttm_str(value)[:10] else: col_val = str(value) elif col_type == 'numberType.TIME': - if isinstance(value, int) or isinstance(value, float): + if isinstance(value, (int, float)): try: total_secs = value * 86400 # seconds in day col_val = str(timedelta(seconds=total_secs)) @@ -267,7 +270,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data else: col_val = str(value) elif isinstance(value, int): - if value == 1 or value == -1: + if value in (1, -1): col_val = True elif value == 0: col_val = False @@ -321,11 +324,10 @@ def sync(client, config, catalog, state): LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) if this_datetime <= last_datetime: LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') - return 0 - else: - # Sync file_metadata if selected - sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf) - write_bookmark(state, stream_name, strftime(this_datetime)) + return + # Sync file_metadata if selected + sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted) + write_bookmark(state, stream_name, strftime(this_datetime)) # SPREADSHEET_METADATA spreadsheet_metadata = {} @@ -334,7 +336,7 @@ def sync(client, config, catalog, state): # GET spreadsheet_metadata LOGGER.info('GET spreadsheet_meatadata') - spreadsheet_metadata, ss_time_extracted = get_data( + spreadsheet_metadata, ss_time_extracted = get_data( stream_name=stream_name, endpoint_config=spreadsheet_metadata_config, client=client, @@ -345,7 +347,8 @@ def sync(client, config, catalog, state): spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) # Sync spreadsheet_metadata if selected - sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf) + sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \ + ss_time_extracted) # SHEET_METADATA and SHEET_DATA sheets = spreadsheet_metadata.get('sheets') @@ -360,6 +363,7 @@ def sync(client, config, catalog, state): # GET sheet_metadata and columns sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) + LOGGER.info('sheet_schema: {}'.format(sheet_schema)) # Transform sheet_metadata sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) @@ -371,6 +375,8 @@ def sync(client, config, catalog, state): if sheet_title in selected_streams: LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) update_currently_syncing(state, sheet_title) + selected_fields = get_selected_fields(catalog, sheet_title) + LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) write_schema(catalog, sheet_title) # Determine max range of columns and rows for "paging" through the data @@ -396,7 +402,7 @@ def sync(client, config, catalog, state): # Loop thru batches (each having 200 rows of data) while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) - + # GET sheet_data for a worksheet tab sheet_data, time_extracted = get_data( stream_name=sheet_title, @@ -423,7 +429,9 @@ def sync(client, config, catalog, state): stream_name=sheet_title, records=sheet_data_tf, time_extracted=ss_time_extracted) - + LOGGER.info('Sheet: {}, ecords processed: {}'.format( + sheet_title, record_count)) + # Update paging from/to_row for next batch from_row = to_row + 1 if to_row + batch_rows > sheet_max_row: @@ -442,8 +450,8 @@ def sync(client, config, catalog, state): sheets_loaded.append(sheet_loaded) # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. - # This forces hard deletes on the data downstream if fewer records are sent for a sheet. - # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167 + # This forces hard deletes on the data downstream if fewer records are sent. + # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 activate_version_message = singer.ActivateVersionMessage( stream=sheet_title, version=int(time.time() * 1000)) @@ -451,12 +459,13 @@ def sync(client, config, catalog, state): LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( sheet_title, row_num - 1)) - update_currently_syncing(state, None) stream_name = 'sheet_metadata' # Sync sheet_metadata if selected sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) - + stream_name = 'sheets_loaded' # Sync sheet_metadata if selected sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) + + return -- cgit v1.2.3