```
Pylint test resulted in the following score:
```bash
- TBD
+ Your code has been rated at 9.78/10
```
To [check the tap](https://github.com/singer-io/singer-tools#singer-check-tap) and verify working:
```
Check tap resulted in the following:
```bash
- TBD
+ The output is valid.
+ It contained 3881 messages for 13 streams.
+
+ 13 schema messages
+ 3841 record messages
+ 27 state messages
+
+ Details by stream:
+ +----------------------+---------+---------+
+ | stream | records | schemas |
+ +----------------------+---------+---------+
+ | file_metadata | 1 | 1 |
+ | spreadsheet_metadata | 1 | 1 |
+ | Test-1 | 9 | 1 |
+ | Test 2 | 2 | 1 |
+ | SKU COGS | 218 | 1 |
+ | Item Master | 216 | 1 |
+ | Retail Price | 273 | 1 |
+ | Retail Price NEW | 284 | 1 |
+ | Forecast Scenarios | 2681 | 1 |
+ | Promo Type | 91 | 1 |
+ | Shipping Method | 47 | 1 |
+ | sheet_metadata | 9 | 1 |
+ | sheets_loaded | 9 | 1 |
+ +----------------------+---------+---------+
```
---
from datetime import datetime, timedelta
+from collections import OrderedDict
import backoff
import requests
-from collections import OrderedDict
-
import singer
from singer import metrics
from singer import utils
error_code = response.get('error', {}).get('code')
ex = get_exception_for_error_code(error_code)
raise ex(message)
- else:
- raise GoogleError(error)
+ raise GoogleError(error)
except (ValueError, TypeError):
raise GoogleError(error)
factor=3)
@utils.ratelimit(100, 100)
def request(self, method, path=None, url=None, api=None, **kwargs):
-
self.get_access_token()
-
self.base_url = 'https://sheets.googleapis.com/v4'
if api == 'files':
self.base_url = 'https://www.googleapis.com/drive/v3'
schema = Schema.from_dict(schema_dict)
mdata = field_metadata[stream_name]
key_properties = None
- for md in mdata:
- table_key_properties = md.get('metadata', {}).get('table-key-properties')
+ for mdt in mdata:
+ table_key_properties = mdt.get('metadata', {}).get('table-key-properties')
if table_key_properties:
key_properties = table_key_properties
-
+
catalog.streams.append(CatalogEntry(
stream=stream_name,
tap_stream_id=stream_name,
# https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata
# Convert column index to column letter
-def colnum_string(n):
+def colnum_string(num):
string = ""
- while n > 0:
- n, remainder = divmod(n - 1, 26)
+ while num > 0:
+ num, remainder = divmod(num - 1, 26)
string = chr(65 + remainder) + string
return string
# Create sheet_metadata_json with columns from sheet
-def get_sheet_schema_columns(sheet, spreadsheet_id, client):
+def get_sheet_schema_columns(sheet):
sheet_json_schema = OrderedDict()
data = next(iter(sheet.get('data', [])), {})
- row_data = data.get('rowData',[])
+ row_data = data.get('rowData', [])
# spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse
headers = row_data[0].get('values', [])
column_name = '{}'.format(header_value)
if column_name in header_list:
raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name))
- else:
- header_list.append(column_name)
+ header_list.append(column_name)
first_value = first_values[i]
- # LOGGER.info('first_value[{}] = {}'.format(i, json.dumps(first_value, indent=2, sort_keys=True)))
column_effective_value = first_value.get('effectiveValue', {})
for key in column_effective_value.keys():
if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'):
column_effective_value_type = key
- column_number_format = first_values[i].get('effectiveFormat', {}).get('numberFormat', {})
+ column_number_format = first_values[i].get('effectiveFormat', {}).get(
+ 'numberFormat', {})
column_number_format_type = column_number_format.get('type')
# Determine datatype for sheet_json_schema
#
- # column_effective_value_type = numberValue, stringValue, boolValue; INVALID: errorType, formulaType
- # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
+ # column_effective_value_type = numberValue, stringValue, boolValue;
+ # INVALID: errorType, formulaType
+ # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
#
- # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC
- # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
+ # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE,
+ # TIME, DATE_TIME, SCIENTIFIC
+ # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
#
column_format = None # Default
# column_multiple_of = None # Default
- if column_effective_value_type in ('formulaValue', 'errorValue'):
- raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name))
- elif column_effective_value_type == 'stringValue':
+ if column_effective_value_type == 'stringValue':
column_type = ['null', 'string']
column_gs_type = 'stringValue'
elif column_effective_value_type == 'boolValue':
else:
column_type = ['null', 'number', 'string']
column_gs_type = 'numberType'
-
+ elif column_effective_value_type in ('formulaValue', 'errorValue'):
+ raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name, \
+ column_effective_value_type))
else: # skipped
column_is_skipped = True
skipped = skipped + 1
# skipped = 2 consecutive skipped headers
# Remove prior_header column_name
sheet_json_schema['properties'].pop(prior_header, None)
- column_count = i - 1
break
else:
stream_metadata = STREAMS.get(stream_name)
api = stream_metadata.get('api', 'sheets')
params = stream_metadata.get('params', {})
- querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace('{sheet_title}', sheet_title)
- path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring)
+ querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in \
+ params.items()]).replace('{sheet_title}', sheet_title)
+ path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \
+ spreadsheet_id), querystring)
sheet_md_results = client.get(path=path, api=api, endpoint=stream_name)
sheet_cols = sheet_md_results.get('sheets')[0]
- sheet_schema, columns = get_sheet_schema_columns(sheet_cols, spreadsheet_id, client)
+ sheet_schema, columns = get_sheet_schema_columns(sheet_cols)
return sheet_schema, columns
replication_method=stream_metadata.get('replication_method', None)
)
field_metadata[stream_name] = mdata
-
+
if stream_name == 'spreadsheet_metadata':
api = stream_metadata.get('api', 'sheets')
params = stream_metadata.get('params', {})
querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()])
- path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring)
+ path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \
+ spreadsheet_id), querystring)
- spreadsheet_md_results = client.get(path=path, params=querystring, api=api, endpoint=stream_name)
+ spreadsheet_md_results = client.get(path=path, params=querystring, api=api, \
+ endpoint=stream_name)
sheets = spreadsheet_md_results.get('sheets')
if sheets:
for sheet in sheets:
sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
- # LOGGER.info('sheet_schema = {}'.format(json.dumps(sheet_schema, indent=2, sort_keys=True)))
+ LOGGER.info('columns = {}'.format(columns))
sheet_title = sheet.get('properties', {}).get('title')
schemas[sheet_title] = sheet_schema
replication_method='FULL_TABLE'
)
field_metadata[sheet_title] = sheet_mdata
-
+
return schemas, field_metadata
# key_properties: Primary key fields for identifying an endpoint record.
# replication_method: INCREMENTAL or FULL_TABLE
# replication_keys: bookmark_field(s), typically a date-time, used for filtering the results
-# and setting the state
+# and setting the state
# params: Query, sort, and other endpoint specific parameters; default = {}
-# data_key: JSON element containing the results list for the endpoint; default = root (no data_key)
-# bookmark_query_field: From date-time field used for filtering the query
-# bookmark_type: Data type for bookmark, integer or datetime
+# data_key: JSON element containing the results list for the endpoint;
+# default = root (no data_key)
FILE_METADATA = {
"api": "files",
import time
import math
-import singer
import json
-import pytz
from datetime import datetime, timedelta
-from collections import OrderedDict
+import pytz
+import singer
from singer import metrics, metadata, Transformer, utils
from singer.utils import strptime_to_utc, strftime
from tap_google_sheets.streams import STREAMS
return counter.value
-def sync_stream(stream_name, selected_streams, catalog, state, records):
+def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
# Should sheets_loaded be synced?
if stream_name in selected_streams:
LOGGER.info('STARTED Syncing {}'.format(stream_name))
update_currently_syncing(state, stream_name)
+ selected_fields = get_selected_fields(catalog, stream_name)
+ LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
write_schema(catalog, stream_name)
+ if not time_extracted:
+ time_extracted = utils.now()
record_count = process_records(
catalog=catalog,
stream_name=stream_name,
records=records,
- time_extracted=utils.now())
+ time_extracted=time_extracted)
LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
update_currently_syncing(state, None)
mdata_list = singer.metadata.to_list(mdata)
selected_fields = []
for entry in mdata_list:
- field = None
+ field = None
try:
- field = entry['breadcrumb'][1]
+ field = entry['breadcrumb'][1]
if entry.get('metadata', {}).get('selected', False):
selected_fields.append(field)
except IndexError:
def transform_sheet_metadata(spreadsheet_id, sheet, columns):
# Convert to properties to dict
sheet_metadata = sheet.get('properties')
- sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
+ sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
sheet_id = sheet_metadata_tf.get('sheetId')
sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
spreadsheet_id, sheet_id)
def excel_to_dttm_str(excel_date_sn, timezone_str=None):
if not timezone_str:
timezone_str = 'UTC'
- tz = pytz.timezone(timezone_str)
+ tzn = pytz.timezone(timezone_str)
sec_per_day = 86400
excel_epoch = 25569 # 1970-01-01T00:00:00Z
epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
epoch_dttm = datetime(1970, 1, 1)
excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
- utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc)
+ utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
utc_dttm_str = strftime(utc_dttm)
return utc_dttm_str
is_last_row = False
row_num = from_row
# Create sorted list of columns based on columnIndex
- cols = sorted(columns, key = lambda i: i['columnIndex'])
+ cols = sorted(columns, key=lambda i: i['columnIndex'])
# LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
for row in sheet_data_rows:
col_type = col.get('columnType')
# Convert dates/times from Lotus Notes Serial Numbers
if col_type == 'numberType.DATE_TIME':
- if isinstance(value, int) or isinstance(value, float):
+ if isinstance(value, (int, float)):
col_val = excel_to_dttm_str(value)
else:
col_val = str(value)
elif col_type == 'numberType.DATE':
- if isinstance(value, int) or isinstance(value, float):
+ if isinstance(value, (int, float)):
col_val = excel_to_dttm_str(value)[:10]
else:
col_val = str(value)
elif col_type == 'numberType.TIME':
- if isinstance(value, int) or isinstance(value, float):
+ if isinstance(value, (int, float)):
try:
total_secs = value * 86400 # seconds in day
col_val = str(timedelta(seconds=total_secs))
else:
col_val = str(value)
elif isinstance(value, int):
- if value == 1 or value == -1:
+ if value in (1, -1):
col_val = True
elif value == 0:
col_val = False
LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
if this_datetime <= last_datetime:
LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
- return 0
- else:
- # Sync file_metadata if selected
- sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf)
- write_bookmark(state, stream_name, strftime(this_datetime))
+ return
+ # Sync file_metadata if selected
+ sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
+ write_bookmark(state, stream_name, strftime(this_datetime))
# SPREADSHEET_METADATA
spreadsheet_metadata = {}
# GET spreadsheet_metadata
LOGGER.info('GET spreadsheet_meatadata')
- spreadsheet_metadata, ss_time_extracted = get_data(
+ spreadsheet_metadata, ss_time_extracted = get_data(
stream_name=stream_name,
endpoint_config=spreadsheet_metadata_config,
client=client,
spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
# Sync spreadsheet_metadata if selected
- sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf)
+ sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
+ ss_time_extracted)
# SHEET_METADATA and SHEET_DATA
sheets = spreadsheet_metadata.get('sheets')
# GET sheet_metadata and columns
sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
+ LOGGER.info('sheet_schema: {}'.format(sheet_schema))
# Transform sheet_metadata
sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
if sheet_title in selected_streams:
LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
update_currently_syncing(state, sheet_title)
+ selected_fields = get_selected_fields(catalog, sheet_title)
+ LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
write_schema(catalog, sheet_title)
# Determine max range of columns and rows for "paging" through the data
# Loop thru batches (each having 200 rows of data)
while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
-
+
# GET sheet_data for a worksheet tab
sheet_data, time_extracted = get_data(
stream_name=sheet_title,
stream_name=sheet_title,
records=sheet_data_tf,
time_extracted=ss_time_extracted)
-
+ LOGGER.info('Sheet: {}, ecords processed: {}'.format(
+ sheet_title, record_count))
+
# Update paging from/to_row for next batch
from_row = to_row + 1
if to_row + batch_rows > sheet_max_row:
sheets_loaded.append(sheet_loaded)
# Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
- # This forces hard deletes on the data downstream if fewer records are sent for a sheet.
- # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167
+ # This forces hard deletes on the data downstream if fewer records are sent.
+ # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
activate_version_message = singer.ActivateVersionMessage(
stream=sheet_title,
version=int(time.time() * 1000))
LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
sheet_title, row_num - 1))
- update_currently_syncing(state, None)
stream_name = 'sheet_metadata'
# Sync sheet_metadata if selected
sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
-
+
stream_name = 'sheets_loaded'
# Sync sheet_metadata if selected
sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
+
+ return