import os
import json
import re
import urllib.parse
from collections import OrderedDict
import singer
from singer import metadata
from tap_google_sheets.streams import STREAMS
LOGGER = singer.get_logger()
Convert column index to column letter
def colnum_string(num):
string = ""
while num > 0:
num, remainder = divmod(num - 1, 26)
string = chr(65 + remainder) + string
return string
The goal of this function is to get the JSON schema of the sheet you pass in. Our return values here
are sheet_json_schema
and columns
, an OrderedDict
and a list respectively.
This function is massive and we will discuss it in the following parts:
Part 1 is just setting up constants and variables. We can skim through this part.
Part 2 is split into two parts because it’s a loop over the column and there’s two ways to handle a column.
We’ll consider 2A to be the “skip this column” case.
We’ll consider 2B as the “not skipped” case. In which we determine a field’s type (Part 3) and then use the type to decide the JSON Schema (Part 4).
def get_sheet_schema_columns(sheet):
The input to this function is shaped like
{
"data" : [
{
"rowData": [
{"values": <thing 1>},
{"values": <thing 2>}
]
}
]
}
Return Values
columns
column
that goes into columns
is a dictionary with keys "columnIndex"
,
"columnLetter"
, "columnName"
, "columnType"
, and "columnSkipped"
.sheet_json_schema
col_properties
that goes into sheet_json_schema['properties'][column_name]
is the JSON
schema of column_name
. sheet_title = sheet.get('properties', {}).get('title')
sheet_json_schema = OrderedDict()
data = next(iter(sheet.get('data', [])), {})
row_data = data.get('rowData', [])
if row_data == []:
LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
return None, None
So this function starts by unpacking it into two lists, headers
and first_values
, which is
“thing 1” and “thing 2” respectively.
headers = row_data[0].get('values', [])
first_values = row_data[1].get('values', [])
All of the objects in headers
and first_values
have the following shape:
{
"userEnteredValue": {"stringValue": "time1"},
"effectiveValue": {"stringValue": "time1"},
"formattedValue": "time1",
"userEnteredFormat": {...},
"effectiveFormat": {}
}
The base Sheet schema
sheet_json_schema = {
'type': 'object',
'additionalProperties': False,
'properties': {
'__sdc_spreadsheet_id': {
'type': ['null', 'string']
},
'__sdc_sheet_id': {
'type': ['null', 'integer']
},
'__sdc_row': {
'type': ['null', 'integer']
}
}
}
header_list = [] # used for checking uniqueness
columns = []
prior_header = None
i = 0
skipped = 0
We loop over the columns in the headers
list and accummulate an object in each return
variable.
for header in headers:
column_index = i + 1
column_letter = colnum_string(column_index)
header_value = header.get('formattedValue')
if header_value: # NOT skipped
Assuming the column we are looking at does not get skipped, we have to figure out the schema.
column_is_skipped = False
First we reset the counter for consecutive skipped columns.
skipped = 0
Then we let the name of this column be the value of formattedValue
from the header
object we are looking at. This seems to be the value rendered in Google Sheets in the
cell.
column_name = '{}'.format(header_value)
We assert that this column name is unique or else we raise a “Duplicate Header Error”.
if column_name in header_list:
raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format(
sheet_title, column_name, column_letter))
header_list.append(column_name)
We attempt to grab the value in the second row of the sheet (the first row of data)
associated with this column. Remember this row we are looking at is stored in
first_values
. Note again that headers
and first_values
have the same shape.
first_value = None
try:
first_value = first_values[i]
except IndexError as err:
LOGGER.info('NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2. {}'.format(
sheet_title, column_name, column_letter, err))
first_value = {}
first_values.append(first_value)
pass
column_effective_value = first_value.get('effectiveValue', {})
col_val = None
if column_effective_value == {}:
column_effective_value_type = 'stringValue'
LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format(
sheet_title, column_name, column_letter))
LOGGER.info(' Setting column datatype to STRING')
else:
The tap calls the value of "effectiveValue"
the column_effective_value
. This
dictionary can be empty or it can have a key1
that looks like "numberValue"
,
"stringValue"
, or "boolValue"
. If the dictionary is empty, we force key1
to
be "stringValue"
.
for key, val in column_effective_value.items():
if key in ('numberValue', 'stringValue', 'boolValue'):
column_effective_value_type = key
col_val = str(val)
Sometimes key1
also looks like "errorType"
or "formulaType"
, but in
these cases, we raise a “Data Type Error” error immediately.
elif key in ('errorType', 'formulaType'):
col_val = str(val)
raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
sheet_title, column_name, column_letter, key, col_val))
column_number_format = first_values[i].get('effectiveFormat', {}).get(
'numberFormat', {})
column_number_format_type = UNSPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE
column_number_format_type = column_number_format.get('type')
the giant if-elif-else block: All it does is set a variable col_properties
and
column_gs_type
based on the values of column_effective_value_type
and
column_number_format_type
.
column_format = None
if column_effective_value == {}:
col_properties = {'type': ['null', 'string']}
column_gs_type = 'stringValue'
LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format(
sheet_title, column_name, column_letter))
LOGGER.info(' Setting column datatype to STRING')
column_effective_value_type = numberValue, stringValue, boolValue
elif column_effective_value_type == 'stringValue':
col_properties = {'type': ['null', 'string']}
column_gs_type = 'stringValue'
elif column_effective_value_type == 'boolValue':
col_properties = {'type': ['null', 'boolean', 'string']}
column_gs_type = 'boolValue'
elif column_effective_value_type == 'numberValue':
if column_number_format_type == 'DATE_TIME':
col_properties = {
'type': ['null', 'string'],
'format': 'date-time'
}
column_gs_type = 'numberType.DATE_TIME'
elif column_number_format_type == 'DATE':
col_properties = {
'type': ['null', 'string'],
'format': 'date'
}
column_gs_type = 'numberType.DATE'
elif column_number_format_type == 'TIME':
col_properties = {
'type': ['null', 'string'],
'format': 'time'
}
column_gs_type = 'numberType.TIME'
elif column_number_format_type == 'TEXT':
col_properties = {'type': ['null', 'string']}
column_gs_type = 'stringValue'
else:
col_properties = {'type': 'number', 'multipleOf': 1e-15}
column_gs_type = 'numberType'
else:
col_properties = {'type': ['null', 'string']}
column_gs_type = 'unsupportedValue'
LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
sheet_title, column_name, column_letter, column_effective_value_type, col_val))
LOGGER.info('Converting to string.')
else: # skipped
We note that we are skipping this column. It still gets added to the schema though as
a string field. The only other notable thing about skipped columns is the we create
the field name for it, and it looks like "__sdc_skip_col_XY"
, where the XY
goes
from "00"
, "01"
, to "99"
.
column_is_skipped = True
skipped = skipped + 1
column_index_str = str(column_index).zfill(2)
column_name = '__sdc_skip_col_{}'.format(column_index_str)
col_properties = {'type': ['null', 'string']}
column_gs_type = 'stringValue'
LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format(
sheet_title, column_name, column_letter))
LOGGER.info(' This column will be skipped during data loading.')
if skipped >= 2:
sheet_json_schema['properties'].pop(prior_header, None)
LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format(
sheet_title, column_name, column_letter))
break
else:
column = {}
column = {
'columnIndex': column_index,
'columnLetter': column_letter,
'columnName': column_name,
'columnType': column_gs_type,
'columnSkipped': column_is_skipped
}
columns.append(column)
if column_gs_type in {'numberType.DATE_TIME', 'numberType.DATE', 'numberType.TIME', 'numberType'}:
col_properties = {
'anyOf': [
col_properties,
{'type': ['null', 'string']}
]
}
sheet_json_schema['properties'][column_name] = col_properties
prior_header = column_name
i = i + 1
return sheet_json_schema, columns
The point of this function seems to be (1) make a request to get a sheet (2) return the schema
generated for this sheet by schema.py:get_sheet_schema_columns
.
get_sheet_metadata()
sets up a lot of variables to ultimately make a request to
https://sheets.googleapis.com/v4/spreadsheets/my-spreadsheet-id?includeGridData=true&ranges='my-sheet-title'!1:2
Let’s dissect the query params here a bit.
includeGridData
is false by default and setting this to true lets us get “Grid data”. If you
compare the same request but with that value flipped, then you’ll notice the includeGridData=false
gives you a relatively small response with no data in it. It seems like just a bunch of metadata.
ranges
controls the rows returned.
def get_sheet_metadata(sheet, spreadsheet_id, client):
Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query
sheet_id = sheet.get('properties', {}).get('sheetId')
sheet_title = sheet.get('properties', {}).get('title')
LOGGER.info('sheet_id = {}, sheet_title = {}'.format(sheet_id, sheet_title))
stream_name = 'sheet_metadata'
stream_metadata = STREAMS.get(stream_name)
api = stream_metadata.get('api', 'sheets')
params = stream_metadata.get('params', {})
sheet_title_encoded = urllib.parse.quote_plus(sheet_title)
sheet_title_escaped = re.escape(sheet_title)
querystring = '&'.join(
['%s=%s' % (key, value) for (key, value) in params.items()]
).replace('{sheet_title}', sheet_title_encoded)
path = '{}?{}'.format(
stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id),
querystring
)
See the Footnotes for this response shape
sheet_md_results = client.get(path=path, api=api, endpoint=sheet_title_escaped)
sheet_metadata = sheet_md_results.get('sheets')[0]
try:
Create sheet_json_schema (for discovery/catalog) and columns (for sheet_metadata results)
sheet_json_schema, columns = get_sheet_schema_columns(sheet_metadata)
except Exception as err:
LOGGER.warning('{}'.format(err))
LOGGER.warning('SKIPPING Malformed sheet: {}'.format(sheet_title))
sheet_json_schema, columns = None, None
return sheet_json_schema, columns
def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
We initialize our return variables, schemas
and field_metadata
to empty dictionaries.
We loop over each stream in streams.py:STREAMS
. We load the static JSON file into memory - all
four streams currently have some static schema. We store this on our return variable schemas
under the stream name.
We then call singer.metadata.get_standard_metadata()
passing in whatever metadata we do have
(key properties, valid replication keys, the replication method). The return value here is
stored on our return variable field_metadata
under the stream name.
def get_schemas(client, spreadsheet_id):
schemas = {}
field_metadata = {}
for stream_name, stream_metadata in STREAMS.items():
schema_path = get_abs_path('schemas/{}.json'.format(stream_name))
with open(schema_path) as file:
schema = json.load(file)
schemas[stream_name] = schema
mdata = metadata.new()
mdata = metadata.get_standard_metadata(
schema=schema,
key_properties=stream_metadata.get('key_properties', None),
valid_replication_keys=stream_metadata.get('replication_keys', None),
replication_method=stream_metadata.get('replication_method', None)
)
field_metadata[stream_name] = mdata
If we are handling the "spreadsheet_metadata"
stream, we do some extra work to build the
dynamic schemas of each Sheet we want to sync.. Otherwise, that’s it.
if stream_name == 'spreadsheet_metadata':
We ultimately end up making a GET
to
https://sheets.googleapis.com/v4/spreadsheets/my-spreadsheet-id?includeGridData=false
Notice this is base_url + path + query_string
. There’s code here to figure out and
properly format path
and query_string
. I’m not sure why we don’t let requests
handle this.
We assume this request is successful and we store the OrderedDict
return value as
spreadsheet_md_results
.
api = stream_metadata.get('api', 'sheets')
params = stream_metadata.get('params', {})
querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()])
path = '{}?{}'.format(
stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id),
querystring
)
spreadsheet_md_results = client.get(
path=path,
params=querystring,
api=api,
endpoint=stream_name
)
The response here is one of those “envelope” kinds. The data we care about is under
the "sheets"
key.
sheets = spreadsheet_md_results.get('sheets')
if sheets:
Looping over this array, we call schema.py:get_sheet_metadata
. This gets the
JSON schema of each sheet found in this Google Doc. We use the sheet’s title as
the stream name here.
for sheet in sheets:
sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
if sheet_json_schema and columns:
sheet_title = sheet.get('properties', {}).get('title')
schemas[sheet_title] = sheet_json_schema
sheet_mdata = metadata.new()
sheet_mdata = metadata.get_standard_metadata(
schema=sheet_json_schema,
key_properties=['__sdc_row'],
valid_replication_keys=None,
replication_method='FULL_TABLE'
)
field_metadata[sheet_title] = sheet_mdata
return schemas, field_metadata
The shape of response is like, but note the tap stores this in the recursive OrderedDict
structure
{
"spreadsheetid": "my-id",
"properties": {...},
"sheets": [
{
"properties": {},
"data": [
{
"rowData": [
{
"values": [
{
"userEnteredValue": {"stringValue": "time1"},
"effectiveValue": {"stringValue": "time1"},
"formattedValue": "time1",
"userEnteredFormat": {...},
"effectiveFormat": {}
},
...
],
},
...
]
}
]
},
]
}