import time
import math
-import singer
import json
-from collections import OrderedDict
+import re
+import urllib.parse
+from datetime import datetime, timedelta
+import pytz
+import singer
from singer import metrics, metadata, Transformer, utils
from singer.utils import strptime_to_utc, strftime
-from tap_google_sheets.transform import transform_json
+from singer.messages import RecordMessage
from tap_google_sheets.streams import STREAMS
from tap_google_sheets.schema import get_sheet_metadata
schema = stream.schema.to_dict()
try:
singer.write_schema(stream_name, schema, stream.key_properties)
+ LOGGER.info('Writing schema for: {}'.format(stream_name))
except OSError as err:
LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
raise err
-def write_record(stream_name, record, time_extracted):
+def write_record(stream_name, record, time_extracted, version=None):
try:
- singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
+ if version:
+ singer.messages.write_message(
+ RecordMessage(
+ stream=stream_name,
+ record=record,
+ version=version,
+ time_extracted=time_extracted))
+ else:
+ singer.messages.write_record(
+ stream_name=stream_name,
+ record=record,
+ time_extracted=time_extracted)
except OSError as err:
LOGGER.info('OS Error writing record for: {}'.format(stream_name))
LOGGER.info('record: {}'.format(record))
singer.write_state(state)
-# def transform_datetime(this_dttm):
-def transform_datetime(this_dttm):
- with Transformer() as transformer:
- new_dttm = transformer._transform_datetime(this_dttm)
- return new_dttm
-
-
-def process_records(catalog, #pylint: disable=too-many-branches
+# Transform/validate batch of records w/ schema and sent to target
+def process_records(catalog,
stream_name,
records,
time_extracted,
- bookmark_field=None,
- bookmark_type=None,
- max_bookmark_value=None,
- last_datetime=None,
- last_integer=None,
- parent=None,
- parent_id=None):
+ version=None):
stream = catalog.get_stream(stream_name)
schema = stream.schema.to_dict()
stream_metadata = metadata.to_map(stream.metadata)
-
with metrics.record_counter(stream_name) as counter:
for record in records:
- # If child object, add parent_id to record
- if parent_id and parent:
- record[parent + '_id'] = parent_id
-
# Transform record for Singer.io
with Transformer() as transformer:
- transformed_record = transformer.transform(
- record,
- schema,
- stream_metadata)
- # Reset max_bookmark_value to new value if higher
- if transformed_record.get(bookmark_field):
- if max_bookmark_value is None or \
- transformed_record[bookmark_field] > transform_datetime(max_bookmark_value):
- max_bookmark_value = transformed_record[bookmark_field]
-
- if bookmark_field and (bookmark_field in transformed_record):
- if bookmark_type == 'integer':
- # Keep only records whose bookmark is after the last_integer
- if transformed_record[bookmark_field] >= last_integer:
- write_record(stream_name, transformed_record, \
- time_extracted=time_extracted)
- counter.increment()
- elif bookmark_type == 'datetime':
- last_dttm = transform_datetime(last_datetime)
- bookmark_dttm = transform_datetime(transformed_record[bookmark_field])
- # Keep only records whose bookmark is after the last_datetime
- if bookmark_dttm >= last_dttm:
- write_record(stream_name, transformed_record, \
- time_extracted=time_extracted)
- counter.increment()
- else:
- write_record(stream_name, transformed_record, time_extracted=time_extracted)
- counter.increment()
-
- return max_bookmark_value, counter.value
+ try:
+ transformed_record = transformer.transform(
+ record,
+ schema,
+ stream_metadata)
+ except Exception as err:
+ LOGGER.error('{}'.format(err))
+ raise RuntimeError(err)
+ write_record(
+ stream_name=stream_name,
+ record=transformed_record,
+ time_extracted=time_extracted,
+ version=version)
+ counter.increment()
+ return counter.value
+
+
+def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
+ # Should sheets_loaded be synced?
+ if stream_name in selected_streams:
+ LOGGER.info('STARTED Syncing {}'.format(stream_name))
+ update_currently_syncing(state, stream_name)
+ selected_fields = get_selected_fields(catalog, stream_name)
+ LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
+ write_schema(catalog, stream_name)
+ if not time_extracted:
+ time_extracted = utils.now()
+ record_count = process_records(
+ catalog=catalog,
+ stream_name=stream_name,
+ records=records,
+ time_extracted=time_extracted)
+ LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
+ update_currently_syncing(state, None)
# Currently syncing sets the stream currently being delivered in the state.
mdata_list = singer.metadata.to_list(mdata)
selected_fields = []
for entry in mdata_list:
- field = None
+ field = None
try:
- field = entry['breadcrumb'][1]
+ field = entry['breadcrumb'][1]
if entry.get('metadata', {}).get('selected', False):
selected_fields.append(field)
except IndexError:
range_rows=None):
if not range_rows:
range_rows = ''
+ # Replace {placeholder} variables in path
+ # Encode stream_name: fixes issue w/ special characters in sheet name
+ stream_name_escaped = re.escape(stream_name)
+ stream_name_encoded = urllib.parse.quote_plus(stream_name)
path = endpoint_config.get('path', stream_name).replace(
- '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
+ '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace(
'{range_rows}', range_rows)
params = endpoint_config.get('params', {})
api = endpoint_config.get('api', 'sheets')
+ # Add in querystring parameters and replace {placeholder} variables
+ # querystring function ensures parameters are added but not encoded causing API errors
querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
- '{sheet_title}', stream_name)
+ '{sheet_title}', stream_name_encoded)
+ LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring))
data = {}
+ time_extracted = utils.now()
data = client.get(
path=path,
api=api,
params=querystring,
- endpoint=stream_name)
- return data
+ endpoint=stream_name_escaped)
+ return data, time_extracted
+# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
def transform_file_metadata(file_metadata):
# Convert to dict
file_metadata_tf = json.loads(json.dumps(file_metadata))
return file_metadata_arr
+# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
def transform_spreadsheet_metadata(spreadsheet_metadata):
# Convert to dict
spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
- # Remove keys
+ # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
if spreadsheet_metadata_tf.get('properties'):
spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
spreadsheet_metadata_tf.pop('sheets', None)
return spreadsheet_metadata_arr
+# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
def transform_sheet_metadata(spreadsheet_id, sheet, columns):
# Convert to properties to dict
sheet_metadata = sheet.get('properties')
- sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
+ sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
sheet_id = sheet_metadata_tf.get('sheetId')
sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
spreadsheet_id, sheet_id)
return sheet_metadata_tf
+# Convert Excel Date Serial Number (excel_date_sn) to datetime string
+# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
+def excel_to_dttm_str(excel_date_sn, timezone_str=None):
+ if not timezone_str:
+ timezone_str = 'UTC'
+ tzn = pytz.timezone(timezone_str)
+ sec_per_day = 86400
+ excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
+ epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
+ epoch_dttm = datetime(1970, 1, 1)
+ excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
+ utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
+ utc_dttm_str = strftime(utc_dttm)
+ return utc_dttm_str
+
+
+# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
+# Convert from array of values to JSON with column names as keys
+def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
+ sheet_data_tf = []
+ row_num = from_row
+ # Create sorted list of columns based on columnIndex
+ cols = sorted(columns, key=lambda i: i['columnIndex'])
+
+ # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
+ for row in sheet_data_rows:
+ # If empty row, SKIP
+ if row == []:
+ LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
+ else:
+ sheet_data_row_tf = {}
+ # Add spreadsheet_id, sheet_id, and row
+ sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
+ sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
+ sheet_data_row_tf['__sdc_row'] = row_num
+ col_num = 1
+ for value in row:
+ # Select column metadata based on column index
+ col = cols[col_num - 1]
+ col_skipped = col.get('columnSkipped')
+ if not col_skipped:
+ # Get column metadata
+ col_name = col.get('columnName')
+ col_type = col.get('columnType')
+ col_letter = col.get('columnLetter')
+
+ # NULL values
+ if value is None or value == '':
+ col_val = None
+
+ # Convert dates/times from Lotus Notes Serial Numbers
+ # DATE-TIME
+ elif col_type == 'numberType.DATE_TIME':
+ if isinstance(value, (int, float)):
+ col_val = excel_to_dttm_str(value)
+ else:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
+ # DATE
+ elif col_type == 'numberType.DATE':
+ if isinstance(value, (int, float)):
+ col_val = excel_to_dttm_str(value)[:10]
+ else:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
+ # TIME ONLY (NO DATE)
+ elif col_type == 'numberType.TIME':
+ if isinstance(value, (int, float)):
+ try:
+ total_secs = value * 86400 # seconds in day
+ # Create string formatted like HH:MM:SS
+ col_val = str(timedelta(seconds=total_secs))
+ except ValueError:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
+ else:
+ col_val = str(value)
+ # NUMBER (INTEGER AND FLOAT)
+ elif col_type == 'numberType':
+ if isinstance(value, int):
+ col_val = int(value)
+ elif isinstance(value, float):
+ # Determine float decimal digits
+ decimal_digits = str(value)[::-1].find('.')
+ if decimal_digits > 15:
+ try:
+ # ROUND to multipleOf: 1e-15
+ col_val = float(round(value, 15))
+ except ValueError:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
+ else: # decimal_digits <= 15, no rounding
+ try:
+ col_val = float(value)
+ except ValueError:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
+ else:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
+ # STRING
+ elif col_type == 'stringValue':
+ col_val = str(value)
+ # BOOLEAN
+ elif col_type == 'boolValue':
+ if isinstance(value, bool):
+ col_val = value
+ elif isinstance(value, str):
+ if value.lower() in ('true', 't', 'yes', 'y'):
+ col_val = True
+ elif value.lower() in ('false', 'f', 'no', 'n'):
+ col_val = False
+ else:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row, col_type, value))
+ elif isinstance(value, int):
+ if value in (1, -1):
+ col_val = True
+ elif value == 0:
+ col_val = False
+ else:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row, col_type, value))
+ # OTHER: Convert everything else to a string
+ else:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row, col_type, value))
+ sheet_data_row_tf[col_name] = col_val
+ col_num = col_num + 1
+ # APPEND non-empty row
+ sheet_data_tf.append(sheet_data_row_tf)
+ row_num = row_num + 1
+ return sheet_data_tf, row_num
+
+
def sync(client, config, catalog, state):
start_date = config.get('start_date')
spreadsheet_id = config.get('spreadsheet_id')
if not selected_streams:
return
- # Get file_metadata
+ # FILE_METADATA
file_metadata = {}
- file_metadata_config = STREAMS.get('file_metadata')
- file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id)
+ stream_name = 'file_metadata'
+ file_metadata_config = STREAMS.get(stream_name)
+
+ # GET file_metadata
+ LOGGER.info('GET file_meatadata')
+ file_metadata, time_extracted = get_data(stream_name=stream_name,
+ endpoint_config=file_metadata_config,
+ client=client,
+ spreadsheet_id=spreadsheet_id)
+ # Transform file_metadata
+ LOGGER.info('Transform file_meatadata')
file_metadata_tf = transform_file_metadata(file_metadata)
# LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
- last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date))
+
+ # Check if file has changed, if not break (return to __init__)
+ last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
if this_datetime <= last_datetime:
LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
- return 0
-
- # Get spreadsheet_metadata
+ # Update file_metadata bookmark
+ write_bookmark(state, 'file_metadata', strftime(this_datetime))
+ return
+ # Sync file_metadata if selected
+ sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
+ # file_metadata bookmark is updated at the end of sync
+
+ # SPREADSHEET_METADATA
spreadsheet_metadata = {}
- spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata')
- spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id)
+ stream_name = 'spreadsheet_metadata'
+ spreadsheet_metadata_config = STREAMS.get(stream_name)
+
+ # GET spreadsheet_metadata
+ LOGGER.info('GET spreadsheet_meatadata')
+ spreadsheet_metadata, ss_time_extracted = get_data(
+ stream_name=stream_name,
+ endpoint_config=spreadsheet_metadata_config,
+ client=client,
+ spreadsheet_id=spreadsheet_id)
+
+ # Transform spreadsheet_metadata
+ LOGGER.info('Transform spreadsheet_meatadata')
spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
- # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf))
- # Get sheet_metadata
+ # Sync spreadsheet_metadata if selected
+ sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
+ ss_time_extracted)
+
+ # SHEET_METADATA and SHEET_DATA
sheets = spreadsheet_metadata.get('sheets')
sheet_metadata = []
sheets_loaded = []
sheets_loaded_config = STREAMS['sheets_loaded']
if sheets:
+ # Loop thru sheets (worksheet tabs) in spreadsheet
for sheet in sheets:
sheet_title = sheet.get('properties', {}).get('title')
+ sheet_id = sheet.get('properties', {}).get('sheetId')
+
+ # GET sheet_metadata and columns
sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
- sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
- # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
- sheet_metadata.append(sheet_metadata_tf)
-
- # Determine range of rows and columns for "paging" through batch rows of data
- sheet_last_col_index = 1
- sheet_last_col_letter = 'A'
- for col in columns:
- col_index = col.get('columnIndex')
- col_letter = col.get('columnLetter')
- if col_index > sheet_last_col_index:
- sheet_last_col_index = col_index
- sheet_last_col_letter = col_letter
- sheet_max_row = sheet.get('gridProperties', {}).get('rowCount')
- is_empty_row = False
- batch_rows = 200
- from_row = 2
- if sheet_max_row < batch_rows:
- to_row = sheet_max_row
+ # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
+
+ # SKIP empty sheets (where sheet_schema and columns are None)
+ if not sheet_schema or not columns:
+ LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
else:
- to_row = batch_rows
-
- while not is_empty_row and to_row <= sheet_max_row:
- range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row)
-
- sheet_data = get_data(
- stream_name=sheet_title,
- endpoint_config=sheets_loaded_config,
- client=client,
- spreadsheet_id=spreadsheet_id,
- range_rows=range_rows)
+ # Transform sheet_metadata
+ sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
+ # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
+ sheet_metadata.append(sheet_metadata_tf)
+
+ # SHEET_DATA
+ # Should this worksheet tab be synced?
+ if sheet_title in selected_streams:
+ LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
+ update_currently_syncing(state, sheet_title)
+ selected_fields = get_selected_fields(catalog, sheet_title)
+ LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
+ write_schema(catalog, sheet_title)
+
+ # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
+ # everytime after each sheet sync is complete.
+ # This forces hard deletes on the data downstream if fewer records are sent.
+ # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
+ last_integer = int(get_bookmark(state, sheet_title, 0))
+ activate_version = int(time.time() * 1000)
+ activate_version_message = singer.ActivateVersionMessage(
+ stream=sheet_title,
+ version=activate_version)
+ if last_integer == 0:
+ # initial load, send activate_version before AND after data sync
+ singer.write_message(activate_version_message)
+ LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+
+ # Determine max range of columns and rows for "paging" through the data
+ sheet_last_col_index = 1
+ sheet_last_col_letter = 'A'
+ for col in columns:
+ col_index = col.get('columnIndex')
+ col_letter = col.get('columnLetter')
+ if col_index > sheet_last_col_index:
+ sheet_last_col_index = col_index
+ sheet_last_col_letter = col_letter
+ sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
+
+ # Initialize paging for 1st batch
+ is_last_row = False
+ batch_rows = 200
+ from_row = 2
+ if sheet_max_row < batch_rows:
+ to_row = sheet_max_row
+ else:
+ to_row = batch_rows
+
+ # Loop thru batches (each having 200 rows of data)
+ while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
+ range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
+
+ # GET sheet_data for a worksheet tab
+ sheet_data, time_extracted = get_data(
+ stream_name=sheet_title,
+ endpoint_config=sheets_loaded_config,
+ client=client,
+ spreadsheet_id=spreadsheet_id,
+ range_rows=range_rows)
+ # Data is returned as a list of arrays, an array of values for each row
+ sheet_data_rows = sheet_data.get('values', [])
+
+ # Transform batch of rows to JSON with keys for each column
+ sheet_data_tf, row_num = transform_sheet_data(
+ spreadsheet_id=spreadsheet_id,
+ sheet_id=sheet_id,
+ sheet_title=sheet_title,
+ from_row=from_row,
+ columns=columns,
+ sheet_data_rows=sheet_data_rows)
+ if row_num < to_row:
+ is_last_row = True
+
+ # Process records, send batch of records to target
+ record_count = process_records(
+ catalog=catalog,
+ stream_name=sheet_title,
+ records=sheet_data_tf,
+ time_extracted=ss_time_extracted,
+ version=activate_version)
+ LOGGER.info('Sheet: {}, records processed: {}'.format(
+ sheet_title, record_count))
+
+ # Update paging from/to_row for next batch
+ from_row = to_row + 1
+ if to_row + batch_rows > sheet_max_row:
+ to_row = sheet_max_row
+ else:
+ to_row = to_row + batch_rows
+
+ # End of Stream: Send Activate Version and update State
+ singer.write_message(activate_version_message)
+ write_bookmark(state, sheet_title, activate_version)
+ LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+ LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
+ sheet_title, row_num - 2)) # subtract 1 for header row
+ update_currently_syncing(state, None)
+
+ # SHEETS_LOADED
+ # Add sheet to sheets_loaded
+ sheet_loaded = {}
+ sheet_loaded['spreadsheetId'] = spreadsheet_id
+ sheet_loaded['sheetId'] = sheet_id
+ sheet_loaded['title'] = sheet_title
+ sheet_loaded['loadDate'] = strftime(utils.now())
+ sheet_loaded['lastRowNumber'] = row_num
+ sheets_loaded.append(sheet_loaded)
+
+ stream_name = 'sheet_metadata'
+ # Sync sheet_metadata if selected
+ sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
+
+ stream_name = 'sheets_loaded'
+ # Sync sheet_metadata if selected
+ sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
+
+ # Update file_metadata bookmark
+ write_bookmark(state, 'file_metadata', strftime(this_datetime))
+
+ return