From 5890b89c1aa7c554235b3cef156b5a5a2c594bec Mon Sep 17 00:00:00 2001 From: Jeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com> Date: Wed, 4 Dec 2019 06:10:46 -0800 Subject: v.0.0.2 schema and sync changes (#1) Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py --- CHANGELOG.md | 3 + setup.py | 2 +- tap_google_sheets/schema.py | 83 +++++++++++++++------- tap_google_sheets/streams.py | 10 ++- tap_google_sheets/sync.py | 166 +++++++++++++++++++++++++------------------ 5 files changed, 164 insertions(+), 100 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d58f396..e5d6560 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ # Changelog +## 0.0.2 + * Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py + ## 0.0.1 * Initial commit diff --git a/setup.py b/setup.py index e3c4f3e..6fe2493 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='tap-google-sheets', - version='0.0.1', + version='0.0.2', description='Singer.io tap for extracting data from the Google Sheets v4 API', author='jeff.huth@bytecode.io', classifiers=['Programming Language :: Python :: 3 :: Only'], diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index d4fead5..243467b 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py @@ -30,8 +30,6 @@ def get_sheet_schema_columns(sheet): first_values = row_data[1].get('values', []) # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) - sheet_json_schema['type'] = 'object' - sheet_json_schema['additionalProperties'] = False sheet_json_schema = { 'type': 'object', 'additionalProperties': False, @@ -89,42 +87,66 @@ def get_sheet_schema_columns(sheet): # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType # column_format = None # Default - # column_multiple_of = None # Default if column_effective_value_type == 'stringValue': - column_type = ['null', 'string'] + col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' elif column_effective_value_type == 'boolValue': - column_type = ['null', 'boolean', 'string'] + col_properties = {'type': ['null', 'boolean', 'string']} column_gs_type = 'boolValue' elif column_effective_value_type == 'numberValue': if column_number_format_type == 'DATE_TIME': - column_type = ['null', 'string'] - column_format = 'date-time' + col_properties = { + 'type': ['null', 'string'], + 'format': 'date-time' + } column_gs_type = 'numberType.DATE_TIME' elif column_number_format_type == 'DATE': - column_type = ['null', 'string'] - column_format = 'date' + col_properties = { + 'type': ['null', 'string'], + 'format': 'date' + } column_gs_type = 'numberType.DATE' elif column_number_format_type == 'TIME': - column_type = ['null', 'string'] - column_format = 'time' + col_properties = { + 'type': ['null', 'string'], + 'format': 'time' + } column_gs_type = 'numberType.TIME' elif column_number_format_type == 'TEXT': - column_type = ['null', 'string'] + col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' else: - column_type = ['null', 'number', 'string'] + # Interesting - order in the anyOf makes a difference. + # Number w/ multipleOf must be listed last, otherwise errors occur. + col_properties = { + 'anyOf': [ + { + 'type': 'string' + }, + { + 'type': 'null' + }, + { + 'type': 'number', + 'multipleOf': 1e-15 + } + ] + } column_gs_type = 'numberType' - elif column_effective_value_type in ('formulaValue', 'errorValue'): - raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name, \ + # Catch-all to deal with other types and set to string + # column_effective_value_type: formulaValue, errorValue, or other + else: + col_properties = {'type': ['null', 'string']} + column_gs_type = 'unsupportedValue' + LOGGER.info('Unsupported data type: {}, value: {}'.format(column_name, \ column_effective_value_type)) + LOGGER.info('Converting to string.') else: # skipped column_is_skipped = True skipped = skipped + 1 column_index_str = str(column_index).zfill(2) column_name = '__sdc_skip_col_{}'.format(column_index_str) - column_type = ['null', 'string'] - column_format = None + col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' if skipped >= 2: @@ -144,10 +166,7 @@ def get_sheet_schema_columns(sheet): } columns.append(column) - sheet_json_schema['properties'][column_name] = column - sheet_json_schema['properties'][column_name]['type'] = column_type - if column_format: - sheet_json_schema['properties'][column_name]['format'] = column_format + sheet_json_schema['properties'][column_name] = col_properties prior_header = column_name i = i + 1 @@ -155,6 +174,10 @@ def get_sheet_schema_columns(sheet): return sheet_json_schema, columns +# Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query +# endpoint: spreadsheets/{spreadsheet_id} +# params: includeGridData = true, ranges = '{sheet_title}'!1:2 +# This endpoint includes detailed metadata about each cell - incl. data type, formatting, etc. def get_sheet_metadata(sheet, spreadsheet_id, client): sheet_id = sheet.get('properties', {}).get('sheetId') sheet_title = sheet.get('properties', {}).get('title') @@ -170,10 +193,13 @@ def get_sheet_metadata(sheet, spreadsheet_id, client): spreadsheet_id), querystring) sheet_md_results = client.get(path=path, api=api, endpoint=stream_name) - sheet_cols = sheet_md_results.get('sheets')[0] - sheet_schema, columns = get_sheet_schema_columns(sheet_cols) + # sheet_metadata: 1st `sheets` node in results + sheet_metadata = sheet_md_results.get('sheets')[0] - return sheet_schema, columns + # Create sheet_json_schema (for discovery/catalog) and columns (for sheet_metadata results) + sheet_json_schema, columns = get_sheet_schema_columns(sheet_metadata) + + return sheet_json_schema, columns def get_abs_path(path): @@ -209,20 +235,23 @@ def get_schemas(client, spreadsheet_id): path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \ spreadsheet_id), querystring) + # GET spreadsheet_metadata, which incl. sheets (basic metadata for each worksheet) spreadsheet_md_results = client.get(path=path, params=querystring, api=api, \ endpoint=stream_name) sheets = spreadsheet_md_results.get('sheets') if sheets: + # Loop thru each worksheet in spreadsheet for sheet in sheets: - sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) + # GET sheet_json_schema for each worksheet (from function above) + sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) LOGGER.info('columns = {}'.format(columns)) sheet_title = sheet.get('properties', {}).get('title') - schemas[sheet_title] = sheet_schema + schemas[sheet_title] = sheet_json_schema sheet_mdata = metadata.new() sheet_mdata = metadata.get_standard_metadata( - schema=sheet_schema, + schema=sheet_json_schema, key_properties=['__sdc_row'], valid_replication_keys=None, replication_method='FULL_TABLE' diff --git a/tap_google_sheets/streams.py b/tap_google_sheets/streams.py index b8e3eff..ad5529f 100644 --- a/tap_google_sheets/streams.py +++ b/tap_google_sheets/streams.py @@ -13,6 +13,8 @@ from collections import OrderedDict # data_key: JSON element containing the results list for the endpoint; # default = root (no data_key) +# file_metadata: Queries Google Drive API to get file information and see if file has been modified +# Provides audit info about who and when last changed the file. FILE_METADATA = { "api": "files", "path": "files/{spreadsheet_id}", @@ -24,6 +26,7 @@ FILE_METADATA = { } } +# spreadsheet_metadata: Queries spreadsheet to get basic information on spreadhsheet and sheets SPREADSHEET_METADATA = { "api": "sheets", "path": "spreadsheets/{spreadsheet_id}", @@ -34,6 +37,9 @@ SPREADSHEET_METADATA = { } } +# sheet_metadata: Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet. +# This endpoint includes detailed metadata about each cell in the header and first data row +# incl. data type, formatting, etc. SHEET_METADATA = { "api": "sheets", "path": "spreadsheets/{spreadsheet_id}", @@ -45,6 +51,8 @@ SHEET_METADATA = { } } +# sheets_loaded: Queries a batch of Rows for each Sheet in the Spreadsheet. +# Each query uses the `values` endpoint, to get data-only, w/out the formatting/type metadata. SHEETS_LOADED = { "api": "sheets", "path": "spreadsheets/{spreadsheet_id}/values/'{sheet_title}'!{range_rows}", @@ -58,7 +66,7 @@ SHEETS_LOADED = { } } -# Ensure streams are ordered logically +# Ensure streams are ordered sequentially, logically. STREAMS = OrderedDict() STREAMS['file_metadata'] = FILE_METADATA STREAMS['spreadsheet_metadata'] = SPREADSHEET_METADATA diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index d7d7184..76b2e59 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py @@ -125,11 +125,14 @@ def get_data(stream_name, range_rows=None): if not range_rows: range_rows = '' + # Replace {placeholder} variables in path path = endpoint_config.get('path', stream_name).replace( '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace( '{range_rows}', range_rows) params = endpoint_config.get('params', {}) api = endpoint_config.get('api', 'sheets') + # Add in querystring parameters and replace {placeholder} variables + # querystring function ensures parameters are added but not encoded causing API errors querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( '{sheet_title}', stream_name) data = {} @@ -192,7 +195,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None): timezone_str = 'UTC' tzn = pytz.timezone(timezone_str) sec_per_day = 86400 - excel_epoch = 25569 # 1970-01-01T00:00:00Z + excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) epoch_dttm = datetime(1970, 1, 1) excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) @@ -205,85 +208,103 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None): # Convert from array of values to JSON with column names as keys def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): sheet_data_tf = [] - is_last_row = False row_num = from_row # Create sorted list of columns based on columnIndex cols = sorted(columns, key=lambda i: i['columnIndex']) # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) for row in sheet_data_rows: - # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1 + # If empty row, SKIP if row == []: - is_last_row = True - return sheet_data_tf, row_num - 1, is_last_row - sheet_data_row_tf = {} - # Add spreadsheet_id, sheet_id, and row - sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id - sheet_data_row_tf['__sdc_sheet_id'] = sheet_id - sheet_data_row_tf['__sdc_row'] = row_num - col_num = 1 - for value in row: - # Select column metadata based on column index - col = cols[col_num - 1] - col_skipped = col.get('columnSkipped') - if not col_skipped: - col_name = col.get('columnName') - col_type = col.get('columnType') - # Convert dates/times from Lotus Notes Serial Numbers - if col_type == 'numberType.DATE_TIME': - if isinstance(value, (int, float)): - col_val = excel_to_dttm_str(value) - else: - col_val = str(value) - elif col_type == 'numberType.DATE': - if isinstance(value, (int, float)): - col_val = excel_to_dttm_str(value)[:10] - else: - col_val = str(value) - elif col_type == 'numberType.TIME': - if isinstance(value, (int, float)): - try: - total_secs = value * 86400 # seconds in day - col_val = str(timedelta(seconds=total_secs)) - except ValueError: + LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num)) + else: + sheet_data_row_tf = {} + # Add spreadsheet_id, sheet_id, and row + sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id + sheet_data_row_tf['__sdc_sheet_id'] = sheet_id + sheet_data_row_tf['__sdc_row'] = row_num + col_num = 1 + for value in row: + # Select column metadata based on column index + col = cols[col_num - 1] + col_skipped = col.get('columnSkipped') + if not col_skipped: + col_name = col.get('columnName') + col_type = col.get('columnType') + # Convert dates/times from Lotus Notes Serial Numbers + # DATE-TIME + if col_type == 'numberType.DATE_TIME': + if isinstance(value, (int, float)): + col_val = excel_to_dttm_str(value) + else: col_val = str(value) - else: - col_val = str(value) - elif col_type == 'numberType': - if isinstance(value, int): - col_val = int(value) - else: - try: - col_val = float(value) - except ValueError: + # DATE + elif col_type == 'numberType.DATE': + if isinstance(value, (int, float)): + col_val = excel_to_dttm_str(value)[:10] + else: col_val = str(value) - elif col_type == 'stringValue': - col_val = str(value) - elif col_type == 'boolValue': - if isinstance(value, bool): - col_val = value - elif isinstance(value, str): - if value.lower() in ('true', 't', 'yes', 'y'): - col_val = True - elif value.lower() in ('false', 'f', 'no', 'n'): - col_val = False + # TIME ONLY (NO DATE) + elif col_type == 'numberType.TIME': + if isinstance(value, (int, float)): + try: + total_secs = value * 86400 # seconds in day + # Create string formatted like HH:MM:SS + col_val = str(timedelta(seconds=total_secs)) + except ValueError: + col_val = str(value) else: col_val = str(value) - elif isinstance(value, int): - if value in (1, -1): - col_val = True - elif value == 0: - col_val = False + # NUMBER (INTEGER AND FLOAT) + elif col_type == 'numberType': + if isinstance(value, int): + col_val = int(value) + elif isinstance(value, float): + # Determine float decimal digits + decimal_digits = str(value)[::-1].find('.') + if decimal_digits > 15: + try: + # ROUND to multipleOf: 1e-15 + col_val = float(round(value, 15)) + except ValueError: + col_val = str(value) + else: # decimal_digits <= 15, no rounding + try: + col_val = float(value) + except ValueError: + col_val = str(value) else: col_val = str(value) - - else: - col_val = value - sheet_data_row_tf[col_name] = col_val - col_num = col_num + 1 - sheet_data_tf.append(sheet_data_row_tf) + # STRING + elif col_type == 'stringValue': + col_val = str(value) + # BOOLEAN + elif col_type == 'boolValue': + if isinstance(value, bool): + col_val = value + elif isinstance(value, str): + if value.lower() in ('true', 't', 'yes', 'y'): + col_val = True + elif value.lower() in ('false', 'f', 'no', 'n'): + col_val = False + else: + col_val = str(value) + elif isinstance(value, int): + if value in (1, -1): + col_val = True + elif value == 0: + col_val = False + else: + col_val = str(value) + # OTHER: Convert everything else to a string + else: + col_val = str(value) + sheet_data_row_tf[col_name] = col_val + col_num = col_num + 1 + # APPEND non-empty row + sheet_data_tf.append(sheet_data_row_tf) row_num = row_num + 1 - return sheet_data_tf, row_num, is_last_row + return sheet_data_tf, row_num def sync(client, config, catalog, state): @@ -327,7 +348,7 @@ def sync(client, config, catalog, state): return # Sync file_metadata if selected sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted) - write_bookmark(state, stream_name, strftime(this_datetime)) + # file_metadata bookmark is updated at the end of sync # SPREADSHEET_METADATA spreadsheet_metadata = {} @@ -363,7 +384,7 @@ def sync(client, config, catalog, state): # GET sheet_metadata and columns sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) - LOGGER.info('sheet_schema: {}'.format(sheet_schema)) + # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) # Transform sheet_metadata sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) @@ -414,7 +435,7 @@ def sync(client, config, catalog, state): sheet_data_rows = sheet_data.get('values') # Transform batch of rows to JSON with keys for each column - sheet_data_tf, row_num, is_last_row = transform_sheet_data( + sheet_data_tf, row_num = transform_sheet_data( spreadsheet_id=spreadsheet_id, sheet_id=sheet_id, from_row=from_row, @@ -429,7 +450,7 @@ def sync(client, config, catalog, state): stream_name=sheet_title, records=sheet_data_tf, time_extracted=ss_time_extracted) - LOGGER.info('Sheet: {}, ecords processed: {}'.format( + LOGGER.info('Sheet: {}, records processed: {}'.format( sheet_title, record_count)) # Update paging from/to_row for next batch @@ -458,7 +479,7 @@ def sync(client, config, catalog, state): singer.write_message(activate_version_message) LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( - sheet_title, row_num - 1)) + sheet_title, row_num - 2)) # subtract 1 for header row stream_name = 'sheet_metadata' # Sync sheet_metadata if selected @@ -468,4 +489,7 @@ def sync(client, config, catalog, state): # Sync sheet_metadata if selected sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) + # Update file_metadata bookmark + write_bookmark(state, 'file_metadata', strftime(this_datetime)) + return -- cgit v1.2.3