# Changelog
+## 0.0.3
+ * Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages.
+
## 0.0.2
* Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py
- [File Metadata](https://developers.google.com/drive/api/v3/reference/files/get)
- [Spreadsheet Metadata](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/get)
- [Spreadsheet Values](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get)
+- Outputs the following metadata streams:
+ - File Metadata: Name, audit/change info from Google Drive
+ - Spreadsheet Metadata: Basic metadata about the Spreadsheet: Title, Locale, URL, etc.
+ - Sheet Metadata: Title, URL, Area (max column and row), and Column Metadata
+ - Column Metadata: Column Header Name, Data type, Format
+ - Sheets Loaded: Sheet title, load date, number of rows
- For each Sheet:
- - Outputs the schema for each resource (based on the column header and datatypes of first row of data)
- - Outputs a record for all columns with column headers, and for each row of data until it reaches an empty row
+ - Outputs the schema for each resource (based on the column header and datatypes of row 2, the first row of data)
+ - Outputs a record for all columns that have column headers, and for each row of data
+ - Emits a Singer ACTIVATE_VERSION message after each sheet is complete. This forces hard deletes on the data downstream if fewer records are sent.
+ - Primary Key for each row in a Sheet is the Row Number: `__sdc_row`
+ - Each Row in a Sheet also includes Foreign Keys to the Spreadsheet Metadata, `__sdc_spreadsheet_id`, and Sheet Metadata, `__sdc_sheet_id`.
## API Endpoints
[**file (GET)**](https://developers.google.com/drive/api/v3/reference/files/get)
- Endpoint: https://www.googleapis.com/drive/v3/files/${spreadsheet_id}?fields=id,name,createdTime,modifiedTime,version
- Primary keys: id
-- Replication strategy: Full (GET file audit data for spreadsheet_id in config)
+- Replication strategy: Incremental (GET file audit data for spreadsheet_id in config)
- Process/Transformations: Replicate Data if Modified
[**metadata (GET)**](https://developers.google.com/drive/api/v3/reference/files/get)
- Endpoint: https://sheets.googleapis.com/v4/spreadsheets/${spreadsheet_id}?includeGridData=true&ranges=1:2
- This endpoint eturns spreadsheet metadata, sheet metadata, and value metadata (data type information)
-- Primary keys: spreadsheetId, title, field_name
+- Primary keys: Spreadsheet Id, Sheet Id, Column Index
- Foreign keys: None
- Replication strategy: Full (get and replace file metadata for spreadshee_id in config)
- Process/Transformations:
- Verify Sheets: Check sheets exist (compared to catalog) and check gridProperties (available area)
- sheetId, title, index, gridProperties (rowCount, columnCount)
- - Verify Field Headers (1st row): Check field headers exist (compared to catalog), missing headers (columns to skip), column order/position, and column uniqueness
- - Header's field_name, position: data.rowData[0].values[i].formattedValue
- - Create/Verify Datatypes (2nd row):
- - Row 2's datatype, format: data.rowData[1].values[i]
+ - Verify Field Headers (1st row): Check field headers exist (compared to catalog), missing headers (columns to skip), column order/position, and column name uniqueness
+ - Create/Verify Datatypes based on 2nd row value and cell metadata
- First check:
- [effectiveValue: key](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue)
- Valid types: numberValue, stringValue, boolValue
- Then check:
- [effectiveFormat.numberFormat.type](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType)
- Valid types: UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC
- - If DATE or DATE_TIME, set JSON schema datatype = string and format = date-time
- - [effectiveFormat.numberFormat.pattern](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormat)
+ - Determine JSON schema column data type based on the value and the above cell metadata settings.
+ - If DATE, DATE_TIME, or TIME, set JSON schema format accordingly
[**values (GET)**](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get)
- Endpoint: https://sheets.googleapis.com/v4/spreadsheets/${spreadsheet_id}/values/'${sheet_name}'!${row_range}?dateTimeRenderOption=SERIAL_NUMBER&valueRenderOption=UNFORMATTED_VALUE&majorDimension=ROWS
- This endpoint loops through sheets and row ranges to get the [unformatted values](https://developers.google.com/sheets/api/reference/rest/v4/ValueRenderOption) (effective values only), dates and datetimes as [serial numbers](https://developers.google.com/sheets/api/reference/rest/v4/DateTimeRenderOption)
-- Primary keys: row
+- Primary keys: _sdc_row
- Replication strategy: Full (GET file audit data for spreadsheet_id in config)
- Process/Transformations:
- Loop through sheets (compared to catalog selection)
- Send metadata for sheet
- - Loop through ranges of rows until reaching empty row or area max row (from sheet metadata)
- - Transform values, if necessary (dates, date-times, boolean, integer, numers)
- - Process/send records
+ - Loop through ALL columns for columns having a column header
+ - Loop through ranges of rows for ALL rows in sheet available area max row (from sheet metadata)
+ - Transform values, if necessary (dates, date-times, times, boolean).
+ - Date/time serial numbers converted to date, date-time, and time strings. Google Sheets uses Lotus 1-2-3 [Serial Number](https://developers.google.com/sheets/api/reference/rest/v4/DateTimeRenderOption) format for date/times. These are converted to normal UTC date-time strings.
+ - Process/send records to target
## Authentication
The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id=1FojlvtLwS0-BzGS37R0jEXtwSHqSiO1Uw-7RKQQO-C4) Google Doc provides instructions show how to configure the Google Cloud API credentials to enable Google Drive and Google Sheets APIs, configure Google Cloud to authorize/verify your domain ownership, generate an API key (client_id, client_secret), authenticate and generate a refresh_token, and prepare your tap config.json with the necessary parameters.
from setuptools import setup, find_packages
setup(name='tap-google-sheets',
- version='0.0.2',
+ version='0.0.3',
description='Singer.io tap for extracting data from the Google Sheets v4 API',
author='jeff.huth@bytecode.io',
classifiers=['Programming Language :: Python :: 3 :: Only'],
install_requires=[
'backoff==1.8.0',
'requests==2.22.0',
- 'singer-python==5.8.1'
+ 'singer-python==5.9.0'
],
entry_points='''
[console_scripts]
# Create sheet_metadata_json with columns from sheet
def get_sheet_schema_columns(sheet):
+ sheet_title = sheet.get('properties', {}).get('title')
sheet_json_schema = OrderedDict()
data = next(iter(sheet.get('data', [])), {})
row_data = data.get('rowData', [])
skipped = 0
column_name = '{}'.format(header_value)
if column_name in header_list:
- raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name))
+ raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format(
+ sheet_title, column_name, column_letter))
header_list.append(column_name)
- first_value = first_values[i]
-
+ first_value = None
+ try:
+ first_value = first_values[i]
+ except IndexError as err:
+ raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format(
+ sheet_title, column_name, column_letter, err))
+
column_effective_value = first_value.get('effectiveValue', {})
- for key in column_effective_value.keys():
- if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'):
- column_effective_value_type = key
+
+ col_val = None
+ if column_effective_value == {}:
+ column_effective_value_type = 'stringValue'
+ LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format(
+ sheet_title, column_name, column_letter))
+ LOGGER.info(' Setting column datatype to STRING')
+ else:
+ for key, val in column_effective_value.items():
+ if key in ('numberValue', 'stringValue', 'boolValue'):
+ column_effective_value_type = key
+ col_val = str(val)
+ elif key in ('errorType', 'formulaType'):
+ col_val = str(val)
+ raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
+ sheet_title, column_name, column_letter, key, col_val))
column_number_format = first_values[i].get('effectiveFormat', {}).get(
'numberFormat', {})
# https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
#
column_format = None # Default
- if column_effective_value_type == 'stringValue':
+ if column_effective_value == {}:
+ col_properties = {'type': ['null', 'string']}
+ column_gs_type = 'stringValue'
+ LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format(
+ sheet_title, column_name, column_letter))
+ LOGGER.info(' Setting column datatype to STRING')
+ elif column_effective_value_type == 'stringValue':
col_properties = {'type': ['null', 'string']}
column_gs_type = 'stringValue'
elif column_effective_value_type == 'boolValue':
else:
col_properties = {'type': ['null', 'string']}
column_gs_type = 'unsupportedValue'
- LOGGER.info('Unsupported data type: {}, value: {}'.format(column_name, \
- column_effective_value_type))
+ LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
+ sheet_title, column_name, column_letter, column_effective_value_type, col_val))
LOGGER.info('Converting to string.')
else: # skipped
column_is_skipped = True
column_name = '__sdc_skip_col_{}'.format(column_index_str)
col_properties = {'type': ['null', 'string']}
column_gs_type = 'stringValue'
+ LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format(
+ sheet_title, column_name, column_letter))
+ LOGGER.info(' This column will be skipped during data loading.')
if skipped >= 2:
# skipped = 2 consecutive skipped headers
# Remove prior_header column_name
sheet_json_schema['properties'].pop(prior_header, None)
+ LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format(
+ sheet_title, column_name, column_letter))
break
else:
for sheet in sheets:
# GET sheet_json_schema for each worksheet (from function above)
sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
- LOGGER.info('columns = {}'.format(columns))
+ # LOGGER.info('columns = {}'.format(columns))
sheet_title = sheet.get('properties', {}).get('title')
schemas[sheet_title] = sheet_json_schema
import singer
from singer import metrics, metadata, Transformer, utils
from singer.utils import strptime_to_utc, strftime
+from singer.messages import RecordMessage
from tap_google_sheets.streams import STREAMS
from tap_google_sheets.schema import get_sheet_metadata
schema = stream.schema.to_dict()
try:
singer.write_schema(stream_name, schema, stream.key_properties)
+ LOGGER.info('Writing schema for: {}'.format(stream_name))
except OSError as err:
LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
raise err
-def write_record(stream_name, record, time_extracted):
+def write_record(stream_name, record, time_extracted, version=None):
try:
- singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
+ if version:
+ singer.messages.write_message(
+ RecordMessage(
+ stream=stream_name,
+ record=record,
+ version=version,
+ time_extracted=time_extracted))
+ else:
+ singer.messages.write_record(
+ stream_name=stream_name,
+ record=record,
+ time_extracted=time_extracted)
except OSError as err:
LOGGER.info('OS Error writing record for: {}'.format(stream_name))
LOGGER.info('record: {}'.format(record))
def process_records(catalog,
stream_name,
records,
- time_extracted):
+ time_extracted,
+ version=None):
stream = catalog.get_stream(stream_name)
schema = stream.schema.to_dict()
stream_metadata = metadata.to_map(stream.metadata)
record,
schema,
stream_metadata)
- write_record(stream_name, transformed_record, time_extracted=time_extracted)
+ write_record(
+ stream_name=stream_name,
+ record=transformed_record,
+ time_extracted=time_extracted,
+ version=version)
counter.increment()
return counter.value
# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
# Convert from array of values to JSON with column names as keys
-def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
+def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
sheet_data_tf = []
row_num = from_row
# Create sorted list of columns based on columnIndex
col = cols[col_num - 1]
col_skipped = col.get('columnSkipped')
if not col_skipped:
+ # Get column metadata
col_name = col.get('columnName')
col_type = col.get('columnType')
+ col_letter = col.get('columnLetter')
+
+ # NULL values
+ if value is None or value == '':
+ col_val = None
+
# Convert dates/times from Lotus Notes Serial Numbers
# DATE-TIME
- if col_type == 'numberType.DATE_TIME':
+ elif col_type == 'numberType.DATE_TIME':
if isinstance(value, (int, float)):
col_val = excel_to_dttm_str(value)
else:
col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
# DATE
elif col_type == 'numberType.DATE':
if isinstance(value, (int, float)):
col_val = excel_to_dttm_str(value)[:10]
else:
col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
# TIME ONLY (NO DATE)
elif col_type == 'numberType.TIME':
if isinstance(value, (int, float)):
col_val = str(timedelta(seconds=total_secs))
except ValueError:
col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
else:
col_val = str(value)
# NUMBER (INTEGER AND FLOAT)
col_val = float(round(value, 15))
except ValueError:
col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
else: # decimal_digits <= 15, no rounding
try:
col_val = float(value)
except ValueError:
col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
else:
col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
# STRING
elif col_type == 'stringValue':
col_val = str(value)
col_val = False
else:
col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row, col_type, value))
elif isinstance(value, int):
if value in (1, -1):
col_val = True
col_val = False
else:
col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row, col_type, value))
# OTHER: Convert everything else to a string
else:
col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row, col_type, value))
sheet_data_row_tf[col_name] = col_val
col_num = col_num + 1
# APPEND non-empty row
LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
write_schema(catalog, sheet_title)
+ # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
+ # everytime after each sheet sync is complete.
+ # This forces hard deletes on the data downstream if fewer records are sent.
+ # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
+ last_integer = int(get_bookmark(state, sheet_title, 0))
+ activate_version = int(time.time() * 1000)
+ activate_version_message = singer.ActivateVersionMessage(
+ stream=sheet_title,
+ version=activate_version)
+ if last_integer == 0:
+ # initial load, send activate_version before AND after data sync
+ singer.write_message(activate_version_message)
+ LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+
# Determine max range of columns and rows for "paging" through the data
sheet_last_col_index = 1
sheet_last_col_letter = 'A'
sheet_data_tf, row_num = transform_sheet_data(
spreadsheet_id=spreadsheet_id,
sheet_id=sheet_id,
+ sheet_title=sheet_title,
from_row=from_row,
columns=columns,
sheet_data_rows=sheet_data_rows)
catalog=catalog,
stream_name=sheet_title,
records=sheet_data_tf,
- time_extracted=ss_time_extracted)
+ time_extracted=ss_time_extracted,
+ version=activate_version)
LOGGER.info('Sheet: {}, records processed: {}'.format(
sheet_title, record_count))
-
+
# Update paging from/to_row for next batch
from_row = to_row + 1
if to_row + batch_rows > sheet_max_row:
else:
to_row = to_row + batch_rows
+ # End of Stream: Send Activate Version and update State
+ singer.write_message(activate_version_message)
+ write_bookmark(state, sheet_title, activate_version)
+ LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+ LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
+ sheet_title, row_num - 2)) # subtract 1 for header row
+ update_currently_syncing(state, None)
+
# SHEETS_LOADED
# Add sheet to sheets_loaded
sheet_loaded = {}
sheet_loaded['lastRowNumber'] = row_num
sheets_loaded.append(sheet_loaded)
- # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
- # This forces hard deletes on the data downstream if fewer records are sent.
- # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
- activate_version_message = singer.ActivateVersionMessage(
- stream=sheet_title,
- version=int(time.time() * 1000))
- singer.write_message(activate_version_message)
-
- LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
- sheet_title, row_num - 2)) # subtract 1 for header row
-
stream_name = 'sheet_metadata'
# Sync sheet_metadata if selected
sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)