]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blame - tap_google_sheets/sync.py
v.0.0.3 Sync error handling, activate version, documentation (#2)
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
CommitLineData
89643ba6
JH
1import time
2import math
89643ba6 3import json
da690bda 4from datetime import datetime, timedelta
99424fee
JH
5import pytz
6import singer
89643ba6
JH
7from singer import metrics, metadata, Transformer, utils
8from singer.utils import strptime_to_utc, strftime
43a24cba 9from singer.messages import RecordMessage
89643ba6
JH
10from tap_google_sheets.streams import STREAMS
11from tap_google_sheets.schema import get_sheet_metadata
12
13LOGGER = singer.get_logger()
14
15
16def write_schema(catalog, stream_name):
17 stream = catalog.get_stream(stream_name)
18 schema = stream.schema.to_dict()
19 try:
20 singer.write_schema(stream_name, schema, stream.key_properties)
43a24cba 21 LOGGER.info('Writing schema for: {}'.format(stream_name))
89643ba6
JH
22 except OSError as err:
23 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
24 raise err
25
26
43a24cba 27def write_record(stream_name, record, time_extracted, version=None):
89643ba6 28 try:
43a24cba
JH
29 if version:
30 singer.messages.write_message(
31 RecordMessage(
32 stream=stream_name,
33 record=record,
34 version=version,
35 time_extracted=time_extracted))
36 else:
37 singer.messages.write_record(
38 stream_name=stream_name,
39 record=record,
40 time_extracted=time_extracted)
89643ba6
JH
41 except OSError as err:
42 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
43 LOGGER.info('record: {}'.format(record))
44 raise err
45
46
47def get_bookmark(state, stream, default):
48 if (state is None) or ('bookmarks' not in state):
49 return default
50 return (
51 state
52 .get('bookmarks', {})
53 .get(stream, default)
54 )
55
56
57def write_bookmark(state, stream, value):
58 if 'bookmarks' not in state:
59 state['bookmarks'] = {}
60 state['bookmarks'][stream] = value
61 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
62 singer.write_state(state)
63
64
da690bda
JH
65# Transform/validate batch of records w/ schema and sent to target
66def process_records(catalog,
89643ba6
JH
67 stream_name,
68 records,
43a24cba
JH
69 time_extracted,
70 version=None):
89643ba6
JH
71 stream = catalog.get_stream(stream_name)
72 schema = stream.schema.to_dict()
73 stream_metadata = metadata.to_map(stream.metadata)
89643ba6
JH
74 with metrics.record_counter(stream_name) as counter:
75 for record in records:
89643ba6
JH
76 # Transform record for Singer.io
77 with Transformer() as transformer:
78 transformed_record = transformer.transform(
79 record,
80 schema,
81 stream_metadata)
43a24cba
JH
82 write_record(
83 stream_name=stream_name,
84 record=transformed_record,
85 time_extracted=time_extracted,
86 version=version)
da690bda
JH
87 counter.increment()
88 return counter.value
89
90
99424fee 91def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
da690bda
JH
92 # Should sheets_loaded be synced?
93 if stream_name in selected_streams:
94 LOGGER.info('STARTED Syncing {}'.format(stream_name))
95 update_currently_syncing(state, stream_name)
99424fee
JH
96 selected_fields = get_selected_fields(catalog, stream_name)
97 LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
da690bda 98 write_schema(catalog, stream_name)
99424fee
JH
99 if not time_extracted:
100 time_extracted = utils.now()
da690bda
JH
101 record_count = process_records(
102 catalog=catalog,
103 stream_name=stream_name,
104 records=records,
99424fee 105 time_extracted=time_extracted)
da690bda
JH
106 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
107 update_currently_syncing(state, None)
89643ba6
JH
108
109
110# Currently syncing sets the stream currently being delivered in the state.
111# If the integration is interrupted, this state property is used to identify
112# the starting point to continue from.
113# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
114def update_currently_syncing(state, stream_name):
115 if (stream_name is None) and ('currently_syncing' in state):
116 del state['currently_syncing']
117 else:
118 singer.set_currently_syncing(state, stream_name)
119 singer.write_state(state)
120
121
122# List selected fields from stream catalog
123def get_selected_fields(catalog, stream_name):
124 stream = catalog.get_stream(stream_name)
125 mdata = metadata.to_map(stream.metadata)
126 mdata_list = singer.metadata.to_list(mdata)
127 selected_fields = []
128 for entry in mdata_list:
99424fee 129 field = None
89643ba6 130 try:
99424fee 131 field = entry['breadcrumb'][1]
89643ba6
JH
132 if entry.get('metadata', {}).get('selected', False):
133 selected_fields.append(field)
134 except IndexError:
135 pass
136 return selected_fields
137
138
139def get_data(stream_name,
140 endpoint_config,
141 client,
142 spreadsheet_id,
143 range_rows=None):
144 if not range_rows:
145 range_rows = ''
5890b89c 146 # Replace {placeholder} variables in path
89643ba6
JH
147 path = endpoint_config.get('path', stream_name).replace(
148 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
149 '{range_rows}', range_rows)
150 params = endpoint_config.get('params', {})
151 api = endpoint_config.get('api', 'sheets')
5890b89c
JH
152 # Add in querystring parameters and replace {placeholder} variables
153 # querystring function ensures parameters are added but not encoded causing API errors
89643ba6
JH
154 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
155 '{sheet_title}', stream_name)
156 data = {}
da690bda 157 time_extracted = utils.now()
89643ba6
JH
158 data = client.get(
159 path=path,
160 api=api,
161 params=querystring,
162 endpoint=stream_name)
da690bda 163 return data, time_extracted
89643ba6
JH
164
165
da690bda 166# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
89643ba6
JH
167def transform_file_metadata(file_metadata):
168 # Convert to dict
169 file_metadata_tf = json.loads(json.dumps(file_metadata))
170 # Remove keys
171 if file_metadata_tf.get('lastModifyingUser'):
172 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
173 file_metadata_tf['lastModifyingUser'].pop('me', None)
174 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
175 # Add record to an array of 1
176 file_metadata_arr = []
177 file_metadata_arr.append(file_metadata_tf)
178 return file_metadata_arr
179
180
da690bda 181# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
89643ba6
JH
182def transform_spreadsheet_metadata(spreadsheet_metadata):
183 # Convert to dict
184 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
da690bda 185 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
89643ba6
JH
186 if spreadsheet_metadata_tf.get('properties'):
187 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
188 spreadsheet_metadata_tf.pop('sheets', None)
189 # Add record to an array of 1
190 spreadsheet_metadata_arr = []
191 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
192 return spreadsheet_metadata_arr
193
194
da690bda 195# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
89643ba6
JH
196def transform_sheet_metadata(spreadsheet_id, sheet, columns):
197 # Convert to properties to dict
198 sheet_metadata = sheet.get('properties')
99424fee 199 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
89643ba6
JH
200 sheet_id = sheet_metadata_tf.get('sheetId')
201 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
202 spreadsheet_id, sheet_id)
203 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
204 sheet_metadata_tf['sheetUrl'] = sheet_url
205 sheet_metadata_tf['columns'] = columns
206 return sheet_metadata_tf
207
208
da690bda
JH
209# Convert Excel Date Serial Number (excel_date_sn) to datetime string
210# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
211def excel_to_dttm_str(excel_date_sn, timezone_str=None):
212 if not timezone_str:
213 timezone_str = 'UTC'
99424fee 214 tzn = pytz.timezone(timezone_str)
da690bda 215 sec_per_day = 86400
5890b89c 216 excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
da690bda
JH
217 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
218 epoch_dttm = datetime(1970, 1, 1)
219 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
99424fee 220 utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
da690bda
JH
221 utc_dttm_str = strftime(utc_dttm)
222 return utc_dttm_str
223
224
225# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
226# Convert from array of values to JSON with column names as keys
43a24cba 227def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
da690bda 228 sheet_data_tf = []
da690bda
JH
229 row_num = from_row
230 # Create sorted list of columns based on columnIndex
99424fee 231 cols = sorted(columns, key=lambda i: i['columnIndex'])
da690bda
JH
232
233 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
234 for row in sheet_data_rows:
5890b89c 235 # If empty row, SKIP
da690bda 236 if row == []:
5890b89c
JH
237 LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
238 else:
239 sheet_data_row_tf = {}
240 # Add spreadsheet_id, sheet_id, and row
241 sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
242 sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
243 sheet_data_row_tf['__sdc_row'] = row_num
244 col_num = 1
245 for value in row:
246 # Select column metadata based on column index
247 col = cols[col_num - 1]
248 col_skipped = col.get('columnSkipped')
249 if not col_skipped:
43a24cba 250 # Get column metadata
5890b89c
JH
251 col_name = col.get('columnName')
252 col_type = col.get('columnType')
43a24cba
JH
253 col_letter = col.get('columnLetter')
254
255 # NULL values
256 if value is None or value == '':
257 col_val = None
258
5890b89c
JH
259 # Convert dates/times from Lotus Notes Serial Numbers
260 # DATE-TIME
43a24cba 261 elif col_type == 'numberType.DATE_TIME':
5890b89c
JH
262 if isinstance(value, (int, float)):
263 col_val = excel_to_dttm_str(value)
264 else:
da690bda 265 col_val = str(value)
43a24cba
JH
266 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
267 sheet_title, col_name, col_letter, row_num, col_type, value))
5890b89c
JH
268 # DATE
269 elif col_type == 'numberType.DATE':
270 if isinstance(value, (int, float)):
271 col_val = excel_to_dttm_str(value)[:10]
272 else:
da690bda 273 col_val = str(value)
43a24cba
JH
274 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
275 sheet_title, col_name, col_letter, row_num, col_type, value))
5890b89c
JH
276 # TIME ONLY (NO DATE)
277 elif col_type == 'numberType.TIME':
278 if isinstance(value, (int, float)):
279 try:
280 total_secs = value * 86400 # seconds in day
281 # Create string formatted like HH:MM:SS
282 col_val = str(timedelta(seconds=total_secs))
283 except ValueError:
284 col_val = str(value)
43a24cba
JH
285 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
286 sheet_title, col_name, col_letter, row_num, col_type, value))
da690bda
JH
287 else:
288 col_val = str(value)
5890b89c
JH
289 # NUMBER (INTEGER AND FLOAT)
290 elif col_type == 'numberType':
291 if isinstance(value, int):
292 col_val = int(value)
293 elif isinstance(value, float):
294 # Determine float decimal digits
295 decimal_digits = str(value)[::-1].find('.')
296 if decimal_digits > 15:
297 try:
298 # ROUND to multipleOf: 1e-15
299 col_val = float(round(value, 15))
300 except ValueError:
301 col_val = str(value)
43a24cba
JH
302 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
303 sheet_title, col_name, col_letter, row_num, col_type, value))
5890b89c
JH
304 else: # decimal_digits <= 15, no rounding
305 try:
306 col_val = float(value)
307 except ValueError:
308 col_val = str(value)
43a24cba
JH
309 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
310 sheet_title, col_name, col_letter, row_num, col_type, value))
da690bda
JH
311 else:
312 col_val = str(value)
43a24cba
JH
313 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
314 sheet_title, col_name, col_letter, row_num, col_type, value))
5890b89c
JH
315 # STRING
316 elif col_type == 'stringValue':
317 col_val = str(value)
318 # BOOLEAN
319 elif col_type == 'boolValue':
320 if isinstance(value, bool):
321 col_val = value
322 elif isinstance(value, str):
323 if value.lower() in ('true', 't', 'yes', 'y'):
324 col_val = True
325 elif value.lower() in ('false', 'f', 'no', 'n'):
326 col_val = False
327 else:
328 col_val = str(value)
43a24cba
JH
329 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
330 sheet_title, col_name, col_letter, row, col_type, value))
5890b89c
JH
331 elif isinstance(value, int):
332 if value in (1, -1):
333 col_val = True
334 elif value == 0:
335 col_val = False
336 else:
337 col_val = str(value)
43a24cba
JH
338 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
339 sheet_title, col_name, col_letter, row, col_type, value))
5890b89c
JH
340 # OTHER: Convert everything else to a string
341 else:
342 col_val = str(value)
43a24cba
JH
343 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
344 sheet_title, col_name, col_letter, row, col_type, value))
5890b89c
JH
345 sheet_data_row_tf[col_name] = col_val
346 col_num = col_num + 1
347 # APPEND non-empty row
348 sheet_data_tf.append(sheet_data_row_tf)
da690bda 349 row_num = row_num + 1
5890b89c 350 return sheet_data_tf, row_num
da690bda
JH
351
352
89643ba6
JH
353def sync(client, config, catalog, state):
354 start_date = config.get('start_date')
355 spreadsheet_id = config.get('spreadsheet_id')
356
357 # Get selected_streams from catalog, based on state last_stream
358 # last_stream = Previous currently synced stream, if the load was interrupted
359 last_stream = singer.get_currently_syncing(state)
360 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
361 selected_streams = []
362 for stream in catalog.get_selected_streams(state):
363 selected_streams.append(stream.stream)
364 LOGGER.info('selected_streams: {}'.format(selected_streams))
365
366 if not selected_streams:
367 return
368
da690bda 369 # FILE_METADATA
89643ba6 370 file_metadata = {}
da690bda
JH
371 stream_name = 'file_metadata'
372 file_metadata_config = STREAMS.get(stream_name)
373
374 # GET file_metadata
375 LOGGER.info('GET file_meatadata')
376 file_metadata, time_extracted = get_data(stream_name=stream_name,
377 endpoint_config=file_metadata_config,
378 client=client,
379 spreadsheet_id=spreadsheet_id)
380 # Transform file_metadata
381 LOGGER.info('Transform file_meatadata')
89643ba6
JH
382 file_metadata_tf = transform_file_metadata(file_metadata)
383 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
da690bda
JH
384
385 # Check if file has changed, if not break (return to __init__)
386 last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
89643ba6
JH
387 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
388 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
389 if this_datetime <= last_datetime:
390 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
99424fee
JH
391 return
392 # Sync file_metadata if selected
393 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
5890b89c 394 # file_metadata bookmark is updated at the end of sync
da690bda
JH
395
396 # SPREADSHEET_METADATA
89643ba6 397 spreadsheet_metadata = {}
da690bda
JH
398 stream_name = 'spreadsheet_metadata'
399 spreadsheet_metadata_config = STREAMS.get(stream_name)
400
401 # GET spreadsheet_metadata
402 LOGGER.info('GET spreadsheet_meatadata')
99424fee 403 spreadsheet_metadata, ss_time_extracted = get_data(
da690bda
JH
404 stream_name=stream_name,
405 endpoint_config=spreadsheet_metadata_config,
406 client=client,
407 spreadsheet_id=spreadsheet_id)
408
409 # Transform spreadsheet_metadata
410 LOGGER.info('Transform spreadsheet_meatadata')
89643ba6 411 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
89643ba6 412
da690bda 413 # Sync spreadsheet_metadata if selected
99424fee
JH
414 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
415 ss_time_extracted)
da690bda
JH
416
417 # SHEET_METADATA and SHEET_DATA
89643ba6
JH
418 sheets = spreadsheet_metadata.get('sheets')
419 sheet_metadata = []
420 sheets_loaded = []
421 sheets_loaded_config = STREAMS['sheets_loaded']
422 if sheets:
da690bda 423 # Loop thru sheets (worksheet tabs) in spreadsheet
89643ba6
JH
424 for sheet in sheets:
425 sheet_title = sheet.get('properties', {}).get('title')
da690bda
JH
426 sheet_id = sheet.get('properties', {}).get('sheetId')
427
428 # GET sheet_metadata and columns
89643ba6 429 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
5890b89c 430 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
da690bda
JH
431
432 # Transform sheet_metadata
89643ba6
JH
433 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
434 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
435 sheet_metadata.append(sheet_metadata_tf)
436
da690bda
JH
437 # SHEET_DATA
438 # Should this worksheet tab be synced?
439 if sheet_title in selected_streams:
440 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
441 update_currently_syncing(state, sheet_title)
99424fee
JH
442 selected_fields = get_selected_fields(catalog, sheet_title)
443 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
da690bda
JH
444 write_schema(catalog, sheet_title)
445
43a24cba
JH
446 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
447 # everytime after each sheet sync is complete.
448 # This forces hard deletes on the data downstream if fewer records are sent.
449 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
450 last_integer = int(get_bookmark(state, sheet_title, 0))
451 activate_version = int(time.time() * 1000)
452 activate_version_message = singer.ActivateVersionMessage(
453 stream=sheet_title,
454 version=activate_version)
455 if last_integer == 0:
456 # initial load, send activate_version before AND after data sync
457 singer.write_message(activate_version_message)
458 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
459
da690bda
JH
460 # Determine max range of columns and rows for "paging" through the data
461 sheet_last_col_index = 1
462 sheet_last_col_letter = 'A'
463 for col in columns:
464 col_index = col.get('columnIndex')
465 col_letter = col.get('columnLetter')
466 if col_index > sheet_last_col_index:
467 sheet_last_col_index = col_index
468 sheet_last_col_letter = col_letter
469 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
470
471 # Initialize paging for 1st batch
472 is_last_row = False
473 batch_rows = 200
474 from_row = 2
475 if sheet_max_row < batch_rows:
476 to_row = sheet_max_row
477 else:
478 to_row = batch_rows
479
480 # Loop thru batches (each having 200 rows of data)
481 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
482 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
99424fee 483
da690bda
JH
484 # GET sheet_data for a worksheet tab
485 sheet_data, time_extracted = get_data(
486 stream_name=sheet_title,
487 endpoint_config=sheets_loaded_config,
488 client=client,
489 spreadsheet_id=spreadsheet_id,
490 range_rows=range_rows)
491 # Data is returned as a list of arrays, an array of values for each row
492 sheet_data_rows = sheet_data.get('values')
493
494 # Transform batch of rows to JSON with keys for each column
5890b89c 495 sheet_data_tf, row_num = transform_sheet_data(
da690bda
JH
496 spreadsheet_id=spreadsheet_id,
497 sheet_id=sheet_id,
43a24cba 498 sheet_title=sheet_title,
da690bda
JH
499 from_row=from_row,
500 columns=columns,
501 sheet_data_rows=sheet_data_rows)
502 if row_num < to_row:
503 is_last_row = True
504
505 # Process records, send batch of records to target
506 record_count = process_records(
507 catalog=catalog,
508 stream_name=sheet_title,
509 records=sheet_data_tf,
43a24cba
JH
510 time_extracted=ss_time_extracted,
511 version=activate_version)
5890b89c 512 LOGGER.info('Sheet: {}, records processed: {}'.format(
99424fee 513 sheet_title, record_count))
43a24cba 514
da690bda
JH
515 # Update paging from/to_row for next batch
516 from_row = to_row + 1
517 if to_row + batch_rows > sheet_max_row:
518 to_row = sheet_max_row
519 else:
520 to_row = to_row + batch_rows
521
43a24cba
JH
522 # End of Stream: Send Activate Version and update State
523 singer.write_message(activate_version_message)
524 write_bookmark(state, sheet_title, activate_version)
525 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
526 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
527 sheet_title, row_num - 2)) # subtract 1 for header row
528 update_currently_syncing(state, None)
529
da690bda
JH
530 # SHEETS_LOADED
531 # Add sheet to sheets_loaded
532 sheet_loaded = {}
533 sheet_loaded['spreadsheetId'] = spreadsheet_id
534 sheet_loaded['sheetId'] = sheet_id
535 sheet_loaded['title'] = sheet_title
536 sheet_loaded['loadDate'] = strftime(utils.now())
537 sheet_loaded['lastRowNumber'] = row_num
538 sheets_loaded.append(sheet_loaded)
539
da690bda
JH
540 stream_name = 'sheet_metadata'
541 # Sync sheet_metadata if selected
542 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
99424fee 543
da690bda
JH
544 stream_name = 'sheets_loaded'
545 # Sync sheet_metadata if selected
546 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
99424fee 547
5890b89c
JH
548 # Update file_metadata bookmark
549 write_bookmark(state, 'file_metadata', strftime(this_datetime))
550
99424fee 551 return