From 43a24cbab1dbc35b893c35b86e34adc0f2fb84e7 Mon Sep 17 00:00:00 2001 From: Jeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com> Date: Thu, 9 Jan 2020 07:30:53 -0800 Subject: v.0.0.3 Sync error handling, activate version, documentation (#2) * v.0.0.2 schema and sync changes Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py * v.0.0.3 Sync activate version and error handling Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages. --- CHANGELOG.md | 3 ++ README.md | 37 +++++++++++------- setup.py | 4 +- tap_google_sheets/schema.py | 51 +++++++++++++++++++----- tap_google_sheets/sync.py | 94 ++++++++++++++++++++++++++++++++++++--------- 5 files changed, 144 insertions(+), 45 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e5d6560..3c47260 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.0.3 + * Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages. + ## 0.0.2 * Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py diff --git a/README.md b/README.md index 8c9cc9d..9470411 100644 --- a/README.md +++ b/README.md @@ -11,30 +11,37 @@ This tap: - [File Metadata](https://developers.google.com/drive/api/v3/reference/files/get) - [Spreadsheet Metadata](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/get) - [Spreadsheet Values](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get) +- Outputs the following metadata streams: + - File Metadata: Name, audit/change info from Google Drive + - Spreadsheet Metadata: Basic metadata about the Spreadsheet: Title, Locale, URL, etc. + - Sheet Metadata: Title, URL, Area (max column and row), and Column Metadata + - Column Metadata: Column Header Name, Data type, Format + - Sheets Loaded: Sheet title, load date, number of rows - For each Sheet: - - Outputs the schema for each resource (based on the column header and datatypes of first row of data) - - Outputs a record for all columns with column headers, and for each row of data until it reaches an empty row + - Outputs the schema for each resource (based on the column header and datatypes of row 2, the first row of data) + - Outputs a record for all columns that have column headers, and for each row of data + - Emits a Singer ACTIVATE_VERSION message after each sheet is complete. This forces hard deletes on the data downstream if fewer records are sent. + - Primary Key for each row in a Sheet is the Row Number: `__sdc_row` + - Each Row in a Sheet also includes Foreign Keys to the Spreadsheet Metadata, `__sdc_spreadsheet_id`, and Sheet Metadata, `__sdc_sheet_id`. ## API Endpoints [**file (GET)**](https://developers.google.com/drive/api/v3/reference/files/get) - Endpoint: https://www.googleapis.com/drive/v3/files/${spreadsheet_id}?fields=id,name,createdTime,modifiedTime,version - Primary keys: id -- Replication strategy: Full (GET file audit data for spreadsheet_id in config) +- Replication strategy: Incremental (GET file audit data for spreadsheet_id in config) - Process/Transformations: Replicate Data if Modified [**metadata (GET)**](https://developers.google.com/drive/api/v3/reference/files/get) - Endpoint: https://sheets.googleapis.com/v4/spreadsheets/${spreadsheet_id}?includeGridData=true&ranges=1:2 - This endpoint eturns spreadsheet metadata, sheet metadata, and value metadata (data type information) -- Primary keys: spreadsheetId, title, field_name +- Primary keys: Spreadsheet Id, Sheet Id, Column Index - Foreign keys: None - Replication strategy: Full (get and replace file metadata for spreadshee_id in config) - Process/Transformations: - Verify Sheets: Check sheets exist (compared to catalog) and check gridProperties (available area) - sheetId, title, index, gridProperties (rowCount, columnCount) - - Verify Field Headers (1st row): Check field headers exist (compared to catalog), missing headers (columns to skip), column order/position, and column uniqueness - - Header's field_name, position: data.rowData[0].values[i].formattedValue - - Create/Verify Datatypes (2nd row): - - Row 2's datatype, format: data.rowData[1].values[i] + - Verify Field Headers (1st row): Check field headers exist (compared to catalog), missing headers (columns to skip), column order/position, and column name uniqueness + - Create/Verify Datatypes based on 2nd row value and cell metadata - First check: - [effectiveValue: key](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue) - Valid types: numberValue, stringValue, boolValue @@ -42,20 +49,22 @@ This tap: - Then check: - [effectiveFormat.numberFormat.type](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType) - Valid types: UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC - - If DATE or DATE_TIME, set JSON schema datatype = string and format = date-time - - [effectiveFormat.numberFormat.pattern](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormat) + - Determine JSON schema column data type based on the value and the above cell metadata settings. + - If DATE, DATE_TIME, or TIME, set JSON schema format accordingly [**values (GET)**](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get) - Endpoint: https://sheets.googleapis.com/v4/spreadsheets/${spreadsheet_id}/values/'${sheet_name}'!${row_range}?dateTimeRenderOption=SERIAL_NUMBER&valueRenderOption=UNFORMATTED_VALUE&majorDimension=ROWS - This endpoint loops through sheets and row ranges to get the [unformatted values](https://developers.google.com/sheets/api/reference/rest/v4/ValueRenderOption) (effective values only), dates and datetimes as [serial numbers](https://developers.google.com/sheets/api/reference/rest/v4/DateTimeRenderOption) -- Primary keys: row +- Primary keys: _sdc_row - Replication strategy: Full (GET file audit data for spreadsheet_id in config) - Process/Transformations: - Loop through sheets (compared to catalog selection) - Send metadata for sheet - - Loop through ranges of rows until reaching empty row or area max row (from sheet metadata) - - Transform values, if necessary (dates, date-times, boolean, integer, numers) - - Process/send records + - Loop through ALL columns for columns having a column header + - Loop through ranges of rows for ALL rows in sheet available area max row (from sheet metadata) + - Transform values, if necessary (dates, date-times, times, boolean). + - Date/time serial numbers converted to date, date-time, and time strings. Google Sheets uses Lotus 1-2-3 [Serial Number](https://developers.google.com/sheets/api/reference/rest/v4/DateTimeRenderOption) format for date/times. These are converted to normal UTC date-time strings. + - Process/send records to target ## Authentication The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id=1FojlvtLwS0-BzGS37R0jEXtwSHqSiO1Uw-7RKQQO-C4) Google Doc provides instructions show how to configure the Google Cloud API credentials to enable Google Drive and Google Sheets APIs, configure Google Cloud to authorize/verify your domain ownership, generate an API key (client_id, client_secret), authenticate and generate a refresh_token, and prepare your tap config.json with the necessary parameters. diff --git a/setup.py b/setup.py index 6fe2493..80c2c10 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='tap-google-sheets', - version='0.0.2', + version='0.0.3', description='Singer.io tap for extracting data from the Google Sheets v4 API', author='jeff.huth@bytecode.io', classifiers=['Programming Language :: Python :: 3 :: Only'], @@ -11,7 +11,7 @@ setup(name='tap-google-sheets', install_requires=[ 'backoff==1.8.0', 'requests==2.22.0', - 'singer-python==5.8.1' + 'singer-python==5.9.0' ], entry_points=''' [console_scripts] diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index 243467b..e319c03 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py @@ -21,6 +21,7 @@ def colnum_string(num): # Create sheet_metadata_json with columns from sheet def get_sheet_schema_columns(sheet): + sheet_title = sheet.get('properties', {}).get('title') sheet_json_schema = OrderedDict() data = next(iter(sheet.get('data', [])), {}) row_data = data.get('rowData', []) @@ -62,15 +63,34 @@ def get_sheet_schema_columns(sheet): skipped = 0 column_name = '{}'.format(header_value) if column_name in header_list: - raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) + raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( + sheet_title, column_name, column_letter)) header_list.append(column_name) - first_value = first_values[i] - + first_value = None + try: + first_value = first_values[i] + except IndexError as err: + raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format( + sheet_title, column_name, column_letter, err)) + column_effective_value = first_value.get('effectiveValue', {}) - for key in column_effective_value.keys(): - if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): - column_effective_value_type = key + + col_val = None + if column_effective_value == {}: + column_effective_value_type = 'stringValue' + LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( + sheet_title, column_name, column_letter)) + LOGGER.info(' Setting column datatype to STRING') + else: + for key, val in column_effective_value.items(): + if key in ('numberValue', 'stringValue', 'boolValue'): + column_effective_value_type = key + col_val = str(val) + elif key in ('errorType', 'formulaType'): + col_val = str(val) + raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( + sheet_title, column_name, column_letter, key, col_val)) column_number_format = first_values[i].get('effectiveFormat', {}).get( 'numberFormat', {}) @@ -87,7 +107,13 @@ def get_sheet_schema_columns(sheet): # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType # column_format = None # Default - if column_effective_value_type == 'stringValue': + if column_effective_value == {}: + col_properties = {'type': ['null', 'string']} + column_gs_type = 'stringValue' + LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( + sheet_title, column_name, column_letter)) + LOGGER.info(' Setting column datatype to STRING') + elif column_effective_value_type == 'stringValue': col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' elif column_effective_value_type == 'boolValue': @@ -138,8 +164,8 @@ def get_sheet_schema_columns(sheet): else: col_properties = {'type': ['null', 'string']} column_gs_type = 'unsupportedValue' - LOGGER.info('Unsupported data type: {}, value: {}'.format(column_name, \ - column_effective_value_type)) + LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( + sheet_title, column_name, column_letter, column_effective_value_type, col_val)) LOGGER.info('Converting to string.') else: # skipped column_is_skipped = True @@ -148,11 +174,16 @@ def get_sheet_schema_columns(sheet): column_name = '__sdc_skip_col_{}'.format(column_index_str) col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' + LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( + sheet_title, column_name, column_letter)) + LOGGER.info(' This column will be skipped during data loading.') if skipped >= 2: # skipped = 2 consecutive skipped headers # Remove prior_header column_name sheet_json_schema['properties'].pop(prior_header, None) + LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( + sheet_title, column_name, column_letter)) break else: @@ -245,7 +276,7 @@ def get_schemas(client, spreadsheet_id): for sheet in sheets: # GET sheet_json_schema for each worksheet (from function above) sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) - LOGGER.info('columns = {}'.format(columns)) + # LOGGER.info('columns = {}'.format(columns)) sheet_title = sheet.get('properties', {}).get('title') schemas[sheet_title] = sheet_json_schema diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 76b2e59..311281c 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py @@ -6,6 +6,7 @@ import pytz import singer from singer import metrics, metadata, Transformer, utils from singer.utils import strptime_to_utc, strftime +from singer.messages import RecordMessage from tap_google_sheets.streams import STREAMS from tap_google_sheets.schema import get_sheet_metadata @@ -17,14 +18,26 @@ def write_schema(catalog, stream_name): schema = stream.schema.to_dict() try: singer.write_schema(stream_name, schema, stream.key_properties) + LOGGER.info('Writing schema for: {}'.format(stream_name)) except OSError as err: LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) raise err -def write_record(stream_name, record, time_extracted): +def write_record(stream_name, record, time_extracted, version=None): try: - singer.messages.write_record(stream_name, record, time_extracted=time_extracted) + if version: + singer.messages.write_message( + RecordMessage( + stream=stream_name, + record=record, + version=version, + time_extracted=time_extracted)) + else: + singer.messages.write_record( + stream_name=stream_name, + record=record, + time_extracted=time_extracted) except OSError as err: LOGGER.info('OS Error writing record for: {}'.format(stream_name)) LOGGER.info('record: {}'.format(record)) @@ -53,7 +66,8 @@ def write_bookmark(state, stream, value): def process_records(catalog, stream_name, records, - time_extracted): + time_extracted, + version=None): stream = catalog.get_stream(stream_name) schema = stream.schema.to_dict() stream_metadata = metadata.to_map(stream.metadata) @@ -65,7 +79,11 @@ def process_records(catalog, record, schema, stream_metadata) - write_record(stream_name, transformed_record, time_extracted=time_extracted) + write_record( + stream_name=stream_name, + record=transformed_record, + time_extracted=time_extracted, + version=version) counter.increment() return counter.value @@ -206,7 +224,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None): # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times # Convert from array of values to JSON with column names as keys -def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): +def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows): sheet_data_tf = [] row_num = from_row # Create sorted list of columns based on columnIndex @@ -229,21 +247,32 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col = cols[col_num - 1] col_skipped = col.get('columnSkipped') if not col_skipped: + # Get column metadata col_name = col.get('columnName') col_type = col.get('columnType') + col_letter = col.get('columnLetter') + + # NULL values + if value is None or value == '': + col_val = None + # Convert dates/times from Lotus Notes Serial Numbers # DATE-TIME - if col_type == 'numberType.DATE_TIME': + elif col_type == 'numberType.DATE_TIME': if isinstance(value, (int, float)): col_val = excel_to_dttm_str(value) else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) # DATE elif col_type == 'numberType.DATE': if isinstance(value, (int, float)): col_val = excel_to_dttm_str(value)[:10] else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) # TIME ONLY (NO DATE) elif col_type == 'numberType.TIME': if isinstance(value, (int, float)): @@ -253,6 +282,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col_val = str(timedelta(seconds=total_secs)) except ValueError: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) else: col_val = str(value) # NUMBER (INTEGER AND FLOAT) @@ -268,13 +299,19 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col_val = float(round(value, 15)) except ValueError: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) else: # decimal_digits <= 15, no rounding try: col_val = float(value) except ValueError: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row_num, col_type, value)) # STRING elif col_type == 'stringValue': col_val = str(value) @@ -289,6 +326,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col_val = False else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row, col_type, value)) elif isinstance(value, int): if value in (1, -1): col_val = True @@ -296,9 +335,13 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data col_val = False else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row, col_type, value)) # OTHER: Convert everything else to a string else: col_val = str(value) + LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( + sheet_title, col_name, col_letter, row, col_type, value)) sheet_data_row_tf[col_name] = col_val col_num = col_num + 1 # APPEND non-empty row @@ -400,6 +443,20 @@ def sync(client, config, catalog, state): LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) write_schema(catalog, sheet_title) + # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) + # everytime after each sheet sync is complete. + # This forces hard deletes on the data downstream if fewer records are sent. + # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 + last_integer = int(get_bookmark(state, sheet_title, 0)) + activate_version = int(time.time() * 1000) + activate_version_message = singer.ActivateVersionMessage( + stream=sheet_title, + version=activate_version) + if last_integer == 0: + # initial load, send activate_version before AND after data sync + singer.write_message(activate_version_message) + LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) + # Determine max range of columns and rows for "paging" through the data sheet_last_col_index = 1 sheet_last_col_letter = 'A' @@ -438,6 +495,7 @@ def sync(client, config, catalog, state): sheet_data_tf, row_num = transform_sheet_data( spreadsheet_id=spreadsheet_id, sheet_id=sheet_id, + sheet_title=sheet_title, from_row=from_row, columns=columns, sheet_data_rows=sheet_data_rows) @@ -449,10 +507,11 @@ def sync(client, config, catalog, state): catalog=catalog, stream_name=sheet_title, records=sheet_data_tf, - time_extracted=ss_time_extracted) + time_extracted=ss_time_extracted, + version=activate_version) LOGGER.info('Sheet: {}, records processed: {}'.format( sheet_title, record_count)) - + # Update paging from/to_row for next batch from_row = to_row + 1 if to_row + batch_rows > sheet_max_row: @@ -460,6 +519,14 @@ def sync(client, config, catalog, state): else: to_row = to_row + batch_rows + # End of Stream: Send Activate Version and update State + singer.write_message(activate_version_message) + write_bookmark(state, sheet_title, activate_version) + LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) + LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( + sheet_title, row_num - 2)) # subtract 1 for header row + update_currently_syncing(state, None) + # SHEETS_LOADED # Add sheet to sheets_loaded sheet_loaded = {} @@ -470,17 +537,6 @@ def sync(client, config, catalog, state): sheet_loaded['lastRowNumber'] = row_num sheets_loaded.append(sheet_loaded) - # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. - # This forces hard deletes on the data downstream if fewer records are sent. - # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 - activate_version_message = singer.ActivateVersionMessage( - stream=sheet_title, - version=int(time.time() * 1000)) - singer.write_message(activate_version_message) - - LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( - sheet_title, row_num - 2)) # subtract 1 for header row - stream_name = 'sheet_metadata' # Sync sheet_metadata if selected sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) -- cgit v1.2.3