aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets/sync.py
blob: 26c2d19cebd1a4c174eeffa3a0be78797006d8a4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
import time
import math
import json
import re
import urllib.parse
from datetime import datetime, timedelta
import pytz
import singer
from singer import metrics, metadata, Transformer, utils
from singer.utils import strptime_to_utc, strftime
from singer.messages import RecordMessage
from tap_google_sheets.streams import STREAMS
from tap_google_sheets.schema import get_sheet_metadata

LOGGER = singer.get_logger()


def write_schema(catalog, stream_name):
    stream = catalog.get_stream(stream_name)
    schema = stream.schema.to_dict()
    try:
        singer.write_schema(stream_name, schema, stream.key_properties)
        LOGGER.info('Writing schema for: {}'.format(stream_name))
    except OSError as err:
        LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
        raise err


def write_record(stream_name, record, time_extracted, version=None):
    try:
        if version:
            singer.messages.write_message(
                RecordMessage(
                    stream=stream_name,
                    record=record,
                    version=version,
                    time_extracted=time_extracted))
        else:
            singer.messages.write_record(
                stream_name=stream_name,
                record=record,
                time_extracted=time_extracted)
    except OSError as err:
        LOGGER.info('OS Error writing record for: {}'.format(stream_name))
        LOGGER.info('record: {}'.format(record))
        raise err


def get_bookmark(state, stream, default):
    if (state is None) or ('bookmarks' not in state):
        return default
    return (
        state
        .get('bookmarks', {})
        .get(stream, default)
    )


def write_bookmark(state, stream, value):
    if 'bookmarks' not in state:
        state['bookmarks'] = {}
    state['bookmarks'][stream] = value
    LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
    singer.write_state(state)


# Transform/validate batch of records w/ schema and sent to target
def process_records(catalog,
                    stream_name,
                    records,
                    time_extracted,
                    version=None):
    stream = catalog.get_stream(stream_name)
    schema = stream.schema.to_dict()
    stream_metadata = metadata.to_map(stream.metadata)
    with metrics.record_counter(stream_name) as counter:
        for record in records:
            # Transform record for Singer.io
            with Transformer() as transformer:
                try:
                    transformed_record = transformer.transform(
                        record,
                        schema,
                        stream_metadata)
                except Exception as err:
                    LOGGER.error('{}'.format(err))
                    raise RuntimeError(err)
                write_record(
                    stream_name=stream_name,
                    record=transformed_record,
                    time_extracted=time_extracted,
                    version=version)
                counter.increment()
        return counter.value


def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
    # Should sheets_loaded be synced?
    if stream_name in selected_streams:
        LOGGER.info('STARTED Syncing {}'.format(stream_name))
        update_currently_syncing(state, stream_name)
        selected_fields = get_selected_fields(catalog, stream_name)
        LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
        write_schema(catalog, stream_name)
        if not time_extracted:
            time_extracted = utils.now()
        record_count = process_records(
            catalog=catalog,
            stream_name=stream_name,
            records=records,
            time_extracted=time_extracted)
        LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
        update_currently_syncing(state, None)


# Currently syncing sets the stream currently being delivered in the state.
# If the integration is interrupted, this state property is used to identify
#  the starting point to continue from.
# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
def update_currently_syncing(state, stream_name):
    if (stream_name is None) and ('currently_syncing' in state):
        del state['currently_syncing']
    else:
        singer.set_currently_syncing(state, stream_name)
    singer.write_state(state)


# List selected fields from stream catalog
def get_selected_fields(catalog, stream_name):
    stream = catalog.get_stream(stream_name)
    mdata = metadata.to_map(stream.metadata)
    mdata_list = singer.metadata.to_list(mdata)
    selected_fields = []
    for entry in mdata_list:
        field = None
        try:
            field = entry['breadcrumb'][1]
            if entry.get('metadata', {}).get('selected', False):
                selected_fields.append(field)
        except IndexError:
            pass
    return selected_fields


def get_data(stream_name,
             endpoint_config,
             client,
             spreadsheet_id,
             range_rows=None):
    if not range_rows:
        range_rows = ''
    # Replace {placeholder} variables in path
    # Encode stream_name: fixes issue w/ special characters in sheet name
    stream_name_escaped = re.escape(stream_name)
    stream_name_encoded = urllib.parse.quote_plus(stream_name)
    path = endpoint_config.get('path', stream_name).replace(
        '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace(
            '{range_rows}', range_rows)
    params = endpoint_config.get('params', {})
    api = endpoint_config.get('api', 'sheets')
    # Add in querystring parameters and replace {placeholder} variables
    # querystring function ensures parameters are added but not encoded causing API errors
    querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
        '{sheet_title}', stream_name_encoded)
    LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring))
    data = {}
    time_extracted = utils.now()
    data = client.get(
        path=path,
        api=api,
        params=querystring,
        endpoint=stream_name_escaped)
    return data, time_extracted


# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
def transform_file_metadata(file_metadata):
    # Convert to dict
    file_metadata_tf = json.loads(json.dumps(file_metadata))
    # Remove keys
    if file_metadata_tf.get('lastModifyingUser'):
        file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
        file_metadata_tf['lastModifyingUser'].pop('me', None)
        file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
    # Add record to an array of 1
    file_metadata_arr = []
    file_metadata_arr.append(file_metadata_tf)
    return file_metadata_arr


# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
def transform_spreadsheet_metadata(spreadsheet_metadata):
    # Convert to dict
    spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
    # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
    if spreadsheet_metadata_tf.get('properties'):
        spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
    spreadsheet_metadata_tf.pop('sheets', None)
    # Add record to an array of 1
    spreadsheet_metadata_arr = []
    spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
    return spreadsheet_metadata_arr


# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
def transform_sheet_metadata(spreadsheet_id, sheet, columns):
    # Convert to properties to dict
    sheet_metadata = sheet.get('properties')
    sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
    sheet_id = sheet_metadata_tf.get('sheetId')
    sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
        spreadsheet_id, sheet_id)
    sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
    sheet_metadata_tf['sheetUrl'] = sheet_url
    sheet_metadata_tf['columns'] = columns
    return sheet_metadata_tf


# Convert Excel Date Serial Number (excel_date_sn) to datetime string
# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
def excel_to_dttm_str(excel_date_sn, timezone_str=None):
    if not timezone_str:
        timezone_str = 'UTC'
    tzn = pytz.timezone(timezone_str)
    sec_per_day = 86400
    excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
    epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
    epoch_dttm = datetime(1970, 1, 1)
    excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
    utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
    utc_dttm_str = strftime(utc_dttm)
    return utc_dttm_str


# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
#  Convert from array of values to JSON with column names as keys
def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
    sheet_data_tf = []
    row_num = from_row
    # Create sorted list of columns based on columnIndex
    cols = sorted(columns, key=lambda i: i['columnIndex'])

    # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
    for row in sheet_data_rows:
        # If empty row, SKIP
        if row == []:
            LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
        else:
            sheet_data_row_tf = {}
            # Add spreadsheet_id, sheet_id, and row
            sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
            sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
            sheet_data_row_tf['__sdc_row'] = row_num
            col_num = 1
            for value in row:
                # Select column metadata based on column index
                col = cols[col_num - 1]
                col_skipped = col.get('columnSkipped')
                if not col_skipped:
                    # Get column metadata
                    col_name = col.get('columnName')
                    col_type = col.get('columnType')
                    col_letter = col.get('columnLetter')

                    # NULL values
                    if value is None or value == '':
                        col_val = None

                    # Convert dates/times from Lotus Notes Serial Numbers
                    # DATE-TIME
                    elif col_type == 'numberType.DATE_TIME':
                        if isinstance(value, (int, float)):
                            col_val = excel_to_dttm_str(value)
                        else:
                            col_val = str(value)
                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                sheet_title, col_name, col_letter, row_num, col_type, value))
                    # DATE
                    elif col_type == 'numberType.DATE':
                        if isinstance(value, (int, float)):
                            col_val = excel_to_dttm_str(value)[:10]
                        else:
                            col_val = str(value)
                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                sheet_title, col_name, col_letter, row_num, col_type, value))
                    # TIME ONLY (NO DATE)
                    elif col_type == 'numberType.TIME':
                        if isinstance(value, (int, float)):
                            try:
                                total_secs = value * 86400 # seconds in day
                                # Create string formatted like HH:MM:SS
                                col_val = str(timedelta(seconds=total_secs))
                            except ValueError:
                                col_val = str(value)
                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                    sheet_title, col_name, col_letter, row_num, col_type, value))
                        else:
                            col_val = str(value)
                    # NUMBER (INTEGER AND FLOAT)
                    elif col_type == 'numberType':
                        if isinstance(value, int):
                            col_val = int(value)
                        elif isinstance(value, float):
                            # Determine float decimal digits
                            decimal_digits = str(value)[::-1].find('.')
                            if decimal_digits > 15:
                                try:
                                    # ROUND to multipleOf: 1e-15
                                    col_val = float(round(value, 15))
                                except ValueError:
                                    col_val = str(value)
                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                        sheet_title, col_name, col_letter, row_num, col_type, value))
                            else: # decimal_digits <= 15, no rounding
                                try:
                                    col_val = float(value)
                                except ValueError:
                                    col_val = str(value)
                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                        sheet_title, col_name, col_letter, row_num, col_type, value))
                        else:
                            col_val = str(value)
                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                   sheet_title, col_name, col_letter, row_num, col_type, value))
                    # STRING
                    elif col_type == 'stringValue':
                        col_val = str(value)
                    # BOOLEAN
                    elif col_type == 'boolValue':
                        if isinstance(value, bool):
                            col_val = value
                        elif isinstance(value, str):
                            if value.lower() in ('true', 't', 'yes', 'y'):
                                col_val = True
                            elif value.lower() in ('false', 'f', 'no', 'n'):
                                col_val = False
                            else:
                                col_val = str(value)
                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                    sheet_title, col_name, col_letter, row, col_type, value))
                        elif isinstance(value, int):
                            if value in (1, -1):
                                col_val = True
                            elif value == 0:
                                col_val = False
                            else:
                                col_val = str(value)
                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                    sheet_title, col_name, col_letter, row, col_type, value))
                    # OTHER: Convert everything else to a string
                    else:
                        col_val = str(value)
                        LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                            sheet_title, col_name, col_letter, row, col_type, value))
                    sheet_data_row_tf[col_name] = col_val
                col_num = col_num + 1
            # APPEND non-empty row
            sheet_data_tf.append(sheet_data_row_tf)
        row_num = row_num + 1
    return sheet_data_tf, row_num


def sync(client, config, catalog, state):
    start_date = config.get('start_date')
    spreadsheet_id = config.get('spreadsheet_id')

    # Get selected_streams from catalog, based on state last_stream
    #   last_stream = Previous currently synced stream, if the load was interrupted
    last_stream = singer.get_currently_syncing(state)
    LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
    selected_streams = []
    for stream in catalog.get_selected_streams(state):
        selected_streams.append(stream.stream)
    LOGGER.info('selected_streams: {}'.format(selected_streams))

    if not selected_streams:
        return

    # FILE_METADATA
    file_metadata = {}
    stream_name = 'file_metadata'
    file_metadata_config = STREAMS.get(stream_name)

    # GET file_metadata
    LOGGER.info('GET file_meatadata')
    file_metadata, time_extracted = get_data(stream_name=stream_name,
                                             endpoint_config=file_metadata_config,
                                             client=client,
                                             spreadsheet_id=spreadsheet_id)
    # Transform file_metadata
    LOGGER.info('Transform file_meatadata')
    file_metadata_tf = transform_file_metadata(file_metadata)
    # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))

    # Check if file has changed, if not break (return to __init__)
    last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
    this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
    LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
    if this_datetime <= last_datetime:
        LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
        # Update file_metadata bookmark
        write_bookmark(state, 'file_metadata', strftime(this_datetime))
        return
    # Sync file_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
    # file_metadata bookmark is updated at the end of sync

    # SPREADSHEET_METADATA
    spreadsheet_metadata = {}
    stream_name = 'spreadsheet_metadata'
    spreadsheet_metadata_config = STREAMS.get(stream_name)

    # GET spreadsheet_metadata
    LOGGER.info('GET spreadsheet_meatadata')
    spreadsheet_metadata, ss_time_extracted = get_data(
        stream_name=stream_name,
        endpoint_config=spreadsheet_metadata_config,
        client=client,
        spreadsheet_id=spreadsheet_id)

    # Transform spreadsheet_metadata
    LOGGER.info('Transform spreadsheet_meatadata')
    spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)

    # Sync spreadsheet_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
        ss_time_extracted)

    # SHEET_METADATA and SHEET_DATA
    sheets = spreadsheet_metadata.get('sheets')
    sheet_metadata = []
    sheets_loaded = []
    sheets_loaded_config = STREAMS['sheets_loaded']
    if sheets:
        # Loop thru sheets (worksheet tabs) in spreadsheet
        for sheet in sheets:
            sheet_title = sheet.get('properties', {}).get('title')
            sheet_id = sheet.get('properties', {}).get('sheetId')

            # GET sheet_metadata and columns
            sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
            # LOGGER.info('sheet_schema: {}'.format(sheet_schema))

            # SKIP empty sheets (where sheet_schema and columns are None)
            if not sheet_schema or not columns:
                LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
            else:
                # Transform sheet_metadata
                sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
                # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
                sheet_metadata.append(sheet_metadata_tf)

                # SHEET_DATA
                # Should this worksheet tab be synced?
                if sheet_title in selected_streams:
                    LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
                    update_currently_syncing(state, sheet_title)
                    selected_fields = get_selected_fields(catalog, sheet_title)
                    LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
                    write_schema(catalog, sheet_title)

                    # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
                    # everytime after each sheet sync is complete.
                    # This forces hard deletes on the data downstream if fewer records are sent.
                    # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
                    last_integer = int(get_bookmark(state, sheet_title, 0))
                    activate_version = int(time.time() * 1000)
                    activate_version_message = singer.ActivateVersionMessage(
                            stream=sheet_title,
                            version=activate_version)
                    if last_integer == 0:
                        # initial load, send activate_version before AND after data sync
                        singer.write_message(activate_version_message)
                        LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))

                    # Determine max range of columns and rows for "paging" through the data
                    sheet_last_col_index = 1
                    sheet_last_col_letter = 'A'
                    for col in columns:
                        col_index = col.get('columnIndex')
                        col_letter = col.get('columnLetter')
                        if col_index > sheet_last_col_index:
                            sheet_last_col_index = col_index
                            sheet_last_col_letter = col_letter
                    sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')

                    # Initialize paging for 1st batch
                    is_last_row = False
                    batch_rows = 200
                    from_row = 2
                    if sheet_max_row < batch_rows:
                        to_row = sheet_max_row
                    else:
                        to_row = batch_rows

                    # Loop thru batches (each having 200 rows of data)
                    while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
                        range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)

                        # GET sheet_data for a worksheet tab
                        sheet_data, time_extracted = get_data(
                            stream_name=sheet_title,
                            endpoint_config=sheets_loaded_config,
                            client=client,
                            spreadsheet_id=spreadsheet_id,
                            range_rows=range_rows)
                        # Data is returned as a list of arrays, an array of values for each row
                        sheet_data_rows = sheet_data.get('values', [])

                        # Transform batch of rows to JSON with keys for each column
                        sheet_data_tf, row_num = transform_sheet_data(
                            spreadsheet_id=spreadsheet_id,
                            sheet_id=sheet_id,
                            sheet_title=sheet_title,
                            from_row=from_row,
                            columns=columns,
                            sheet_data_rows=sheet_data_rows)
                        if row_num < to_row:
                            is_last_row = True

                        # Process records, send batch of records to target
                        record_count = process_records(
                            catalog=catalog,
                            stream_name=sheet_title,
                            records=sheet_data_tf,
                            time_extracted=ss_time_extracted,
                            version=activate_version)
                        LOGGER.info('Sheet: {}, records processed: {}'.format(
                            sheet_title, record_count))
                        
                        # Update paging from/to_row for next batch
                        from_row = to_row + 1
                        if to_row + batch_rows > sheet_max_row:
                            to_row = sheet_max_row
                        else:
                            to_row = to_row + batch_rows

                    # End of Stream: Send Activate Version and update State
                    singer.write_message(activate_version_message)
                    write_bookmark(state, sheet_title, activate_version)
                    LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
                    LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
                        sheet_title, row_num - 2)) # subtract 1 for header row
                    update_currently_syncing(state, None)

                    # SHEETS_LOADED
                    # Add sheet to sheets_loaded
                    sheet_loaded = {}
                    sheet_loaded['spreadsheetId'] = spreadsheet_id
                    sheet_loaded['sheetId'] = sheet_id
                    sheet_loaded['title'] = sheet_title
                    sheet_loaded['loadDate'] = strftime(utils.now())
                    sheet_loaded['lastRowNumber'] = row_num
                    sheets_loaded.append(sheet_loaded)

    stream_name = 'sheet_metadata'
    # Sync sheet_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)

    stream_name = 'sheets_loaded'
    # Sync sheet_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)

    # Update file_metadata bookmark
    write_bookmark(state, 'file_metadata', strftime(this_datetime))

    return