aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r--tap_google_sheets/sync.py65
1 files changed, 37 insertions, 28 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
index 79e05f9..d7d7184 100644
--- a/tap_google_sheets/sync.py
+++ b/tap_google_sheets/sync.py
@@ -1,10 +1,9 @@
1import time 1import time
2import math 2import math
3import singer
4import json 3import json
5import pytz
6from datetime import datetime, timedelta 4from datetime import datetime, timedelta
7from collections import OrderedDict 5import pytz
6import singer
8from singer import metrics, metadata, Transformer, utils 7from singer import metrics, metadata, Transformer, utils
9from singer.utils import strptime_to_utc, strftime 8from singer.utils import strptime_to_utc, strftime
10from tap_google_sheets.streams import STREAMS 9from tap_google_sheets.streams import STREAMS
@@ -71,17 +70,21 @@ def process_records(catalog,
71 return counter.value 70 return counter.value
72 71
73 72
74def sync_stream(stream_name, selected_streams, catalog, state, records): 73def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
75 # Should sheets_loaded be synced? 74 # Should sheets_loaded be synced?
76 if stream_name in selected_streams: 75 if stream_name in selected_streams:
77 LOGGER.info('STARTED Syncing {}'.format(stream_name)) 76 LOGGER.info('STARTED Syncing {}'.format(stream_name))
78 update_currently_syncing(state, stream_name) 77 update_currently_syncing(state, stream_name)
78 selected_fields = get_selected_fields(catalog, stream_name)
79 LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
79 write_schema(catalog, stream_name) 80 write_schema(catalog, stream_name)
81 if not time_extracted:
82 time_extracted = utils.now()
80 record_count = process_records( 83 record_count = process_records(
81 catalog=catalog, 84 catalog=catalog,
82 stream_name=stream_name, 85 stream_name=stream_name,
83 records=records, 86 records=records,
84 time_extracted=utils.now()) 87 time_extracted=time_extracted)
85 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) 88 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
86 update_currently_syncing(state, None) 89 update_currently_syncing(state, None)
87 90
@@ -105,9 +108,9 @@ def get_selected_fields(catalog, stream_name):
105 mdata_list = singer.metadata.to_list(mdata) 108 mdata_list = singer.metadata.to_list(mdata)
106 selected_fields = [] 109 selected_fields = []
107 for entry in mdata_list: 110 for entry in mdata_list:
108 field = None 111 field = None
109 try: 112 try:
110 field = entry['breadcrumb'][1] 113 field = entry['breadcrumb'][1]
111 if entry.get('metadata', {}).get('selected', False): 114 if entry.get('metadata', {}).get('selected', False):
112 selected_fields.append(field) 115 selected_fields.append(field)
113 except IndexError: 116 except IndexError:
@@ -172,7 +175,7 @@ def transform_spreadsheet_metadata(spreadsheet_metadata):
172def transform_sheet_metadata(spreadsheet_id, sheet, columns): 175def transform_sheet_metadata(spreadsheet_id, sheet, columns):
173 # Convert to properties to dict 176 # Convert to properties to dict
174 sheet_metadata = sheet.get('properties') 177 sheet_metadata = sheet.get('properties')
175 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) 178 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
176 sheet_id = sheet_metadata_tf.get('sheetId') 179 sheet_id = sheet_metadata_tf.get('sheetId')
177 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( 180 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
178 spreadsheet_id, sheet_id) 181 spreadsheet_id, sheet_id)
@@ -187,13 +190,13 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns):
187def excel_to_dttm_str(excel_date_sn, timezone_str=None): 190def excel_to_dttm_str(excel_date_sn, timezone_str=None):
188 if not timezone_str: 191 if not timezone_str:
189 timezone_str = 'UTC' 192 timezone_str = 'UTC'
190 tz = pytz.timezone(timezone_str) 193 tzn = pytz.timezone(timezone_str)
191 sec_per_day = 86400 194 sec_per_day = 86400
192 excel_epoch = 25569 # 1970-01-01T00:00:00Z 195 excel_epoch = 25569 # 1970-01-01T00:00:00Z
193 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) 196 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
194 epoch_dttm = datetime(1970, 1, 1) 197 epoch_dttm = datetime(1970, 1, 1)
195 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) 198 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
196 utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc) 199 utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
197 utc_dttm_str = strftime(utc_dttm) 200 utc_dttm_str = strftime(utc_dttm)
198 return utc_dttm_str 201 return utc_dttm_str
199 202
@@ -205,7 +208,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
205 is_last_row = False 208 is_last_row = False
206 row_num = from_row 209 row_num = from_row
207 # Create sorted list of columns based on columnIndex 210 # Create sorted list of columns based on columnIndex
208 cols = sorted(columns, key = lambda i: i['columnIndex']) 211 cols = sorted(columns, key=lambda i: i['columnIndex'])
209 212
210 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) 213 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
211 for row in sheet_data_rows: 214 for row in sheet_data_rows:
@@ -228,17 +231,17 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
228 col_type = col.get('columnType') 231 col_type = col.get('columnType')
229 # Convert dates/times from Lotus Notes Serial Numbers 232 # Convert dates/times from Lotus Notes Serial Numbers
230 if col_type == 'numberType.DATE_TIME': 233 if col_type == 'numberType.DATE_TIME':
231 if isinstance(value, int) or isinstance(value, float): 234 if isinstance(value, (int, float)):
232 col_val = excel_to_dttm_str(value) 235 col_val = excel_to_dttm_str(value)
233 else: 236 else:
234 col_val = str(value) 237 col_val = str(value)
235 elif col_type == 'numberType.DATE': 238 elif col_type == 'numberType.DATE':
236 if isinstance(value, int) or isinstance(value, float): 239 if isinstance(value, (int, float)):
237 col_val = excel_to_dttm_str(value)[:10] 240 col_val = excel_to_dttm_str(value)[:10]
238 else: 241 else:
239 col_val = str(value) 242 col_val = str(value)
240 elif col_type == 'numberType.TIME': 243 elif col_type == 'numberType.TIME':
241 if isinstance(value, int) or isinstance(value, float): 244 if isinstance(value, (int, float)):
242 try: 245 try:
243 total_secs = value * 86400 # seconds in day 246 total_secs = value * 86400 # seconds in day
244 col_val = str(timedelta(seconds=total_secs)) 247 col_val = str(timedelta(seconds=total_secs))
@@ -267,7 +270,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
267 else: 270 else:
268 col_val = str(value) 271 col_val = str(value)
269 elif isinstance(value, int): 272 elif isinstance(value, int):
270 if value == 1 or value == -1: 273 if value in (1, -1):
271 col_val = True 274 col_val = True
272 elif value == 0: 275 elif value == 0:
273 col_val = False 276 col_val = False
@@ -321,11 +324,10 @@ def sync(client, config, catalog, state):
321 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) 324 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
322 if this_datetime <= last_datetime: 325 if this_datetime <= last_datetime:
323 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') 326 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
324 return 0 327 return
325 else: 328 # Sync file_metadata if selected
326 # Sync file_metadata if selected 329 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
327 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf) 330 write_bookmark(state, stream_name, strftime(this_datetime))
328 write_bookmark(state, stream_name, strftime(this_datetime))
329 331
330 # SPREADSHEET_METADATA 332 # SPREADSHEET_METADATA
331 spreadsheet_metadata = {} 333 spreadsheet_metadata = {}
@@ -334,7 +336,7 @@ def sync(client, config, catalog, state):
334 336
335 # GET spreadsheet_metadata 337 # GET spreadsheet_metadata
336 LOGGER.info('GET spreadsheet_meatadata') 338 LOGGER.info('GET spreadsheet_meatadata')
337 spreadsheet_metadata, ss_time_extracted = get_data( 339 spreadsheet_metadata, ss_time_extracted = get_data(
338 stream_name=stream_name, 340 stream_name=stream_name,
339 endpoint_config=spreadsheet_metadata_config, 341 endpoint_config=spreadsheet_metadata_config,
340 client=client, 342 client=client,
@@ -345,7 +347,8 @@ def sync(client, config, catalog, state):
345 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) 347 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
346 348
347 # Sync spreadsheet_metadata if selected 349 # Sync spreadsheet_metadata if selected
348 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf) 350 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
351 ss_time_extracted)
349 352
350 # SHEET_METADATA and SHEET_DATA 353 # SHEET_METADATA and SHEET_DATA
351 sheets = spreadsheet_metadata.get('sheets') 354 sheets = spreadsheet_metadata.get('sheets')
@@ -360,6 +363,7 @@ def sync(client, config, catalog, state):
360 363
361 # GET sheet_metadata and columns 364 # GET sheet_metadata and columns
362 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 365 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
366 LOGGER.info('sheet_schema: {}'.format(sheet_schema))
363 367
364 # Transform sheet_metadata 368 # Transform sheet_metadata
365 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) 369 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
@@ -371,6 +375,8 @@ def sync(client, config, catalog, state):
371 if sheet_title in selected_streams: 375 if sheet_title in selected_streams:
372 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) 376 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
373 update_currently_syncing(state, sheet_title) 377 update_currently_syncing(state, sheet_title)
378 selected_fields = get_selected_fields(catalog, sheet_title)
379 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
374 write_schema(catalog, sheet_title) 380 write_schema(catalog, sheet_title)
375 381
376 # Determine max range of columns and rows for "paging" through the data 382 # Determine max range of columns and rows for "paging" through the data
@@ -396,7 +402,7 @@ def sync(client, config, catalog, state):
396 # Loop thru batches (each having 200 rows of data) 402 # Loop thru batches (each having 200 rows of data)
397 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: 403 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
398 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) 404 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
399 405
400 # GET sheet_data for a worksheet tab 406 # GET sheet_data for a worksheet tab
401 sheet_data, time_extracted = get_data( 407 sheet_data, time_extracted = get_data(
402 stream_name=sheet_title, 408 stream_name=sheet_title,
@@ -423,7 +429,9 @@ def sync(client, config, catalog, state):
423 stream_name=sheet_title, 429 stream_name=sheet_title,
424 records=sheet_data_tf, 430 records=sheet_data_tf,
425 time_extracted=ss_time_extracted) 431 time_extracted=ss_time_extracted)
426 432 LOGGER.info('Sheet: {}, ecords processed: {}'.format(
433 sheet_title, record_count))
434
427 # Update paging from/to_row for next batch 435 # Update paging from/to_row for next batch
428 from_row = to_row + 1 436 from_row = to_row + 1
429 if to_row + batch_rows > sheet_max_row: 437 if to_row + batch_rows > sheet_max_row:
@@ -442,8 +450,8 @@ def sync(client, config, catalog, state):
442 sheets_loaded.append(sheet_loaded) 450 sheets_loaded.append(sheet_loaded)
443 451
444 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. 452 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
445 # This forces hard deletes on the data downstream if fewer records are sent for a sheet. 453 # This forces hard deletes on the data downstream if fewer records are sent.
446 # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167 454 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
447 activate_version_message = singer.ActivateVersionMessage( 455 activate_version_message = singer.ActivateVersionMessage(
448 stream=sheet_title, 456 stream=sheet_title,
449 version=int(time.time() * 1000)) 457 version=int(time.time() * 1000))
@@ -451,12 +459,13 @@ def sync(client, config, catalog, state):
451 459
452 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( 460 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
453 sheet_title, row_num - 1)) 461 sheet_title, row_num - 1))
454 update_currently_syncing(state, None)
455 462
456 stream_name = 'sheet_metadata' 463 stream_name = 'sheet_metadata'
457 # Sync sheet_metadata if selected 464 # Sync sheet_metadata if selected
458 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) 465 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
459 466
460 stream_name = 'sheets_loaded' 467 stream_name = 'sheets_loaded'
461 # Sync sheet_metadata if selected 468 # Sync sheet_metadata if selected
462 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) 469 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
470
471 return