From 43a24cbab1dbc35b893c35b86e34adc0f2fb84e7 Mon Sep 17 00:00:00 2001 From: Jeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com> Date: Thu, 9 Jan 2020 07:30:53 -0800 Subject: v.0.0.3 Sync error handling, activate version, documentation (#2) * v.0.0.2 schema and sync changes Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py * v.0.0.3 Sync activate version and error handling Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages. --- tap_google_sheets/schema.py | 51 +++++++++++++++++++----- tap_google_sheets/sync.py | 94 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 116 insertions(+), 29 deletions(-) (limited to 'tap_google_sheets') diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index 243467b..e319c03 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py @@ -21,6 +21,7 @@ def colnum_string(num): # Create sheet_metadata_json with columns from sheet def get_sheet_schema_columns(sheet): + sheet_title = sheet.get('properties', {}).get('title') sheet_json_schema = OrderedDict() data = next(iter(sheet.get('data', [])), {}) row_data = data.get('rowData', []) @@ -62,15 +63,34 @@ def get_sheet_schema_columns(sheet): skipped = 0 column_name = '{}'.format(header_value) if column_name in header_list: - raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) + raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( + sheet_title, column_name, column_letter)) header_list.append(column_name) - first_value = first_values[i] - + first_value = None + try: + first_value = first_values[i] + except IndexError as err: + raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format( + sheet_title, column_name, column_letter, err)) + column_effective_value = first_value.get('effectiveValue', {}) - for key in column_effective_value.keys(): - if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): - column_effective_value_type = key + + col_val = None + if column_effective_value == {}: + column_effective_value_type = 'stringValue' + LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( + sheet_title, column_name, column_letter)) + LOGGER.info(' Setting column datatype to STRING') + else: + for key, val in column_effective_value.items(): + if key in ('numberValue', 'stringValue', 'boolValue'): + column_effective_value_type = key + col_val = str(val) + elif key in ('errorType', 'formulaType'): + col_val = str(val) + raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( + sheet_title, column_name, column_letter, key, col_val)) column_number_format = first_values[i].get('effectiveFormat', {}).get( 'numberFormat', {}) @@ -87,7 +107,13 @@ def get_sheet_schema_columns(sheet): # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType # column_format = None # Default - if column_effective_value_type == 'stringValue': + if column_effective_value == {}: + col_properties = {'type': ['null', 'string']} + column_gs_type = 'stringValue' + LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( + sheet_title, column_name, column_letter)) + LOGGER.info(' Setting column datatype to STRING') + elif column_effective_value_type == 'stringValue': col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' elif column_effective_value_type == 'boolValue': @@ -138,8 +164,8 @@ def get_sheet_schema_columns(sheet): else: col_properties = {'type': ['null', 'string']} column_gs_type = 'unsupportedValue' - LOGGER.info('Unsupported data type: {}, value: {}'.format(column_name, \ - column_effective_value_type)) + LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( + sheet_title, column_name, column_letter, column_effective_value_type, col_val)) LOGGER.info('Converting to string.') else: # skipped column_is_skipped = True @@ -148,11 +174,16 @@ def get_sheet_schema_columns(sheet): column_name = '__sdc_skip_col_{}'.format(column_index_str) col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' + LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( + sheet_title, column_name, column_letter)) + LOGGER.info(' This column will be skipped during data loading.') if skipped >= 2: # skipped = 2 consecutive skipped headers # Remove prior_header column_name sheet_json_schema['properties'].pop(prior_header, None) + LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( + sheet_title, column_name, column_letter)) break else: @@ -245,7 +276,7 @@ def get_schemas(client, spreadsheet_id): for sheet in sheets: # GET sheet_json_schema for each worksheet (from function above) sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) - LOGGER.info('columns = {}'.format(columns)) + # LOGGER.info('columns = {}'.format(columns)) sheet_title = sheet.get('properties', {}).get('title') schemas[sheet_title] = sheet_json_schema diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 76b2e59..311281c 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py @@ -6,6 +6,7 @@ import pytz import singer from singer import metrics, metadata, Transformer, utils from singer.utils import strptime_to_utc, strftime +from singer.messages import RecordMessage from tap_google_sheets.streams import STREAMS from tap_google_sheets.schema import get_sheet_metadata @@ -17,14 +18,26 @@ def write_schema(catalog, stream_name): schema = stream.schema.to_dict() try: singer.write_schema(stream_name, schema, stream.key_properties) + LOGGER.info('Writing schema for: {}'.format(stream_name)) except OSError as err: LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) raise err -def write_record(stream_name, record, time_extracted): +def write_record(stream_name, record, time_extracted, version=None): try: - singer.messages.write_record(stream_name, record, time_extracted=time_extracted) + if version: + singer.messages.write_message( + RecordMessage( + stream=stream_name, + record=record, + version=version, + time_extracted=time_extracted)) + else: + singer.messages.write_record( + stream_name=stream_name, + record=record, + time_extracted=time_extracted) except OSError as err: LOGGER.info('OS Error writing record for: {}'.format(stream_name)) LOGGER.info('record: {}'.format(record)) @@ -53,7 +66,8 @@ def write_bookmark(state, stream, value): def process_records(catalog, stream_name, records, - time_extracted): + time_extracted, + version=None): stream = catalog.get_stream(stream_name) schema = stream.schema.to_dict() stream_metadata = metadata.to_map(stream.metadata) @@ -65,7 +79,11 @@ def process_records(catalog, record, schema, stream_metadata) - write_record(stream_name, transformed_record, time_extracted=time_extracted) + write_record( + stream_name=stream_name, + record=transformed_record, + time_extracted=time_extracted, + version=version) counter.increment() return counter.value @@ -206,7 +224,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None): # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times # Convert from array of values to JSON with column names as keys -def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): +def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows): sheet_data_tf = [] row_num = from_row # Create sorted list of columns based on columnIndex @@ -229,21 +247,32 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col = cols[col_num - 1] col_skipped = col.get('columnSkipped') if not col_skipped: + # Get column metadata col_name = col.get('columnName') col_type = col.get('columnType') + col_letter = col.get('columnLetter') + + # NULL values + if value is None or value == '': + col_val = None + # Convert dates/times from Lotus Notes Serial Numbers # DATE-TIME - if col_type == 'numberType.DATE_TIME': + elif col_type == 'numberType.DATE_TIME': if isinstance(value, (int, float)): col_val = excel_to_dttm_str(value) else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) # DATE elif col_type == 'numberType.DATE': if isinstance(value, (int, float)): col_val = excel_to_dttm_str(value)[:10] else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) # TIME ONLY (NO DATE) elif col_type == 'numberType.TIME': if isinstance(value, (int, float)): @@ -253,6 +282,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col_val = str(timedelta(seconds=total_secs)) except ValueError: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) else: col_val = str(value) # NUMBER (INTEGER AND FLOAT) @@ -268,13 +299,19 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col_val = float(round(value, 15)) except ValueError: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) else: # decimal_digits <= 15, no rounding try: col_val = float(value) except ValueError: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) # STRING elif col_type == 'stringValue': col_val = str(value) @@ -289,6 +326,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col_val = False else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row, col_type, value)) elif isinstance(value, int): if value in (1, -1): col_val = True @@ -296,9 +335,13 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col_val = False else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row, col_type, value)) # OTHER: Convert everything else to a string else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row, col_type, value)) sheet_data_row_tf[col_name] = col_val col_num = col_num + 1 # APPEND non-empty row @@ -400,6 +443,20 @@ def sync(client, config, catalog, state): LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) write_schema(catalog, sheet_title) + # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) + # everytime after each sheet sync is complete. + # This forces hard deletes on the data downstream if fewer records are sent. + # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 + last_integer = int(get_bookmark(state, sheet_title, 0)) + activate_version = int(time.time() * 1000) + activate_version_message = singer.ActivateVersionMessage( + stream=sheet_title, + version=activate_version) + if last_integer == 0: + # initial load, send activate_version before AND after data sync + singer.write_message(activate_version_message) + LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) + # Determine max range of columns and rows for "paging" through the data sheet_last_col_index = 1 sheet_last_col_letter = 'A' @@ -438,6 +495,7 @@ def sync(client, config, catalog, state): sheet_data_tf, row_num = transform_sheet_data( spreadsheet_id=spreadsheet_id, sheet_id=sheet_id, + sheet_title=sheet_title, from_row=from_row, columns=columns, sheet_data_rows=sheet_data_rows) @@ -449,10 +507,11 @@ def sync(client, config, catalog, state): catalog=catalog, stream_name=sheet_title, records=sheet_data_tf, - time_extracted=ss_time_extracted) + time_extracted=ss_time_extracted, + version=activate_version) LOGGER.info('Sheet: {}, records processed: {}'.format( sheet_title, record_count)) - + # Update paging from/to_row for next batch from_row = to_row + 1 if to_row + batch_rows > sheet_max_row: @@ -460,6 +519,14 @@ def sync(client, config, catalog, state): else: to_row = to_row + batch_rows + # End of Stream: Send Activate Version and update State + singer.write_message(activate_version_message) + write_bookmark(state, sheet_title, activate_version) + LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) + LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( + sheet_title, row_num - 2)) # subtract 1 for header row + update_currently_syncing(state, None) + # SHEETS_LOADED # Add sheet to sheets_loaded sheet_loaded = {} @@ -470,17 +537,6 @@ def sync(client, config, catalog, state): sheet_loaded['lastRowNumber'] = row_num sheets_loaded.append(sheet_loaded) - # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. - # This forces hard deletes on the data downstream if fewer records are sent. - # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 - activate_version_message = singer.ActivateVersionMessage( - stream=sheet_title, - version=int(time.time() * 1000)) - singer.write_message(activate_version_message) - - LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( - sheet_title, row_num - 2)) # subtract 1 for header row - stream_name = 'sheet_metadata' # Sync sheet_metadata if selected sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) -- cgit v1.2.3