schema.py

#
import os
import json
import re
import urllib.parse
from collections import OrderedDict
import singer
from singer import metadata
from tap_google_sheets.streams import STREAMS

LOGGER = singer.get_logger()
#

Convert column index to column letter

def colnum_string(num):
#
    string = ""
    while num > 0:
        num, remainder = divmod(num - 1, 26)
        string = chr(65 + remainder) + string
    return string
#

#

The goal of this function is to get the JSON schema of the sheet you pass in. Our return values here are sheet_json_schema and columns, an OrderedDict and a list respectively.

#

This function is massive and we will discuss it in the following parts:

#
  • Part 1
  • Part 2
    • Part 2A
    • Part 2B
      • Part 3
      • Part 4
#

Part 1 is just setting up constants and variables. We can skim through this part.

#

Part 2 is split into two parts because it’s a loop over the column and there’s two ways to handle a column.

#

We’ll consider 2A to be the “skip this column” case.

#

We’ll consider 2B as the “not skipped” case. In which we determine a field’s type (Part 3) and then use the type to decide the JSON Schema (Part 4).

#

Create sheet_metadata_json with columns from sheet

def get_sheet_schema_columns(sheet):
#

The input to this function is shaped like

{
  "data" : [
    {
      "rowData": [
        {"values": <thing 1>},
        {"values": <thing 2>}
      ]
    }
  ]
}

Return Values

  • columns

    • A column that goes into columns is a dictionary with keys "columnIndex", "columnLetter", "columnName", "columnType", and "columnSkipped".
  • sheet_json_schema

    • A col_properties that goes into sheet_json_schema['properties'][column_name] is the JSON schema of column_name.
    sheet_title = sheet.get('properties', {}).get('title')
    sheet_json_schema = OrderedDict()
    data = next(iter(sheet.get('data', [])), {})
    row_data = data.get('rowData', [])
    if row_data == []:
        LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
        return None, None
#

So this function starts by unpacking it into two lists, headers and first_values, which is “thing 1” and “thing 2” respectively.

    headers = row_data[0].get('values', [])
    first_values = row_data[1].get('values', [])
#

All of the objects in headers and first_values have the following shape:

#
{
    "userEnteredValue": {"stringValue": "time1"},
    "effectiveValue": {"stringValue": "time1"},
    "formattedValue": "time1",
    "userEnteredFormat": {...},
    "effectiveFormat": {}
}
#

The base Sheet schema

    sheet_json_schema = {
        'type': 'object',
        'additionalProperties': False,
        'properties': {
            '__sdc_spreadsheet_id': {
                'type': ['null', 'string']
            },
            '__sdc_sheet_id': {
                'type': ['null', 'integer']
            },
            '__sdc_row': {
                'type': ['null', 'integer']
            }
        }
    }

    header_list = [] # used for checking uniqueness
    columns = []
    prior_header = None
    i = 0
    skipped = 0
#

We loop over the columns in the headers list and accummulate an object in each return variable.

    for header in headers:
        column_index = i + 1
        column_letter = colnum_string(column_index)
        header_value = header.get('formattedValue')
        if header_value: # NOT skipped
#

Assuming the column we are looking at does not get skipped, we have to figure out the schema.

            column_is_skipped = False
#

First we reset the counter for consecutive skipped columns.

            skipped = 0
#

Then we let the name of this column be the value of formattedValue from the header object we are looking at. This seems to be the value rendered in Google Sheets in the cell.

            column_name = '{}'.format(header_value)
#

We assert that this column name is unique or else we raise a “Duplicate Header Error”.

            if column_name in header_list:
                raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format(
                    sheet_title, column_name, column_letter))
            header_list.append(column_name)
#

We attempt to grab the value in the second row of the sheet (the first row of data) associated with this column. Remember this row we are looking at is stored in first_values. Note again that headers and first_values have the same shape.

            first_value = None
            try:
                first_value = first_values[i]
            except IndexError as err:
                LOGGER.info('NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2. {}'.format(
                    sheet_title, column_name, column_letter, err))
                first_value = {}
                first_values.append(first_value)
                pass

            column_effective_value = first_value.get('effectiveValue', {})

            col_val = None
            if column_effective_value == {}:
                column_effective_value_type = 'stringValue'
                LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format(
                    sheet_title, column_name, column_letter))
                LOGGER.info('   Setting column datatype to STRING')
            else:
#

The tap calls the value of "effectiveValue" the column_effective_value. This dictionary can be empty or it can have a key1 that looks like "numberValue", "stringValue", or "boolValue". If the dictionary is empty, we force key1 to be "stringValue".

                for key, val in column_effective_value.items():
                    if key in ('numberValue', 'stringValue', 'boolValue'):
                        column_effective_value_type = key
                        col_val = str(val)
#

Sometimes key1 also looks like "errorType" or "formulaType", but in these cases, we raise a “Data Type Error” error immediately.

                    elif key in ('errorType', 'formulaType'):
                        col_val = str(val)
                        raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
                            sheet_title, column_name, column_letter, key, col_val))

            column_number_format = first_values[i].get('effectiveFormat', {}).get(
                'numberFormat', {})
#

column_number_format_type = UNSPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE

  • TIME, DATE_TIME, SCIENTIFIC
  • https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
            column_number_format_type = column_number_format.get('type')
#

the giant if-elif-else block: All it does is set a variable col_properties and column_gs_type based on the values of column_effective_value_type and column_number_format_type.

            column_format = None
            if column_effective_value == {}:
                col_properties = {'type': ['null', 'string']}
                column_gs_type = 'stringValue'
                LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format(
                        sheet_title, column_name, column_letter))
                LOGGER.info('   Setting column datatype to STRING')
#

column_effective_value_type = numberValue, stringValue, boolValue

  • INVALID: errorType, formulaType
  • https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
            elif column_effective_value_type == 'stringValue':
                col_properties = {'type': ['null', 'string']}
                column_gs_type = 'stringValue'
            elif column_effective_value_type == 'boolValue':
                col_properties = {'type': ['null', 'boolean', 'string']}
                column_gs_type = 'boolValue'
            elif column_effective_value_type == 'numberValue':
                if column_number_format_type == 'DATE_TIME':
                    col_properties = {
                        'type': ['null', 'string'],
                        'format': 'date-time'
                    }
                    column_gs_type = 'numberType.DATE_TIME'
                elif column_number_format_type == 'DATE':
                    col_properties = {
                        'type': ['null', 'string'],
                        'format': 'date'
                    }
                    column_gs_type = 'numberType.DATE'
                elif column_number_format_type == 'TIME':
                    col_properties = {
                        'type': ['null', 'string'],
                        'format': 'time'
                    }
                    column_gs_type = 'numberType.TIME'
                elif column_number_format_type == 'TEXT':
                    col_properties = {'type': ['null', 'string']}
                    column_gs_type = 'stringValue'
                else:
                    col_properties = {'type': 'number', 'multipleOf': 1e-15}
                    column_gs_type = 'numberType'
            else:
                col_properties = {'type': ['null', 'string']}
                column_gs_type = 'unsupportedValue'
                LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
                        sheet_title, column_name, column_letter, column_effective_value_type, col_val))
                LOGGER.info('Converting to string.')
        else: # skipped
#

We note that we are skipping this column. It still gets added to the schema though as a string field. The only other notable thing about skipped columns is the we create the field name for it, and it looks like "__sdc_skip_col_XY", where the XY goes from "00", "01", to "99".

            column_is_skipped = True
            skipped = skipped + 1
            column_index_str = str(column_index).zfill(2)
            column_name = '__sdc_skip_col_{}'.format(column_index_str)
            col_properties = {'type': ['null', 'string']}
            column_gs_type = 'stringValue'
            LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format(
                sheet_title, column_name, column_letter))
            LOGGER.info('  This column will be skipped during data loading.')

        if skipped >= 2:
            sheet_json_schema['properties'].pop(prior_header, None)
            LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format(
                sheet_title, column_name, column_letter))
            break

        else:
            column = {}
            column = {
                'columnIndex': column_index,
                'columnLetter': column_letter,
                'columnName': column_name,
                'columnType': column_gs_type,
                'columnSkipped': column_is_skipped
            }
            columns.append(column)

            if column_gs_type in {'numberType.DATE_TIME', 'numberType.DATE', 'numberType.TIME', 'numberType'}:
                col_properties = {
                    'anyOf': [
                        col_properties,
                        {'type': ['null', 'string']}
                    ]
                }

            sheet_json_schema['properties'][column_name] = col_properties

        prior_header = column_name
        i = i + 1

    return sheet_json_schema, columns
#

The point of this function seems to be (1) make a request to get a sheet (2) return the schema generated for this sheet by schema.py:get_sheet_schema_columns.

get_sheet_metadata() sets up a lot of variables to ultimately make a request to

https://sheets.googleapis.com/v4/spreadsheets/my-spreadsheet-id?includeGridData=true&ranges='my-sheet-title'!1:2
#

Let’s dissect the query params here a bit.

#

includeGridData is false by default and setting this to true lets us get “Grid data”. If you compare the same request but with that value flipped, then you’ll notice the includeGridData=false gives you a relatively small response with no data in it. It seems like just a bunch of metadata.

#

ranges controls the rows returned.

def get_sheet_metadata(sheet, spreadsheet_id, client):
#

Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query

  • endpoint: spreadsheets/{spreadsheet_id}
  • params: includeGridData = true, ranges = ‘{sheet_title}’!1:2 This endpoint includes detailed metadata about each cell - incl. data type, formatting, etc.
    sheet_id = sheet.get('properties', {}).get('sheetId')
    sheet_title = sheet.get('properties', {}).get('title')
    LOGGER.info('sheet_id = {}, sheet_title = {}'.format(sheet_id, sheet_title))

    stream_name = 'sheet_metadata'
    stream_metadata = STREAMS.get(stream_name)
    api = stream_metadata.get('api', 'sheets')
    params = stream_metadata.get('params', {})
    sheet_title_encoded = urllib.parse.quote_plus(sheet_title)
    sheet_title_escaped = re.escape(sheet_title)
    querystring = '&'.join(
        ['%s=%s' % (key, value) for (key, value) in params.items()]
    ).replace('{sheet_title}', sheet_title_encoded)
    path = '{}?{}'.format(
        stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id),
        querystring
    )
#

See the Footnotes for this response shape

    sheet_md_results = client.get(path=path, api=api, endpoint=sheet_title_escaped)
    sheet_metadata = sheet_md_results.get('sheets')[0]


    try:
#

Create sheet_json_schema (for discovery/catalog) and columns (for sheet_metadata results)

        sheet_json_schema, columns = get_sheet_schema_columns(sheet_metadata)
    except Exception as err:
        LOGGER.warning('{}'.format(err))
        LOGGER.warning('SKIPPING Malformed sheet: {}'.format(sheet_title))
        sheet_json_schema, columns = None, None

    return sheet_json_schema, columns
#
def get_abs_path(path):
    return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
#

We initialize our return variables, schemas and field_metadata to empty dictionaries.

We loop over each stream in streams.py:STREAMS. We load the static JSON file into memory - all four streams currently have some static schema. We store this on our return variable schemas under the stream name.

We then call singer.metadata.get_standard_metadata() passing in whatever metadata we do have (key properties, valid replication keys, the replication method). The return value here is stored on our return variable field_metadata under the stream name.

def get_schemas(client, spreadsheet_id):
#
    schemas = {}
    field_metadata = {}

    for stream_name, stream_metadata in STREAMS.items():
        schema_path = get_abs_path('schemas/{}.json'.format(stream_name))
        with open(schema_path) as file:
            schema = json.load(file)
        schemas[stream_name] = schema
        mdata = metadata.new()

        mdata = metadata.get_standard_metadata(
            schema=schema,
            key_properties=stream_metadata.get('key_properties', None),
            valid_replication_keys=stream_metadata.get('replication_keys', None),
            replication_method=stream_metadata.get('replication_method', None)
        )
        field_metadata[stream_name] = mdata
#

If we are handling the "spreadsheet_metadata" stream, we do some extra work to build the dynamic schemas of each Sheet we want to sync.. Otherwise, that’s it.

        if stream_name == 'spreadsheet_metadata':
#

We ultimately end up making a GET to

#
https://sheets.googleapis.com/v4/spreadsheets/my-spreadsheet-id?includeGridData=false
#

Notice this is base_url + path + query_string. There’s code here to figure out and properly format path and query_string. I’m not sure why we don’t let requests handle this.

#

We assume this request is successful and we store the OrderedDict return value as spreadsheet_md_results.

            api = stream_metadata.get('api', 'sheets')
            params = stream_metadata.get('params', {})
            querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()])
            path = '{}?{}'.format(
                stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id),
                querystring
            )

            spreadsheet_md_results = client.get(
                path=path,
                params=querystring,
                api=api,
                endpoint=stream_name
            )
#

The response here is one of those “envelope” kinds. The data we care about is under the "sheets" key.

            sheets = spreadsheet_md_results.get('sheets')
            if sheets:
#

Looping over this array, we call schema.py:get_sheet_metadata. This gets the JSON schema of each sheet found in this Google Doc. We use the sheet’s title as the stream name here.

                for sheet in sheets:
                    sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)

                    if sheet_json_schema and columns:
                        sheet_title = sheet.get('properties', {}).get('title')
                        schemas[sheet_title] = sheet_json_schema
                        sheet_mdata = metadata.new()
                        sheet_mdata = metadata.get_standard_metadata(
                            schema=sheet_json_schema,
                            key_properties=['__sdc_row'],
                            valid_replication_keys=None,
                            replication_method='FULL_TABLE'
                        )
                        field_metadata[sheet_title] = sheet_mdata

    return schemas, field_metadata
#

Footnotes

The shape of response is like, but note the tap stores this in the recursive OrderedDict structure

#
{
    "spreadsheetid": "my-id",
    "properties": {...},
    "sheets": [
        {
            "properties": {},
            "data": [
                {
                    "rowData": [
                        {
                            "values": [
                                {
                                    "userEnteredValue": {"stringValue": "time1"},
                                    "effectiveValue": {"stringValue": "time1"},
                                    "formattedValue": "time1",
                                    "userEnteredFormat": {...},
                                    "effectiveFormat": {}
                                },
                                ...
                            ],
                        },
                        ...
                    ]
                }
            ]
        },
    ]
}