aboutsummaryrefslogblamecommitdiffhomepage
path: root/tap_google_sheets/schema.py
blob: 237ab06f9cea44deaa3225a75d286e8852615652 (plain) (tree)



































































































































































































































                                                                                                                             
import os
import json
from collections import OrderedDict
import singer
from singer import metadata
from tap_google_sheets.streams import STREAMS

LOGGER = singer.get_logger()

# Reference:
# https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata

# Convert column index to column letter
def colnum_string(n):
    string = ""
    while n > 0:
        n, remainder = divmod(n - 1, 26)
        string = chr(65 + remainder) + string
    return string


# Create sheet_metadata_json with columns from sheet
def get_sheet_schema_columns(sheet, spreadsheet_id, client):
    sheet_json_schema = OrderedDict()
    data = next(iter(sheet.get('data', [])), {})
    row_data = data.get('rowData',[])
    # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse

    headers = row_data[0].get('values', [])
    first_values = row_data[1].get('values', [])
    # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True)))

    sheet_json_schema['type'] = 'object'
    sheet_json_schema['additionalProperties'] = False
    sheet_json_schema = {
        'type': 'object',
        'additionalProperties': False,
        'properties': {
            '__sdc_spreadsheet_id': {
                'type': ['null', 'string']
            },
            '__sdc_sheet_id': {
                'type': ['null', 'integer']
            },
            '__sdc_row': {
                'type': ['null', 'integer']
            }
        }
    }

    header_list = [] # used for checking uniqueness
    columns = []
    prior_header = None
    i = 0
    skipped = 0
    # Read column headers until end or 2 consecutive skipped headers
    for header in headers:
        # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True)))
        column_index = i + 1
        column_letter = colnum_string(column_index)
        header_value = header.get('formattedValue')
        if header_value: # NOT skipped
            column_is_skipped = False
            skipped = 0
            column_name = '{}'.format(header_value)
            if column_name in header_list:
                raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name))
            else:
                header_list.append(column_name)

            first_value = first_values[i]
            # LOGGER.info('first_value[{}] = {}'.format(i, json.dumps(first_value, indent=2, sort_keys=True)))

            column_effective_value = first_value.get('effectiveValue', {})
            for key in column_effective_value.keys():
                if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'):
                    column_effective_value_type = key

            column_number_format = first_values[i].get('effectiveFormat', {}).get('numberFormat', {})
            column_number_format_type = column_number_format.get('type')

            # Determine datatype for sheet_json_schema
            #
            # column_effective_value_type = numberValue, stringValue, boolValue; INVALID: errorType, formulaType
            #   Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
            #
            # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC
            #   Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
            #
            column_format = None # Default
            # column_multiple_of = None # Default
            if column_effective_value_type in ('formulaValue', 'errorValue'):
                raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name))
            elif column_effective_value_type == 'stringValue':
                column_type = ['null', 'string']
                column_gs_type = 'stringValue'
            elif column_effective_value_type == 'boolValue':
                column_type = ['null', 'boolean', 'string']
                column_gs_type = 'boolValue'
            elif column_effective_value_type == 'numberValue':
                if column_number_format_type == 'DATE_TIME':
                    column_type = ['null', 'string']
                    column_format = 'date-time'
                    column_gs_type = 'numberType.DATE_TIME'
                elif column_number_format_type == 'DATE':
                    column_type = ['null', 'string']
                    column_format = 'date'
                    column_gs_type = 'numberType.DATE'
                elif column_number_format_type == 'TIME':
                    column_type = ['null', 'string']
                    column_format = 'time'
                    column_gs_type = 'numberType.TIME'
                elif column_number_format_type == 'TEXT':
                    column_type = ['null', 'string']
                    column_gs_type = 'stringValue'
                else:
                    column_type = ['null', 'number', 'string']
                    column_gs_type = 'numberType'

        else: # skipped
            column_is_skipped = True
            skipped = skipped + 1
            column_index_str = str(column_index).zfill(2)
            column_name = '__sdc_skip_col_{}'.format(column_index_str)
            column_type = ['null', 'string']
            column_format = None
            column_gs_type = 'stringValue'

        if skipped >= 2:
            # skipped = 2 consecutive skipped headers
            # Remove prior_header column_name
            sheet_json_schema['properties'].pop(prior_header, None)
            column_count = i - 1
            break

        else:
            column = {}
            column = {
                'columnIndex': column_index,
                'columnLetter': column_letter,
                'columnName': column_name,
                'columnType': column_gs_type,
                'columnSkipped': column_is_skipped
            }
            columns.append(column)

            sheet_json_schema['properties'][column_name] = column
            sheet_json_schema['properties'][column_name]['type'] = column_type
            if column_format:
                sheet_json_schema['properties'][column_name]['format'] = column_format

        prior_header = column_name
        i = i + 1

    return sheet_json_schema, columns


def get_sheet_metadata(sheet, spreadsheet_id, client):
    sheet_id = sheet.get('properties', {}).get('sheetId')
    sheet_title = sheet.get('properties', {}).get('title')
    LOGGER.info('sheet_id = {}, sheet_title = {}'.format(sheet_id, sheet_title))

    stream_name = 'sheet_metadata'
    stream_metadata = STREAMS.get(stream_name)
    api = stream_metadata.get('api', 'sheets')
    params = stream_metadata.get('params', {})
    querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace('{sheet_title}', sheet_title)
    path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring)

    sheet_md_results = client.get(path=path, api=api, endpoint=stream_name)
    sheet_cols = sheet_md_results.get('sheets')[0]
    sheet_schema, columns = get_sheet_schema_columns(sheet_cols, spreadsheet_id, client)

    return sheet_schema, columns


def get_abs_path(path):
    return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)

def get_schemas(client, spreadsheet_id):
    schemas = {}
    field_metadata = {}

    for stream_name, stream_metadata in STREAMS.items():
        schema_path = get_abs_path('schemas/{}.json'.format(stream_name))
        with open(schema_path) as file:
            schema = json.load(file)
        schemas[stream_name] = schema
        mdata = metadata.new()

        # Documentation:
        # https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#singer-python-helper-functions
        # Reference:
        # https://github.com/singer-io/singer-python/blob/master/singer/metadata.py#L25-L44
        mdata = metadata.get_standard_metadata(
            schema=schema,
            key_properties=stream_metadata.get('key_properties', None),
            valid_replication_keys=stream_metadata.get('replication_keys', None),
            replication_method=stream_metadata.get('replication_method', None)
        )
        field_metadata[stream_name] = mdata
        
        if stream_name == 'spreadsheet_metadata':
            api = stream_metadata.get('api', 'sheets')
            params = stream_metadata.get('params', {})
            querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()])
            path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring)

            spreadsheet_md_results = client.get(path=path, params=querystring, api=api, endpoint=stream_name)

            sheets = spreadsheet_md_results.get('sheets')
            if sheets:
                for sheet in sheets:
                    sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
                    # LOGGER.info('sheet_schema = {}'.format(json.dumps(sheet_schema, indent=2, sort_keys=True)))

                    sheet_title = sheet.get('properties', {}).get('title')
                    schemas[sheet_title] = sheet_schema
                    sheet_mdata = metadata.new()
                    sheet_mdata = metadata.get_standard_metadata(
                        schema=sheet_schema,
                        key_properties=['__sdc_row'],
                        valid_replication_keys=None,
                        replication_method='FULL_TABLE'
                    )
                    field_metadata[sheet_title] = sheet_mdata
            
    return schemas, field_metadata