From 89643ba6fa98db82efd3246805ef801a8bfb5c81 Mon Sep 17 00:00:00 2001 From: Jeff Huth Date: Wed, 13 Nov 2019 17:03:56 -0800 Subject: Initial commit Discovery mode works. Still working on normal sync. --- tap_google_sheets/schema.py | 228 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 tap_google_sheets/schema.py (limited to 'tap_google_sheets/schema.py') diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py new file mode 100644 index 0000000..237ab06 --- /dev/null +++ b/tap_google_sheets/schema.py @@ -0,0 +1,228 @@ +import os +import json +from collections import OrderedDict +import singer +from singer import metadata +from tap_google_sheets.streams import STREAMS + +LOGGER = singer.get_logger() + +# Reference: +# https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata + +# Convert column index to column letter +def colnum_string(n): + string = "" + while n > 0: + n, remainder = divmod(n - 1, 26) + string = chr(65 + remainder) + string + return string + + +# Create sheet_metadata_json with columns from sheet +def get_sheet_schema_columns(sheet, spreadsheet_id, client): + sheet_json_schema = OrderedDict() + data = next(iter(sheet.get('data', [])), {}) + row_data = data.get('rowData',[]) + # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse + + headers = row_data[0].get('values', []) + first_values = row_data[1].get('values', []) + # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) + + sheet_json_schema['type'] = 'object' + sheet_json_schema['additionalProperties'] = False + sheet_json_schema = { + 'type': 'object', + 'additionalProperties': False, + 'properties': { + '__sdc_spreadsheet_id': { + 'type': ['null', 'string'] + }, + '__sdc_sheet_id': { + 'type': ['null', 'integer'] + }, + '__sdc_row': { + 'type': ['null', 'integer'] + } + } + } + + header_list = [] # used for checking uniqueness + columns = [] + prior_header = None + i = 0 + skipped = 0 + # Read column headers until end or 2 consecutive skipped headers + for header in headers: + # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True))) + column_index = i + 1 + column_letter = colnum_string(column_index) + header_value = header.get('formattedValue') + if header_value: # NOT skipped + column_is_skipped = False + skipped = 0 + column_name = '{}'.format(header_value) + if column_name in header_list: + raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) + else: + header_list.append(column_name) + + first_value = first_values[i] + # LOGGER.info('first_value[{}] = {}'.format(i, json.dumps(first_value, indent=2, sort_keys=True))) + + column_effective_value = first_value.get('effectiveValue', {}) + for key in column_effective_value.keys(): + if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): + column_effective_value_type = key + + column_number_format = first_values[i].get('effectiveFormat', {}).get('numberFormat', {}) + column_number_format_type = column_number_format.get('type') + + # Determine datatype for sheet_json_schema + # + # column_effective_value_type = numberValue, stringValue, boolValue; INVALID: errorType, formulaType + # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue + # + # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC + # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType + # + column_format = None # Default + # column_multiple_of = None # Default + if column_effective_value_type in ('formulaValue', 'errorValue'): + raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name)) + elif column_effective_value_type == 'stringValue': + column_type = ['null', 'string'] + column_gs_type = 'stringValue' + elif column_effective_value_type == 'boolValue': + column_type = ['null', 'boolean', 'string'] + column_gs_type = 'boolValue' + elif column_effective_value_type == 'numberValue': + if column_number_format_type == 'DATE_TIME': + column_type = ['null', 'string'] + column_format = 'date-time' + column_gs_type = 'numberType.DATE_TIME' + elif column_number_format_type == 'DATE': + column_type = ['null', 'string'] + column_format = 'date' + column_gs_type = 'numberType.DATE' + elif column_number_format_type == 'TIME': + column_type = ['null', 'string'] + column_format = 'time' + column_gs_type = 'numberType.TIME' + elif column_number_format_type == 'TEXT': + column_type = ['null', 'string'] + column_gs_type = 'stringValue' + else: + column_type = ['null', 'number', 'string'] + column_gs_type = 'numberType' + + else: # skipped + column_is_skipped = True + skipped = skipped + 1 + column_index_str = str(column_index).zfill(2) + column_name = '__sdc_skip_col_{}'.format(column_index_str) + column_type = ['null', 'string'] + column_format = None + column_gs_type = 'stringValue' + + if skipped >= 2: + # skipped = 2 consecutive skipped headers + # Remove prior_header column_name + sheet_json_schema['properties'].pop(prior_header, None) + column_count = i - 1 + break + + else: + column = {} + column = { + 'columnIndex': column_index, + 'columnLetter': column_letter, + 'columnName': column_name, + 'columnType': column_gs_type, + 'columnSkipped': column_is_skipped + } + columns.append(column) + + sheet_json_schema['properties'][column_name] = column + sheet_json_schema['properties'][column_name]['type'] = column_type + if column_format: + sheet_json_schema['properties'][column_name]['format'] = column_format + + prior_header = column_name + i = i + 1 + + return sheet_json_schema, columns + + +def get_sheet_metadata(sheet, spreadsheet_id, client): + sheet_id = sheet.get('properties', {}).get('sheetId') + sheet_title = sheet.get('properties', {}).get('title') + LOGGER.info('sheet_id = {}, sheet_title = {}'.format(sheet_id, sheet_title)) + + stream_name = 'sheet_metadata' + stream_metadata = STREAMS.get(stream_name) + api = stream_metadata.get('api', 'sheets') + params = stream_metadata.get('params', {}) + querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace('{sheet_title}', sheet_title) + path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring) + + sheet_md_results = client.get(path=path, api=api, endpoint=stream_name) + sheet_cols = sheet_md_results.get('sheets')[0] + sheet_schema, columns = get_sheet_schema_columns(sheet_cols, spreadsheet_id, client) + + return sheet_schema, columns + + +def get_abs_path(path): + return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) + +def get_schemas(client, spreadsheet_id): + schemas = {} + field_metadata = {} + + for stream_name, stream_metadata in STREAMS.items(): + schema_path = get_abs_path('schemas/{}.json'.format(stream_name)) + with open(schema_path) as file: + schema = json.load(file) + schemas[stream_name] = schema + mdata = metadata.new() + + # Documentation: + # https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#singer-python-helper-functions + # Reference: + # https://github.com/singer-io/singer-python/blob/master/singer/metadata.py#L25-L44 + mdata = metadata.get_standard_metadata( + schema=schema, + key_properties=stream_metadata.get('key_properties', None), + valid_replication_keys=stream_metadata.get('replication_keys', None), + replication_method=stream_metadata.get('replication_method', None) + ) + field_metadata[stream_name] = mdata + + if stream_name == 'spreadsheet_metadata': + api = stream_metadata.get('api', 'sheets') + params = stream_metadata.get('params', {}) + querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]) + path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring) + + spreadsheet_md_results = client.get(path=path, params=querystring, api=api, endpoint=stream_name) + + sheets = spreadsheet_md_results.get('sheets') + if sheets: + for sheet in sheets: + sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) + # LOGGER.info('sheet_schema = {}'.format(json.dumps(sheet_schema, indent=2, sort_keys=True))) + + sheet_title = sheet.get('properties', {}).get('title') + schemas[sheet_title] = sheet_schema + sheet_mdata = metadata.new() + sheet_mdata = metadata.get_standard_metadata( + schema=sheet_schema, + key_properties=['__sdc_row'], + valid_replication_keys=None, + replication_method='FULL_TABLE' + ) + field_metadata[sheet_title] = sheet_mdata + + return schemas, field_metadata -- cgit v1.2.3