]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blobdiff - tap_google_sheets/sync.py
v.0.0.4 Logic to skip empty sheets (#4)
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
index 311281c3811836ad67dad1f9bb3d4b9d10bd65c1..b77eab38af8c8bc0da5f27caba6057c6e4110d54 100644 (file)
@@ -429,113 +429,117 @@ def sync(client, config, catalog, state):
             sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
             # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
 
-            # Transform sheet_metadata
-            sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
-            # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
-            sheet_metadata.append(sheet_metadata_tf)
-
-            # SHEET_DATA
-            # Should this worksheet tab be synced?
-            if sheet_title in selected_streams:
-                LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
-                update_currently_syncing(state, sheet_title)
-                selected_fields = get_selected_fields(catalog, sheet_title)
-                LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
-                write_schema(catalog, sheet_title)
-
-                # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
-                # everytime after each sheet sync is complete.
-                # This forces hard deletes on the data downstream if fewer records are sent.
-                # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
-                last_integer = int(get_bookmark(state, sheet_title, 0))
-                activate_version = int(time.time() * 1000)
-                activate_version_message = singer.ActivateVersionMessage(
-                        stream=sheet_title,
-                        version=activate_version)
-                if last_integer == 0:
-                    # initial load, send activate_version before AND after data sync
-                    singer.write_message(activate_version_message)
-                    LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
-
-                # Determine max range of columns and rows for "paging" through the data
-                sheet_last_col_index = 1
-                sheet_last_col_letter = 'A'
-                for col in columns:
-                    col_index = col.get('columnIndex')
-                    col_letter = col.get('columnLetter')
-                    if col_index > sheet_last_col_index:
-                        sheet_last_col_index = col_index
-                        sheet_last_col_letter = col_letter
-                sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
-
-                # Initialize paging for 1st batch
-                is_last_row = False
-                batch_rows = 200
-                from_row = 2
-                if sheet_max_row < batch_rows:
-                    to_row = sheet_max_row
-                else:
-                    to_row = batch_rows
-
-                # Loop thru batches (each having 200 rows of data)
-                while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
-                    range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
-
-                    # GET sheet_data for a worksheet tab
-                    sheet_data, time_extracted = get_data(
-                        stream_name=sheet_title,
-                        endpoint_config=sheets_loaded_config,
-                        client=client,
-                        spreadsheet_id=spreadsheet_id,
-                        range_rows=range_rows)
-                    # Data is returned as a list of arrays, an array of values for each row
-                    sheet_data_rows = sheet_data.get('values')
-
-                    # Transform batch of rows to JSON with keys for each column
-                    sheet_data_tf, row_num = transform_sheet_data(
-                        spreadsheet_id=spreadsheet_id,
-                        sheet_id=sheet_id,
-                        sheet_title=sheet_title,
-                        from_row=from_row,
-                        columns=columns,
-                        sheet_data_rows=sheet_data_rows)
-                    if row_num < to_row:
-                        is_last_row = True
-
-                    # Process records, send batch of records to target
-                    record_count = process_records(
-                        catalog=catalog,
-                        stream_name=sheet_title,
-                        records=sheet_data_tf,
-                        time_extracted=ss_time_extracted,
-                        version=activate_version)
-                    LOGGER.info('Sheet: {}, records processed: {}'.format(
-                        sheet_title, record_count))
-                    
-                    # Update paging from/to_row for next batch
-                    from_row = to_row + 1
-                    if to_row + batch_rows > sheet_max_row:
+            # SKIP empty sheets (where sheet_schema and columns are None)
+            if not sheet_schema or not columns:
+                LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
+            else:
+                # Transform sheet_metadata
+                sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
+                # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
+                sheet_metadata.append(sheet_metadata_tf)
+
+                # SHEET_DATA
+                # Should this worksheet tab be synced?
+                if sheet_title in selected_streams:
+                    LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
+                    update_currently_syncing(state, sheet_title)
+                    selected_fields = get_selected_fields(catalog, sheet_title)
+                    LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
+                    write_schema(catalog, sheet_title)
+
+                    # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
+                    # everytime after each sheet sync is complete.
+                    # This forces hard deletes on the data downstream if fewer records are sent.
+                    # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
+                    last_integer = int(get_bookmark(state, sheet_title, 0))
+                    activate_version = int(time.time() * 1000)
+                    activate_version_message = singer.ActivateVersionMessage(
+                            stream=sheet_title,
+                            version=activate_version)
+                    if last_integer == 0:
+                        # initial load, send activate_version before AND after data sync
+                        singer.write_message(activate_version_message)
+                        LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+
+                    # Determine max range of columns and rows for "paging" through the data
+                    sheet_last_col_index = 1
+                    sheet_last_col_letter = 'A'
+                    for col in columns:
+                        col_index = col.get('columnIndex')
+                        col_letter = col.get('columnLetter')
+                        if col_index > sheet_last_col_index:
+                            sheet_last_col_index = col_index
+                            sheet_last_col_letter = col_letter
+                    sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
+
+                    # Initialize paging for 1st batch
+                    is_last_row = False
+                    batch_rows = 200
+                    from_row = 2
+                    if sheet_max_row < batch_rows:
                         to_row = sheet_max_row
                     else:
-                        to_row = to_row + batch_rows
-
-                # End of Stream: Send Activate Version and update State
-                singer.write_message(activate_version_message)
-                write_bookmark(state, sheet_title, activate_version)
-                LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
-                LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
-                    sheet_title, row_num - 2)) # subtract 1 for header row
-                update_currently_syncing(state, None)
-
-                # SHEETS_LOADED
-                # Add sheet to sheets_loaded
-                sheet_loaded = {}
-                sheet_loaded['spreadsheetId'] = spreadsheet_id
-                sheet_loaded['sheetId'] = sheet_id
-                sheet_loaded['title'] = sheet_title
-                sheet_loaded['loadDate'] = strftime(utils.now())
-                sheet_loaded['lastRowNumber'] = row_num
-                sheets_loaded.append(sheet_loaded)
+                        to_row = batch_rows
+
+                    # Loop thru batches (each having 200 rows of data)
+                    while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
+                        range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
+
+                        # GET sheet_data for a worksheet tab
+                        sheet_data, time_extracted = get_data(
+                            stream_name=sheet_title,
+                            endpoint_config=sheets_loaded_config,
+                            client=client,
+                            spreadsheet_id=spreadsheet_id,
+                            range_rows=range_rows)
+                        # Data is returned as a list of arrays, an array of values for each row
+                        sheet_data_rows = sheet_data.get('values')
+
+                        # Transform batch of rows to JSON with keys for each column
+                        sheet_data_tf, row_num = transform_sheet_data(
+                            spreadsheet_id=spreadsheet_id,
+                            sheet_id=sheet_id,
+                            sheet_title=sheet_title,
+                            from_row=from_row,
+                            columns=columns,
+                            sheet_data_rows=sheet_data_rows)
+                        if row_num < to_row:
+                            is_last_row = True
+
+                        # Process records, send batch of records to target
+                        record_count = process_records(
+                            catalog=catalog,
+                            stream_name=sheet_title,
+                            records=sheet_data_tf,
+                            time_extracted=ss_time_extracted,
+                            version=activate_version)
+                        LOGGER.info('Sheet: {}, records processed: {}'.format(
+                            sheet_title, record_count))
+                        
+                        # Update paging from/to_row for next batch
+                        from_row = to_row + 1
+                        if to_row + batch_rows > sheet_max_row:
+                            to_row = sheet_max_row
+                        else:
+                            to_row = to_row + batch_rows
+
+                    # End of Stream: Send Activate Version and update State
+                    singer.write_message(activate_version_message)
+                    write_bookmark(state, sheet_title, activate_version)
+                    LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+                    LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
+                        sheet_title, row_num - 2)) # subtract 1 for header row
+                    update_currently_syncing(state, None)
+
+                    # SHEETS_LOADED
+                    # Add sheet to sheets_loaded
+                    sheet_loaded = {}
+                    sheet_loaded['spreadsheetId'] = spreadsheet_id
+                    sheet_loaded['sheetId'] = sheet_id
+                    sheet_loaded['title'] = sheet_title
+                    sheet_loaded['loadDate'] = strftime(utils.now())
+                    sheet_loaded['lastRowNumber'] = row_num
+                    sheets_loaded.append(sheet_loaded)
 
     stream_name = 'sheet_metadata'
     # Sync sheet_metadata if selected