sync.py

#

This module contains the logic to sync data from the API.


Syncable streams: The tap seems to care about syncing the streams in this order.

  1. file_metadata
  2. spreadsheet_metadata
  3. N Sheets
  4. sheet_metadata
  5. sheets_loaded
  6. sheets_loaded

The flow through this module is:

  1. Entrypoint: sync()
  2. Sync file_metadata
    1. get_data()
    2. transform_file_metadata()
    3. Maybe exit the sync
    4. sync_stream()
  3. Sync spreadsheet_metadata
    1. get_data()
    2. transform_spreadsheet_metadata()
    3. sync_stream()
  4. Sync all of the Sheets. Here’s the process for a single Sheet
    1. get_sheet_metadata()
    2. transform_sheet_metadata()
    3. get_data()
    4. transform_sheet_data()
    5. process_records()
  5. Sync sheet_metadata
    1. sync_stream()
  6. Sync sheets_loaded
    1. sync_stream()
  7. Sync sheets_loaded
    1. sync_stream()
import time
import math
import json
import re
import urllib.parse
from datetime import datetime, timedelta
import pytz
import singer
from singer import metrics, metadata, Transformer, utils
from singer.utils import strptime_to_utc, strftime
from singer.messages import RecordMessage
from tap_google_sheets.streams import STREAMS
from tap_google_sheets.schema import get_sheet_metadata

LOGGER = singer.get_logger()
#

#

Helper Functions

#

Log that we write a schema via singer.write_schema

def write_schema(catalog, stream_name):
#
    stream = catalog.get_stream(stream_name)
    schema = stream.schema.to_dict()
    try:
        singer.write_schema(stream_name, schema, stream.key_properties)
        LOGGER.info('Writing schema for: {}'.format(stream_name))
    except OSError as err:
#

QUESTION: When do we encounter an OSError?

        LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
        raise err
#

Write a RecordMessage, with the given version if it was passed in

def write_record(stream_name, record, time_extracted, version=None):
#
    try:
        if version:
            singer.messages.write_message(
                RecordMessage(
                    stream=stream_name,
                    record=record,
                    version=version,
                    time_extracted=time_extracted))
        else:
            singer.messages.write_record(
                stream_name=stream_name,
                record=record,
                time_extracted=time_extracted)
    except OSError as err:
#

QUESTION: When do we encounter an OSError?

        LOGGER.info('OS Error writing record for: {}'.format(stream_name))
        LOGGER.info('record: {}'.format(record))
        raise err
#

Safe get a bookmark from state.

def get_bookmark(state, stream, default):
#

Hides an error though if state turns out to be None

    if (state is None) or ('bookmarks' not in state):
        return default
#

This is also short enough for one line, is this supposed to be more readable?

    return (
        state
        .get('bookmarks', {})
        .get(stream, default)
    )
#

Updates and write state

def write_bookmark(state, stream, value):
#
    if 'bookmarks' not in state:
        state['bookmarks'] = {}
    state['bookmarks'][stream] = value
    LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
    singer.write_state(state)
#

Upserts or deletes the ‘currently_syncing’ stream

def update_currently_syncing(state, stream_name):
#

Why do we care if stream_name is passed in to delete currently_syncing?

    if (stream_name is None) and ('currently_syncing' in state):
        del state['currently_syncing']
    else:
        singer.set_currently_syncing(state, stream_name)
    singer.write_state(state)
#

Get a list of selected, top-level fields for stream_name

def get_selected_fields(catalog, stream_name):
#
    stream = catalog.get_stream(stream_name)
    mdata = metadata.to_map(stream.metadata)
    mdata_list = singer.metadata.to_list(mdata)
    selected_fields = []
    for entry in mdata_list:
        field = None
        try:
            field = entry['breadcrumb'][1]
            if entry.get('metadata', {}).get('selected', False):
                selected_fields.append(field)
        except IndexError:
#

Swallow the error for the Stream level metadata

            pass
    return selected_fields
#

Construct the request we want to make, make the request, and return the Response

def get_data(stream_name, endpoint_config, client, spreadsheet_id, range_rows=None):
#
#

Build the query

    stream_name_escaped = re.escape(stream_name)
#

Encode stream_name to fix issues with special characters in stream_name QUESTION: If there’s special characters here how do databases handle it?

    stream_name_encoded = urllib.parse.quote_plus(stream_name)

    if not range_rows:
        range_rows = ''
#

QUESTION: Why is this not a string.format() with keywords?

    path = endpoint_config.get('path', stream_name).replace(
        '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace(
            '{range_rows}', range_rows)
    params = endpoint_config.get('params', {})
    api = endpoint_config.get('api', 'sheets')
    querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
        '{sheet_title}', stream_name_encoded)
    LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring))
    data = {}
    time_extracted = utils.now()
#

Make the query

    data = client.get(
        path=path,
        api=api,
        params=querystring,
        endpoint=stream_name_escaped)
#

Return the Response.json()

    return data, time_extracted
#

Transform Functions

There’s this line of code that happens in these that is a bit confusing:

#
json.loads(json.dumps(some_object))
#

I don’t see the use here. We turn Python into a JSON string and back again. The only thing I could see in the repl is that integer keys get stringified.

#

In general, the transform functions just look like “maybe pop some stuff”, “maybe add some stuff”, and return the input in a list

#

#

remove nodes from lastModifyingUser, format as array

def transform_file_metadata(file_metadata):
#
    file_metadata_tf = json.loads(json.dumps(file_metadata))

    if file_metadata_tf.get('lastModifyingUser'):
        file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
        file_metadata_tf['lastModifyingUser'].pop('me', None)
        file_metadata_tf['lastModifyingUser'].pop('permissionId', None)

    file_metadata_arr = []
    file_metadata_arr.append(file_metadata_tf)
    return file_metadata_arr
#

remove defaultFormat and sheets nodes, format as array

def transform_spreadsheet_metadata(spreadsheet_metadata):
#
    spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))

    if spreadsheet_metadata_tf.get('properties'):
        spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
    spreadsheet_metadata_tf.pop('sheets', None)

    spreadsheet_metadata_arr = []
    spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
    return spreadsheet_metadata_arr
#

add spreadsheetId, sheetUrl, and columns metadata

def transform_sheet_metadata(spreadsheet_id, sheet, columns):
#
    sheet_metadata = sheet.get('properties')
    sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
    sheet_id = sheet_metadata_tf.get('sheetId')
    sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(spreadsheet_id, sheet_id)
    sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
    sheet_metadata_tf['sheetUrl'] = sheet_url
    sheet_metadata_tf['columns'] = columns
    return sheet_metadata_tf
#

Convert Excel Date Serial Number (excel_date_sn) to datetime string timezone_str: defaults to

def excel_to_dttm_str(excel_date_sn, timezone_str=None):
#

UTC (which we assume is the timezone for ALL datetimes)

    if not timezone_str:
        timezone_str = 'UTC'
    tzn = pytz.timezone(timezone_str)
    epoch_dttm = datetime(1970, 1, 1)

    sec_per_day = 86400
#

1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date

    excel_epoch = 25569
#

Seconds since Epoch, times the seconds per day => days since Epoch?

    epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)

    excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
    utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
    utc_dttm_str = singer.utils.strftime(utc_dttm)
    return utc_dttm_str
#

WARNING This next function is confusing

#

In general, the point of the function is to transform the field based on the data type that the API tells us. It loops over every row and then every column in the row.

#

For the TIME fields, there’s no reason it should work. And for some cases, the value returned is just wrong.

#

You can look at the code for timedelta and you would see that this constructor wants to normalize the input of 6 units into 3 (you can create the object with years, months, days, hours, minutes, and seconds. But it will convert values into just days, hours, and seconds).

#

Disclaimer I don’t have the exact units, but the spirit of the idea is here.

#

When we pass in seconds here as the value we get from the API times the number of seconds in a day, how timedelta does its normalization gives us an incorrect value. It takes the input to seconds and passes that to divmod() which returns a 2-ple as the result. The first element is our input integer divided by the number of seconds in a day. The second element is our input mod the number of seconds in a day. Then these results are added to the rest of the normalization and we get the correct time value back out. It’s easy to imagine that since we don’t pass in a days argument, our divmod‘s days output is just added to zero. The __str__() for timedelta must be something like "{my_days} days, {time_since_midnight(my_seconds)}", which is essentially what we get after this transform function.

#

#

add spreadsheet_id, sheet_id, and row, convert dates/times Convert from array of values to

def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
#

JSON with column names as keys

    sheet_data_tf = []
    row_num = from_row
#

Create sorted list of columns based on columnIndex

    cols = sorted(columns, key=lambda i: i['columnIndex'])

    for row in sheet_data_rows:
#

If empty row, SKIP

        if row == []:
            LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
        else:
            sheet_data_row_tf = {}
#

Add spreadsheet_id, sheet_id, and row

            sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
            sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
            sheet_data_row_tf['__sdc_row'] = row_num
            col_num = 1
            for value in row:
#

Select column metadata based on column index

                col = cols[col_num - 1]
                col_skipped = col.get('columnSkipped')
                if not col_skipped:
#

Get column metadata

                    col_name = col.get('columnName')
                    col_type = col.get('columnType')
                    col_letter = col.get('columnLetter')
#

NULL values

                    if value is None or value == '':
                        col_val = None
#

Convert dates/times from Lotus Notes Serial Numbers DATE-TIME

                    elif col_type == 'numberType.DATE_TIME':
                        if isinstance(value, (int, float)):
                            col_val = excel_to_dttm_str(value)
                        else:
                            col_val = str(value)
                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                sheet_title, col_name, col_letter, row_num, col_type, value))
#

DATE

                    elif col_type == 'numberType.DATE':
                        if isinstance(value, (int, float)):
                            col_val = excel_to_dttm_str(value)[:10]
                        else:
                            col_val = str(value)
                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                sheet_title, col_name, col_letter, row_num, col_type, value))
#

TIME ONLY (NO DATE)

                    elif col_type == 'numberType.TIME':

                        if isinstance(value, (int, float)):
                            try:
                                total_secs = value * 86400
#

Create string formatted like HH:MM:SS

                                col_val = str(timedelta(seconds=total_secs))
                            except ValueError:
                                col_val = str(value)
                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                    sheet_title, col_name, col_letter, row_num, col_type, value))
                        else:
                            col_val = str(value)
#

NUMBER (INTEGER AND FLOAT)

                    elif col_type == 'numberType':
                        if isinstance(value, int):
                            col_val = int(value)
                        elif isinstance(value, float):
#

Determine float decimal digits

                            decimal_digits = str(value)[::-1].find('.')
                            if decimal_digits > 15:
                                try:
#

ROUND to multipleOf: 1e-15

                                    col_val = float(round(value, 15))
                                except ValueError:
                                    col_val = str(value)
                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                        sheet_title, col_name, col_letter, row_num, col_type, value))
                            else: # decimal_digits <= 15, no rounding
                                try:
                                    col_val = float(value)
                                except ValueError:
                                    col_val = str(value)
                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                        sheet_title, col_name, col_letter, row_num, col_type, value))
                        else:
                            col_val = str(value)
                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                   sheet_title, col_name, col_letter, row_num, col_type, value))
#

STRING

                    elif col_type == 'stringValue':
                        col_val = str(value)
#

BOOLEAN

                    elif col_type == 'boolValue':
                        if isinstance(value, bool):
                            col_val = value
                        elif isinstance(value, str):
                            if value.lower() in ('true', 't', 'yes', 'y'):
                                col_val = True
                            elif value.lower() in ('false', 'f', 'no', 'n'):
                                col_val = False
                            else:
                                col_val = str(value)
                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                    sheet_title, col_name, col_letter, row, col_type, value))
                        elif isinstance(value, int):
                            if value in (1, -1):
                                col_val = True
                            elif value == 0:
                                col_val = False
                            else:
                                col_val = str(value)
                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                    sheet_title, col_name, col_letter, row, col_type, value))
#

OTHER: Convert everything else to a string

                    else:
                        col_val = str(value)
                        LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                            sheet_title, col_name, col_letter, row, col_type, value))
                    sheet_data_row_tf[col_name] = col_val
                col_num = col_num + 1
#

APPEND non-empty row

            sheet_data_tf.append(sheet_data_row_tf)
        row_num = row_num + 1
    return sheet_data_tf, row_num
#

#

Main Functions

#

#

Transform/validate batch of records w/ schema and sent to target

def process_records(catalog, stream_name, records, time_extracted, version=None):
#
    stream = catalog.get_stream(stream_name)
    schema = stream.schema.to_dict()
    stream_metadata = metadata.to_map(stream.metadata)
    with metrics.record_counter(stream_name) as counter:
        for record in records:
            with Transformer() as transformer:
                try:
                    transformed_record = transformer.transform(record, schema, stream_metadata)
                except Exception as err:
                    LOGGER.error('{}'.format(err))
                    raise RuntimeError(err)
                write_record(
                    stream_name=stream_name,
                    record=transformed_record,
                    time_extracted=time_extracted,
                    version=version)
                counter.increment()
        return counter.value
#

This is just a pass-through to process_records()

def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
#
    if stream_name in selected_streams:
        LOGGER.info('STARTED Syncing {}'.format(stream_name))
        update_currently_syncing(state, stream_name)
        selected_fields = get_selected_fields(catalog, stream_name)
        LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
        write_schema(catalog, stream_name)
        if not time_extracted:
            time_extracted = utils.now()
        record_count = process_records(
            catalog=catalog,
            stream_name=stream_name,
            records=records,
            time_extracted=time_extracted)
        LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
        update_currently_syncing(state, None)
#

See top of file for notes

def sync(client, config, catalog, state):
    start_date = config.get('start_date')
    spreadsheet_id = config.get('spreadsheet_id')

    last_stream = singer.get_currently_syncing(state)
    LOGGER.info('last/currently syncing stream: {}'.format(last_stream))

    selected_streams = []
    for stream in catalog.get_selected_streams(state):
        selected_streams.append(stream.stream)
    LOGGER.info('selected_streams: {}'.format(selected_streams))

    if not selected_streams:
        return
#

FILE_METADATA

    file_metadata = {}
    stream_name = 'file_metadata'
    file_metadata_config = STREAMS.get(stream_name)
#

GET file_metadata

    LOGGER.info('GET file_meatadata')
    file_metadata, time_extracted = get_data(stream_name=stream_name,
                                             endpoint_config=file_metadata_config,
                                             client=client,
                                             spreadsheet_id=spreadsheet_id)
#

Transform file_metadata

    LOGGER.info('Transform file_meatadata')
    file_metadata_tf = transform_file_metadata(file_metadata)
#

Check if file has changed, if not exit

    last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
    this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
    LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
    if this_datetime <= last_datetime:
        LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
        write_bookmark(state, 'file_metadata', strftime(this_datetime))
        return
#

Write file_metadata records if selected

    sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
#

SPREADSHEET_METADATA

    spreadsheet_metadata = {}
    stream_name = 'spreadsheet_metadata'
    spreadsheet_metadata_config = STREAMS.get(stream_name)
#

GET spreadsheet_metadata

    LOGGER.info('GET spreadsheet_meatadata')
    spreadsheet_metadata, ss_time_extracted = get_data(
        stream_name=stream_name,
        endpoint_config=spreadsheet_metadata_config,
        client=client,
        spreadsheet_id=spreadsheet_id)
#

Transform spreadsheet_metadata

    LOGGER.info('Transform spreadsheet_meatadata')
    spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
#

Write spreadsheet_metadata records if selected

    sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
        ss_time_extracted)
#

SHEET_METADATA and SHEET_DATA

    sheets = spreadsheet_metadata.get('sheets')
    sheet_metadata = []
    sheets_loaded = []
    sheets_loaded_config = STREAMS['sheets_loaded']
    if sheets:
#

Loop thru sheets (worksheet tabs) in spreadsheet

        for sheet in sheets:
            sheet_title = sheet.get('properties', {}).get('title')
            sheet_id = sheet.get('properties', {}).get('sheetId')
#

Sheet_Metadata

GET sheet_metadata and columns

            sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)

            if not sheet_schema or not columns:
                LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
            else:
#

Transform sheet_metadata

                sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
                sheet_metadata.append(sheet_metadata_tf)
#

SHEET_DATA

                if sheet_title in selected_streams:
                    LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
                    update_currently_syncing(state, sheet_title)
                    selected_fields = get_selected_fields(catalog, sheet_title)
                    LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
                    write_schema(catalog, sheet_title)
#

Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) everytime after each sheet sync is complete. This forces hard deletes on the data downstream if fewer records are sent. https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137

                    last_integer = int(get_bookmark(state, sheet_title, 0))
                    activate_version = int(time.time() * 1000)
                    activate_version_message = singer.ActivateVersionMessage(
                            stream=sheet_title,
                            version=activate_version)
                    if last_integer == 0:
#

initial load, send activate_version before AND after data sync

                        singer.write_message(activate_version_message)
                        LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
#

Determine max range of columns and rows for “paging” through the data

                    sheet_last_col_index = 1
                    sheet_last_col_letter = 'A'
                    for col in columns:
                        col_index = col.get('columnIndex')
                        col_letter = col.get('columnLetter')
                        if col_index > sheet_last_col_index:
                            sheet_last_col_index = col_index
                            sheet_last_col_letter = col_letter
                    sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
#

Initialize paging for 1st batch

                    is_last_row = False
                    batch_rows = 200
                    from_row = 2
                    if sheet_max_row < batch_rows:
                        to_row = sheet_max_row
                    else:
                        to_row = batch_rows
#

Loop thru batches (each having 200 rows of data)

                    while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
                        range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
#

GET sheet_data for a worksheet tab

                        sheet_data, time_extracted = get_data(
                            stream_name=sheet_title,
                            endpoint_config=sheets_loaded_config,
                            client=client,
                            spreadsheet_id=spreadsheet_id,
                            range_rows=range_rows)
#

Data is returned as a list of arrays, an array of values for each row

                        sheet_data_rows = sheet_data.get('values', [])
#

Transform batch of rows to JSON with keys for each column

                        sheet_data_tf, row_num = transform_sheet_data(
                            spreadsheet_id=spreadsheet_id,
                            sheet_id=sheet_id,
                            sheet_title=sheet_title,
                            from_row=from_row,
                            columns=columns,
                            sheet_data_rows=sheet_data_rows)
                        if row_num < to_row:
                            is_last_row = True
#

Process records, send batch of records to target

                        record_count = process_records(
                            catalog=catalog,
                            stream_name=sheet_title,
                            records=sheet_data_tf,
                            time_extracted=ss_time_extracted,
                            version=activate_version)
                        LOGGER.info('Sheet: {}, records processed: {}'.format(
                            sheet_title, record_count))
#

Update paging from/to_row for next batch

                        from_row = to_row + 1
                        if to_row + batch_rows > sheet_max_row:
                            to_row = sheet_max_row
                        else:
                            to_row = to_row + batch_rows
#

End of Stream: Send Activate Version and update State

                    singer.write_message(activate_version_message)
                    write_bookmark(state, sheet_title, activate_version)
                    LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
                    LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
                        sheet_title, row_num - 2)) # subtract 1 for header row
                    update_currently_syncing(state, None)
#

SHEETS_LOADED Add sheet to sheets_loaded

                    sheet_loaded = {}
                    sheet_loaded['spreadsheetId'] = spreadsheet_id
                    sheet_loaded['sheetId'] = sheet_id
                    sheet_loaded['title'] = sheet_title
                    sheet_loaded['loadDate'] = strftime(utils.now())
                    sheet_loaded['lastRowNumber'] = row_num
                    sheets_loaded.append(sheet_loaded)

    stream_name = 'sheet_metadata'
#

Write sheet_metadata records if selected

    sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)

    stream_name = 'sheets_loaded'
#

Write sheet_metadata records if selected

    sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
#

Update file_metadata bookmark

    write_bookmark(state, 'file_metadata', strftime(this_datetime))

    return