diff options
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r-- | tap_google_sheets/sync.py | 214 |
1 files changed, 109 insertions, 105 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 311281c..b77eab3 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -429,113 +429,117 @@ def sync(client, config, catalog, state): | |||
429 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 429 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
430 | # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) | 430 | # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) |
431 | 431 | ||
432 | # Transform sheet_metadata | 432 | # SKIP empty sheets (where sheet_schema and columns are None) |
433 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) | 433 | if not sheet_schema or not columns: |
434 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) | 434 | LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title)) |
435 | sheet_metadata.append(sheet_metadata_tf) | 435 | else: |
436 | 436 | # Transform sheet_metadata | |
437 | # SHEET_DATA | 437 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) |
438 | # Should this worksheet tab be synced? | 438 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) |
439 | if sheet_title in selected_streams: | 439 | sheet_metadata.append(sheet_metadata_tf) |
440 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) | 440 | |
441 | update_currently_syncing(state, sheet_title) | 441 | # SHEET_DATA |
442 | selected_fields = get_selected_fields(catalog, sheet_title) | 442 | # Should this worksheet tab be synced? |
443 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | 443 | if sheet_title in selected_streams: |
444 | write_schema(catalog, sheet_title) | 444 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) |
445 | 445 | update_currently_syncing(state, sheet_title) | |
446 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) | 446 | selected_fields = get_selected_fields(catalog, sheet_title) |
447 | # everytime after each sheet sync is complete. | 447 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) |
448 | # This forces hard deletes on the data downstream if fewer records are sent. | 448 | write_schema(catalog, sheet_title) |
449 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | 449 | |
450 | last_integer = int(get_bookmark(state, sheet_title, 0)) | 450 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) |
451 | activate_version = int(time.time() * 1000) | 451 | # everytime after each sheet sync is complete. |
452 | activate_version_message = singer.ActivateVersionMessage( | 452 | # This forces hard deletes on the data downstream if fewer records are sent. |
453 | stream=sheet_title, | 453 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 |
454 | version=activate_version) | 454 | last_integer = int(get_bookmark(state, sheet_title, 0)) |
455 | if last_integer == 0: | 455 | activate_version = int(time.time() * 1000) |
456 | # initial load, send activate_version before AND after data sync | 456 | activate_version_message = singer.ActivateVersionMessage( |
457 | singer.write_message(activate_version_message) | 457 | stream=sheet_title, |
458 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | 458 | version=activate_version) |
459 | 459 | if last_integer == 0: | |
460 | # Determine max range of columns and rows for "paging" through the data | 460 | # initial load, send activate_version before AND after data sync |
461 | sheet_last_col_index = 1 | 461 | singer.write_message(activate_version_message) |
462 | sheet_last_col_letter = 'A' | 462 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) |
463 | for col in columns: | 463 | |
464 | col_index = col.get('columnIndex') | 464 | # Determine max range of columns and rows for "paging" through the data |
465 | col_letter = col.get('columnLetter') | 465 | sheet_last_col_index = 1 |
466 | if col_index > sheet_last_col_index: | 466 | sheet_last_col_letter = 'A' |
467 | sheet_last_col_index = col_index | 467 | for col in columns: |
468 | sheet_last_col_letter = col_letter | 468 | col_index = col.get('columnIndex') |
469 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') | 469 | col_letter = col.get('columnLetter') |
470 | 470 | if col_index > sheet_last_col_index: | |
471 | # Initialize paging for 1st batch | 471 | sheet_last_col_index = col_index |
472 | is_last_row = False | 472 | sheet_last_col_letter = col_letter |
473 | batch_rows = 200 | 473 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') |
474 | from_row = 2 | 474 | |
475 | if sheet_max_row < batch_rows: | 475 | # Initialize paging for 1st batch |
476 | to_row = sheet_max_row | 476 | is_last_row = False |
477 | else: | 477 | batch_rows = 200 |
478 | to_row = batch_rows | 478 | from_row = 2 |
479 | 479 | if sheet_max_row < batch_rows: | |
480 | # Loop thru batches (each having 200 rows of data) | ||
481 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: | ||
482 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) | ||
483 | |||
484 | # GET sheet_data for a worksheet tab | ||
485 | sheet_data, time_extracted = get_data( | ||
486 | stream_name=sheet_title, | ||
487 | endpoint_config=sheets_loaded_config, | ||
488 | client=client, | ||
489 | spreadsheet_id=spreadsheet_id, | ||
490 | range_rows=range_rows) | ||
491 | # Data is returned as a list of arrays, an array of values for each row | ||
492 | sheet_data_rows = sheet_data.get('values') | ||
493 | |||
494 | # Transform batch of rows to JSON with keys for each column | ||
495 | sheet_data_tf, row_num = transform_sheet_data( | ||
496 | spreadsheet_id=spreadsheet_id, | ||
497 | sheet_id=sheet_id, | ||
498 | sheet_title=sheet_title, | ||
499 | from_row=from_row, | ||
500 | columns=columns, | ||
501 | sheet_data_rows=sheet_data_rows) | ||
502 | if row_num < to_row: | ||
503 | is_last_row = True | ||
504 | |||
505 | # Process records, send batch of records to target | ||
506 | record_count = process_records( | ||
507 | catalog=catalog, | ||
508 | stream_name=sheet_title, | ||
509 | records=sheet_data_tf, | ||
510 | time_extracted=ss_time_extracted, | ||
511 | version=activate_version) | ||
512 | LOGGER.info('Sheet: {}, records processed: {}'.format( | ||
513 | sheet_title, record_count)) | ||
514 | |||
515 | # Update paging from/to_row for next batch | ||
516 | from_row = to_row + 1 | ||
517 | if to_row + batch_rows > sheet_max_row: | ||
518 | to_row = sheet_max_row | 480 | to_row = sheet_max_row |
519 | else: | 481 | else: |
520 | to_row = to_row + batch_rows | 482 | to_row = batch_rows |
521 | 483 | ||
522 | # End of Stream: Send Activate Version and update State | 484 | # Loop thru batches (each having 200 rows of data) |
523 | singer.write_message(activate_version_message) | 485 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: |
524 | write_bookmark(state, sheet_title, activate_version) | 486 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) |
525 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | 487 | |
526 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | 488 | # GET sheet_data for a worksheet tab |
527 | sheet_title, row_num - 2)) # subtract 1 for header row | 489 | sheet_data, time_extracted = get_data( |
528 | update_currently_syncing(state, None) | 490 | stream_name=sheet_title, |
529 | 491 | endpoint_config=sheets_loaded_config, | |
530 | # SHEETS_LOADED | 492 | client=client, |
531 | # Add sheet to sheets_loaded | 493 | spreadsheet_id=spreadsheet_id, |
532 | sheet_loaded = {} | 494 | range_rows=range_rows) |
533 | sheet_loaded['spreadsheetId'] = spreadsheet_id | 495 | # Data is returned as a list of arrays, an array of values for each row |
534 | sheet_loaded['sheetId'] = sheet_id | 496 | sheet_data_rows = sheet_data.get('values') |
535 | sheet_loaded['title'] = sheet_title | 497 | |
536 | sheet_loaded['loadDate'] = strftime(utils.now()) | 498 | # Transform batch of rows to JSON with keys for each column |
537 | sheet_loaded['lastRowNumber'] = row_num | 499 | sheet_data_tf, row_num = transform_sheet_data( |
538 | sheets_loaded.append(sheet_loaded) | 500 | spreadsheet_id=spreadsheet_id, |
501 | sheet_id=sheet_id, | ||
502 | sheet_title=sheet_title, | ||
503 | from_row=from_row, | ||
504 | columns=columns, | ||
505 | sheet_data_rows=sheet_data_rows) | ||
506 | if row_num < to_row: | ||
507 | is_last_row = True | ||
508 | |||
509 | # Process records, send batch of records to target | ||
510 | record_count = process_records( | ||
511 | catalog=catalog, | ||
512 | stream_name=sheet_title, | ||
513 | records=sheet_data_tf, | ||
514 | time_extracted=ss_time_extracted, | ||
515 | version=activate_version) | ||
516 | LOGGER.info('Sheet: {}, records processed: {}'.format( | ||
517 | sheet_title, record_count)) | ||
518 | |||
519 | # Update paging from/to_row for next batch | ||
520 | from_row = to_row + 1 | ||
521 | if to_row + batch_rows > sheet_max_row: | ||
522 | to_row = sheet_max_row | ||
523 | else: | ||
524 | to_row = to_row + batch_rows | ||
525 | |||
526 | # End of Stream: Send Activate Version and update State | ||
527 | singer.write_message(activate_version_message) | ||
528 | write_bookmark(state, sheet_title, activate_version) | ||
529 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
530 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
531 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
532 | update_currently_syncing(state, None) | ||
533 | |||
534 | # SHEETS_LOADED | ||
535 | # Add sheet to sheets_loaded | ||
536 | sheet_loaded = {} | ||
537 | sheet_loaded['spreadsheetId'] = spreadsheet_id | ||
538 | sheet_loaded['sheetId'] = sheet_id | ||
539 | sheet_loaded['title'] = sheet_title | ||
540 | sheet_loaded['loadDate'] = strftime(utils.now()) | ||
541 | sheet_loaded['lastRowNumber'] = row_num | ||
542 | sheets_loaded.append(sheet_loaded) | ||
539 | 543 | ||
540 | stream_name = 'sheet_metadata' | 544 | stream_name = 'sheet_metadata' |
541 | # Sync sheet_metadata if selected | 545 | # Sync sheet_metadata if selected |