first_values = row_data[1].get('values', [])
# LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True)))
- sheet_json_schema['type'] = 'object'
- sheet_json_schema['additionalProperties'] = False
sheet_json_schema = {
'type': 'object',
'additionalProperties': False,
# https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
#
column_format = None # Default
- # column_multiple_of = None # Default
if column_effective_value_type == 'stringValue':
- column_type = ['null', 'string']
+ col_properties = {'type': ['null', 'string']}
column_gs_type = 'stringValue'
elif column_effective_value_type == 'boolValue':
- column_type = ['null', 'boolean', 'string']
+ col_properties = {'type': ['null', 'boolean', 'string']}
column_gs_type = 'boolValue'
elif column_effective_value_type == 'numberValue':
if column_number_format_type == 'DATE_TIME':
- column_type = ['null', 'string']
- column_format = 'date-time'
+ col_properties = {
+ 'type': ['null', 'string'],
+ 'format': 'date-time'
+ }
column_gs_type = 'numberType.DATE_TIME'
elif column_number_format_type == 'DATE':
- column_type = ['null', 'string']
- column_format = 'date'
+ col_properties = {
+ 'type': ['null', 'string'],
+ 'format': 'date'
+ }
column_gs_type = 'numberType.DATE'
elif column_number_format_type == 'TIME':
- column_type = ['null', 'string']
- column_format = 'time'
+ col_properties = {
+ 'type': ['null', 'string'],
+ 'format': 'time'
+ }
column_gs_type = 'numberType.TIME'
elif column_number_format_type == 'TEXT':
- column_type = ['null', 'string']
+ col_properties = {'type': ['null', 'string']}
column_gs_type = 'stringValue'
else:
- column_type = ['null', 'number', 'string']
+ # Interesting - order in the anyOf makes a difference.
+ # Number w/ multipleOf must be listed last, otherwise errors occur.
+ col_properties = {
+ 'anyOf': [
+ {
+ 'type': 'string'
+ },
+ {
+ 'type': 'null'
+ },
+ {
+ 'type': 'number',
+ 'multipleOf': 1e-15
+ }
+ ]
+ }
column_gs_type = 'numberType'
- elif column_effective_value_type in ('formulaValue', 'errorValue'):
- raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name, \
+ # Catch-all to deal with other types and set to string
+ # column_effective_value_type: formulaValue, errorValue, or other
+ else:
+ col_properties = {'type': ['null', 'string']}
+ column_gs_type = 'unsupportedValue'
+ LOGGER.info('Unsupported data type: {}, value: {}'.format(column_name, \
column_effective_value_type))
+ LOGGER.info('Converting to string.')
else: # skipped
column_is_skipped = True
skipped = skipped + 1
column_index_str = str(column_index).zfill(2)
column_name = '__sdc_skip_col_{}'.format(column_index_str)
- column_type = ['null', 'string']
- column_format = None
+ col_properties = {'type': ['null', 'string']}
column_gs_type = 'stringValue'
if skipped >= 2:
}
columns.append(column)
- sheet_json_schema['properties'][column_name] = column
- sheet_json_schema['properties'][column_name]['type'] = column_type
- if column_format:
- sheet_json_schema['properties'][column_name]['format'] = column_format
+ sheet_json_schema['properties'][column_name] = col_properties
prior_header = column_name
i = i + 1
return sheet_json_schema, columns
+# Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query
+# endpoint: spreadsheets/{spreadsheet_id}
+# params: includeGridData = true, ranges = '{sheet_title}'!1:2
+# This endpoint includes detailed metadata about each cell - incl. data type, formatting, etc.
def get_sheet_metadata(sheet, spreadsheet_id, client):
sheet_id = sheet.get('properties', {}).get('sheetId')
sheet_title = sheet.get('properties', {}).get('title')
spreadsheet_id), querystring)
sheet_md_results = client.get(path=path, api=api, endpoint=stream_name)
- sheet_cols = sheet_md_results.get('sheets')[0]
- sheet_schema, columns = get_sheet_schema_columns(sheet_cols)
+ # sheet_metadata: 1st `sheets` node in results
+ sheet_metadata = sheet_md_results.get('sheets')[0]
- return sheet_schema, columns
+ # Create sheet_json_schema (for discovery/catalog) and columns (for sheet_metadata results)
+ sheet_json_schema, columns = get_sheet_schema_columns(sheet_metadata)
+
+ return sheet_json_schema, columns
def get_abs_path(path):
path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \
spreadsheet_id), querystring)
+ # GET spreadsheet_metadata, which incl. sheets (basic metadata for each worksheet)
spreadsheet_md_results = client.get(path=path, params=querystring, api=api, \
endpoint=stream_name)
sheets = spreadsheet_md_results.get('sheets')
if sheets:
+ # Loop thru each worksheet in spreadsheet
for sheet in sheets:
- sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
+ # GET sheet_json_schema for each worksheet (from function above)
+ sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
LOGGER.info('columns = {}'.format(columns))
sheet_title = sheet.get('properties', {}).get('title')
- schemas[sheet_title] = sheet_schema
+ schemas[sheet_title] = sheet_json_schema
sheet_mdata = metadata.new()
sheet_mdata = metadata.get_standard_metadata(
- schema=sheet_schema,
+ schema=sheet_json_schema,
key_properties=['__sdc_row'],
valid_replication_keys=None,
replication_method='FULL_TABLE'
range_rows=None):
if not range_rows:
range_rows = ''
+ # Replace {placeholder} variables in path
path = endpoint_config.get('path', stream_name).replace(
'{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
'{range_rows}', range_rows)
params = endpoint_config.get('params', {})
api = endpoint_config.get('api', 'sheets')
+ # Add in querystring parameters and replace {placeholder} variables
+ # querystring function ensures parameters are added but not encoded causing API errors
querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
'{sheet_title}', stream_name)
data = {}
timezone_str = 'UTC'
tzn = pytz.timezone(timezone_str)
sec_per_day = 86400
- excel_epoch = 25569 # 1970-01-01T00:00:00Z
+ excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
epoch_dttm = datetime(1970, 1, 1)
excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
# Convert from array of values to JSON with column names as keys
def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
sheet_data_tf = []
- is_last_row = False
row_num = from_row
# Create sorted list of columns based on columnIndex
cols = sorted(columns, key=lambda i: i['columnIndex'])
# LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
for row in sheet_data_rows:
- # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
+ # If empty row, SKIP
if row == []:
- is_last_row = True
- return sheet_data_tf, row_num - 1, is_last_row
- sheet_data_row_tf = {}
- # Add spreadsheet_id, sheet_id, and row
- sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
- sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
- sheet_data_row_tf['__sdc_row'] = row_num
- col_num = 1
- for value in row:
- # Select column metadata based on column index
- col = cols[col_num - 1]
- col_skipped = col.get('columnSkipped')
- if not col_skipped:
- col_name = col.get('columnName')
- col_type = col.get('columnType')
- # Convert dates/times from Lotus Notes Serial Numbers
- if col_type == 'numberType.DATE_TIME':
- if isinstance(value, (int, float)):
- col_val = excel_to_dttm_str(value)
- else:
- col_val = str(value)
- elif col_type == 'numberType.DATE':
- if isinstance(value, (int, float)):
- col_val = excel_to_dttm_str(value)[:10]
- else:
- col_val = str(value)
- elif col_type == 'numberType.TIME':
- if isinstance(value, (int, float)):
- try:
- total_secs = value * 86400 # seconds in day
- col_val = str(timedelta(seconds=total_secs))
- except ValueError:
+ LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
+ else:
+ sheet_data_row_tf = {}
+ # Add spreadsheet_id, sheet_id, and row
+ sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
+ sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
+ sheet_data_row_tf['__sdc_row'] = row_num
+ col_num = 1
+ for value in row:
+ # Select column metadata based on column index
+ col = cols[col_num - 1]
+ col_skipped = col.get('columnSkipped')
+ if not col_skipped:
+ col_name = col.get('columnName')
+ col_type = col.get('columnType')
+ # Convert dates/times from Lotus Notes Serial Numbers
+ # DATE-TIME
+ if col_type == 'numberType.DATE_TIME':
+ if isinstance(value, (int, float)):
+ col_val = excel_to_dttm_str(value)
+ else:
col_val = str(value)
- else:
- col_val = str(value)
- elif col_type == 'numberType':
- if isinstance(value, int):
- col_val = int(value)
- else:
- try:
- col_val = float(value)
- except ValueError:
+ # DATE
+ elif col_type == 'numberType.DATE':
+ if isinstance(value, (int, float)):
+ col_val = excel_to_dttm_str(value)[:10]
+ else:
col_val = str(value)
- elif col_type == 'stringValue':
- col_val = str(value)
- elif col_type == 'boolValue':
- if isinstance(value, bool):
- col_val = value
- elif isinstance(value, str):
- if value.lower() in ('true', 't', 'yes', 'y'):
- col_val = True
- elif value.lower() in ('false', 'f', 'no', 'n'):
- col_val = False
+ # TIME ONLY (NO DATE)
+ elif col_type == 'numberType.TIME':
+ if isinstance(value, (int, float)):
+ try:
+ total_secs = value * 86400 # seconds in day
+ # Create string formatted like HH:MM:SS
+ col_val = str(timedelta(seconds=total_secs))
+ except ValueError:
+ col_val = str(value)
else:
col_val = str(value)
- elif isinstance(value, int):
- if value in (1, -1):
- col_val = True
- elif value == 0:
- col_val = False
+ # NUMBER (INTEGER AND FLOAT)
+ elif col_type == 'numberType':
+ if isinstance(value, int):
+ col_val = int(value)
+ elif isinstance(value, float):
+ # Determine float decimal digits
+ decimal_digits = str(value)[::-1].find('.')
+ if decimal_digits > 15:
+ try:
+ # ROUND to multipleOf: 1e-15
+ col_val = float(round(value, 15))
+ except ValueError:
+ col_val = str(value)
+ else: # decimal_digits <= 15, no rounding
+ try:
+ col_val = float(value)
+ except ValueError:
+ col_val = str(value)
else:
col_val = str(value)
-
- else:
- col_val = value
- sheet_data_row_tf[col_name] = col_val
- col_num = col_num + 1
- sheet_data_tf.append(sheet_data_row_tf)
+ # STRING
+ elif col_type == 'stringValue':
+ col_val = str(value)
+ # BOOLEAN
+ elif col_type == 'boolValue':
+ if isinstance(value, bool):
+ col_val = value
+ elif isinstance(value, str):
+ if value.lower() in ('true', 't', 'yes', 'y'):
+ col_val = True
+ elif value.lower() in ('false', 'f', 'no', 'n'):
+ col_val = False
+ else:
+ col_val = str(value)
+ elif isinstance(value, int):
+ if value in (1, -1):
+ col_val = True
+ elif value == 0:
+ col_val = False
+ else:
+ col_val = str(value)
+ # OTHER: Convert everything else to a string
+ else:
+ col_val = str(value)
+ sheet_data_row_tf[col_name] = col_val
+ col_num = col_num + 1
+ # APPEND non-empty row
+ sheet_data_tf.append(sheet_data_row_tf)
row_num = row_num + 1
- return sheet_data_tf, row_num, is_last_row
+ return sheet_data_tf, row_num
def sync(client, config, catalog, state):
return
# Sync file_metadata if selected
sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
- write_bookmark(state, stream_name, strftime(this_datetime))
+ # file_metadata bookmark is updated at the end of sync
# SPREADSHEET_METADATA
spreadsheet_metadata = {}
# GET sheet_metadata and columns
sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
- LOGGER.info('sheet_schema: {}'.format(sheet_schema))
+ # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
# Transform sheet_metadata
sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
sheet_data_rows = sheet_data.get('values')
# Transform batch of rows to JSON with keys for each column
- sheet_data_tf, row_num, is_last_row = transform_sheet_data(
+ sheet_data_tf, row_num = transform_sheet_data(
spreadsheet_id=spreadsheet_id,
sheet_id=sheet_id,
from_row=from_row,
stream_name=sheet_title,
records=sheet_data_tf,
time_extracted=ss_time_extracted)
- LOGGER.info('Sheet: {}, ecords processed: {}'.format(
+ LOGGER.info('Sheet: {}, records processed: {}'.format(
sheet_title, record_count))
# Update paging from/to_row for next batch
singer.write_message(activate_version_message)
LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
- sheet_title, row_num - 1))
+ sheet_title, row_num - 2)) # subtract 1 for header row
stream_name = 'sheet_metadata'
# Sync sheet_metadata if selected
# Sync sheet_metadata if selected
sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
+ # Update file_metadata bookmark
+ write_bookmark(state, 'file_metadata', strftime(this_datetime))
+
return