diff options
author | Jeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com> | 2020-02-24 09:53:26 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-24 12:53:26 -0500 |
commit | 376f1145837541d4fff2ad0e499236761f9873c3 (patch) | |
tree | cc086f18b24bda8a86c16c3ec742b89947f382ae /tap_google_sheets/sync.py | |
parent | f1d1d43c6b74a8705e91e908c582e39c68464c0c (diff) | |
download | tap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.tar.gz tap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.tar.zst tap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.zip |
v.0.0.4 Logic to skip empty sheets (#4)v0.0.4
* v.0.0.2 schema and sync changes
Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py
* v.0.0.3 Sync activate version and error handling
Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages.
* v.0.0.4 Skip empty worksheets
Add logic to skip empty worksheets in Discovery and Sync mode.
* schema.py fix number datatype issue
Nomber datatypes are being created as strings in targets. The JSON schema order needs to be adjusted so that order is null, number, string.
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r-- | tap_google_sheets/sync.py | 214 |
1 files changed, 109 insertions, 105 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 311281c..b77eab3 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -429,113 +429,117 @@ def sync(client, config, catalog, state): | |||
429 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 429 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
430 | # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) | 430 | # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) |
431 | 431 | ||
432 | # Transform sheet_metadata | 432 | # SKIP empty sheets (where sheet_schema and columns are None) |
433 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) | 433 | if not sheet_schema or not columns: |
434 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) | 434 | LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title)) |
435 | sheet_metadata.append(sheet_metadata_tf) | 435 | else: |
436 | 436 | # Transform sheet_metadata | |
437 | # SHEET_DATA | 437 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) |
438 | # Should this worksheet tab be synced? | 438 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) |
439 | if sheet_title in selected_streams: | 439 | sheet_metadata.append(sheet_metadata_tf) |
440 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) | 440 | |
441 | update_currently_syncing(state, sheet_title) | 441 | # SHEET_DATA |
442 | selected_fields = get_selected_fields(catalog, sheet_title) | 442 | # Should this worksheet tab be synced? |
443 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | 443 | if sheet_title in selected_streams: |
444 | write_schema(catalog, sheet_title) | 444 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) |
445 | 445 | update_currently_syncing(state, sheet_title) | |
446 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) | 446 | selected_fields = get_selected_fields(catalog, sheet_title) |
447 | # everytime after each sheet sync is complete. | 447 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) |
448 | # This forces hard deletes on the data downstream if fewer records are sent. | 448 | write_schema(catalog, sheet_title) |
449 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | 449 | |
450 | last_integer = int(get_bookmark(state, sheet_title, 0)) | 450 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) |
451 | activate_version = int(time.time() * 1000) | 451 | # everytime after each sheet sync is complete. |
452 | activate_version_message = singer.ActivateVersionMessage( | 452 | # This forces hard deletes on the data downstream if fewer records are sent. |
453 | stream=sheet_title, | 453 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 |
454 | version=activate_version) | 454 | last_integer = int(get_bookmark(state, sheet_title, 0)) |
455 | if last_integer == 0: | 455 | activate_version = int(time.time() * 1000) |
456 | # initial load, send activate_version before AND after data sync | 456 | activate_version_message = singer.ActivateVersionMessage( |
457 | singer.write_message(activate_version_message) | 457 | stream=sheet_title, |
458 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | 458 | version=activate_version) |
459 | 459 | if last_integer == 0: | |
460 | # Determine max range of columns and rows for "paging" through the data | 460 | # initial load, send activate_version before AND after data sync |
461 | sheet_last_col_index = 1 | 461 | singer.write_message(activate_version_message) |
462 | sheet_last_col_letter = 'A' | 462 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) |
463 | for col in columns: | 463 | |
464 | col_index = col.get('columnIndex') | 464 | # Determine max range of columns and rows for "paging" through the data |
465 | col_letter = col.get('columnLetter') | 465 | sheet_last_col_index = 1 |
466 | if col_index > sheet_last_col_index: | 466 | sheet_last_col_letter = 'A' |
467 | sheet_last_col_index = col_index | 467 | for col in columns: |
468 | sheet_last_col_letter = col_letter | 468 | col_index = col.get('columnIndex') |
469 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') | 469 | col_letter = col.get('columnLetter') |
470 | 470 | if col_index > sheet_last_col_index: | |
471 | # Initialize paging for 1st batch | 471 | sheet_last_col_index = col_index |
472 | is_last_row = False | 472 | sheet_last_col_letter = col_letter |
473 | batch_rows = 200 | 473 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') |
474 | from_row = 2 | 474 | |
475 | if sheet_max_row < batch_rows: | 475 | # Initialize paging for 1st batch |
476 | to_row = sheet_max_row | 476 | is_last_row = False |
477 | else: | 477 | batch_rows = 200 |
478 | to_row = batch_rows | 478 | from_row = 2 |
479 | 479 | if sheet_max_row < batch_rows: | |
480 | # Loop thru batches (each having 200 rows of data) | ||
481 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: | ||
482 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) | ||
483 | |||
484 | # GET sheet_data for a worksheet tab | ||
485 | sheet_data, time_extracted = get_data( | ||
486 | stream_name=sheet_title, | ||
487 | endpoint_config=sheets_loaded_config, | ||
488 | client=client, | ||
489 | spreadsheet_id=spreadsheet_id, | ||
490 | range_rows=range_rows) | ||
491 | # Data is returned as a list of arrays, an array of values for each row | ||
492 | sheet_data_rows = sheet_data.get('values') | ||
493 | |||
494 | # Transform batch of rows to JSON with keys for each column | ||
495 | sheet_data_tf, row_num = transform_sheet_data( | ||
496 | spreadsheet_id=spreadsheet_id, | ||
497 | sheet_id=sheet_id, | ||
498 | sheet_title=sheet_title, | ||
499 | from_row=from_row, | ||
500 | columns=columns, | ||
501 | sheet_data_rows=sheet_data_rows) | ||
502 | if row_num < to_row: | ||
503 | is_last_row = True | ||
504 | |||
505 | # Process records, send batch of records to target | ||
506 | record_count = process_records( | ||
507 | catalog=catalog, | ||
508 | stream_name=sheet_title, | ||
509 | records=sheet_data_tf, | ||
510 | time_extracted=ss_time_extracted, | ||
511 | version=activate_version) | ||
512 | LOGGER.info('Sheet: {}, records processed: {}'.format( | ||
513 | sheet_title, record_count)) | ||
514 | |||
515 | # Update paging from/to_row for next batch | ||
516 | from_row = to_row + 1 | ||
517 | if to_row + batch_rows > sheet_max_row: | ||
518 | to_row = sheet_max_row | 480 | to_row = sheet_max_row |
519 | else: | 481 | else: |
520 | to_row = to_row + batch_rows | 482 | to_row = batch_rows |
521 | 483 | ||
522 | # End of Stream: Send Activate Version and update State | 484 | # Loop thru batches (each having 200 rows of data) |
523 | singer.write_message(activate_version_message) | 485 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: |
524 | write_bookmark(state, sheet_title, activate_version) | 486 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) |
525 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | 487 | |
526 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | 488 | # GET sheet_data for a worksheet tab |
527 | sheet_title, row_num - 2)) # subtract 1 for header row | 489 | sheet_data, time_extracted = get_data( |
528 | update_currently_syncing(state, None) | 490 | stream_name=sheet_title, |
529 | 491 | endpoint_config=sheets_loaded_config, | |
530 | # SHEETS_LOADED | 492 | client=client, |
531 | # Add sheet to sheets_loaded | 493 | spreadsheet_id=spreadsheet_id, |
532 | sheet_loaded = {} | 494 | range_rows=range_rows) |
533 | sheet_loaded['spreadsheetId'] = spreadsheet_id | 495 | # Data is returned as a list of arrays, an array of values for each row |
534 | sheet_loaded['sheetId'] = sheet_id | 496 | sheet_data_rows = sheet_data.get('values') |
535 | sheet_loaded['title'] = sheet_title | 497 | |
536 | sheet_loaded['loadDate'] = strftime(utils.now()) | 498 | # Transform batch of rows to JSON with keys for each column |
537 | sheet_loaded['lastRowNumber'] = row_num | 499 | sheet_data_tf, row_num = transform_sheet_data( |
538 | sheets_loaded.append(sheet_loaded) | 500 | spreadsheet_id=spreadsheet_id, |
501 | sheet_id=sheet_id, | ||
502 | sheet_title=sheet_title, | ||
503 | from_row=from_row, | ||
504 | columns=columns, | ||
505 | sheet_data_rows=sheet_data_rows) | ||
506 | if row_num < to_row: | ||
507 | is_last_row = True | ||
508 | |||
509 | # Process records, send batch of records to target | ||
510 | record_count = process_records( | ||
511 | catalog=catalog, | ||
512 | stream_name=sheet_title, | ||
513 | records=sheet_data_tf, | ||
514 | time_extracted=ss_time_extracted, | ||
515 | version=activate_version) | ||
516 | LOGGER.info('Sheet: {}, records processed: {}'.format( | ||
517 | sheet_title, record_count)) | ||
518 | |||
519 | # Update paging from/to_row for next batch | ||
520 | from_row = to_row + 1 | ||
521 | if to_row + batch_rows > sheet_max_row: | ||
522 | to_row = sheet_max_row | ||
523 | else: | ||
524 | to_row = to_row + batch_rows | ||
525 | |||
526 | # End of Stream: Send Activate Version and update State | ||
527 | singer.write_message(activate_version_message) | ||
528 | write_bookmark(state, sheet_title, activate_version) | ||
529 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
530 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
531 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
532 | update_currently_syncing(state, None) | ||
533 | |||
534 | # SHEETS_LOADED | ||
535 | # Add sheet to sheets_loaded | ||
536 | sheet_loaded = {} | ||
537 | sheet_loaded['spreadsheetId'] = spreadsheet_id | ||
538 | sheet_loaded['sheetId'] = sheet_id | ||
539 | sheet_loaded['title'] = sheet_title | ||
540 | sheet_loaded['loadDate'] = strftime(utils.now()) | ||
541 | sheet_loaded['lastRowNumber'] = row_num | ||
542 | sheets_loaded.append(sheet_loaded) | ||
539 | 543 | ||
540 | stream_name = 'sheet_metadata' | 544 | stream_name = 'sheet_metadata' |
541 | # Sync sheet_metadata if selected | 545 | # Sync sheet_metadata if selected |