From 376f1145837541d4fff2ad0e499236761f9873c3 Mon Sep 17 00:00:00 2001 From: Jeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com> Date: Mon, 24 Feb 2020 09:53:26 -0800 Subject: v.0.0.4 Logic to skip empty sheets (#4) * v.0.0.2 schema and sync changes Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py * v.0.0.3 Sync activate version and error handling Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages. * v.0.0.4 Skip empty worksheets Add logic to skip empty worksheets in Discovery and Sync mode. * schema.py fix number datatype issue Nomber datatypes are being created as strings in targets. The JSON schema order needs to be adjusted so that order is null, number, string. --- CHANGELOG.md | 3 + setup.py | 2 +- tap_google_sheets/schema.py | 365 ++++++++++++++++++++++---------------------- tap_google_sheets/sync.py | 214 +++++++++++++------------- 4 files changed, 298 insertions(+), 286 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c47260..81057ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.0.4 + * Add logic to skip empty worksheets in Discovery and Sync mode. + ## 0.0.3 * Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages. diff --git a/setup.py b/setup.py index 80c2c10..dde2bad 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages setup(name='tap-google-sheets', - version='0.0.3', + version='0.0.4', description='Singer.io tap for extracting data from the Google Sheets v4 API', author='jeff.huth@bytecode.io', classifiers=['Programming Language :: Python :: 3 :: Only'], diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index e319c03..c229d72 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py @@ -25,184 +25,188 @@ def get_sheet_schema_columns(sheet): sheet_json_schema = OrderedDict() data = next(iter(sheet.get('data', [])), {}) row_data = data.get('rowData', []) - # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse - - headers = row_data[0].get('values', []) - first_values = row_data[1].get('values', []) - # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) - - sheet_json_schema = { - 'type': 'object', - 'additionalProperties': False, - 'properties': { - '__sdc_spreadsheet_id': { - 'type': ['null', 'string'] - }, - '__sdc_sheet_id': { - 'type': ['null', 'integer'] - }, - '__sdc_row': { - 'type': ['null', 'integer'] + if row_data == []: + # Empty sheet, SKIP + LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title)) + return None, None + else: + # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse + headers = row_data[0].get('values', []) + first_values = row_data[1].get('values', []) + # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) + + sheet_json_schema = { + 'type': 'object', + 'additionalProperties': False, + 'properties': { + '__sdc_spreadsheet_id': { + 'type': ['null', 'string'] + }, + '__sdc_sheet_id': { + 'type': ['null', 'integer'] + }, + '__sdc_row': { + 'type': ['null', 'integer'] + } } } - } - - header_list = [] # used for checking uniqueness - columns = [] - prior_header = None - i = 0 - skipped = 0 - # Read column headers until end or 2 consecutive skipped headers - for header in headers: - # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True))) - column_index = i + 1 - column_letter = colnum_string(column_index) - header_value = header.get('formattedValue') - if header_value: # NOT skipped - column_is_skipped = False - skipped = 0 - column_name = '{}'.format(header_value) - if column_name in header_list: - raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( - sheet_title, column_name, column_letter)) - header_list.append(column_name) - - first_value = None - try: - first_value = first_values[i] - except IndexError as err: - raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format( - sheet_title, column_name, column_letter, err)) - - column_effective_value = first_value.get('effectiveValue', {}) - - col_val = None - if column_effective_value == {}: - column_effective_value_type = 'stringValue' - LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( - sheet_title, column_name, column_letter)) - LOGGER.info(' Setting column datatype to STRING') - else: - for key, val in column_effective_value.items(): - if key in ('numberValue', 'stringValue', 'boolValue'): - column_effective_value_type = key - col_val = str(val) - elif key in ('errorType', 'formulaType'): - col_val = str(val) - raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( - sheet_title, column_name, column_letter, key, col_val)) - - column_number_format = first_values[i].get('effectiveFormat', {}).get( - 'numberFormat', {}) - column_number_format_type = column_number_format.get('type') - - # Determine datatype for sheet_json_schema - # - # column_effective_value_type = numberValue, stringValue, boolValue; - # INVALID: errorType, formulaType - # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue - # - # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, - # TIME, DATE_TIME, SCIENTIFIC - # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType - # - column_format = None # Default - if column_effective_value == {}: - col_properties = {'type': ['null', 'string']} - column_gs_type = 'stringValue' - LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( + + header_list = [] # used for checking uniqueness + columns = [] + prior_header = None + i = 0 + skipped = 0 + # Read column headers until end or 2 consecutive skipped headers + for header in headers: + # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True))) + column_index = i + 1 + column_letter = colnum_string(column_index) + header_value = header.get('formattedValue') + if header_value: # NOT skipped + column_is_skipped = False + skipped = 0 + column_name = '{}'.format(header_value) + if column_name in header_list: + raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( sheet_title, column_name, column_letter)) - LOGGER.info(' Setting column datatype to STRING') - elif column_effective_value_type == 'stringValue': - col_properties = {'type': ['null', 'string']} - column_gs_type = 'stringValue' - elif column_effective_value_type == 'boolValue': - col_properties = {'type': ['null', 'boolean', 'string']} - column_gs_type = 'boolValue' - elif column_effective_value_type == 'numberValue': - if column_number_format_type == 'DATE_TIME': - col_properties = { - 'type': ['null', 'string'], - 'format': 'date-time' - } - column_gs_type = 'numberType.DATE_TIME' - elif column_number_format_type == 'DATE': - col_properties = { - 'type': ['null', 'string'], - 'format': 'date' - } - column_gs_type = 'numberType.DATE' - elif column_number_format_type == 'TIME': - col_properties = { - 'type': ['null', 'string'], - 'format': 'time' - } - column_gs_type = 'numberType.TIME' - elif column_number_format_type == 'TEXT': + header_list.append(column_name) + + first_value = None + try: + first_value = first_values[i] + except IndexError as err: + raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format( + sheet_title, column_name, column_letter, err)) + + column_effective_value = first_value.get('effectiveValue', {}) + + col_val = None + if column_effective_value == {}: + column_effective_value_type = 'stringValue' + LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( + sheet_title, column_name, column_letter)) + LOGGER.info(' Setting column datatype to STRING') + else: + for key, val in column_effective_value.items(): + if key in ('numberValue', 'stringValue', 'boolValue'): + column_effective_value_type = key + col_val = str(val) + elif key in ('errorType', 'formulaType'): + col_val = str(val) + raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( + sheet_title, column_name, column_letter, key, col_val)) + + column_number_format = first_values[i].get('effectiveFormat', {}).get( + 'numberFormat', {}) + column_number_format_type = column_number_format.get('type') + + # Determine datatype for sheet_json_schema + # + # column_effective_value_type = numberValue, stringValue, boolValue; + # INVALID: errorType, formulaType + # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue + # + # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, + # TIME, DATE_TIME, SCIENTIFIC + # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType + # + column_format = None # Default + if column_effective_value == {}: + col_properties = {'type': ['null', 'string']} + column_gs_type = 'stringValue' + LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( + sheet_title, column_name, column_letter)) + LOGGER.info(' Setting column datatype to STRING') + elif column_effective_value_type == 'stringValue': col_properties = {'type': ['null', 'string']} column_gs_type = 'stringValue' + elif column_effective_value_type == 'boolValue': + col_properties = {'type': ['null', 'boolean', 'string']} + column_gs_type = 'boolValue' + elif column_effective_value_type == 'numberValue': + if column_number_format_type == 'DATE_TIME': + col_properties = { + 'type': ['null', 'string'], + 'format': 'date-time' + } + column_gs_type = 'numberType.DATE_TIME' + elif column_number_format_type == 'DATE': + col_properties = { + 'type': ['null', 'string'], + 'format': 'date' + } + column_gs_type = 'numberType.DATE' + elif column_number_format_type == 'TIME': + col_properties = { + 'type': ['null', 'string'], + 'format': 'time' + } + column_gs_type = 'numberType.TIME' + elif column_number_format_type == 'TEXT': + col_properties = {'type': ['null', 'string']} + column_gs_type = 'stringValue' + else: + # Interesting - order in the anyOf makes a difference. + # Number w/ multipleOf must be listed last, otherwise errors occur. + col_properties = { + 'anyOf': [ + { + 'type': 'null' + }, + { + 'type': 'number', + 'multipleOf': 1e-15 + }, + { + 'type': 'string' + } + ] + } + column_gs_type = 'numberType' + # Catch-all to deal with other types and set to string + # column_effective_value_type: formulaValue, errorValue, or other else: - # Interesting - order in the anyOf makes a difference. - # Number w/ multipleOf must be listed last, otherwise errors occur. - col_properties = { - 'anyOf': [ - { - 'type': 'string' - }, - { - 'type': 'null' - }, - { - 'type': 'number', - 'multipleOf': 1e-15 - } - ] - } - column_gs_type = 'numberType' - # Catch-all to deal with other types and set to string - # column_effective_value_type: formulaValue, errorValue, or other - else: + col_properties = {'type': ['null', 'string']} + column_gs_type = 'unsupportedValue' + LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( + sheet_title, column_name, column_letter, column_effective_value_type, col_val)) + LOGGER.info('Converting to string.') + else: # skipped + column_is_skipped = True + skipped = skipped + 1 + column_index_str = str(column_index).zfill(2) + column_name = '__sdc_skip_col_{}'.format(column_index_str) col_properties = {'type': ['null', 'string']} - column_gs_type = 'unsupportedValue' - LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( - sheet_title, column_name, column_letter, column_effective_value_type, col_val)) - LOGGER.info('Converting to string.') - else: # skipped - column_is_skipped = True - skipped = skipped + 1 - column_index_str = str(column_index).zfill(2) - column_name = '__sdc_skip_col_{}'.format(column_index_str) - col_properties = {'type': ['null', 'string']} - column_gs_type = 'stringValue' - LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( - sheet_title, column_name, column_letter)) - LOGGER.info(' This column will be skipped during data loading.') - - if skipped >= 2: - # skipped = 2 consecutive skipped headers - # Remove prior_header column_name - sheet_json_schema['properties'].pop(prior_header, None) - LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( - sheet_title, column_name, column_letter)) - break - - else: - column = {} - column = { - 'columnIndex': column_index, - 'columnLetter': column_letter, - 'columnName': column_name, - 'columnType': column_gs_type, - 'columnSkipped': column_is_skipped - } - columns.append(column) + column_gs_type = 'stringValue' + LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( + sheet_title, column_name, column_letter)) + LOGGER.info(' This column will be skipped during data loading.') - sheet_json_schema['properties'][column_name] = col_properties + if skipped >= 2: + # skipped = 2 consecutive skipped headers + # Remove prior_header column_name + sheet_json_schema['properties'].pop(prior_header, None) + LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( + sheet_title, column_name, column_letter)) + break - prior_header = column_name - i = i + 1 + else: + column = {} + column = { + 'columnIndex': column_index, + 'columnLetter': column_letter, + 'columnName': column_name, + 'columnType': column_gs_type, + 'columnSkipped': column_is_skipped + } + columns.append(column) - return sheet_json_schema, columns + sheet_json_schema['properties'][column_name] = col_properties + + prior_header = column_name + i = i + 1 + + return sheet_json_schema, columns # Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query @@ -276,17 +280,18 @@ def get_schemas(client, spreadsheet_id): for sheet in sheets: # GET sheet_json_schema for each worksheet (from function above) sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) - # LOGGER.info('columns = {}'.format(columns)) - - sheet_title = sheet.get('properties', {}).get('title') - schemas[sheet_title] = sheet_json_schema - sheet_mdata = metadata.new() - sheet_mdata = metadata.get_standard_metadata( - schema=sheet_json_schema, - key_properties=['__sdc_row'], - valid_replication_keys=None, - replication_method='FULL_TABLE' - ) - field_metadata[sheet_title] = sheet_mdata + + # SKIP empty sheets (where sheet_json_schema and columns are None) + if sheet_json_schema and columns: + sheet_title = sheet.get('properties', {}).get('title') + schemas[sheet_title] = sheet_json_schema + sheet_mdata = metadata.new() + sheet_mdata = metadata.get_standard_metadata( + schema=sheet_json_schema, + key_properties=['__sdc_row'], + valid_replication_keys=None, + replication_method='FULL_TABLE' + ) + field_metadata[sheet_title] = sheet_mdata return schemas, field_metadata diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 311281c..b77eab3 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py @@ -429,113 +429,117 @@ def sync(client, config, catalog, state): sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) - # Transform sheet_metadata - sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) - # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) - sheet_metadata.append(sheet_metadata_tf) - - # SHEET_DATA - # Should this worksheet tab be synced? - if sheet_title in selected_streams: - LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) - update_currently_syncing(state, sheet_title) - selected_fields = get_selected_fields(catalog, sheet_title) - LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) - write_schema(catalog, sheet_title) - - # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) - # everytime after each sheet sync is complete. - # This forces hard deletes on the data downstream if fewer records are sent. - # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 - last_integer = int(get_bookmark(state, sheet_title, 0)) - activate_version = int(time.time() * 1000) - activate_version_message = singer.ActivateVersionMessage( - stream=sheet_title, - version=activate_version) - if last_integer == 0: - # initial load, send activate_version before AND after data sync - singer.write_message(activate_version_message) - LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) - - # Determine max range of columns and rows for "paging" through the data - sheet_last_col_index = 1 - sheet_last_col_letter = 'A' - for col in columns: - col_index = col.get('columnIndex') - col_letter = col.get('columnLetter') - if col_index > sheet_last_col_index: - sheet_last_col_index = col_index - sheet_last_col_letter = col_letter - sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') - - # Initialize paging for 1st batch - is_last_row = False - batch_rows = 200 - from_row = 2 - if sheet_max_row < batch_rows: - to_row = sheet_max_row - else: - to_row = batch_rows - - # Loop thru batches (each having 200 rows of data) - while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: - range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) - - # GET sheet_data for a worksheet tab - sheet_data, time_extracted = get_data( - stream_name=sheet_title, - endpoint_config=sheets_loaded_config, - client=client, - spreadsheet_id=spreadsheet_id, - range_rows=range_rows) - # Data is returned as a list of arrays, an array of values for each row - sheet_data_rows = sheet_data.get('values') - - # Transform batch of rows to JSON with keys for each column - sheet_data_tf, row_num = transform_sheet_data( - spreadsheet_id=spreadsheet_id, - sheet_id=sheet_id, - sheet_title=sheet_title, - from_row=from_row, - columns=columns, - sheet_data_rows=sheet_data_rows) - if row_num < to_row: - is_last_row = True - - # Process records, send batch of records to target - record_count = process_records( - catalog=catalog, - stream_name=sheet_title, - records=sheet_data_tf, - time_extracted=ss_time_extracted, - version=activate_version) - LOGGER.info('Sheet: {}, records processed: {}'.format( - sheet_title, record_count)) - - # Update paging from/to_row for next batch - from_row = to_row + 1 - if to_row + batch_rows > sheet_max_row: + # SKIP empty sheets (where sheet_schema and columns are None) + if not sheet_schema or not columns: + LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title)) + else: + # Transform sheet_metadata + sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) + # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) + sheet_metadata.append(sheet_metadata_tf) + + # SHEET_DATA + # Should this worksheet tab be synced? + if sheet_title in selected_streams: + LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) + update_currently_syncing(state, sheet_title) + selected_fields = get_selected_fields(catalog, sheet_title) + LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) + write_schema(catalog, sheet_title) + + # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) + # everytime after each sheet sync is complete. + # This forces hard deletes on the data downstream if fewer records are sent. + # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 + last_integer = int(get_bookmark(state, sheet_title, 0)) + activate_version = int(time.time() * 1000) + activate_version_message = singer.ActivateVersionMessage( + stream=sheet_title, + version=activate_version) + if last_integer == 0: + # initial load, send activate_version before AND after data sync + singer.write_message(activate_version_message) + LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) + + # Determine max range of columns and rows for "paging" through the data + sheet_last_col_index = 1 + sheet_last_col_letter = 'A' + for col in columns: + col_index = col.get('columnIndex') + col_letter = col.get('columnLetter') + if col_index > sheet_last_col_index: + sheet_last_col_index = col_index + sheet_last_col_letter = col_letter + sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') + + # Initialize paging for 1st batch + is_last_row = False + batch_rows = 200 + from_row = 2 + if sheet_max_row < batch_rows: to_row = sheet_max_row else: - to_row = to_row + batch_rows - - # End of Stream: Send Activate Version and update State - singer.write_message(activate_version_message) - write_bookmark(state, sheet_title, activate_version) - LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) - LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( - sheet_title, row_num - 2)) # subtract 1 for header row - update_currently_syncing(state, None) - - # SHEETS_LOADED - # Add sheet to sheets_loaded - sheet_loaded = {} - sheet_loaded['spreadsheetId'] = spreadsheet_id - sheet_loaded['sheetId'] = sheet_id - sheet_loaded['title'] = sheet_title - sheet_loaded['loadDate'] = strftime(utils.now()) - sheet_loaded['lastRowNumber'] = row_num - sheets_loaded.append(sheet_loaded) + to_row = batch_rows + + # Loop thru batches (each having 200 rows of data) + while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: + range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) + + # GET sheet_data for a worksheet tab + sheet_data, time_extracted = get_data( + stream_name=sheet_title, + endpoint_config=sheets_loaded_config, + client=client, + spreadsheet_id=spreadsheet_id, + range_rows=range_rows) + # Data is returned as a list of arrays, an array of values for each row + sheet_data_rows = sheet_data.get('values') + + # Transform batch of rows to JSON with keys for each column + sheet_data_tf, row_num = transform_sheet_data( + spreadsheet_id=spreadsheet_id, + sheet_id=sheet_id, + sheet_title=sheet_title, + from_row=from_row, + columns=columns, + sheet_data_rows=sheet_data_rows) + if row_num < to_row: + is_last_row = True + + # Process records, send batch of records to target + record_count = process_records( + catalog=catalog, + stream_name=sheet_title, + records=sheet_data_tf, + time_extracted=ss_time_extracted, + version=activate_version) + LOGGER.info('Sheet: {}, records processed: {}'.format( + sheet_title, record_count)) + + # Update paging from/to_row for next batch + from_row = to_row + 1 + if to_row + batch_rows > sheet_max_row: + to_row = sheet_max_row + else: + to_row = to_row + batch_rows + + # End of Stream: Send Activate Version and update State + singer.write_message(activate_version_message) + write_bookmark(state, sheet_title, activate_version) + LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) + LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( + sheet_title, row_num - 2)) # subtract 1 for header row + update_currently_syncing(state, None) + + # SHEETS_LOADED + # Add sheet to sheets_loaded + sheet_loaded = {} + sheet_loaded['spreadsheetId'] = spreadsheet_id + sheet_loaded['sheetId'] = sheet_id + sheet_loaded['title'] = sheet_title + sheet_loaded['loadDate'] = strftime(utils.now()) + sheet_loaded['lastRowNumber'] = row_num + sheets_loaded.append(sheet_loaded) stream_name = 'sheet_metadata' # Sync sheet_metadata if selected -- cgit v1.2.3