import time
import math
-import singer
import json
-import pytz
+import re
+import urllib.parse
from datetime import datetime, timedelta
-from collections import OrderedDict
+import pytz
+import singer
from singer import metrics, metadata, Transformer, utils
from singer.utils import strptime_to_utc, strftime
+from singer.messages import RecordMessage
from tap_google_sheets.streams import STREAMS
from tap_google_sheets.schema import get_sheet_metadata
schema = stream.schema.to_dict()
try:
singer.write_schema(stream_name, schema, stream.key_properties)
+ LOGGER.info('Writing schema for: {}'.format(stream_name))
except OSError as err:
LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
raise err
-def write_record(stream_name, record, time_extracted):
+def write_record(stream_name, record, time_extracted, version=None):
try:
- singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
+ if version:
+ singer.messages.write_message(
+ RecordMessage(
+ stream=stream_name,
+ record=record,
+ version=version,
+ time_extracted=time_extracted))
+ else:
+ singer.messages.write_record(
+ stream_name=stream_name,
+ record=record,
+ time_extracted=time_extracted)
except OSError as err:
LOGGER.info('OS Error writing record for: {}'.format(stream_name))
LOGGER.info('record: {}'.format(record))
def process_records(catalog,
stream_name,
records,
- time_extracted):
+ time_extracted,
+ version=None):
stream = catalog.get_stream(stream_name)
schema = stream.schema.to_dict()
stream_metadata = metadata.to_map(stream.metadata)
for record in records:
# Transform record for Singer.io
with Transformer() as transformer:
- transformed_record = transformer.transform(
- record,
- schema,
- stream_metadata)
- write_record(stream_name, transformed_record, time_extracted=time_extracted)
+ try:
+ transformed_record = transformer.transform(
+ record,
+ schema,
+ stream_metadata)
+ except Exception as err:
+ LOGGER.error('{}'.format(err))
+ raise RuntimeError(err)
+ write_record(
+ stream_name=stream_name,
+ record=transformed_record,
+ time_extracted=time_extracted,
+ version=version)
counter.increment()
return counter.value
-def sync_stream(stream_name, selected_streams, catalog, state, records):
+def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
# Should sheets_loaded be synced?
if stream_name in selected_streams:
LOGGER.info('STARTED Syncing {}'.format(stream_name))
update_currently_syncing(state, stream_name)
+ selected_fields = get_selected_fields(catalog, stream_name)
+ LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
write_schema(catalog, stream_name)
+ if not time_extracted:
+ time_extracted = utils.now()
record_count = process_records(
catalog=catalog,
stream_name=stream_name,
records=records,
- time_extracted=utils.now())
+ time_extracted=time_extracted)
LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
update_currently_syncing(state, None)
mdata_list = singer.metadata.to_list(mdata)
selected_fields = []
for entry in mdata_list:
- field = None
+ field = None
try:
- field = entry['breadcrumb'][1]
+ field = entry['breadcrumb'][1]
if entry.get('metadata', {}).get('selected', False):
selected_fields.append(field)
except IndexError:
range_rows=None):
if not range_rows:
range_rows = ''
+ # Replace {placeholder} variables in path
+ # Encode stream_name: fixes issue w/ special characters in sheet name
+ stream_name_escaped = re.escape(stream_name)
+ stream_name_encoded = urllib.parse.quote_plus(stream_name)
path = endpoint_config.get('path', stream_name).replace(
- '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
+ '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace(
'{range_rows}', range_rows)
params = endpoint_config.get('params', {})
api = endpoint_config.get('api', 'sheets')
+ # Add in querystring parameters and replace {placeholder} variables
+ # querystring function ensures parameters are added but not encoded causing API errors
querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
- '{sheet_title}', stream_name)
+ '{sheet_title}', stream_name_encoded)
+ LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring))
data = {}
time_extracted = utils.now()
data = client.get(
path=path,
api=api,
params=querystring,
- endpoint=stream_name)
+ endpoint=stream_name_escaped)
return data, time_extracted
def transform_sheet_metadata(spreadsheet_id, sheet, columns):
# Convert to properties to dict
sheet_metadata = sheet.get('properties')
- sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
+ sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
sheet_id = sheet_metadata_tf.get('sheetId')
sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
spreadsheet_id, sheet_id)
def excel_to_dttm_str(excel_date_sn, timezone_str=None):
if not timezone_str:
timezone_str = 'UTC'
- tz = pytz.timezone(timezone_str)
+ tzn = pytz.timezone(timezone_str)
sec_per_day = 86400
- excel_epoch = 25569 # 1970-01-01T00:00:00Z
+ excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
epoch_dttm = datetime(1970, 1, 1)
excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
- utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc)
+ utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
utc_dttm_str = strftime(utc_dttm)
return utc_dttm_str
# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
# Convert from array of values to JSON with column names as keys
-def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
+def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
sheet_data_tf = []
- is_last_row = False
row_num = from_row
# Create sorted list of columns based on columnIndex
- cols = sorted(columns, key = lambda i: i['columnIndex'])
+ cols = sorted(columns, key=lambda i: i['columnIndex'])
# LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
for row in sheet_data_rows:
- # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
+ # If empty row, SKIP
if row == []:
- is_last_row = True
- return sheet_data_tf, row_num - 1, is_last_row
- sheet_data_row_tf = {}
- # Add spreadsheet_id, sheet_id, and row
- sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
- sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
- sheet_data_row_tf['__sdc_row'] = row_num
- col_num = 1
- for value in row:
- # Select column metadata based on column index
- col = cols[col_num - 1]
- col_skipped = col.get('columnSkipped')
- if not col_skipped:
- col_name = col.get('columnName')
- col_type = col.get('columnType')
- # Convert dates/times from Lotus Notes Serial Numbers
- if col_type == 'numberType.DATE_TIME':
- if isinstance(value, int) or isinstance(value, float):
- col_val = excel_to_dttm_str(value)
- else:
- col_val = str(value)
- elif col_type == 'numberType.DATE':
- if isinstance(value, int) or isinstance(value, float):
- col_val = excel_to_dttm_str(value)[:10]
- else:
- col_val = str(value)
- elif col_type == 'numberType.TIME':
- if isinstance(value, int) or isinstance(value, float):
- try:
- total_secs = value * 86400 # seconds in day
- col_val = str(timedelta(seconds=total_secs))
- except ValueError:
+ LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
+ else:
+ sheet_data_row_tf = {}
+ # Add spreadsheet_id, sheet_id, and row
+ sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
+ sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
+ sheet_data_row_tf['__sdc_row'] = row_num
+ col_num = 1
+ for value in row:
+ # Select column metadata based on column index
+ col = cols[col_num - 1]
+ col_skipped = col.get('columnSkipped')
+ if not col_skipped:
+ # Get column metadata
+ col_name = col.get('columnName')
+ col_type = col.get('columnType')
+ col_letter = col.get('columnLetter')
+
+ # NULL values
+ if value is None or value == '':
+ col_val = None
+
+ # Convert dates/times from Lotus Notes Serial Numbers
+ # DATE-TIME
+ elif col_type == 'numberType.DATE_TIME':
+ if isinstance(value, (int, float)):
+ col_val = excel_to_dttm_str(value)
+ else:
col_val = str(value)
- else:
- col_val = str(value)
- elif col_type == 'numberType':
- if isinstance(value, int):
- col_val = int(value)
- else:
- try:
- col_val = float(value)
- except ValueError:
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
+ # DATE
+ elif col_type == 'numberType.DATE':
+ if isinstance(value, (int, float)):
+ col_val = excel_to_dttm_str(value)[:10]
+ else:
col_val = str(value)
- elif col_type == 'stringValue':
- col_val = str(value)
- elif col_type == 'boolValue':
- if isinstance(value, bool):
- col_val = value
- elif isinstance(value, str):
- if value.lower() in ('true', 't', 'yes', 'y'):
- col_val = True
- elif value.lower() in ('false', 'f', 'no', 'n'):
- col_val = False
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
+ # TIME ONLY (NO DATE)
+ elif col_type == 'numberType.TIME':
+ if isinstance(value, (int, float)):
+ try:
+ total_secs = value * 86400 # seconds in day
+ # Create string formatted like HH:MM:SS
+ col_val = str(timedelta(seconds=total_secs))
+ except ValueError:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
else:
col_val = str(value)
- elif isinstance(value, int):
- if value == 1 or value == -1:
- col_val = True
- elif value == 0:
- col_val = False
+ # NUMBER (INTEGER AND FLOAT)
+ elif col_type == 'numberType':
+ if isinstance(value, int):
+ col_val = int(value)
+ elif isinstance(value, float):
+ # Determine float decimal digits
+ decimal_digits = str(value)[::-1].find('.')
+ if decimal_digits > 15:
+ try:
+ # ROUND to multipleOf: 1e-15
+ col_val = float(round(value, 15))
+ except ValueError:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
+ else: # decimal_digits <= 15, no rounding
+ try:
+ col_val = float(value)
+ except ValueError:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
else:
col_val = str(value)
-
- else:
- col_val = value
- sheet_data_row_tf[col_name] = col_val
- col_num = col_num + 1
- sheet_data_tf.append(sheet_data_row_tf)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row_num, col_type, value))
+ # STRING
+ elif col_type == 'stringValue':
+ col_val = str(value)
+ # BOOLEAN
+ elif col_type == 'boolValue':
+ if isinstance(value, bool):
+ col_val = value
+ elif isinstance(value, str):
+ if value.lower() in ('true', 't', 'yes', 'y'):
+ col_val = True
+ elif value.lower() in ('false', 'f', 'no', 'n'):
+ col_val = False
+ else:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row, col_type, value))
+ elif isinstance(value, int):
+ if value in (1, -1):
+ col_val = True
+ elif value == 0:
+ col_val = False
+ else:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row, col_type, value))
+ # OTHER: Convert everything else to a string
+ else:
+ col_val = str(value)
+ LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+ sheet_title, col_name, col_letter, row, col_type, value))
+ sheet_data_row_tf[col_name] = col_val
+ col_num = col_num + 1
+ # APPEND non-empty row
+ sheet_data_tf.append(sheet_data_row_tf)
row_num = row_num + 1
- return sheet_data_tf, row_num, is_last_row
+ return sheet_data_tf, row_num
def sync(client, config, catalog, state):
LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
if this_datetime <= last_datetime:
LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
- return 0
- else:
- # Sync file_metadata if selected
- sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf)
- write_bookmark(state, stream_name, strftime(this_datetime))
+ # Update file_metadata bookmark
+ write_bookmark(state, 'file_metadata', strftime(this_datetime))
+ return
+ # Sync file_metadata if selected
+ sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
+ # file_metadata bookmark is updated at the end of sync
# SPREADSHEET_METADATA
spreadsheet_metadata = {}
# GET spreadsheet_metadata
LOGGER.info('GET spreadsheet_meatadata')
- spreadsheet_metadata, ss_time_extracted = get_data(
+ spreadsheet_metadata, ss_time_extracted = get_data(
stream_name=stream_name,
endpoint_config=spreadsheet_metadata_config,
client=client,
spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
# Sync spreadsheet_metadata if selected
- sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf)
+ sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
+ ss_time_extracted)
# SHEET_METADATA and SHEET_DATA
sheets = spreadsheet_metadata.get('sheets')
# GET sheet_metadata and columns
sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
-
- # Transform sheet_metadata
- sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
- # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
- sheet_metadata.append(sheet_metadata_tf)
-
- # SHEET_DATA
- # Should this worksheet tab be synced?
- if sheet_title in selected_streams:
- LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
- update_currently_syncing(state, sheet_title)
- write_schema(catalog, sheet_title)
-
- # Determine max range of columns and rows for "paging" through the data
- sheet_last_col_index = 1
- sheet_last_col_letter = 'A'
- for col in columns:
- col_index = col.get('columnIndex')
- col_letter = col.get('columnLetter')
- if col_index > sheet_last_col_index:
- sheet_last_col_index = col_index
- sheet_last_col_letter = col_letter
- sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
-
- # Initialize paging for 1st batch
- is_last_row = False
- batch_rows = 200
- from_row = 2
- if sheet_max_row < batch_rows:
- to_row = sheet_max_row
- else:
- to_row = batch_rows
-
- # Loop thru batches (each having 200 rows of data)
- while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
- range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
-
- # GET sheet_data for a worksheet tab
- sheet_data, time_extracted = get_data(
- stream_name=sheet_title,
- endpoint_config=sheets_loaded_config,
- client=client,
- spreadsheet_id=spreadsheet_id,
- range_rows=range_rows)
- # Data is returned as a list of arrays, an array of values for each row
- sheet_data_rows = sheet_data.get('values')
-
- # Transform batch of rows to JSON with keys for each column
- sheet_data_tf, row_num, is_last_row = transform_sheet_data(
- spreadsheet_id=spreadsheet_id,
- sheet_id=sheet_id,
- from_row=from_row,
- columns=columns,
- sheet_data_rows=sheet_data_rows)
- if row_num < to_row:
- is_last_row = True
-
- # Process records, send batch of records to target
- record_count = process_records(
- catalog=catalog,
- stream_name=sheet_title,
- records=sheet_data_tf,
- time_extracted=ss_time_extracted)
-
- # Update paging from/to_row for next batch
- from_row = to_row + 1
- if to_row + batch_rows > sheet_max_row:
+ # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
+
+ # SKIP empty sheets (where sheet_schema and columns are None)
+ if not sheet_schema or not columns:
+ LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
+ else:
+ # Transform sheet_metadata
+ sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
+ # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
+ sheet_metadata.append(sheet_metadata_tf)
+
+ # SHEET_DATA
+ # Should this worksheet tab be synced?
+ if sheet_title in selected_streams:
+ LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
+ update_currently_syncing(state, sheet_title)
+ selected_fields = get_selected_fields(catalog, sheet_title)
+ LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
+ write_schema(catalog, sheet_title)
+
+ # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
+ # everytime after each sheet sync is complete.
+ # This forces hard deletes on the data downstream if fewer records are sent.
+ # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
+ last_integer = int(get_bookmark(state, sheet_title, 0))
+ activate_version = int(time.time() * 1000)
+ activate_version_message = singer.ActivateVersionMessage(
+ stream=sheet_title,
+ version=activate_version)
+ if last_integer == 0:
+ # initial load, send activate_version before AND after data sync
+ singer.write_message(activate_version_message)
+ LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+
+ # Determine max range of columns and rows for "paging" through the data
+ sheet_last_col_index = 1
+ sheet_last_col_letter = 'A'
+ for col in columns:
+ col_index = col.get('columnIndex')
+ col_letter = col.get('columnLetter')
+ if col_index > sheet_last_col_index:
+ sheet_last_col_index = col_index
+ sheet_last_col_letter = col_letter
+ sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
+
+ # Initialize paging for 1st batch
+ is_last_row = False
+ batch_rows = 200
+ from_row = 2
+ if sheet_max_row < batch_rows:
to_row = sheet_max_row
else:
- to_row = to_row + batch_rows
-
- # SHEETS_LOADED
- # Add sheet to sheets_loaded
- sheet_loaded = {}
- sheet_loaded['spreadsheetId'] = spreadsheet_id
- sheet_loaded['sheetId'] = sheet_id
- sheet_loaded['title'] = sheet_title
- sheet_loaded['loadDate'] = strftime(utils.now())
- sheet_loaded['lastRowNumber'] = row_num
- sheets_loaded.append(sheet_loaded)
-
- # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
- # This forces hard deletes on the data downstream if fewer records are sent for a sheet.
- # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167
- activate_version_message = singer.ActivateVersionMessage(
- stream=sheet_title,
- version=int(time.time() * 1000))
- singer.write_message(activate_version_message)
-
- LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
- sheet_title, row_num - 1))
- update_currently_syncing(state, None)
+ to_row = batch_rows
+
+ # Loop thru batches (each having 200 rows of data)
+ while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
+ range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
+
+ # GET sheet_data for a worksheet tab
+ sheet_data, time_extracted = get_data(
+ stream_name=sheet_title,
+ endpoint_config=sheets_loaded_config,
+ client=client,
+ spreadsheet_id=spreadsheet_id,
+ range_rows=range_rows)
+ # Data is returned as a list of arrays, an array of values for each row
+ sheet_data_rows = sheet_data.get('values', [])
+
+ # Transform batch of rows to JSON with keys for each column
+ sheet_data_tf, row_num = transform_sheet_data(
+ spreadsheet_id=spreadsheet_id,
+ sheet_id=sheet_id,
+ sheet_title=sheet_title,
+ from_row=from_row,
+ columns=columns,
+ sheet_data_rows=sheet_data_rows)
+ if row_num < to_row:
+ is_last_row = True
+
+ # Process records, send batch of records to target
+ record_count = process_records(
+ catalog=catalog,
+ stream_name=sheet_title,
+ records=sheet_data_tf,
+ time_extracted=ss_time_extracted,
+ version=activate_version)
+ LOGGER.info('Sheet: {}, records processed: {}'.format(
+ sheet_title, record_count))
+
+ # Update paging from/to_row for next batch
+ from_row = to_row + 1
+ if to_row + batch_rows > sheet_max_row:
+ to_row = sheet_max_row
+ else:
+ to_row = to_row + batch_rows
+
+ # End of Stream: Send Activate Version and update State
+ singer.write_message(activate_version_message)
+ write_bookmark(state, sheet_title, activate_version)
+ LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+ LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
+ sheet_title, row_num - 2)) # subtract 1 for header row
+ update_currently_syncing(state, None)
+
+ # SHEETS_LOADED
+ # Add sheet to sheets_loaded
+ sheet_loaded = {}
+ sheet_loaded['spreadsheetId'] = spreadsheet_id
+ sheet_loaded['sheetId'] = sheet_id
+ sheet_loaded['title'] = sheet_title
+ sheet_loaded['loadDate'] = strftime(utils.now())
+ sheet_loaded['lastRowNumber'] = row_num
+ sheets_loaded.append(sheet_loaded)
stream_name = 'sheet_metadata'
# Sync sheet_metadata if selected
sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
-
+
stream_name = 'sheets_loaded'
# Sync sheet_metadata if selected
sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
+
+ # Update file_metadata bookmark
+ write_bookmark(state, 'file_metadata', strftime(this_datetime))
+
+ return