import os import json import re import urllib.parse from collections import OrderedDict import singer from singer import metadata from tap_google_sheets.streams import STREAMS LOGGER = singer.get_logger() # Reference: # https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata # Convert column index to column letter def colnum_string(num): string = "" while num > 0: num, remainder = divmod(num - 1, 26) string = chr(65 + remainder) + string return string # Create sheet_metadata_json with columns from sheet def get_sheet_schema_columns(sheet): sheet_title = sheet.get('properties', {}).get('title') sheet_json_schema = OrderedDict() data = next(iter(sheet.get('data', [])), {}) row_data = data.get('rowData', []) if row_data == []: # Empty sheet, SKIP LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title)) return None, None # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse headers = row_data[0].get('values', []) first_values = row_data[1].get('values', []) # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) sheet_json_schema = { 'type': 'object', 'additionalProperties': False, 'properties': { '__sdc_spreadsheet_id': { 'type': ['null', 'string'] }, '__sdc_sheet_id': { 'type': ['null', 'integer'] }, '__sdc_row': { 'type': ['null', 'integer'] } } } header_list = [] # used for checking uniqueness columns = [] prior_header = None i = 0 skipped = 0 # Read column headers until end or 2 consecutive skipped headers for header in headers: # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True))) column_index = i + 1 column_letter = colnum_string(column_index) header_value = header.get('formattedValue') if header_value: # NOT skipped column_is_skipped = False skipped = 0 column_name = '{}'.format(header_value) if column_name in header_list: raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( sheet_title, column_name, column_letter)) header_list.append(column_name) first_value = None try: first_value = first_values[i] except IndexError as err: LOGGER.info('NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2. {}'.format( sheet_title, column_name, column_letter, err)) first_value = {} first_values.append(first_value) pass column_effective_value = first_value.get('effectiveValue', {}) col_val = None if column_effective_value == {}: column_effective_value_type = 'stringValue' LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( sheet_title, column_name, column_letter)) LOGGER.info(' Setting column datatype to STRING') else: for key, val in column_effective_value.items(): if key in ('numberValue', 'stringValue', 'boolValue'): column_effective_value_type = key col_val = str(val) elif key in ('errorType', 'formulaType'): col_val = str(val) raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( sheet_title, column_name, column_letter, key, col_val)) column_number_format = first_values[i].get('effectiveFormat', {}).get( 'numberFormat', {}) column_number_format_type = column_number_format.get('type') # Determine datatype for sheet_json_schema # # column_effective_value_type = numberValue, stringValue, boolValue; # INVALID: errorType, formulaType # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue # # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, # TIME, DATE_TIME, SCIENTIFIC # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType # column_format = None # Default if column_effective_value == {}: col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( sheet_title, column_name, column_letter)) LOGGER.info(' Setting column datatype to STRING') elif column_effective_value_type == 'stringValue': col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' elif column_effective_value_type == 'boolValue': col_properties = {'type': ['null', 'boolean', 'string']} column_gs_type = 'boolValue' elif column_effective_value_type == 'numberValue': if column_number_format_type == 'DATE_TIME': col_properties = { 'type': ['null', 'string'], 'format': 'date-time' } column_gs_type = 'numberType.DATE_TIME' elif column_number_format_type == 'DATE': col_properties = { 'type': ['null', 'string'], 'format': 'date' } column_gs_type = 'numberType.DATE' elif column_number_format_type == 'TIME': col_properties = { 'type': ['null', 'string'], 'format': 'time' } column_gs_type = 'numberType.TIME' elif column_number_format_type == 'TEXT': col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' else: # Interesting - order in the anyOf makes a difference. # Number w/ multipleOf must be listed last, otherwise errors occur. col_properties = {'type': 'number', 'multipleOf': 1e-15} column_gs_type = 'numberType' # Catch-all to deal with other types and set to string # column_effective_value_type: formulaValue, errorValue, or other else: col_properties = {'type': ['null', 'string']} column_gs_type = 'unsupportedValue' LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( sheet_title, column_name, column_letter, column_effective_value_type, col_val)) LOGGER.info('Converting to string.') else: # skipped column_is_skipped = True skipped = skipped + 1 column_index_str = str(column_index).zfill(2) column_name = '__sdc_skip_col_{}'.format(column_index_str) col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( sheet_title, column_name, column_letter)) LOGGER.info(' This column will be skipped during data loading.') if skipped >= 2: # skipped = 2 consecutive skipped headers # Remove prior_header column_name sheet_json_schema['properties'].pop(prior_header, None) LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( sheet_title, column_name, column_letter)) break else: column = {} column = { 'columnIndex': column_index, 'columnLetter': column_letter, 'columnName': column_name, 'columnType': column_gs_type, 'columnSkipped': column_is_skipped } columns.append(column) if column_gs_type in {'numberType.DATE_TIME', 'numberType.DATE', 'numberType.TIME', 'numberType'}: col_properties = { 'anyOf': [ col_properties, {'type': ['null', 'string']} ] } sheet_json_schema['properties'][column_name] = col_properties prior_header = column_name i = i + 1 return sheet_json_schema, columns # Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query # endpoint: spreadsheets/{spreadsheet_id} # params: includeGridData = true, ranges = '{sheet_title}'!1:2 # This endpoint includes detailed metadata about each cell - incl. data type, formatting, etc. def get_sheet_metadata(sheet, spreadsheet_id, client): sheet_id = sheet.get('properties', {}).get('sheetId') sheet_title = sheet.get('properties', {}).get('title') LOGGER.info('sheet_id = {}, sheet_title = {}'.format(sheet_id, sheet_title)) stream_name = 'sheet_metadata' stream_metadata = STREAMS.get(stream_name) api = stream_metadata.get('api', 'sheets') params = stream_metadata.get('params', {}) sheet_title_encoded = urllib.parse.quote_plus(sheet_title) sheet_title_escaped = re.escape(sheet_title) querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in \ params.items()]).replace('{sheet_title}', sheet_title_encoded) path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \ spreadsheet_id), querystring) sheet_md_results = client.get(path=path, api=api, endpoint=sheet_title_escaped) # sheet_metadata: 1st `sheets` node in results sheet_metadata = sheet_md_results.get('sheets')[0] # Create sheet_json_schema (for discovery/catalog) and columns (for sheet_metadata results) try: sheet_json_schema, columns = get_sheet_schema_columns(sheet_metadata) except Exception as err: LOGGER.warning('{}'.format(err)) LOGGER.warning('SKIPPING Malformed sheet: {}'.format(sheet_title)) sheet_json_schema, columns = None, None return sheet_json_schema, columns def get_abs_path(path): return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) def get_schemas(client, spreadsheet_id): schemas = {} field_metadata = {} for stream_name, stream_metadata in STREAMS.items(): schema_path = get_abs_path('schemas/{}.json'.format(stream_name)) with open(schema_path) as file: schema = json.load(file) schemas[stream_name] = schema mdata = metadata.new() # Documentation: # https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#singer-python-helper-functions # Reference: # https://github.com/singer-io/singer-python/blob/master/singer/metadata.py#L25-L44 mdata = metadata.get_standard_metadata( schema=schema, key_properties=stream_metadata.get('key_properties', None), valid_replication_keys=stream_metadata.get('replication_keys', None), replication_method=stream_metadata.get('replication_method', None) ) field_metadata[stream_name] = mdata if stream_name == 'spreadsheet_metadata': api = stream_metadata.get('api', 'sheets') params = stream_metadata.get('params', {}) querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]) path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \ spreadsheet_id), querystring) # GET spreadsheet_metadata, which incl. sheets (basic metadata for each worksheet) spreadsheet_md_results = client.get(path=path, params=querystring, api=api, \ endpoint=stream_name) sheets = spreadsheet_md_results.get('sheets') if sheets: # Loop thru each worksheet in spreadsheet for sheet in sheets: # GET sheet_json_schema for each worksheet (from function above) sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) # SKIP empty sheets (where sheet_json_schema and columns are None) if sheet_json_schema and columns: sheet_title = sheet.get('properties', {}).get('title') schemas[sheet_title] = sheet_json_schema sheet_mdata = metadata.new() sheet_mdata = metadata.get_standard_metadata( schema=sheet_json_schema, key_properties=['__sdc_row'], valid_replication_keys=None, replication_method='FULL_TABLE' ) field_metadata[sheet_title] = sheet_mdata return schemas, field_metadata