aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets/sync.py
diff options
context:
space:
mode:
authorJeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com>2020-02-24 09:53:26 -0800
committerGitHub <noreply@github.com>2020-02-24 12:53:26 -0500
commit376f1145837541d4fff2ad0e499236761f9873c3 (patch)
treecc086f18b24bda8a86c16c3ec742b89947f382ae /tap_google_sheets/sync.py
parentf1d1d43c6b74a8705e91e908c582e39c68464c0c (diff)
downloadtap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.tar.gz
tap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.tar.zst
tap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.zip
v.0.0.4 Logic to skip empty sheets (#4)v0.0.4
* v.0.0.2 schema and sync changes Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py * v.0.0.3 Sync activate version and error handling Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages. * v.0.0.4 Skip empty worksheets Add logic to skip empty worksheets in Discovery and Sync mode. * schema.py fix number datatype issue Nomber datatypes are being created as strings in targets. The JSON schema order needs to be adjusted so that order is null, number, string.
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r--tap_google_sheets/sync.py214
1 files changed, 109 insertions, 105 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
index 311281c..b77eab3 100644
--- a/tap_google_sheets/sync.py
+++ b/tap_google_sheets/sync.py
@@ -429,113 +429,117 @@ def sync(client, config, catalog, state):
429 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 429 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
430 # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) 430 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
431 431
432 # Transform sheet_metadata 432 # SKIP empty sheets (where sheet_schema and columns are None)
433 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) 433 if not sheet_schema or not columns:
434 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) 434 LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
435 sheet_metadata.append(sheet_metadata_tf) 435 else:
436 436 # Transform sheet_metadata
437 # SHEET_DATA 437 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
438 # Should this worksheet tab be synced? 438 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
439 if sheet_title in selected_streams: 439 sheet_metadata.append(sheet_metadata_tf)
440 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) 440
441 update_currently_syncing(state, sheet_title) 441 # SHEET_DATA
442 selected_fields = get_selected_fields(catalog, sheet_title) 442 # Should this worksheet tab be synced?
443 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) 443 if sheet_title in selected_streams:
444 write_schema(catalog, sheet_title) 444 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
445 445 update_currently_syncing(state, sheet_title)
446 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) 446 selected_fields = get_selected_fields(catalog, sheet_title)
447 # everytime after each sheet sync is complete. 447 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
448 # This forces hard deletes on the data downstream if fewer records are sent. 448 write_schema(catalog, sheet_title)
449 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 449
450 last_integer = int(get_bookmark(state, sheet_title, 0)) 450 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
451 activate_version = int(time.time() * 1000) 451 # everytime after each sheet sync is complete.
452 activate_version_message = singer.ActivateVersionMessage( 452 # This forces hard deletes on the data downstream if fewer records are sent.
453 stream=sheet_title, 453 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
454 version=activate_version) 454 last_integer = int(get_bookmark(state, sheet_title, 0))
455 if last_integer == 0: 455 activate_version = int(time.time() * 1000)
456 # initial load, send activate_version before AND after data sync 456 activate_version_message = singer.ActivateVersionMessage(
457 singer.write_message(activate_version_message) 457 stream=sheet_title,
458 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) 458 version=activate_version)
459 459 if last_integer == 0:
460 # Determine max range of columns and rows for "paging" through the data 460 # initial load, send activate_version before AND after data sync
461 sheet_last_col_index = 1 461 singer.write_message(activate_version_message)
462 sheet_last_col_letter = 'A' 462 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
463 for col in columns: 463
464 col_index = col.get('columnIndex') 464 # Determine max range of columns and rows for "paging" through the data
465 col_letter = col.get('columnLetter') 465 sheet_last_col_index = 1
466 if col_index > sheet_last_col_index: 466 sheet_last_col_letter = 'A'
467 sheet_last_col_index = col_index 467 for col in columns:
468 sheet_last_col_letter = col_letter 468 col_index = col.get('columnIndex')
469 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') 469 col_letter = col.get('columnLetter')
470 470 if col_index > sheet_last_col_index:
471 # Initialize paging for 1st batch 471 sheet_last_col_index = col_index
472 is_last_row = False 472 sheet_last_col_letter = col_letter
473 batch_rows = 200 473 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
474 from_row = 2 474
475 if sheet_max_row < batch_rows: 475 # Initialize paging for 1st batch
476 to_row = sheet_max_row 476 is_last_row = False
477 else: 477 batch_rows = 200
478 to_row = batch_rows 478 from_row = 2
479 479 if sheet_max_row < batch_rows:
480 # Loop thru batches (each having 200 rows of data)
481 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
482 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
483
484 # GET sheet_data for a worksheet tab
485 sheet_data, time_extracted = get_data(
486 stream_name=sheet_title,
487 endpoint_config=sheets_loaded_config,
488 client=client,
489 spreadsheet_id=spreadsheet_id,
490 range_rows=range_rows)
491 # Data is returned as a list of arrays, an array of values for each row
492 sheet_data_rows = sheet_data.get('values')
493
494 # Transform batch of rows to JSON with keys for each column
495 sheet_data_tf, row_num = transform_sheet_data(
496 spreadsheet_id=spreadsheet_id,
497 sheet_id=sheet_id,
498 sheet_title=sheet_title,
499 from_row=from_row,
500 columns=columns,
501 sheet_data_rows=sheet_data_rows)
502 if row_num < to_row:
503 is_last_row = True
504
505 # Process records, send batch of records to target
506 record_count = process_records(
507 catalog=catalog,
508 stream_name=sheet_title,
509 records=sheet_data_tf,
510 time_extracted=ss_time_extracted,
511 version=activate_version)
512 LOGGER.info('Sheet: {}, records processed: {}'.format(
513 sheet_title, record_count))
514
515 # Update paging from/to_row for next batch
516 from_row = to_row + 1
517 if to_row + batch_rows > sheet_max_row:
518 to_row = sheet_max_row 480 to_row = sheet_max_row
519 else: 481 else:
520 to_row = to_row + batch_rows 482 to_row = batch_rows
521 483
522 # End of Stream: Send Activate Version and update State 484 # Loop thru batches (each having 200 rows of data)
523 singer.write_message(activate_version_message) 485 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
524 write_bookmark(state, sheet_title, activate_version) 486 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
525 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) 487
526 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( 488 # GET sheet_data for a worksheet tab
527 sheet_title, row_num - 2)) # subtract 1 for header row 489 sheet_data, time_extracted = get_data(
528 update_currently_syncing(state, None) 490 stream_name=sheet_title,
529 491 endpoint_config=sheets_loaded_config,
530 # SHEETS_LOADED 492 client=client,
531 # Add sheet to sheets_loaded 493 spreadsheet_id=spreadsheet_id,
532 sheet_loaded = {} 494 range_rows=range_rows)
533 sheet_loaded['spreadsheetId'] = spreadsheet_id 495 # Data is returned as a list of arrays, an array of values for each row
534 sheet_loaded['sheetId'] = sheet_id 496 sheet_data_rows = sheet_data.get('values')
535 sheet_loaded['title'] = sheet_title 497
536 sheet_loaded['loadDate'] = strftime(utils.now()) 498 # Transform batch of rows to JSON with keys for each column
537 sheet_loaded['lastRowNumber'] = row_num 499 sheet_data_tf, row_num = transform_sheet_data(
538 sheets_loaded.append(sheet_loaded) 500 spreadsheet_id=spreadsheet_id,
501 sheet_id=sheet_id,
502 sheet_title=sheet_title,
503 from_row=from_row,
504 columns=columns,
505 sheet_data_rows=sheet_data_rows)
506 if row_num < to_row:
507 is_last_row = True
508
509 # Process records, send batch of records to target
510 record_count = process_records(
511 catalog=catalog,
512 stream_name=sheet_title,
513 records=sheet_data_tf,
514 time_extracted=ss_time_extracted,
515 version=activate_version)
516 LOGGER.info('Sheet: {}, records processed: {}'.format(
517 sheet_title, record_count))
518
519 # Update paging from/to_row for next batch
520 from_row = to_row + 1
521 if to_row + batch_rows > sheet_max_row:
522 to_row = sheet_max_row
523 else:
524 to_row = to_row + batch_rows
525
526 # End of Stream: Send Activate Version and update State
527 singer.write_message(activate_version_message)
528 write_bookmark(state, sheet_title, activate_version)
529 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
530 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
531 sheet_title, row_num - 2)) # subtract 1 for header row
532 update_currently_syncing(state, None)
533
534 # SHEETS_LOADED
535 # Add sheet to sheets_loaded
536 sheet_loaded = {}
537 sheet_loaded['spreadsheetId'] = spreadsheet_id
538 sheet_loaded['sheetId'] = sheet_id
539 sheet_loaded['title'] = sheet_title
540 sheet_loaded['loadDate'] = strftime(utils.now())
541 sheet_loaded['lastRowNumber'] = row_num
542 sheets_loaded.append(sheet_loaded)
539 543
540 stream_name = 'sheet_metadata' 544 stream_name = 'sheet_metadata'
541 # Sync sheet_metadata if selected 545 # Sync sheet_metadata if selected