]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blobdiff - tap_google_sheets/sync.py
Bump to v1.1.0, update changelog (#26)
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
index a8b02d0c91194e59b3e06f7c313897d0fccfcffa..26c2d19cebd1a4c174eeffa3a0be78797006d8a4 100644 (file)
@@ -1,11 +1,14 @@
 import time
 import math
-import singer
 import json
-from collections import OrderedDict
+import re
+import urllib.parse
+from datetime import datetime, timedelta
+import pytz
+import singer
 from singer import metrics, metadata, Transformer, utils
 from singer.utils import strptime_to_utc, strftime
-from tap_google_sheets.transform import transform_json
+from singer.messages import RecordMessage
 from tap_google_sheets.streams import STREAMS
 from tap_google_sheets.schema import get_sheet_metadata
 
@@ -17,14 +20,26 @@ def write_schema(catalog, stream_name):
     schema = stream.schema.to_dict()
     try:
         singer.write_schema(stream_name, schema, stream.key_properties)
+        LOGGER.info('Writing schema for: {}'.format(stream_name))
     except OSError as err:
         LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
         raise err
 
 
-def write_record(stream_name, record, time_extracted):
+def write_record(stream_name, record, time_extracted, version=None):
     try:
-        singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
+        if version:
+            singer.messages.write_message(
+                RecordMessage(
+                    stream=stream_name,
+                    record=record,
+                    version=version,
+                    time_extracted=time_extracted))
+        else:
+            singer.messages.write_record(
+                stream_name=stream_name,
+                record=record,
+                time_extracted=time_extracted)
     except OSError as err:
         LOGGER.info('OS Error writing record for: {}'.format(stream_name))
         LOGGER.info('record: {}'.format(record))
@@ -49,66 +64,53 @@ def write_bookmark(state, stream, value):
     singer.write_state(state)
 
 
-# def transform_datetime(this_dttm):
-def transform_datetime(this_dttm):
-    with Transformer() as transformer:
-        new_dttm = transformer._transform_datetime(this_dttm)
-    return new_dttm
-
-
-def process_records(catalog, #pylint: disable=too-many-branches
+# Transform/validate batch of records w/ schema and sent to target
+def process_records(catalog,
                     stream_name,
                     records,
                     time_extracted,
-                    bookmark_field=None,
-                    bookmark_type=None,
-                    max_bookmark_value=None,
-                    last_datetime=None,
-                    last_integer=None,
-                    parent=None,
-                    parent_id=None):
+                    version=None):
     stream = catalog.get_stream(stream_name)
     schema = stream.schema.to_dict()
     stream_metadata = metadata.to_map(stream.metadata)
-
     with metrics.record_counter(stream_name) as counter:
         for record in records:
-            # If child object, add parent_id to record
-            if parent_id and parent:
-                record[parent + '_id'] = parent_id
-
             # Transform record for Singer.io
             with Transformer() as transformer:
-                transformed_record = transformer.transform(
-                    record,
-                    schema,
-                    stream_metadata)
-                # Reset max_bookmark_value to new value if higher
-                if transformed_record.get(bookmark_field):
-                    if max_bookmark_value is None or \
-                        transformed_record[bookmark_field] > transform_datetime(max_bookmark_value):
-                        max_bookmark_value = transformed_record[bookmark_field]
-
-                if bookmark_field and (bookmark_field in transformed_record):
-                    if bookmark_type == 'integer':
-                        # Keep only records whose bookmark is after the last_integer
-                        if transformed_record[bookmark_field] >= last_integer:
-                            write_record(stream_name, transformed_record, \
-                                time_extracted=time_extracted)
-                            counter.increment()
-                    elif bookmark_type == 'datetime':
-                        last_dttm = transform_datetime(last_datetime)
-                        bookmark_dttm = transform_datetime(transformed_record[bookmark_field])
-                        # Keep only records whose bookmark is after the last_datetime
-                        if bookmark_dttm >= last_dttm:
-                            write_record(stream_name, transformed_record, \
-                                time_extracted=time_extracted)
-                            counter.increment()
-                else:
-                    write_record(stream_name, transformed_record, time_extracted=time_extracted)
-                    counter.increment()
-
-        return max_bookmark_value, counter.value
+                try:
+                    transformed_record = transformer.transform(
+                        record,
+                        schema,
+                        stream_metadata)
+                except Exception as err:
+                    LOGGER.error('{}'.format(err))
+                    raise RuntimeError(err)
+                write_record(
+                    stream_name=stream_name,
+                    record=transformed_record,
+                    time_extracted=time_extracted,
+                    version=version)
+                counter.increment()
+        return counter.value
+
+
+def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
+    # Should sheets_loaded be synced?
+    if stream_name in selected_streams:
+        LOGGER.info('STARTED Syncing {}'.format(stream_name))
+        update_currently_syncing(state, stream_name)
+        selected_fields = get_selected_fields(catalog, stream_name)
+        LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
+        write_schema(catalog, stream_name)
+        if not time_extracted:
+            time_extracted = utils.now()
+        record_count = process_records(
+            catalog=catalog,
+            stream_name=stream_name,
+            records=records,
+            time_extracted=time_extracted)
+        LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
+        update_currently_syncing(state, None)
 
 
 # Currently syncing sets the stream currently being delivered in the state.
@@ -130,9 +132,9 @@ def get_selected_fields(catalog, stream_name):
     mdata_list = singer.metadata.to_list(mdata)
     selected_fields = []
     for entry in mdata_list:
-        field =  None
+        field = None
         try:
-            field =  entry['breadcrumb'][1]
+            field = entry['breadcrumb'][1]
             if entry.get('metadata', {}).get('selected', False):
                 selected_fields.append(field)
         except IndexError:
@@ -147,22 +149,31 @@ def get_data(stream_name,
              range_rows=None):
     if not range_rows:
         range_rows = ''
+    # Replace {placeholder} variables in path
+    # Encode stream_name: fixes issue w/ special characters in sheet name
+    stream_name_escaped = re.escape(stream_name)
+    stream_name_encoded = urllib.parse.quote_plus(stream_name)
     path = endpoint_config.get('path', stream_name).replace(
-        '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
+        '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace(
             '{range_rows}', range_rows)
     params = endpoint_config.get('params', {})
     api = endpoint_config.get('api', 'sheets')
+    # Add in querystring parameters and replace {placeholder} variables
+    # querystring function ensures parameters are added but not encoded causing API errors
     querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
-        '{sheet_title}', stream_name)
+        '{sheet_title}', stream_name_encoded)
+    LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring))
     data = {}
+    time_extracted = utils.now()
     data = client.get(
         path=path,
         api=api,
         params=querystring,
-        endpoint=stream_name)
-    return data
+        endpoint=stream_name_escaped)
+    return data, time_extracted
 
 
+# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
 def transform_file_metadata(file_metadata):
     # Convert to dict
     file_metadata_tf = json.loads(json.dumps(file_metadata))
@@ -177,10 +188,11 @@ def transform_file_metadata(file_metadata):
     return file_metadata_arr
 
 
+# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
 def transform_spreadsheet_metadata(spreadsheet_metadata):
     # Convert to dict
     spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
-    # Remove keys
+    # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
     if spreadsheet_metadata_tf.get('properties'):
         spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
     spreadsheet_metadata_tf.pop('sheets', None)
@@ -190,10 +202,11 @@ def transform_spreadsheet_metadata(spreadsheet_metadata):
     return spreadsheet_metadata_arr
 
 
+# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
 def transform_sheet_metadata(spreadsheet_id, sheet, columns):
     # Convert to properties to dict
     sheet_metadata = sheet.get('properties')
-    sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) 
+    sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
     sheet_id = sheet_metadata_tf.get('sheetId')
     sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
         spreadsheet_id, sheet_id)
@@ -203,6 +216,150 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns):
     return sheet_metadata_tf
 
 
+# Convert Excel Date Serial Number (excel_date_sn) to datetime string
+# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
+def excel_to_dttm_str(excel_date_sn, timezone_str=None):
+    if not timezone_str:
+        timezone_str = 'UTC'
+    tzn = pytz.timezone(timezone_str)
+    sec_per_day = 86400
+    excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
+    epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
+    epoch_dttm = datetime(1970, 1, 1)
+    excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
+    utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
+    utc_dttm_str = strftime(utc_dttm)
+    return utc_dttm_str
+
+
+# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
+#  Convert from array of values to JSON with column names as keys
+def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
+    sheet_data_tf = []
+    row_num = from_row
+    # Create sorted list of columns based on columnIndex
+    cols = sorted(columns, key=lambda i: i['columnIndex'])
+
+    # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
+    for row in sheet_data_rows:
+        # If empty row, SKIP
+        if row == []:
+            LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
+        else:
+            sheet_data_row_tf = {}
+            # Add spreadsheet_id, sheet_id, and row
+            sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
+            sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
+            sheet_data_row_tf['__sdc_row'] = row_num
+            col_num = 1
+            for value in row:
+                # Select column metadata based on column index
+                col = cols[col_num - 1]
+                col_skipped = col.get('columnSkipped')
+                if not col_skipped:
+                    # Get column metadata
+                    col_name = col.get('columnName')
+                    col_type = col.get('columnType')
+                    col_letter = col.get('columnLetter')
+
+                    # NULL values
+                    if value is None or value == '':
+                        col_val = None
+
+                    # Convert dates/times from Lotus Notes Serial Numbers
+                    # DATE-TIME
+                    elif col_type == 'numberType.DATE_TIME':
+                        if isinstance(value, (int, float)):
+                            col_val = excel_to_dttm_str(value)
+                        else:
+                            col_val = str(value)
+                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                sheet_title, col_name, col_letter, row_num, col_type, value))
+                    # DATE
+                    elif col_type == 'numberType.DATE':
+                        if isinstance(value, (int, float)):
+                            col_val = excel_to_dttm_str(value)[:10]
+                        else:
+                            col_val = str(value)
+                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                sheet_title, col_name, col_letter, row_num, col_type, value))
+                    # TIME ONLY (NO DATE)
+                    elif col_type == 'numberType.TIME':
+                        if isinstance(value, (int, float)):
+                            try:
+                                total_secs = value * 86400 # seconds in day
+                                # Create string formatted like HH:MM:SS
+                                col_val = str(timedelta(seconds=total_secs))
+                            except ValueError:
+                                col_val = str(value)
+                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row_num, col_type, value))
+                        else:
+                            col_val = str(value)
+                    # NUMBER (INTEGER AND FLOAT)
+                    elif col_type == 'numberType':
+                        if isinstance(value, int):
+                            col_val = int(value)
+                        elif isinstance(value, float):
+                            # Determine float decimal digits
+                            decimal_digits = str(value)[::-1].find('.')
+                            if decimal_digits > 15:
+                                try:
+                                    # ROUND to multipleOf: 1e-15
+                                    col_val = float(round(value, 15))
+                                except ValueError:
+                                    col_val = str(value)
+                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                        sheet_title, col_name, col_letter, row_num, col_type, value))
+                            else: # decimal_digits <= 15, no rounding
+                                try:
+                                    col_val = float(value)
+                                except ValueError:
+                                    col_val = str(value)
+                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                        sheet_title, col_name, col_letter, row_num, col_type, value))
+                        else:
+                            col_val = str(value)
+                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                   sheet_title, col_name, col_letter, row_num, col_type, value))
+                    # STRING
+                    elif col_type == 'stringValue':
+                        col_val = str(value)
+                    # BOOLEAN
+                    elif col_type == 'boolValue':
+                        if isinstance(value, bool):
+                            col_val = value
+                        elif isinstance(value, str):
+                            if value.lower() in ('true', 't', 'yes', 'y'):
+                                col_val = True
+                            elif value.lower() in ('false', 'f', 'no', 'n'):
+                                col_val = False
+                            else:
+                                col_val = str(value)
+                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row, col_type, value))
+                        elif isinstance(value, int):
+                            if value in (1, -1):
+                                col_val = True
+                            elif value == 0:
+                                col_val = False
+                            else:
+                                col_val = str(value)
+                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row, col_type, value))
+                    # OTHER: Convert everything else to a string
+                    else:
+                        col_val = str(value)
+                        LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                            sheet_title, col_name, col_letter, row, col_type, value))
+                    sheet_data_row_tf[col_name] = col_val
+                col_num = col_num + 1
+            # APPEND non-empty row
+            sheet_data_tf.append(sheet_data_row_tf)
+        row_num = row_num + 1
+    return sheet_data_tf, row_num
+
+
 def sync(client, config, catalog, state):
     start_date = config.get('start_date')
     spreadsheet_id = config.get('spreadsheet_id')
@@ -219,63 +376,192 @@ def sync(client, config, catalog, state):
     if not selected_streams:
         return
 
-    # Get file_metadata
+    # FILE_METADATA
     file_metadata = {}
-    file_metadata_config = STREAMS.get('file_metadata')
-    file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id)
+    stream_name = 'file_metadata'
+    file_metadata_config = STREAMS.get(stream_name)
+
+    # GET file_metadata
+    LOGGER.info('GET file_meatadata')
+    file_metadata, time_extracted = get_data(stream_name=stream_name,
+                                             endpoint_config=file_metadata_config,
+                                             client=client,
+                                             spreadsheet_id=spreadsheet_id)
+    # Transform file_metadata
+    LOGGER.info('Transform file_meatadata')
     file_metadata_tf = transform_file_metadata(file_metadata)
     # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
-    last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date))
+
+    # Check if file has changed, if not break (return to __init__)
+    last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
     this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
     LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
     if this_datetime <= last_datetime:
         LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
-        return 0
-    
-    # Get spreadsheet_metadata
+        # Update file_metadata bookmark
+        write_bookmark(state, 'file_metadata', strftime(this_datetime))
+        return
+    # Sync file_metadata if selected
+    sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
+    # file_metadata bookmark is updated at the end of sync
+
+    # SPREADSHEET_METADATA
     spreadsheet_metadata = {}
-    spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata')
-    spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id)
+    stream_name = 'spreadsheet_metadata'
+    spreadsheet_metadata_config = STREAMS.get(stream_name)
+
+    # GET spreadsheet_metadata
+    LOGGER.info('GET spreadsheet_meatadata')
+    spreadsheet_metadata, ss_time_extracted = get_data(
+        stream_name=stream_name,
+        endpoint_config=spreadsheet_metadata_config,
+        client=client,
+        spreadsheet_id=spreadsheet_id)
+
+    # Transform spreadsheet_metadata
+    LOGGER.info('Transform spreadsheet_meatadata')
     spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
-    # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf))
 
-    # Get sheet_metadata
+    # Sync spreadsheet_metadata if selected
+    sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
+        ss_time_extracted)
+
+    # SHEET_METADATA and SHEET_DATA
     sheets = spreadsheet_metadata.get('sheets')
     sheet_metadata = []
     sheets_loaded = []
     sheets_loaded_config = STREAMS['sheets_loaded']
     if sheets:
+        # Loop thru sheets (worksheet tabs) in spreadsheet
         for sheet in sheets:
             sheet_title = sheet.get('properties', {}).get('title')
+            sheet_id = sheet.get('properties', {}).get('sheetId')
+
+            # GET sheet_metadata and columns
             sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
-            sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
-            # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
-            sheet_metadata.append(sheet_metadata_tf)
-
-            # Determine range of rows and columns for "paging" through batch rows of data
-            sheet_last_col_index = 1
-            sheet_last_col_letter = 'A'
-            for col in columns:
-                col_index = col.get('columnIndex')
-                col_letter = col.get('columnLetter')
-                if col_index > sheet_last_col_index:
-                    sheet_last_col_index = col_index
-                    sheet_last_col_letter = col_letter
-            sheet_max_row = sheet.get('gridProperties', {}).get('rowCount')
-            is_empty_row = False
-            batch_rows = 200
-            from_row = 2
-            if sheet_max_row < batch_rows:
-                to_row = sheet_max_row
+            # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
+
+            # SKIP empty sheets (where sheet_schema and columns are None)
+            if not sheet_schema or not columns:
+                LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
             else:
-                to_row = batch_rows
-
-            while not is_empty_row and to_row <= sheet_max_row:
-                range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row)
-                
-                sheet_data = get_data(
-                    stream_name=sheet_title,
-                    endpoint_config=sheets_loaded_config,
-                    client=client,
-                    spreadsheet_id=spreadsheet_id,
-                    range_rows=range_rows)
+                # Transform sheet_metadata
+                sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
+                # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
+                sheet_metadata.append(sheet_metadata_tf)
+
+                # SHEET_DATA
+                # Should this worksheet tab be synced?
+                if sheet_title in selected_streams:
+                    LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
+                    update_currently_syncing(state, sheet_title)
+                    selected_fields = get_selected_fields(catalog, sheet_title)
+                    LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
+                    write_schema(catalog, sheet_title)
+
+                    # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
+                    # everytime after each sheet sync is complete.
+                    # This forces hard deletes on the data downstream if fewer records are sent.
+                    # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
+                    last_integer = int(get_bookmark(state, sheet_title, 0))
+                    activate_version = int(time.time() * 1000)
+                    activate_version_message = singer.ActivateVersionMessage(
+                            stream=sheet_title,
+                            version=activate_version)
+                    if last_integer == 0:
+                        # initial load, send activate_version before AND after data sync
+                        singer.write_message(activate_version_message)
+                        LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+
+                    # Determine max range of columns and rows for "paging" through the data
+                    sheet_last_col_index = 1
+                    sheet_last_col_letter = 'A'
+                    for col in columns:
+                        col_index = col.get('columnIndex')
+                        col_letter = col.get('columnLetter')
+                        if col_index > sheet_last_col_index:
+                            sheet_last_col_index = col_index
+                            sheet_last_col_letter = col_letter
+                    sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
+
+                    # Initialize paging for 1st batch
+                    is_last_row = False
+                    batch_rows = 200
+                    from_row = 2
+                    if sheet_max_row < batch_rows:
+                        to_row = sheet_max_row
+                    else:
+                        to_row = batch_rows
+
+                    # Loop thru batches (each having 200 rows of data)
+                    while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
+                        range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
+
+                        # GET sheet_data for a worksheet tab
+                        sheet_data, time_extracted = get_data(
+                            stream_name=sheet_title,
+                            endpoint_config=sheets_loaded_config,
+                            client=client,
+                            spreadsheet_id=spreadsheet_id,
+                            range_rows=range_rows)
+                        # Data is returned as a list of arrays, an array of values for each row
+                        sheet_data_rows = sheet_data.get('values', [])
+
+                        # Transform batch of rows to JSON with keys for each column
+                        sheet_data_tf, row_num = transform_sheet_data(
+                            spreadsheet_id=spreadsheet_id,
+                            sheet_id=sheet_id,
+                            sheet_title=sheet_title,
+                            from_row=from_row,
+                            columns=columns,
+                            sheet_data_rows=sheet_data_rows)
+                        if row_num < to_row:
+                            is_last_row = True
+
+                        # Process records, send batch of records to target
+                        record_count = process_records(
+                            catalog=catalog,
+                            stream_name=sheet_title,
+                            records=sheet_data_tf,
+                            time_extracted=ss_time_extracted,
+                            version=activate_version)
+                        LOGGER.info('Sheet: {}, records processed: {}'.format(
+                            sheet_title, record_count))
+                        
+                        # Update paging from/to_row for next batch
+                        from_row = to_row + 1
+                        if to_row + batch_rows > sheet_max_row:
+                            to_row = sheet_max_row
+                        else:
+                            to_row = to_row + batch_rows
+
+                    # End of Stream: Send Activate Version and update State
+                    singer.write_message(activate_version_message)
+                    write_bookmark(state, sheet_title, activate_version)
+                    LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+                    LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
+                        sheet_title, row_num - 2)) # subtract 1 for header row
+                    update_currently_syncing(state, None)
+
+                    # SHEETS_LOADED
+                    # Add sheet to sheets_loaded
+                    sheet_loaded = {}
+                    sheet_loaded['spreadsheetId'] = spreadsheet_id
+                    sheet_loaded['sheetId'] = sheet_id
+                    sheet_loaded['title'] = sheet_title
+                    sheet_loaded['loadDate'] = strftime(utils.now())
+                    sheet_loaded['lastRowNumber'] = row_num
+                    sheets_loaded.append(sheet_loaded)
+
+    stream_name = 'sheet_metadata'
+    # Sync sheet_metadata if selected
+    sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
+
+    stream_name = 'sheets_loaded'
+    # Sync sheet_metadata if selected
+    sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
+
+    # Update file_metadata bookmark
+    write_bookmark(state, 'file_metadata', strftime(this_datetime))
+
+    return