diff options
author | Jeff Huth <jeff.huth@bytecode.io> | 2019-11-15 01:58:55 -0800 |
---|---|---|
committer | Jeff Huth <jeff.huth@bytecode.io> | 2019-11-15 01:58:55 -0800 |
commit | 99424fee5ba6ff830df39be8f47c3e3d685b444a (patch) | |
tree | 08ace54622092921ab7253946515ce5d3dcf0a66 /tap_google_sheets/sync.py | |
parent | da690bda91ea6a14964a2378e5dbb5d4de91a7e2 (diff) | |
download | tap-google-sheets-99424fee5ba6ff830df39be8f47c3e3d685b444a.tar.gz tap-google-sheets-99424fee5ba6ff830df39be8f47c3e3d685b444a.tar.zst tap-google-sheets-99424fee5ba6ff830df39be8f47c3e3d685b444a.zip |
pylint and testingv0.0.1
pylint and testing
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r-- | tap_google_sheets/sync.py | 65 |
1 files changed, 37 insertions, 28 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 79e05f9..d7d7184 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -1,10 +1,9 @@ | |||
1 | import time | 1 | import time |
2 | import math | 2 | import math |
3 | import singer | ||
4 | import json | 3 | import json |
5 | import pytz | ||
6 | from datetime import datetime, timedelta | 4 | from datetime import datetime, timedelta |
7 | from collections import OrderedDict | 5 | import pytz |
6 | import singer | ||
8 | from singer import metrics, metadata, Transformer, utils | 7 | from singer import metrics, metadata, Transformer, utils |
9 | from singer.utils import strptime_to_utc, strftime | 8 | from singer.utils import strptime_to_utc, strftime |
10 | from tap_google_sheets.streams import STREAMS | 9 | from tap_google_sheets.streams import STREAMS |
@@ -71,17 +70,21 @@ def process_records(catalog, | |||
71 | return counter.value | 70 | return counter.value |
72 | 71 | ||
73 | 72 | ||
74 | def sync_stream(stream_name, selected_streams, catalog, state, records): | 73 | def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None): |
75 | # Should sheets_loaded be synced? | 74 | # Should sheets_loaded be synced? |
76 | if stream_name in selected_streams: | 75 | if stream_name in selected_streams: |
77 | LOGGER.info('STARTED Syncing {}'.format(stream_name)) | 76 | LOGGER.info('STARTED Syncing {}'.format(stream_name)) |
78 | update_currently_syncing(state, stream_name) | 77 | update_currently_syncing(state, stream_name) |
78 | selected_fields = get_selected_fields(catalog, stream_name) | ||
79 | LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields)) | ||
79 | write_schema(catalog, stream_name) | 80 | write_schema(catalog, stream_name) |
81 | if not time_extracted: | ||
82 | time_extracted = utils.now() | ||
80 | record_count = process_records( | 83 | record_count = process_records( |
81 | catalog=catalog, | 84 | catalog=catalog, |
82 | stream_name=stream_name, | 85 | stream_name=stream_name, |
83 | records=records, | 86 | records=records, |
84 | time_extracted=utils.now()) | 87 | time_extracted=time_extracted) |
85 | LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) | 88 | LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) |
86 | update_currently_syncing(state, None) | 89 | update_currently_syncing(state, None) |
87 | 90 | ||
@@ -105,9 +108,9 @@ def get_selected_fields(catalog, stream_name): | |||
105 | mdata_list = singer.metadata.to_list(mdata) | 108 | mdata_list = singer.metadata.to_list(mdata) |
106 | selected_fields = [] | 109 | selected_fields = [] |
107 | for entry in mdata_list: | 110 | for entry in mdata_list: |
108 | field = None | 111 | field = None |
109 | try: | 112 | try: |
110 | field = entry['breadcrumb'][1] | 113 | field = entry['breadcrumb'][1] |
111 | if entry.get('metadata', {}).get('selected', False): | 114 | if entry.get('metadata', {}).get('selected', False): |
112 | selected_fields.append(field) | 115 | selected_fields.append(field) |
113 | except IndexError: | 116 | except IndexError: |
@@ -172,7 +175,7 @@ def transform_spreadsheet_metadata(spreadsheet_metadata): | |||
172 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): | 175 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): |
173 | # Convert to properties to dict | 176 | # Convert to properties to dict |
174 | sheet_metadata = sheet.get('properties') | 177 | sheet_metadata = sheet.get('properties') |
175 | sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) | 178 | sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) |
176 | sheet_id = sheet_metadata_tf.get('sheetId') | 179 | sheet_id = sheet_metadata_tf.get('sheetId') |
177 | sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( | 180 | sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( |
178 | spreadsheet_id, sheet_id) | 181 | spreadsheet_id, sheet_id) |
@@ -187,13 +190,13 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns): | |||
187 | def excel_to_dttm_str(excel_date_sn, timezone_str=None): | 190 | def excel_to_dttm_str(excel_date_sn, timezone_str=None): |
188 | if not timezone_str: | 191 | if not timezone_str: |
189 | timezone_str = 'UTC' | 192 | timezone_str = 'UTC' |
190 | tz = pytz.timezone(timezone_str) | 193 | tzn = pytz.timezone(timezone_str) |
191 | sec_per_day = 86400 | 194 | sec_per_day = 86400 |
192 | excel_epoch = 25569 # 1970-01-01T00:00:00Z | 195 | excel_epoch = 25569 # 1970-01-01T00:00:00Z |
193 | epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) | 196 | epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) |
194 | epoch_dttm = datetime(1970, 1, 1) | 197 | epoch_dttm = datetime(1970, 1, 1) |
195 | excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) | 198 | excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) |
196 | utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc) | 199 | utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc) |
197 | utc_dttm_str = strftime(utc_dttm) | 200 | utc_dttm_str = strftime(utc_dttm) |
198 | return utc_dttm_str | 201 | return utc_dttm_str |
199 | 202 | ||
@@ -205,7 +208,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
205 | is_last_row = False | 208 | is_last_row = False |
206 | row_num = from_row | 209 | row_num = from_row |
207 | # Create sorted list of columns based on columnIndex | 210 | # Create sorted list of columns based on columnIndex |
208 | cols = sorted(columns, key = lambda i: i['columnIndex']) | 211 | cols = sorted(columns, key=lambda i: i['columnIndex']) |
209 | 212 | ||
210 | # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) | 213 | # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) |
211 | for row in sheet_data_rows: | 214 | for row in sheet_data_rows: |
@@ -228,17 +231,17 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
228 | col_type = col.get('columnType') | 231 | col_type = col.get('columnType') |
229 | # Convert dates/times from Lotus Notes Serial Numbers | 232 | # Convert dates/times from Lotus Notes Serial Numbers |
230 | if col_type == 'numberType.DATE_TIME': | 233 | if col_type == 'numberType.DATE_TIME': |
231 | if isinstance(value, int) or isinstance(value, float): | 234 | if isinstance(value, (int, float)): |
232 | col_val = excel_to_dttm_str(value) | 235 | col_val = excel_to_dttm_str(value) |
233 | else: | 236 | else: |
234 | col_val = str(value) | 237 | col_val = str(value) |
235 | elif col_type == 'numberType.DATE': | 238 | elif col_type == 'numberType.DATE': |
236 | if isinstance(value, int) or isinstance(value, float): | 239 | if isinstance(value, (int, float)): |
237 | col_val = excel_to_dttm_str(value)[:10] | 240 | col_val = excel_to_dttm_str(value)[:10] |
238 | else: | 241 | else: |
239 | col_val = str(value) | 242 | col_val = str(value) |
240 | elif col_type == 'numberType.TIME': | 243 | elif col_type == 'numberType.TIME': |
241 | if isinstance(value, int) or isinstance(value, float): | 244 | if isinstance(value, (int, float)): |
242 | try: | 245 | try: |
243 | total_secs = value * 86400 # seconds in day | 246 | total_secs = value * 86400 # seconds in day |
244 | col_val = str(timedelta(seconds=total_secs)) | 247 | col_val = str(timedelta(seconds=total_secs)) |
@@ -267,7 +270,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
267 | else: | 270 | else: |
268 | col_val = str(value) | 271 | col_val = str(value) |
269 | elif isinstance(value, int): | 272 | elif isinstance(value, int): |
270 | if value == 1 or value == -1: | 273 | if value in (1, -1): |
271 | col_val = True | 274 | col_val = True |
272 | elif value == 0: | 275 | elif value == 0: |
273 | col_val = False | 276 | col_val = False |
@@ -321,11 +324,10 @@ def sync(client, config, catalog, state): | |||
321 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) | 324 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) |
322 | if this_datetime <= last_datetime: | 325 | if this_datetime <= last_datetime: |
323 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') | 326 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') |
324 | return 0 | 327 | return |
325 | else: | 328 | # Sync file_metadata if selected |
326 | # Sync file_metadata if selected | 329 | sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted) |
327 | sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf) | 330 | write_bookmark(state, stream_name, strftime(this_datetime)) |
328 | write_bookmark(state, stream_name, strftime(this_datetime)) | ||
329 | 331 | ||
330 | # SPREADSHEET_METADATA | 332 | # SPREADSHEET_METADATA |
331 | spreadsheet_metadata = {} | 333 | spreadsheet_metadata = {} |
@@ -334,7 +336,7 @@ def sync(client, config, catalog, state): | |||
334 | 336 | ||
335 | # GET spreadsheet_metadata | 337 | # GET spreadsheet_metadata |
336 | LOGGER.info('GET spreadsheet_meatadata') | 338 | LOGGER.info('GET spreadsheet_meatadata') |
337 | spreadsheet_metadata, ss_time_extracted = get_data( | 339 | spreadsheet_metadata, ss_time_extracted = get_data( |
338 | stream_name=stream_name, | 340 | stream_name=stream_name, |
339 | endpoint_config=spreadsheet_metadata_config, | 341 | endpoint_config=spreadsheet_metadata_config, |
340 | client=client, | 342 | client=client, |
@@ -345,7 +347,8 @@ def sync(client, config, catalog, state): | |||
345 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) | 347 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) |
346 | 348 | ||
347 | # Sync spreadsheet_metadata if selected | 349 | # Sync spreadsheet_metadata if selected |
348 | sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf) | 350 | sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \ |
351 | ss_time_extracted) | ||
349 | 352 | ||
350 | # SHEET_METADATA and SHEET_DATA | 353 | # SHEET_METADATA and SHEET_DATA |
351 | sheets = spreadsheet_metadata.get('sheets') | 354 | sheets = spreadsheet_metadata.get('sheets') |
@@ -360,6 +363,7 @@ def sync(client, config, catalog, state): | |||
360 | 363 | ||
361 | # GET sheet_metadata and columns | 364 | # GET sheet_metadata and columns |
362 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 365 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
366 | LOGGER.info('sheet_schema: {}'.format(sheet_schema)) | ||
363 | 367 | ||
364 | # Transform sheet_metadata | 368 | # Transform sheet_metadata |
365 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) | 369 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) |
@@ -371,6 +375,8 @@ def sync(client, config, catalog, state): | |||
371 | if sheet_title in selected_streams: | 375 | if sheet_title in selected_streams: |
372 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) | 376 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) |
373 | update_currently_syncing(state, sheet_title) | 377 | update_currently_syncing(state, sheet_title) |
378 | selected_fields = get_selected_fields(catalog, sheet_title) | ||
379 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | ||
374 | write_schema(catalog, sheet_title) | 380 | write_schema(catalog, sheet_title) |
375 | 381 | ||
376 | # Determine max range of columns and rows for "paging" through the data | 382 | # Determine max range of columns and rows for "paging" through the data |
@@ -396,7 +402,7 @@ def sync(client, config, catalog, state): | |||
396 | # Loop thru batches (each having 200 rows of data) | 402 | # Loop thru batches (each having 200 rows of data) |
397 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: | 403 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: |
398 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) | 404 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) |
399 | 405 | ||
400 | # GET sheet_data for a worksheet tab | 406 | # GET sheet_data for a worksheet tab |
401 | sheet_data, time_extracted = get_data( | 407 | sheet_data, time_extracted = get_data( |
402 | stream_name=sheet_title, | 408 | stream_name=sheet_title, |
@@ -423,7 +429,9 @@ def sync(client, config, catalog, state): | |||
423 | stream_name=sheet_title, | 429 | stream_name=sheet_title, |
424 | records=sheet_data_tf, | 430 | records=sheet_data_tf, |
425 | time_extracted=ss_time_extracted) | 431 | time_extracted=ss_time_extracted) |
426 | 432 | LOGGER.info('Sheet: {}, ecords processed: {}'.format( | |
433 | sheet_title, record_count)) | ||
434 | |||
427 | # Update paging from/to_row for next batch | 435 | # Update paging from/to_row for next batch |
428 | from_row = to_row + 1 | 436 | from_row = to_row + 1 |
429 | if to_row + batch_rows > sheet_max_row: | 437 | if to_row + batch_rows > sheet_max_row: |
@@ -442,8 +450,8 @@ def sync(client, config, catalog, state): | |||
442 | sheets_loaded.append(sheet_loaded) | 450 | sheets_loaded.append(sheet_loaded) |
443 | 451 | ||
444 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. | 452 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. |
445 | # This forces hard deletes on the data downstream if fewer records are sent for a sheet. | 453 | # This forces hard deletes on the data downstream if fewer records are sent. |
446 | # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167 | 454 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 |
447 | activate_version_message = singer.ActivateVersionMessage( | 455 | activate_version_message = singer.ActivateVersionMessage( |
448 | stream=sheet_title, | 456 | stream=sheet_title, |
449 | version=int(time.time() * 1000)) | 457 | version=int(time.time() * 1000)) |
@@ -451,12 +459,13 @@ def sync(client, config, catalog, state): | |||
451 | 459 | ||
452 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | 460 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( |
453 | sheet_title, row_num - 1)) | 461 | sheet_title, row_num - 1)) |
454 | update_currently_syncing(state, None) | ||
455 | 462 | ||
456 | stream_name = 'sheet_metadata' | 463 | stream_name = 'sheet_metadata' |
457 | # Sync sheet_metadata if selected | 464 | # Sync sheet_metadata if selected |
458 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | 465 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) |
459 | 466 | ||
460 | stream_name = 'sheets_loaded' | 467 | stream_name = 'sheets_loaded' |
461 | # Sync sheet_metadata if selected | 468 | # Sync sheet_metadata if selected |
462 | sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) | 469 | sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) |
470 | |||
471 | return | ||