aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r--tap_google_sheets/sync.py354
1 files changed, 268 insertions, 86 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
index 5b57e77..79e05f9 100644
--- a/tap_google_sheets/sync.py
+++ b/tap_google_sheets/sync.py
@@ -2,6 +2,8 @@ import time
2import math 2import math
3import singer 3import singer
4import json 4import json
5import pytz
6from datetime import datetime, timedelta
5from collections import OrderedDict 7from collections import OrderedDict
6from singer import metrics, metadata, Transformer, utils 8from singer import metrics, metadata, Transformer, utils
7from singer.utils import strptime_to_utc, strftime 9from singer.utils import strptime_to_utc, strftime
@@ -48,66 +50,40 @@ def write_bookmark(state, stream, value):
48 singer.write_state(state) 50 singer.write_state(state)
49 51
50 52
51# def transform_datetime(this_dttm): 53# Transform/validate batch of records w/ schema and sent to target
52def transform_datetime(this_dttm): 54def process_records(catalog,
53 with Transformer() as transformer:
54 new_dttm = transformer._transform_datetime(this_dttm)
55 return new_dttm
56
57
58def process_records(catalog, #pylint: disable=too-many-branches
59 stream_name, 55 stream_name,
60 records, 56 records,
61 time_extracted, 57 time_extracted):
62 bookmark_field=None,
63 bookmark_type=None,
64 max_bookmark_value=None,
65 last_datetime=None,
66 last_integer=None,
67 parent=None,
68 parent_id=None):
69 stream = catalog.get_stream(stream_name) 58 stream = catalog.get_stream(stream_name)
70 schema = stream.schema.to_dict() 59 schema = stream.schema.to_dict()
71 stream_metadata = metadata.to_map(stream.metadata) 60 stream_metadata = metadata.to_map(stream.metadata)
72
73 with metrics.record_counter(stream_name) as counter: 61 with metrics.record_counter(stream_name) as counter:
74 for record in records: 62 for record in records:
75 # If child object, add parent_id to record
76 if parent_id and parent:
77 record[parent + '_id'] = parent_id
78
79 # Transform record for Singer.io 63 # Transform record for Singer.io
80 with Transformer() as transformer: 64 with Transformer() as transformer:
81 transformed_record = transformer.transform( 65 transformed_record = transformer.transform(
82 record, 66 record,
83 schema, 67 schema,
84 stream_metadata) 68 stream_metadata)
85 # Reset max_bookmark_value to new value if higher 69 write_record(stream_name, transformed_record, time_extracted=time_extracted)
86 if transformed_record.get(bookmark_field): 70 counter.increment()
87 if max_bookmark_value is None or \ 71 return counter.value
88 transformed_record[bookmark_field] > transform_datetime(max_bookmark_value): 72
89 max_bookmark_value = transformed_record[bookmark_field] 73
90 74def sync_stream(stream_name, selected_streams, catalog, state, records):
91 if bookmark_field and (bookmark_field in transformed_record): 75 # Should sheets_loaded be synced?
92 if bookmark_type == 'integer': 76 if stream_name in selected_streams:
93 # Keep only records whose bookmark is after the last_integer 77 LOGGER.info('STARTED Syncing {}'.format(stream_name))
94 if transformed_record[bookmark_field] >= last_integer: 78 update_currently_syncing(state, stream_name)
95 write_record(stream_name, transformed_record, \ 79 write_schema(catalog, stream_name)
96 time_extracted=time_extracted) 80 record_count = process_records(
97 counter.increment() 81 catalog=catalog,
98 elif bookmark_type == 'datetime': 82 stream_name=stream_name,
99 last_dttm = transform_datetime(last_datetime) 83 records=records,
100 bookmark_dttm = transform_datetime(transformed_record[bookmark_field]) 84 time_extracted=utils.now())
101 # Keep only records whose bookmark is after the last_datetime 85 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
102 if bookmark_dttm >= last_dttm: 86 update_currently_syncing(state, None)
103 write_record(stream_name, transformed_record, \
104 time_extracted=time_extracted)
105 counter.increment()
106 else:
107 write_record(stream_name, transformed_record, time_extracted=time_extracted)
108 counter.increment()
109
110 return max_bookmark_value, counter.value
111 87
112 88
113# Currently syncing sets the stream currently being delivered in the state. 89# Currently syncing sets the stream currently being delivered in the state.
@@ -154,14 +130,16 @@ def get_data(stream_name,
154 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( 130 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
155 '{sheet_title}', stream_name) 131 '{sheet_title}', stream_name)
156 data = {} 132 data = {}
133 time_extracted = utils.now()
157 data = client.get( 134 data = client.get(
158 path=path, 135 path=path,
159 api=api, 136 api=api,
160 params=querystring, 137 params=querystring,
161 endpoint=stream_name) 138 endpoint=stream_name)
162 return data 139 return data, time_extracted
163 140
164 141
142# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
165def transform_file_metadata(file_metadata): 143def transform_file_metadata(file_metadata):
166 # Convert to dict 144 # Convert to dict
167 file_metadata_tf = json.loads(json.dumps(file_metadata)) 145 file_metadata_tf = json.loads(json.dumps(file_metadata))
@@ -176,10 +154,11 @@ def transform_file_metadata(file_metadata):
176 return file_metadata_arr 154 return file_metadata_arr
177 155
178 156
157# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
179def transform_spreadsheet_metadata(spreadsheet_metadata): 158def transform_spreadsheet_metadata(spreadsheet_metadata):
180 # Convert to dict 159 # Convert to dict
181 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) 160 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
182 # Remove keys 161 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
183 if spreadsheet_metadata_tf.get('properties'): 162 if spreadsheet_metadata_tf.get('properties'):
184 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) 163 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
185 spreadsheet_metadata_tf.pop('sheets', None) 164 spreadsheet_metadata_tf.pop('sheets', None)
@@ -189,6 +168,7 @@ def transform_spreadsheet_metadata(spreadsheet_metadata):
189 return spreadsheet_metadata_arr 168 return spreadsheet_metadata_arr
190 169
191 170
171# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
192def transform_sheet_metadata(spreadsheet_id, sheet, columns): 172def transform_sheet_metadata(spreadsheet_id, sheet, columns):
193 # Convert to properties to dict 173 # Convert to properties to dict
194 sheet_metadata = sheet.get('properties') 174 sheet_metadata = sheet.get('properties')
@@ -202,6 +182,107 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns):
202 return sheet_metadata_tf 182 return sheet_metadata_tf
203 183
204 184
185# Convert Excel Date Serial Number (excel_date_sn) to datetime string
186# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
187def excel_to_dttm_str(excel_date_sn, timezone_str=None):
188 if not timezone_str:
189 timezone_str = 'UTC'
190 tz = pytz.timezone(timezone_str)
191 sec_per_day = 86400
192 excel_epoch = 25569 # 1970-01-01T00:00:00Z
193 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
194 epoch_dttm = datetime(1970, 1, 1)
195 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
196 utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc)
197 utc_dttm_str = strftime(utc_dttm)
198 return utc_dttm_str
199
200
201# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
202# Convert from array of values to JSON with column names as keys
203def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
204 sheet_data_tf = []
205 is_last_row = False
206 row_num = from_row
207 # Create sorted list of columns based on columnIndex
208 cols = sorted(columns, key = lambda i: i['columnIndex'])
209
210 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
211 for row in sheet_data_rows:
212 # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
213 if row == []:
214 is_last_row = True
215 return sheet_data_tf, row_num - 1, is_last_row
216 sheet_data_row_tf = {}
217 # Add spreadsheet_id, sheet_id, and row
218 sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
219 sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
220 sheet_data_row_tf['__sdc_row'] = row_num
221 col_num = 1
222 for value in row:
223 # Select column metadata based on column index
224 col = cols[col_num - 1]
225 col_skipped = col.get('columnSkipped')
226 if not col_skipped:
227 col_name = col.get('columnName')
228 col_type = col.get('columnType')
229 # Convert dates/times from Lotus Notes Serial Numbers
230 if col_type == 'numberType.DATE_TIME':
231 if isinstance(value, int) or isinstance(value, float):
232 col_val = excel_to_dttm_str(value)
233 else:
234 col_val = str(value)
235 elif col_type == 'numberType.DATE':
236 if isinstance(value, int) or isinstance(value, float):
237 col_val = excel_to_dttm_str(value)[:10]
238 else:
239 col_val = str(value)
240 elif col_type == 'numberType.TIME':
241 if isinstance(value, int) or isinstance(value, float):
242 try:
243 total_secs = value * 86400 # seconds in day
244 col_val = str(timedelta(seconds=total_secs))
245 except ValueError:
246 col_val = str(value)
247 else:
248 col_val = str(value)
249 elif col_type == 'numberType':
250 if isinstance(value, int):
251 col_val = int(value)
252 else:
253 try:
254 col_val = float(value)
255 except ValueError:
256 col_val = str(value)
257 elif col_type == 'stringValue':
258 col_val = str(value)
259 elif col_type == 'boolValue':
260 if isinstance(value, bool):
261 col_val = value
262 elif isinstance(value, str):
263 if value.lower() in ('true', 't', 'yes', 'y'):
264 col_val = True
265 elif value.lower() in ('false', 'f', 'no', 'n'):
266 col_val = False
267 else:
268 col_val = str(value)
269 elif isinstance(value, int):
270 if value == 1 or value == -1:
271 col_val = True
272 elif value == 0:
273 col_val = False
274 else:
275 col_val = str(value)
276
277 else:
278 col_val = value
279 sheet_data_row_tf[col_name] = col_val
280 col_num = col_num + 1
281 sheet_data_tf.append(sheet_data_row_tf)
282 row_num = row_num + 1
283 return sheet_data_tf, row_num, is_last_row
284
285
205def sync(client, config, catalog, state): 286def sync(client, config, catalog, state):
206 start_date = config.get('start_date') 287 start_date = config.get('start_date')
207 spreadsheet_id = config.get('spreadsheet_id') 288 spreadsheet_id = config.get('spreadsheet_id')
@@ -218,63 +299,164 @@ def sync(client, config, catalog, state):
218 if not selected_streams: 299 if not selected_streams:
219 return 300 return
220 301
221 # Get file_metadata 302 # FILE_METADATA
222 file_metadata = {} 303 file_metadata = {}
223 file_metadata_config = STREAMS.get('file_metadata') 304 stream_name = 'file_metadata'
224 file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id) 305 file_metadata_config = STREAMS.get(stream_name)
306
307 # GET file_metadata
308 LOGGER.info('GET file_meatadata')
309 file_metadata, time_extracted = get_data(stream_name=stream_name,
310 endpoint_config=file_metadata_config,
311 client=client,
312 spreadsheet_id=spreadsheet_id)
313 # Transform file_metadata
314 LOGGER.info('Transform file_meatadata')
225 file_metadata_tf = transform_file_metadata(file_metadata) 315 file_metadata_tf = transform_file_metadata(file_metadata)
226 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) 316 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
227 last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date)) 317
318 # Check if file has changed, if not break (return to __init__)
319 last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
228 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) 320 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
229 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) 321 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
230 if this_datetime <= last_datetime: 322 if this_datetime <= last_datetime:
231 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') 323 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
232 return 0 324 return 0
233 325 else:
234 # Get spreadsheet_metadata 326 # Sync file_metadata if selected
327 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf)
328 write_bookmark(state, stream_name, strftime(this_datetime))
329
330 # SPREADSHEET_METADATA
235 spreadsheet_metadata = {} 331 spreadsheet_metadata = {}
236 spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata') 332 stream_name = 'spreadsheet_metadata'
237 spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id) 333 spreadsheet_metadata_config = STREAMS.get(stream_name)
334
335 # GET spreadsheet_metadata
336 LOGGER.info('GET spreadsheet_meatadata')
337 spreadsheet_metadata, ss_time_extracted = get_data(
338 stream_name=stream_name,
339 endpoint_config=spreadsheet_metadata_config,
340 client=client,
341 spreadsheet_id=spreadsheet_id)
342
343 # Transform spreadsheet_metadata
344 LOGGER.info('Transform spreadsheet_meatadata')
238 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) 345 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
239 # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf))
240 346
241 # Get sheet_metadata 347 # Sync spreadsheet_metadata if selected
348 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf)
349
350 # SHEET_METADATA and SHEET_DATA
242 sheets = spreadsheet_metadata.get('sheets') 351 sheets = spreadsheet_metadata.get('sheets')
243 sheet_metadata = [] 352 sheet_metadata = []
244 sheets_loaded = [] 353 sheets_loaded = []
245 sheets_loaded_config = STREAMS['sheets_loaded'] 354 sheets_loaded_config = STREAMS['sheets_loaded']
246 if sheets: 355 if sheets:
356 # Loop thru sheets (worksheet tabs) in spreadsheet
247 for sheet in sheets: 357 for sheet in sheets:
248 sheet_title = sheet.get('properties', {}).get('title') 358 sheet_title = sheet.get('properties', {}).get('title')
359 sheet_id = sheet.get('properties', {}).get('sheetId')
360
361 # GET sheet_metadata and columns
249 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 362 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
363
364 # Transform sheet_metadata
250 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) 365 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
251 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) 366 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
252 sheet_metadata.append(sheet_metadata_tf) 367 sheet_metadata.append(sheet_metadata_tf)
253 368
254 # Determine range of rows and columns for "paging" through batch rows of data 369 # SHEET_DATA
255 sheet_last_col_index = 1 370 # Should this worksheet tab be synced?
256 sheet_last_col_letter = 'A' 371 if sheet_title in selected_streams:
257 for col in columns: 372 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
258 col_index = col.get('columnIndex') 373 update_currently_syncing(state, sheet_title)
259 col_letter = col.get('columnLetter') 374 write_schema(catalog, sheet_title)
260 if col_index > sheet_last_col_index: 375
261 sheet_last_col_index = col_index 376 # Determine max range of columns and rows for "paging" through the data
262 sheet_last_col_letter = col_letter 377 sheet_last_col_index = 1
263 sheet_max_row = sheet.get('gridProperties', {}).get('rowCount') 378 sheet_last_col_letter = 'A'
264 is_empty_row = False 379 for col in columns:
265 batch_rows = 200 380 col_index = col.get('columnIndex')
266 from_row = 2 381 col_letter = col.get('columnLetter')
267 if sheet_max_row < batch_rows: 382 if col_index > sheet_last_col_index:
268 to_row = sheet_max_row 383 sheet_last_col_index = col_index
269 else: 384 sheet_last_col_letter = col_letter
270 to_row = batch_rows 385 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
271 386
272 while not is_empty_row and to_row <= sheet_max_row: 387 # Initialize paging for 1st batch
273 range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row) 388 is_last_row = False
274 389 batch_rows = 200
275 sheet_data = get_data( 390 from_row = 2
276 stream_name=sheet_title, 391 if sheet_max_row < batch_rows:
277 endpoint_config=sheets_loaded_config, 392 to_row = sheet_max_row
278 client=client, 393 else:
279 spreadsheet_id=spreadsheet_id, 394 to_row = batch_rows
280 range_rows=range_rows) 395
396 # Loop thru batches (each having 200 rows of data)
397 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
398 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
399
400 # GET sheet_data for a worksheet tab
401 sheet_data, time_extracted = get_data(
402 stream_name=sheet_title,
403 endpoint_config=sheets_loaded_config,
404 client=client,
405 spreadsheet_id=spreadsheet_id,
406 range_rows=range_rows)
407 # Data is returned as a list of arrays, an array of values for each row
408 sheet_data_rows = sheet_data.get('values')
409
410 # Transform batch of rows to JSON with keys for each column
411 sheet_data_tf, row_num, is_last_row = transform_sheet_data(
412 spreadsheet_id=spreadsheet_id,
413 sheet_id=sheet_id,
414 from_row=from_row,
415 columns=columns,
416 sheet_data_rows=sheet_data_rows)
417 if row_num < to_row:
418 is_last_row = True
419
420 # Process records, send batch of records to target
421 record_count = process_records(
422 catalog=catalog,
423 stream_name=sheet_title,
424 records=sheet_data_tf,
425 time_extracted=ss_time_extracted)
426
427 # Update paging from/to_row for next batch
428 from_row = to_row + 1
429 if to_row + batch_rows > sheet_max_row:
430 to_row = sheet_max_row
431 else:
432 to_row = to_row + batch_rows
433
434 # SHEETS_LOADED
435 # Add sheet to sheets_loaded
436 sheet_loaded = {}
437 sheet_loaded['spreadsheetId'] = spreadsheet_id
438 sheet_loaded['sheetId'] = sheet_id
439 sheet_loaded['title'] = sheet_title
440 sheet_loaded['loadDate'] = strftime(utils.now())
441 sheet_loaded['lastRowNumber'] = row_num
442 sheets_loaded.append(sheet_loaded)
443
444 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
445 # This forces hard deletes on the data downstream if fewer records are sent for a sheet.
446 # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167
447 activate_version_message = singer.ActivateVersionMessage(
448 stream=sheet_title,
449 version=int(time.time() * 1000))
450 singer.write_message(activate_version_message)
451
452 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
453 sheet_title, row_num - 1))
454 update_currently_syncing(state, None)
455
456 stream_name = 'sheet_metadata'
457 # Sync sheet_metadata if selected
458 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
459
460 stream_name = 'sheets_loaded'
461 # Sync sheet_metadata if selected
462 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)