aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r--tap_google_sheets/sync.py214
1 files changed, 109 insertions, 105 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
index 311281c..b77eab3 100644
--- a/tap_google_sheets/sync.py
+++ b/tap_google_sheets/sync.py
@@ -429,113 +429,117 @@ def sync(client, config, catalog, state):
429 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 429 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
430 # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) 430 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
431 431
432 # Transform sheet_metadata 432 # SKIP empty sheets (where sheet_schema and columns are None)
433 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) 433 if not sheet_schema or not columns:
434 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) 434 LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
435 sheet_metadata.append(sheet_metadata_tf) 435 else:
436 436 # Transform sheet_metadata
437 # SHEET_DATA 437 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
438 # Should this worksheet tab be synced? 438 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
439 if sheet_title in selected_streams: 439 sheet_metadata.append(sheet_metadata_tf)
440 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) 440
441 update_currently_syncing(state, sheet_title) 441 # SHEET_DATA
442 selected_fields = get_selected_fields(catalog, sheet_title) 442 # Should this worksheet tab be synced?
443 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) 443 if sheet_title in selected_streams:
444 write_schema(catalog, sheet_title) 444 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
445 445 update_currently_syncing(state, sheet_title)
446 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) 446 selected_fields = get_selected_fields(catalog, sheet_title)
447 # everytime after each sheet sync is complete. 447 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
448 # This forces hard deletes on the data downstream if fewer records are sent. 448 write_schema(catalog, sheet_title)
449 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 449
450 last_integer = int(get_bookmark(state, sheet_title, 0)) 450 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
451 activate_version = int(time.time() * 1000) 451 # everytime after each sheet sync is complete.
452 activate_version_message = singer.ActivateVersionMessage( 452 # This forces hard deletes on the data downstream if fewer records are sent.
453 stream=sheet_title, 453 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
454 version=activate_version) 454 last_integer = int(get_bookmark(state, sheet_title, 0))
455 if last_integer == 0: 455 activate_version = int(time.time() * 1000)
456 # initial load, send activate_version before AND after data sync 456 activate_version_message = singer.ActivateVersionMessage(
457 singer.write_message(activate_version_message) 457 stream=sheet_title,
458 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) 458 version=activate_version)
459 459 if last_integer == 0:
460 # Determine max range of columns and rows for "paging" through the data 460 # initial load, send activate_version before AND after data sync
461 sheet_last_col_index = 1 461 singer.write_message(activate_version_message)
462 sheet_last_col_letter = 'A' 462 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
463 for col in columns: 463
464 col_index = col.get('columnIndex') 464 # Determine max range of columns and rows for "paging" through the data
465 col_letter = col.get('columnLetter') 465 sheet_last_col_index = 1
466 if col_index > sheet_last_col_index: 466 sheet_last_col_letter = 'A'
467 sheet_last_col_index = col_index 467 for col in columns:
468 sheet_last_col_letter = col_letter 468 col_index = col.get('columnIndex')
469 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') 469 col_letter = col.get('columnLetter')
470 470 if col_index > sheet_last_col_index:
471 # Initialize paging for 1st batch 471 sheet_last_col_index = col_index
472 is_last_row = False 472 sheet_last_col_letter = col_letter
473 batch_rows = 200 473 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
474 from_row = 2 474
475 if sheet_max_row < batch_rows: 475 # Initialize paging for 1st batch
476 to_row = sheet_max_row 476 is_last_row = False
477 else: 477 batch_rows = 200
478 to_row = batch_rows 478 from_row = 2
479 479 if sheet_max_row < batch_rows:
480 # Loop thru batches (each having 200 rows of data)
481 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
482 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
483
484 # GET sheet_data for a worksheet tab
485 sheet_data, time_extracted = get_data(
486 stream_name=sheet_title,
487 endpoint_config=sheets_loaded_config,
488 client=client,
489 spreadsheet_id=spreadsheet_id,
490 range_rows=range_rows)
491 # Data is returned as a list of arrays, an array of values for each row
492 sheet_data_rows = sheet_data.get('values')
493
494 # Transform batch of rows to JSON with keys for each column
495 sheet_data_tf, row_num = transform_sheet_data(
496 spreadsheet_id=spreadsheet_id,
497 sheet_id=sheet_id,
498 sheet_title=sheet_title,
499 from_row=from_row,
500 columns=columns,
501 sheet_data_rows=sheet_data_rows)
502 if row_num < to_row:
503 is_last_row = True
504
505 # Process records, send batch of records to target
506 record_count = process_records(
507 catalog=catalog,
508 stream_name=sheet_title,
509 records=sheet_data_tf,
510 time_extracted=ss_time_extracted,
511 version=activate_version)
512 LOGGER.info('Sheet: {}, records processed: {}'.format(
513 sheet_title, record_count))
514
515 # Update paging from/to_row for next batch
516 from_row = to_row + 1
517 if to_row + batch_rows > sheet_max_row:
518 to_row = sheet_max_row 480 to_row = sheet_max_row
519 else: 481 else:
520 to_row = to_row + batch_rows 482 to_row = batch_rows
521 483
522 # End of Stream: Send Activate Version and update State 484 # Loop thru batches (each having 200 rows of data)
523 singer.write_message(activate_version_message) 485 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
524 write_bookmark(state, sheet_title, activate_version) 486 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
525 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) 487
526 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( 488 # GET sheet_data for a worksheet tab
527 sheet_title, row_num - 2)) # subtract 1 for header row 489 sheet_data, time_extracted = get_data(
528 update_currently_syncing(state, None) 490 stream_name=sheet_title,
529 491 endpoint_config=sheets_loaded_config,
530 # SHEETS_LOADED 492 client=client,
531 # Add sheet to sheets_loaded 493 spreadsheet_id=spreadsheet_id,
532 sheet_loaded = {} 494 range_rows=range_rows)
533 sheet_loaded['spreadsheetId'] = spreadsheet_id 495 # Data is returned as a list of arrays, an array of values for each row
534 sheet_loaded['sheetId'] = sheet_id 496 sheet_data_rows = sheet_data.get('values')
535 sheet_loaded['title'] = sheet_title 497
536 sheet_loaded['loadDate'] = strftime(utils.now()) 498 # Transform batch of rows to JSON with keys for each column
537 sheet_loaded['lastRowNumber'] = row_num 499 sheet_data_tf, row_num = transform_sheet_data(
538 sheets_loaded.append(sheet_loaded) 500 spreadsheet_id=spreadsheet_id,
501 sheet_id=sheet_id,
502 sheet_title=sheet_title,
503 from_row=from_row,
504 columns=columns,
505 sheet_data_rows=sheet_data_rows)
506 if row_num < to_row:
507 is_last_row = True
508
509 # Process records, send batch of records to target
510 record_count = process_records(
511 catalog=catalog,
512 stream_name=sheet_title,
513 records=sheet_data_tf,
514 time_extracted=ss_time_extracted,
515 version=activate_version)
516 LOGGER.info('Sheet: {}, records processed: {}'.format(
517 sheet_title, record_count))
518
519 # Update paging from/to_row for next batch
520 from_row = to_row + 1
521 if to_row + batch_rows > sheet_max_row:
522 to_row = sheet_max_row
523 else:
524 to_row = to_row + batch_rows
525
526 # End of Stream: Send Activate Version and update State
527 singer.write_message(activate_version_message)
528 write_bookmark(state, sheet_title, activate_version)
529 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
530 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
531 sheet_title, row_num - 2)) # subtract 1 for header row
532 update_currently_syncing(state, None)
533
534 # SHEETS_LOADED
535 # Add sheet to sheets_loaded
536 sheet_loaded = {}
537 sheet_loaded['spreadsheetId'] = spreadsheet_id
538 sheet_loaded['sheetId'] = sheet_id
539 sheet_loaded['title'] = sheet_title
540 sheet_loaded['loadDate'] = strftime(utils.now())
541 sheet_loaded['lastRowNumber'] = row_num
542 sheets_loaded.append(sheet_loaded)
539 543
540 stream_name = 'sheet_metadata' 544 stream_name = 'sheet_metadata'
541 # Sync sheet_metadata if selected 545 # Sync sheet_metadata if selected