From 55ac5a86eae5ef9079e3312fd589f1bda20e43f2 Mon Sep 17 00:00:00 2001 From: Andy Lu Date: Sun, 21 Feb 2021 21:09:11 -0600 Subject: Add files from pycco --- docs/sync.html | 1680 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1680 insertions(+) create mode 100644 docs/sync.html (limited to 'docs/sync.html') diff --git a/docs/sync.html b/docs/sync.html new file mode 100644 index 0000000..97ef9fa --- /dev/null +++ b/docs/sync.html @@ -0,0 +1,1680 @@ + + + + + + + + +

+ # +

This module contains the logic to sync data from the API.


Syncable streams: The tap seems to care about syncing the streams in this order.

  1. file_metadata
  2. +
  3. spreadsheet_metadata
  4. +
  5. N Sheets
  6. +
  7. sheet_metadata
  8. +
  9. sheets_loaded
  10. +
  11. sheets_loaded
  12. +

The flow through this module is:

  1. Entrypoint: sync()
  2. +
  3. Sync file_metadata
    1. get_data()
    2. +
    3. transform_file_metadata()
    4. +
    5. Maybe exit the sync
    6. +
    7. sync_stream()
    8. +
  4. +
  5. Sync spreadsheet_metadata
    1. get_data()
    2. +
    3. transform_spreadsheet_metadata()
    4. +
    5. sync_stream()
    6. +
  6. +
  7. Sync all of the Sheets. Here’s the process for a single Sheet
    1. get_sheet_metadata()
    2. +
    3. transform_sheet_metadata()
    4. +
    5. get_data()
    6. +
    7. transform_sheet_data()
    8. +
    9. process_records()
    10. +
  8. +
  9. Sync sheet_metadata
    1. sync_stream()
    2. +
  10. +
  11. Sync sheets_loaded
    1. sync_stream()
    2. +
  12. +
  13. Sync sheets_loaded
    1. sync_stream()
    2. +
  14. +
import time
+import math
+import json
+import re
+import urllib.parse
+from datetime import datetime, timedelta
+import pytz
+import singer
+from singer import metrics, metadata, Transformer, utils
+from singer.utils import strptime_to_utc, strftime
+from singer.messages import RecordMessage
+from tap_google_sheets.streams import STREAMS
+from tap_google_sheets.schema import get_sheet_metadata
+LOGGER = singer.get_logger()
+ # +
+ # +

Helper Functions

+ # +

Log that we write a schema via singer.write_schema

def write_schema(catalog, stream_name):
+ # +
+ +
    stream = catalog.get_stream(stream_name)
+    schema = stream.schema.to_dict()
+    try:
+        singer.write_schema(stream_name, schema, stream.key_properties)
+'Writing schema for: {}'.format(stream_name))
+    except OSError as err:
+ # +

QUESTION: When do we encounter an OSError?

+'OS Error writing schema for: {}'.format(stream_name))
+        raise err
+ # +

Write a RecordMessage, with the given version if it was passed in

def write_record(stream_name, record, time_extracted, version=None):
+ # +
+ +
+        if version:
+            singer.messages.write_message(
+                RecordMessage(
+                    stream=stream_name,
+                    record=record,
+                    version=version,
+                    time_extracted=time_extracted))
+        else:
+            singer.messages.write_record(
+                stream_name=stream_name,
+                record=record,
+                time_extracted=time_extracted)
+    except OSError as err:
+ # +

QUESTION: When do we encounter an OSError?

+'OS Error writing record for: {}'.format(stream_name))
+'record: {}'.format(record))
+        raise err
+ # +

Safe get a bookmark from state.

def get_bookmark(state, stream, default):
+ # +

Hides an error though if state turns out to be None

    if (state is None) or ('bookmarks' not in state):
+        return default
+ # +

This is also short enough for one line, is this supposed to be more readable?

    return (
+        state
+        .get('bookmarks', {})
+        .get(stream, default)
+    )
+ # +

Updates and write state

def write_bookmark(state, stream, value):
+ # +
+ +
    if 'bookmarks' not in state:
+        state['bookmarks'] = {}
+    state['bookmarks'][stream] = value
+'Write state for stream: {}, value: {}'.format(stream, value))
+    singer.write_state(state)
+ # +

Upserts or deletes the ‘currently_syncing’ stream

def update_currently_syncing(state, stream_name):
+ # +

Why do we care if stream_name is passed in to delete currently_syncing?

    if (stream_name is None) and ('currently_syncing' in state):
+        del state['currently_syncing']
+    else:
+        singer.set_currently_syncing(state, stream_name)
+    singer.write_state(state)
+ # +

Get a list of selected, top-level fields for stream_name

def get_selected_fields(catalog, stream_name):
+ # +
+ +
    stream = catalog.get_stream(stream_name)
+    mdata = metadata.to_map(stream.metadata)
+    mdata_list = singer.metadata.to_list(mdata)
+    selected_fields = []
+    for entry in mdata_list:
+        field = None
+        try:
+            field = entry['breadcrumb'][1]
+            if entry.get('metadata', {}).get('selected', False):
+                selected_fields.append(field)
+        except IndexError:
+ # +

Swallow the error for the Stream level metadata

+    return selected_fields
+ # +

Construct the request we want to make, make the request, and return the Response

def get_data(stream_name, endpoint_config, client, spreadsheet_id, range_rows=None):
+ # +
+ +
+ # +

Build the query

    stream_name_escaped = re.escape(stream_name)
+ # +

Encode stream_name to fix issues with special characters in stream_name +QUESTION: If there’s special characters here how do databases handle it?

    stream_name_encoded = urllib.parse.quote_plus(stream_name)
+    if not range_rows:
+        range_rows = ''
+ # +

QUESTION: Why is this not a string.format() with keywords?

    path = endpoint_config.get('path', stream_name).replace(
+        '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace(
+            '{range_rows}', range_rows)
+    params = endpoint_config.get('params', {})
+    api = endpoint_config.get('api', 'sheets')
+    querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
+        '{sheet_title}', stream_name_encoded)
+'URL: {}/{}?{}'.format(client.base_url, path, querystring))
+    data = {}
+    time_extracted =
+ # +

Make the query

    data = client.get(
+        path=path,
+        api=api,
+        params=querystring,
+        endpoint=stream_name_escaped)
+ # +

Return the Response.json()

    return data, time_extracted
+ # +

Transform Functions


There’s this line of code that happens in these that is a bit confusing:

+ # +
+ # +

I don’t see the use here. We turn Python into a JSON string and back again. +The only thing I could see in the repl is that integer keys get stringified.

+ # +

In general, the transform functions just look like “maybe pop some +stuff”, “maybe add some stuff”, and return the input in a list

+ # +
+ # +

remove nodes from lastModifyingUser, format as array

def transform_file_metadata(file_metadata):
+ # +
+ +
    file_metadata_tf = json.loads(json.dumps(file_metadata))
+    if file_metadata_tf.get('lastModifyingUser'):
+        file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
+        file_metadata_tf['lastModifyingUser'].pop('me', None)
+        file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
+    file_metadata_arr = []
+    file_metadata_arr.append(file_metadata_tf)
+    return file_metadata_arr
+ # +

remove defaultFormat and sheets nodes, format as array

def transform_spreadsheet_metadata(spreadsheet_metadata):
+ # +
+ +
    spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
+    if spreadsheet_metadata_tf.get('properties'):
+        spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
+    spreadsheet_metadata_tf.pop('sheets', None)
+    spreadsheet_metadata_arr = []
+    spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
+    return spreadsheet_metadata_arr
+ # +

add spreadsheetId, sheetUrl, and columns metadata

def transform_sheet_metadata(spreadsheet_id, sheet, columns):
+ # +
+ +
    sheet_metadata = sheet.get('properties')
+    sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
+    sheet_id = sheet_metadata_tf.get('sheetId')
+    sheet_url = '{}/edit#gid={}'.format(spreadsheet_id, sheet_id)
+    sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
+    sheet_metadata_tf['sheetUrl'] = sheet_url
+    sheet_metadata_tf['columns'] = columns
+    return sheet_metadata_tf
+ # +

Convert Excel Date Serial Number (excel_date_sn) to datetime string timezone_str: defaults to

def excel_to_dttm_str(excel_date_sn, timezone_str=None):
+ # +

UTC (which we assume is the timezone for ALL datetimes)

    if not timezone_str:
+        timezone_str = 'UTC'
+    tzn = pytz.timezone(timezone_str)
+    epoch_dttm = datetime(1970, 1, 1)
+    sec_per_day = 86400
+ # +

1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date

    excel_epoch = 25569
+ # +

Seconds since Epoch, times the seconds per day => days since Epoch?

    epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
+    excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
+    utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
+    utc_dttm_str = singer.utils.strftime(utc_dttm)
+    return utc_dttm_str
+ # +

WARNING This next function is confusing

+ # +

In general, the point of the function is to transform the field based on the data type that the +API tells us. It loops over every row and then every column in the row.

+ # +

For the TIME fields, there’s no reason it should work. And for some cases, the value returned is +just wrong.

+ # +

You can look at the code for timedelta and you would see that this constructor wants to +normalize the input of 6 units into 3 (you can create the object with years, months, days, +hours, minutes, and seconds. But it will convert values into just days, hours, and +seconds).

+ # +

Disclaimer I don’t have the exact units, but the spirit of +the idea is here.

+ # +

When we pass in seconds here as the value we get from the API times the number of seconds in a +day, how timedelta does its normalization gives us an incorrect value. It takes the input to +seconds and passes that to divmod() which returns a 2-ple as the result. The first element is +our input integer divided by the number of seconds in a day. The second element is our input mod +the number of seconds in a day. Then these results are added to the rest of the normalization and +we get the correct time value back out. It’s easy to imagine that since we don’t pass in a days +argument, our divmod‘s days output is just added to zero. The __str__() for timedelta must +be something like "{my_days} days, {time_since_midnight(my_seconds)}", which is essentially what +we get after this transform function.

+ # +
+ # +

add spreadsheet_id, sheet_id, and row, convert dates/times Convert from array of values to

def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
+ # +

JSON with column names as keys

    sheet_data_tf = []
+    row_num = from_row
+ # +

Create sorted list of columns based on columnIndex

    cols = sorted(columns, key=lambda i: i['columnIndex'])
+    for row in sheet_data_rows:
+ # +

If empty row, SKIP

        if row == []:
+  'EMPTY ROW: {}, SKIPPING'.format(row_num))
+        else:
+            sheet_data_row_tf = {}
+ # +

Add spreadsheet_id, sheet_id, and row

            sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
+            sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
+            sheet_data_row_tf['__sdc_row'] = row_num
+            col_num = 1
+            for value in row:
+ # +

Select column metadata based on column index

                col = cols[col_num - 1]
+                col_skipped = col.get('columnSkipped')
+                if not col_skipped:
+ # +

Get column metadata

                    col_name = col.get('columnName')
+                    col_type = col.get('columnType')
+                    col_letter = col.get('columnLetter')
+ # +

NULL values

                    if value is None or value == '':
+                        col_val = None
+ # +

Convert dates/times from Lotus Notes Serial Numbers +DATE-TIME

                    elif col_type == 'numberType.DATE_TIME':
+                        if isinstance(value, (int, float)):
+                            col_val = excel_to_dttm_str(value)
+                        else:
+                            col_val = str(value)
+                  'WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                sheet_title, col_name, col_letter, row_num, col_type, value))
+ # +


                    elif col_type == 'numberType.DATE':
+                        if isinstance(value, (int, float)):
+                            col_val = excel_to_dttm_str(value)[:10]
+                        else:
+                            col_val = str(value)
+                  'WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                sheet_title, col_name, col_letter, row_num, col_type, value))
+ # +


                    elif col_type == 'numberType.TIME':
+                        if isinstance(value, (int, float)):
+                            try:
+                                total_secs = value * 86400
+ # +

Create string formatted like HH:MM:SS

                                col_val = str(timedelta(seconds=total_secs))
+                            except ValueError:
+                                col_val = str(value)
+                      'WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row_num, col_type, value))
+                        else:
+                            col_val = str(value)
+ # +


                    elif col_type == 'numberType':
+                        if isinstance(value, int):
+                            col_val = int(value)
+                        elif isinstance(value, float):
+ # +

Determine float decimal digits

                            decimal_digits = str(value)[::-1].find('.')
+                            if decimal_digits > 15:
+                                try:
+ # +

ROUND to multipleOf: 1e-15

                                    col_val = float(round(value, 15))
+                                except ValueError:
+                                    col_val = str(value)
+                          'WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                        sheet_title, col_name, col_letter, row_num, col_type, value))
+                            else: # decimal_digits <= 15, no rounding
+                                try:
+                                    col_val = float(value)
+                                except ValueError:
+                                    col_val = str(value)
+                          'WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                        sheet_title, col_name, col_letter, row_num, col_type, value))
+                        else:
+                            col_val = str(value)
+                  'WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                   sheet_title, col_name, col_letter, row_num, col_type, value))
+ # +


                    elif col_type == 'stringValue':
+                        col_val = str(value)
+ # +


                    elif col_type == 'boolValue':
+                        if isinstance(value, bool):
+                            col_val = value
+                        elif isinstance(value, str):
+                            if value.lower() in ('true', 't', 'yes', 'y'):
+                                col_val = True
+                            elif value.lower() in ('false', 'f', 'no', 'n'):
+                                col_val = False
+                            else:
+                                col_val = str(value)
+                      'WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row, col_type, value))
+                        elif isinstance(value, int):
+                            if value in (1, -1):
+                                col_val = True
+                            elif value == 0:
+                                col_val = False
+                            else:
+                                col_val = str(value)
+                      'WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row, col_type, value))
+ # +

OTHER: Convert everything else to a string

+                        col_val = str(value)
+              'WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                            sheet_title, col_name, col_letter, row, col_type, value))
+                    sheet_data_row_tf[col_name] = col_val
+                col_num = col_num + 1
+ # +

APPEND non-empty row

+        row_num = row_num + 1
+    return sheet_data_tf, row_num
+ # +
+ # +

Main Functions

+ # +
+ # +

Transform/validate batch of records w/ schema and sent to target

def process_records(catalog, stream_name, records, time_extracted, version=None):
+ # +
+ +
    stream = catalog.get_stream(stream_name)
+    schema = stream.schema.to_dict()
+    stream_metadata = metadata.to_map(stream.metadata)
+    with metrics.record_counter(stream_name) as counter:
+        for record in records:
+            with Transformer() as transformer:
+                try:
+                    transformed_record = transformer.transform(record, schema, stream_metadata)
+                except Exception as err:
+                    LOGGER.error('{}'.format(err))
+                    raise RuntimeError(err)
+                write_record(
+                    stream_name=stream_name,
+                    record=transformed_record,
+                    time_extracted=time_extracted,
+                    version=version)
+                counter.increment()
+        return counter.value
+ # +

This is just a pass-through to process_records()

def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
+ # +
+ +
    if stream_name in selected_streams:
+'STARTED Syncing {}'.format(stream_name))
+        update_currently_syncing(state, stream_name)
+        selected_fields = get_selected_fields(catalog, stream_name)
+'Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
+        write_schema(catalog, stream_name)
+        if not time_extracted:
+            time_extracted =
+        record_count = process_records(
+            catalog=catalog,
+            stream_name=stream_name,
+            records=records,
+            time_extracted=time_extracted)
+'FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
+        update_currently_syncing(state, None)
+ # +

See top of file for notes

def sync(client, config, catalog, state):
+    start_date = config.get('start_date')
+    spreadsheet_id = config.get('spreadsheet_id')
+    last_stream = singer.get_currently_syncing(state)
+'last/currently syncing stream: {}'.format(last_stream))
+    selected_streams = []
+    for stream in catalog.get_selected_streams(state):
+        selected_streams.append(
+'selected_streams: {}'.format(selected_streams))
+    if not selected_streams:
+        return
+ # +


    file_metadata = {}
+    stream_name = 'file_metadata'
+    file_metadata_config = STREAMS.get(stream_name)
+ # +

GET file_metadata

+'GET file_meatadata')
+    file_metadata, time_extracted = get_data(stream_name=stream_name,
+                                             endpoint_config=file_metadata_config,
+                                             client=client,
+                                             spreadsheet_id=spreadsheet_id)
+ # +

Transform file_metadata

+'Transform file_meatadata')
+    file_metadata_tf = transform_file_metadata(file_metadata)
+ # +

Check if file has changed, if not exit

    last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
+    this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
+'last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
+    if this_datetime <= last_datetime:
+'this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
+        write_bookmark(state, 'file_metadata', strftime(this_datetime))
+        return
+ # +

Write file_metadata records if selected

    sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
+ # +


    spreadsheet_metadata = {}
+    stream_name = 'spreadsheet_metadata'
+    spreadsheet_metadata_config = STREAMS.get(stream_name)
+ # +

GET spreadsheet_metadata

+'GET spreadsheet_meatadata')
+    spreadsheet_metadata, ss_time_extracted = get_data(
+        stream_name=stream_name,
+        endpoint_config=spreadsheet_metadata_config,
+        client=client,
+        spreadsheet_id=spreadsheet_id)
+ # +

Transform spreadsheet_metadata

+'Transform spreadsheet_meatadata')
+    spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
+ # +

Write spreadsheet_metadata records if selected

    sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
+        ss_time_extracted)
+ # +


    sheets = spreadsheet_metadata.get('sheets')
+    sheet_metadata = []
+    sheets_loaded = []
+    sheets_loaded_config = STREAMS['sheets_loaded']
+    if sheets:
+ # +

Loop thru sheets (worksheet tabs) in spreadsheet

        for sheet in sheets:
+            sheet_title = sheet.get('properties', {}).get('title')
+            sheet_id = sheet.get('properties', {}).get('sheetId')
+ # +



GET sheet_metadata and columns

            sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
+            if not sheet_schema or not columns:
+      'SKIPPING Empty Sheet: {}'.format(sheet_title))
+            else:
+ # +

Transform sheet_metadata

                sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
+                sheet_metadata.append(sheet_metadata_tf)
+ # +


                if sheet_title in selected_streams:
+          'STARTED Syncing Sheet {}'.format(sheet_title))
+                    update_currently_syncing(state, sheet_title)
+                    selected_fields = get_selected_fields(catalog, sheet_title)
+          'Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
+                    write_schema(catalog, sheet_title)
+ # +

Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) +everytime after each sheet sync is complete. +This forces hard deletes on the data downstream if fewer records are sent. +

                    last_integer = int(get_bookmark(state, sheet_title, 0))
+                    activate_version = int(time.time() * 1000)
+                    activate_version_message = singer.ActivateVersionMessage(
+                            stream=sheet_title,
+                            version=activate_version)
+                    if last_integer == 0:
+ # +

initial load, send activate_version before AND after data sync

+              'INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+ # +

Determine max range of columns and rows for “paging” through the data

                    sheet_last_col_index = 1
+                    sheet_last_col_letter = 'A'
+                    for col in columns:
+                        col_index = col.get('columnIndex')
+                        col_letter = col.get('columnLetter')
+                        if col_index > sheet_last_col_index:
+                            sheet_last_col_index = col_index
+                            sheet_last_col_letter = col_letter
+                    sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
+ # +

Initialize paging for 1st batch

                    is_last_row = False
+                    batch_rows = 200
+                    from_row = 2
+                    if sheet_max_row < batch_rows:
+                        to_row = sheet_max_row
+                    else:
+                        to_row = batch_rows
+ # +

Loop thru batches (each having 200 rows of data)

                    while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
+                        range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
+ # +

GET sheet_data for a worksheet tab

                        sheet_data, time_extracted = get_data(
+                            stream_name=sheet_title,
+                            endpoint_config=sheets_loaded_config,
+                            client=client,
+                            spreadsheet_id=spreadsheet_id,
+                            range_rows=range_rows)
+ # +

Data is returned as a list of arrays, an array of values for each row

                        sheet_data_rows = sheet_data.get('values', [])
+ # +

Transform batch of rows to JSON with keys for each column

                        sheet_data_tf, row_num = transform_sheet_data(
+                            spreadsheet_id=spreadsheet_id,
+                            sheet_id=sheet_id,
+                            sheet_title=sheet_title,
+                            from_row=from_row,
+                            columns=columns,
+                            sheet_data_rows=sheet_data_rows)
+                        if row_num < to_row:
+                            is_last_row = True
+ # +

Process records, send batch of records to target

                        record_count = process_records(
+                            catalog=catalog,
+                            stream_name=sheet_title,
+                            records=sheet_data_tf,
+                            time_extracted=ss_time_extracted,
+                            version=activate_version)
+              'Sheet: {}, records processed: {}'.format(
+                            sheet_title, record_count))
+ # +

Update paging from/to_row for next batch

                        from_row = to_row + 1
+                        if to_row + batch_rows > sheet_max_row:
+                            to_row = sheet_max_row
+                        else:
+                            to_row = to_row + batch_rows
+ # +

End of Stream: Send Activate Version and update State

+                    write_bookmark(state, sheet_title, activate_version)
+          'COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+          'FINISHED Syncing Sheet {}, Total Rows: {}'.format(
+                        sheet_title, row_num - 2)) # subtract 1 for header row
+                    update_currently_syncing(state, None)
+ # +

SHEETS_LOADED +Add sheet to sheets_loaded

                    sheet_loaded = {}
+                    sheet_loaded['spreadsheetId'] = spreadsheet_id
+                    sheet_loaded['sheetId'] = sheet_id
+                    sheet_loaded['title'] = sheet_title
+                    sheet_loaded['loadDate'] = strftime(
+                    sheet_loaded['lastRowNumber'] = row_num
+                    sheets_loaded.append(sheet_loaded)
+    stream_name = 'sheet_metadata'
+ # +

Write sheet_metadata records if selected

    sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
+    stream_name = 'sheets_loaded'
+ # +

Write sheet_metadata records if selected

    sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
+ # +

Update file_metadata bookmark

    write_bookmark(state, 'file_metadata', strftime(this_datetime))
+    return
+ -- cgit v1.2.3