aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets
diff options
context:
space:
mode:
authorJeff Huth <jeff.huth@bytecode.io>2019-11-15 01:58:55 -0800
committerJeff Huth <jeff.huth@bytecode.io>2019-11-15 01:58:55 -0800
commit99424fee5ba6ff830df39be8f47c3e3d685b444a (patch)
tree08ace54622092921ab7253946515ce5d3dcf0a66 /tap_google_sheets
parentda690bda91ea6a14964a2378e5dbb5d4de91a7e2 (diff)
downloadtap-google-sheets-3ad2e43c5411e0e2717d03d6dde577ef53fc3425.tar.gz
tap-google-sheets-3ad2e43c5411e0e2717d03d6dde577ef53fc3425.tar.zst
tap-google-sheets-3ad2e43c5411e0e2717d03d6dde577ef53fc3425.zip
pylint and testingv0.0.1
pylint and testing
Diffstat (limited to 'tap_google_sheets')
-rw-r--r--tap_google_sheets/client.py8
-rw-r--r--tap_google_sheets/discover.py6
-rw-r--r--tap_google_sheets/schema.py56
-rw-r--r--tap_google_sheets/streams.py7
-rw-r--r--tap_google_sheets/sync.py65
5 files changed, 75 insertions, 67 deletions
diff --git a/tap_google_sheets/client.py b/tap_google_sheets/client.py
index 0a0ce5a..4f38352 100644
--- a/tap_google_sheets/client.py
+++ b/tap_google_sheets/client.py
@@ -1,8 +1,7 @@
1from datetime import datetime, timedelta 1from datetime import datetime, timedelta
2from collections import OrderedDict
2import backoff 3import backoff
3import requests 4import requests
4from collections import OrderedDict
5
6import singer 5import singer
7from singer import metrics 6from singer import metrics
8from singer import utils 7from singer import utils
@@ -123,8 +122,7 @@ def raise_for_error(response):
123 error_code = response.get('error', {}).get('code') 122 error_code = response.get('error', {}).get('code')
124 ex = get_exception_for_error_code(error_code) 123 ex = get_exception_for_error_code(error_code)
125 raise ex(message) 124 raise ex(message)
126 else: 125 raise GoogleError(error)
127 raise GoogleError(error)
128 except (ValueError, TypeError): 126 except (ValueError, TypeError):
129 raise GoogleError(error) 127 raise GoogleError(error)
130 128
@@ -196,9 +194,7 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes
196 factor=3) 194 factor=3)
197 @utils.ratelimit(100, 100) 195 @utils.ratelimit(100, 100)
198 def request(self, method, path=None, url=None, api=None, **kwargs): 196 def request(self, method, path=None, url=None, api=None, **kwargs):
199
200 self.get_access_token() 197 self.get_access_token()
201
202 self.base_url = 'https://sheets.googleapis.com/v4' 198 self.base_url = 'https://sheets.googleapis.com/v4'
203 if api == 'files': 199 if api == 'files':
204 self.base_url = 'https://www.googleapis.com/drive/v3' 200 self.base_url = 'https://www.googleapis.com/drive/v3'
diff --git a/tap_google_sheets/discover.py b/tap_google_sheets/discover.py
index 6477a5f..6cf0d09 100644
--- a/tap_google_sheets/discover.py
+++ b/tap_google_sheets/discover.py
@@ -10,11 +10,11 @@ def discover(client, spreadsheet_id):
10 schema = Schema.from_dict(schema_dict) 10 schema = Schema.from_dict(schema_dict)
11 mdata = field_metadata[stream_name] 11 mdata = field_metadata[stream_name]
12 key_properties = None 12 key_properties = None
13 for md in mdata: 13 for mdt in mdata:
14 table_key_properties = md.get('metadata', {}).get('table-key-properties') 14 table_key_properties = mdt.get('metadata', {}).get('table-key-properties')
15 if table_key_properties: 15 if table_key_properties:
16 key_properties = table_key_properties 16 key_properties = table_key_properties
17 17
18 catalog.streams.append(CatalogEntry( 18 catalog.streams.append(CatalogEntry(
19 stream=stream_name, 19 stream=stream_name,
20 tap_stream_id=stream_name, 20 tap_stream_id=stream_name,
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py
index 237ab06..d4fead5 100644
--- a/tap_google_sheets/schema.py
+++ b/tap_google_sheets/schema.py
@@ -11,19 +11,19 @@ LOGGER = singer.get_logger()
11# https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata 11# https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata
12 12
13# Convert column index to column letter 13# Convert column index to column letter
14def colnum_string(n): 14def colnum_string(num):
15 string = "" 15 string = ""
16 while n > 0: 16 while num > 0:
17 n, remainder = divmod(n - 1, 26) 17 num, remainder = divmod(num - 1, 26)
18 string = chr(65 + remainder) + string 18 string = chr(65 + remainder) + string
19 return string 19 return string
20 20
21 21
22# Create sheet_metadata_json with columns from sheet 22# Create sheet_metadata_json with columns from sheet
23def get_sheet_schema_columns(sheet, spreadsheet_id, client): 23def get_sheet_schema_columns(sheet):
24 sheet_json_schema = OrderedDict() 24 sheet_json_schema = OrderedDict()
25 data = next(iter(sheet.get('data', [])), {}) 25 data = next(iter(sheet.get('data', [])), {})
26 row_data = data.get('rowData',[]) 26 row_data = data.get('rowData', [])
27 # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse 27 # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse
28 28
29 headers = row_data[0].get('values', []) 29 headers = row_data[0].get('values', [])
@@ -65,33 +65,32 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client):
65 column_name = '{}'.format(header_value) 65 column_name = '{}'.format(header_value)
66 if column_name in header_list: 66 if column_name in header_list:
67 raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) 67 raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name))
68 else: 68 header_list.append(column_name)
69 header_list.append(column_name)
70 69
71 first_value = first_values[i] 70 first_value = first_values[i]
72 # LOGGER.info('first_value[{}] = {}'.format(i, json.dumps(first_value, indent=2, sort_keys=True)))
73 71
74 column_effective_value = first_value.get('effectiveValue', {}) 72 column_effective_value = first_value.get('effectiveValue', {})
75 for key in column_effective_value.keys(): 73 for key in column_effective_value.keys():
76 if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): 74 if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'):
77 column_effective_value_type = key 75 column_effective_value_type = key
78 76
79 column_number_format = first_values[i].get('effectiveFormat', {}).get('numberFormat', {}) 77 column_number_format = first_values[i].get('effectiveFormat', {}).get(
78 'numberFormat', {})
80 column_number_format_type = column_number_format.get('type') 79 column_number_format_type = column_number_format.get('type')
81 80
82 # Determine datatype for sheet_json_schema 81 # Determine datatype for sheet_json_schema
83 # 82 #
84 # column_effective_value_type = numberValue, stringValue, boolValue; INVALID: errorType, formulaType 83 # column_effective_value_type = numberValue, stringValue, boolValue;
85 # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue 84 # INVALID: errorType, formulaType
85 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
86 # 86 #
87 # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC 87 # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE,
88 # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType 88 # TIME, DATE_TIME, SCIENTIFIC
89 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
89 # 90 #
90 column_format = None # Default 91 column_format = None # Default
91 # column_multiple_of = None # Default 92 # column_multiple_of = None # Default
92 if column_effective_value_type in ('formulaValue', 'errorValue'): 93 if column_effective_value_type == 'stringValue':
93 raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name))
94 elif column_effective_value_type == 'stringValue':
95 column_type = ['null', 'string'] 94 column_type = ['null', 'string']
96 column_gs_type = 'stringValue' 95 column_gs_type = 'stringValue'
97 elif column_effective_value_type == 'boolValue': 96 elif column_effective_value_type == 'boolValue':
@@ -116,7 +115,9 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client):
116 else: 115 else:
117 column_type = ['null', 'number', 'string'] 116 column_type = ['null', 'number', 'string']
118 column_gs_type = 'numberType' 117 column_gs_type = 'numberType'
119 118 elif column_effective_value_type in ('formulaValue', 'errorValue'):
119 raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name, \
120 column_effective_value_type))
120 else: # skipped 121 else: # skipped
121 column_is_skipped = True 122 column_is_skipped = True
122 skipped = skipped + 1 123 skipped = skipped + 1
@@ -130,7 +131,6 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client):
130 # skipped = 2 consecutive skipped headers 131 # skipped = 2 consecutive skipped headers
131 # Remove prior_header column_name 132 # Remove prior_header column_name
132 sheet_json_schema['properties'].pop(prior_header, None) 133 sheet_json_schema['properties'].pop(prior_header, None)
133 column_count = i - 1
134 break 134 break
135 135
136 else: 136 else:
@@ -164,12 +164,14 @@ def get_sheet_metadata(sheet, spreadsheet_id, client):
164 stream_metadata = STREAMS.get(stream_name) 164 stream_metadata = STREAMS.get(stream_name)
165 api = stream_metadata.get('api', 'sheets') 165 api = stream_metadata.get('api', 'sheets')
166 params = stream_metadata.get('params', {}) 166 params = stream_metadata.get('params', {})
167 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace('{sheet_title}', sheet_title) 167 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in \
168 path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring) 168 params.items()]).replace('{sheet_title}', sheet_title)
169 path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \
170 spreadsheet_id), querystring)
169 171
170 sheet_md_results = client.get(path=path, api=api, endpoint=stream_name) 172 sheet_md_results = client.get(path=path, api=api, endpoint=stream_name)
171 sheet_cols = sheet_md_results.get('sheets')[0] 173 sheet_cols = sheet_md_results.get('sheets')[0]
172 sheet_schema, columns = get_sheet_schema_columns(sheet_cols, spreadsheet_id, client) 174 sheet_schema, columns = get_sheet_schema_columns(sheet_cols)
173 175
174 return sheet_schema, columns 176 return sheet_schema, columns
175 177
@@ -199,20 +201,22 @@ def get_schemas(client, spreadsheet_id):
199 replication_method=stream_metadata.get('replication_method', None) 201 replication_method=stream_metadata.get('replication_method', None)
200 ) 202 )
201 field_metadata[stream_name] = mdata 203 field_metadata[stream_name] = mdata
202 204
203 if stream_name == 'spreadsheet_metadata': 205 if stream_name == 'spreadsheet_metadata':
204 api = stream_metadata.get('api', 'sheets') 206 api = stream_metadata.get('api', 'sheets')
205 params = stream_metadata.get('params', {}) 207 params = stream_metadata.get('params', {})
206 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]) 208 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()])
207 path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring) 209 path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \
210 spreadsheet_id), querystring)
208 211
209 spreadsheet_md_results = client.get(path=path, params=querystring, api=api, endpoint=stream_name) 212 spreadsheet_md_results = client.get(path=path, params=querystring, api=api, \
213 endpoint=stream_name)
210 214
211 sheets = spreadsheet_md_results.get('sheets') 215 sheets = spreadsheet_md_results.get('sheets')
212 if sheets: 216 if sheets:
213 for sheet in sheets: 217 for sheet in sheets:
214 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 218 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
215 # LOGGER.info('sheet_schema = {}'.format(json.dumps(sheet_schema, indent=2, sort_keys=True))) 219 LOGGER.info('columns = {}'.format(columns))
216 220
217 sheet_title = sheet.get('properties', {}).get('title') 221 sheet_title = sheet.get('properties', {}).get('title')
218 schemas[sheet_title] = sheet_schema 222 schemas[sheet_title] = sheet_schema
@@ -224,5 +228,5 @@ def get_schemas(client, spreadsheet_id):
224 replication_method='FULL_TABLE' 228 replication_method='FULL_TABLE'
225 ) 229 )
226 field_metadata[sheet_title] = sheet_mdata 230 field_metadata[sheet_title] = sheet_mdata
227 231
228 return schemas, field_metadata 232 return schemas, field_metadata
diff --git a/tap_google_sheets/streams.py b/tap_google_sheets/streams.py
index 231a41d..b8e3eff 100644
--- a/tap_google_sheets/streams.py
+++ b/tap_google_sheets/streams.py
@@ -8,11 +8,10 @@ from collections import OrderedDict
8# key_properties: Primary key fields for identifying an endpoint record. 8# key_properties: Primary key fields for identifying an endpoint record.
9# replication_method: INCREMENTAL or FULL_TABLE 9# replication_method: INCREMENTAL or FULL_TABLE
10# replication_keys: bookmark_field(s), typically a date-time, used for filtering the results 10# replication_keys: bookmark_field(s), typically a date-time, used for filtering the results
11# and setting the state 11# and setting the state
12# params: Query, sort, and other endpoint specific parameters; default = {} 12# params: Query, sort, and other endpoint specific parameters; default = {}
13# data_key: JSON element containing the results list for the endpoint; default = root (no data_key) 13# data_key: JSON element containing the results list for the endpoint;
14# bookmark_query_field: From date-time field used for filtering the query 14# default = root (no data_key)
15# bookmark_type: Data type for bookmark, integer or datetime
16 15
17FILE_METADATA = { 16FILE_METADATA = {
18 "api": "files", 17 "api": "files",
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
index 79e05f9..d7d7184 100644
--- a/tap_google_sheets/sync.py
+++ b/tap_google_sheets/sync.py
@@ -1,10 +1,9 @@
1import time 1import time
2import math 2import math
3import singer
4import json 3import json
5import pytz
6from datetime import datetime, timedelta 4from datetime import datetime, timedelta
7from collections import OrderedDict 5import pytz
6import singer
8from singer import metrics, metadata, Transformer, utils 7from singer import metrics, metadata, Transformer, utils
9from singer.utils import strptime_to_utc, strftime 8from singer.utils import strptime_to_utc, strftime
10from tap_google_sheets.streams import STREAMS 9from tap_google_sheets.streams import STREAMS
@@ -71,17 +70,21 @@ def process_records(catalog,
71 return counter.value 70 return counter.value
72 71
73 72
74def sync_stream(stream_name, selected_streams, catalog, state, records): 73def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
75 # Should sheets_loaded be synced? 74 # Should sheets_loaded be synced?
76 if stream_name in selected_streams: 75 if stream_name in selected_streams:
77 LOGGER.info('STARTED Syncing {}'.format(stream_name)) 76 LOGGER.info('STARTED Syncing {}'.format(stream_name))
78 update_currently_syncing(state, stream_name) 77 update_currently_syncing(state, stream_name)
78 selected_fields = get_selected_fields(catalog, stream_name)
79 LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
79 write_schema(catalog, stream_name) 80 write_schema(catalog, stream_name)
81 if not time_extracted:
82 time_extracted = utils.now()
80 record_count = process_records( 83 record_count = process_records(
81 catalog=catalog, 84 catalog=catalog,
82 stream_name=stream_name, 85 stream_name=stream_name,
83 records=records, 86 records=records,
84 time_extracted=utils.now()) 87 time_extracted=time_extracted)
85 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) 88 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
86 update_currently_syncing(state, None) 89 update_currently_syncing(state, None)
87 90
@@ -105,9 +108,9 @@ def get_selected_fields(catalog, stream_name):
105 mdata_list = singer.metadata.to_list(mdata) 108 mdata_list = singer.metadata.to_list(mdata)
106 selected_fields = [] 109 selected_fields = []
107 for entry in mdata_list: 110 for entry in mdata_list:
108 field = None 111 field = None
109 try: 112 try:
110 field = entry['breadcrumb'][1] 113 field = entry['breadcrumb'][1]
111 if entry.get('metadata', {}).get('selected', False): 114 if entry.get('metadata', {}).get('selected', False):
112 selected_fields.append(field) 115 selected_fields.append(field)
113 except IndexError: 116 except IndexError:
@@ -172,7 +175,7 @@ def transform_spreadsheet_metadata(spreadsheet_metadata):
172def transform_sheet_metadata(spreadsheet_id, sheet, columns): 175def transform_sheet_metadata(spreadsheet_id, sheet, columns):
173 # Convert to properties to dict 176 # Convert to properties to dict
174 sheet_metadata = sheet.get('properties') 177 sheet_metadata = sheet.get('properties')
175 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) 178 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
176 sheet_id = sheet_metadata_tf.get('sheetId') 179 sheet_id = sheet_metadata_tf.get('sheetId')
177 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( 180 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
178 spreadsheet_id, sheet_id) 181 spreadsheet_id, sheet_id)
@@ -187,13 +190,13 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns):
187def excel_to_dttm_str(excel_date_sn, timezone_str=None): 190def excel_to_dttm_str(excel_date_sn, timezone_str=None):
188 if not timezone_str: 191 if not timezone_str:
189 timezone_str = 'UTC' 192 timezone_str = 'UTC'
190 tz = pytz.timezone(timezone_str) 193 tzn = pytz.timezone(timezone_str)
191 sec_per_day = 86400 194 sec_per_day = 86400
192 excel_epoch = 25569 # 1970-01-01T00:00:00Z 195 excel_epoch = 25569 # 1970-01-01T00:00:00Z
193 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) 196 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
194 epoch_dttm = datetime(1970, 1, 1) 197 epoch_dttm = datetime(1970, 1, 1)
195 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) 198 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
196 utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc) 199 utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
197 utc_dttm_str = strftime(utc_dttm) 200 utc_dttm_str = strftime(utc_dttm)
198 return utc_dttm_str 201 return utc_dttm_str
199 202
@@ -205,7 +208,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
205 is_last_row = False 208 is_last_row = False
206 row_num = from_row 209 row_num = from_row
207 # Create sorted list of columns based on columnIndex 210 # Create sorted list of columns based on columnIndex
208 cols = sorted(columns, key = lambda i: i['columnIndex']) 211 cols = sorted(columns, key=lambda i: i['columnIndex'])
209 212
210 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) 213 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
211 for row in sheet_data_rows: 214 for row in sheet_data_rows:
@@ -228,17 +231,17 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
228 col_type = col.get('columnType') 231 col_type = col.get('columnType')
229 # Convert dates/times from Lotus Notes Serial Numbers 232 # Convert dates/times from Lotus Notes Serial Numbers
230 if col_type == 'numberType.DATE_TIME': 233 if col_type == 'numberType.DATE_TIME':
231 if isinstance(value, int) or isinstance(value, float): 234 if isinstance(value, (int, float)):
232 col_val = excel_to_dttm_str(value) 235 col_val = excel_to_dttm_str(value)
233 else: 236 else:
234 col_val = str(value) 237 col_val = str(value)
235 elif col_type == 'numberType.DATE': 238 elif col_type == 'numberType.DATE':
236 if isinstance(value, int) or isinstance(value, float): 239 if isinstance(value, (int, float)):
237 col_val = excel_to_dttm_str(value)[:10] 240 col_val = excel_to_dttm_str(value)[:10]
238 else: 241 else:
239 col_val = str(value) 242 col_val = str(value)
240 elif col_type == 'numberType.TIME': 243 elif col_type == 'numberType.TIME':
241 if isinstance(value, int) or isinstance(value, float): 244 if isinstance(value, (int, float)):
242 try: 245 try:
243 total_secs = value * 86400 # seconds in day 246 total_secs = value * 86400 # seconds in day
244 col_val = str(timedelta(seconds=total_secs)) 247 col_val = str(timedelta(seconds=total_secs))
@@ -267,7 +270,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
267 else: 270 else:
268 col_val = str(value) 271 col_val = str(value)
269 elif isinstance(value, int): 272 elif isinstance(value, int):
270 if value == 1 or value == -1: 273 if value in (1, -1):
271 col_val = True 274 col_val = True
272 elif value == 0: 275 elif value == 0:
273 col_val = False 276 col_val = False
@@ -321,11 +324,10 @@ def sync(client, config, catalog, state):
321 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) 324 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
322 if this_datetime <= last_datetime: 325 if this_datetime <= last_datetime:
323 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') 326 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
324 return 0 327 return
325 else: 328 # Sync file_metadata if selected
326 # Sync file_metadata if selected 329 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
327 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf) 330 write_bookmark(state, stream_name, strftime(this_datetime))
328 write_bookmark(state, stream_name, strftime(this_datetime))
329 331
330 # SPREADSHEET_METADATA 332 # SPREADSHEET_METADATA
331 spreadsheet_metadata = {} 333 spreadsheet_metadata = {}
@@ -334,7 +336,7 @@ def sync(client, config, catalog, state):
334 336
335 # GET spreadsheet_metadata 337 # GET spreadsheet_metadata
336 LOGGER.info('GET spreadsheet_meatadata') 338 LOGGER.info('GET spreadsheet_meatadata')
337 spreadsheet_metadata, ss_time_extracted = get_data( 339 spreadsheet_metadata, ss_time_extracted = get_data(
338 stream_name=stream_name, 340 stream_name=stream_name,
339 endpoint_config=spreadsheet_metadata_config, 341 endpoint_config=spreadsheet_metadata_config,
340 client=client, 342 client=client,
@@ -345,7 +347,8 @@ def sync(client, config, catalog, state):
345 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) 347 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
346 348
347 # Sync spreadsheet_metadata if selected 349 # Sync spreadsheet_metadata if selected
348 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf) 350 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
351 ss_time_extracted)
349 352
350 # SHEET_METADATA and SHEET_DATA 353 # SHEET_METADATA and SHEET_DATA
351 sheets = spreadsheet_metadata.get('sheets') 354 sheets = spreadsheet_metadata.get('sheets')
@@ -360,6 +363,7 @@ def sync(client, config, catalog, state):
360 363
361 # GET sheet_metadata and columns 364 # GET sheet_metadata and columns
362 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 365 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
366 LOGGER.info('sheet_schema: {}'.format(sheet_schema))
363 367
364 # Transform sheet_metadata 368 # Transform sheet_metadata
365 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) 369 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
@@ -371,6 +375,8 @@ def sync(client, config, catalog, state):
371 if sheet_title in selected_streams: 375 if sheet_title in selected_streams:
372 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) 376 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
373 update_currently_syncing(state, sheet_title) 377 update_currently_syncing(state, sheet_title)
378 selected_fields = get_selected_fields(catalog, sheet_title)
379 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
374 write_schema(catalog, sheet_title) 380 write_schema(catalog, sheet_title)
375 381
376 # Determine max range of columns and rows for "paging" through the data 382 # Determine max range of columns and rows for "paging" through the data
@@ -396,7 +402,7 @@ def sync(client, config, catalog, state):
396 # Loop thru batches (each having 200 rows of data) 402 # Loop thru batches (each having 200 rows of data)
397 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: 403 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
398 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) 404 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
399 405
400 # GET sheet_data for a worksheet tab 406 # GET sheet_data for a worksheet tab
401 sheet_data, time_extracted = get_data( 407 sheet_data, time_extracted = get_data(
402 stream_name=sheet_title, 408 stream_name=sheet_title,
@@ -423,7 +429,9 @@ def sync(client, config, catalog, state):
423 stream_name=sheet_title, 429 stream_name=sheet_title,
424 records=sheet_data_tf, 430 records=sheet_data_tf,
425 time_extracted=ss_time_extracted) 431 time_extracted=ss_time_extracted)
426 432 LOGGER.info('Sheet: {}, ecords processed: {}'.format(
433 sheet_title, record_count))
434
427 # Update paging from/to_row for next batch 435 # Update paging from/to_row for next batch
428 from_row = to_row + 1 436 from_row = to_row + 1
429 if to_row + batch_rows > sheet_max_row: 437 if to_row + batch_rows > sheet_max_row:
@@ -442,8 +450,8 @@ def sync(client, config, catalog, state):
442 sheets_loaded.append(sheet_loaded) 450 sheets_loaded.append(sheet_loaded)
443 451
444 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. 452 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
445 # This forces hard deletes on the data downstream if fewer records are sent for a sheet. 453 # This forces hard deletes on the data downstream if fewer records are sent.
446 # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167 454 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
447 activate_version_message = singer.ActivateVersionMessage( 455 activate_version_message = singer.ActivateVersionMessage(
448 stream=sheet_title, 456 stream=sheet_title,
449 version=int(time.time() * 1000)) 457 version=int(time.time() * 1000))
@@ -451,12 +459,13 @@ def sync(client, config, catalog, state):
451 459
452 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( 460 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
453 sheet_title, row_num - 1)) 461 sheet_title, row_num - 1))
454 update_currently_syncing(state, None)
455 462
456 stream_name = 'sheet_metadata' 463 stream_name = 'sheet_metadata'
457 # Sync sheet_metadata if selected 464 # Sync sheet_metadata if selected
458 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) 465 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
459 466
460 stream_name = 'sheets_loaded' 467 stream_name = 'sheets_loaded'
461 # Sync sheet_metadata if selected 468 # Sync sheet_metadata if selected
462 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) 469 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
470
471 return