]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blame - tap_google_sheets/sync.py
v.1.0.3 Fix slashes and discovery errors (#15)
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
CommitLineData
89643ba6
JH
1import time
2import math
89643ba6 3import json
5fc2ead5
JH
4import re
5import urllib.parse
da690bda 6from datetime import datetime, timedelta
99424fee
JH
7import pytz
8import singer
89643ba6
JH
9from singer import metrics, metadata, Transformer, utils
10from singer.utils import strptime_to_utc, strftime
43a24cba 11from singer.messages import RecordMessage
89643ba6
JH
12from tap_google_sheets.streams import STREAMS
13from tap_google_sheets.schema import get_sheet_metadata
14
15LOGGER = singer.get_logger()
16
17
18def write_schema(catalog, stream_name):
19 stream = catalog.get_stream(stream_name)
20 schema = stream.schema.to_dict()
21 try:
22 singer.write_schema(stream_name, schema, stream.key_properties)
43a24cba 23 LOGGER.info('Writing schema for: {}'.format(stream_name))
89643ba6
JH
24 except OSError as err:
25 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
26 raise err
27
28
43a24cba 29def write_record(stream_name, record, time_extracted, version=None):
89643ba6 30 try:
43a24cba
JH
31 if version:
32 singer.messages.write_message(
33 RecordMessage(
34 stream=stream_name,
35 record=record,
36 version=version,
37 time_extracted=time_extracted))
38 else:
39 singer.messages.write_record(
40 stream_name=stream_name,
41 record=record,
42 time_extracted=time_extracted)
89643ba6
JH
43 except OSError as err:
44 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
45 LOGGER.info('record: {}'.format(record))
46 raise err
47
48
49def get_bookmark(state, stream, default):
50 if (state is None) or ('bookmarks' not in state):
51 return default
52 return (
53 state
54 .get('bookmarks', {})
55 .get(stream, default)
56 )
57
58
59def write_bookmark(state, stream, value):
60 if 'bookmarks' not in state:
61 state['bookmarks'] = {}
62 state['bookmarks'][stream] = value
63 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
64 singer.write_state(state)
65
66
da690bda
JH
67# Transform/validate batch of records w/ schema and sent to target
68def process_records(catalog,
89643ba6
JH
69 stream_name,
70 records,
43a24cba
JH
71 time_extracted,
72 version=None):
89643ba6
JH
73 stream = catalog.get_stream(stream_name)
74 schema = stream.schema.to_dict()
75 stream_metadata = metadata.to_map(stream.metadata)
89643ba6
JH
76 with metrics.record_counter(stream_name) as counter:
77 for record in records:
89643ba6
JH
78 # Transform record for Singer.io
79 with Transformer() as transformer:
5fc2ead5
JH
80 try:
81 transformed_record = transformer.transform(
82 record,
83 schema,
84 stream_metadata)
85 except Exception as err:
86 LOGGER.error('{}'.format(err))
87 raise RuntimeError(err)
43a24cba
JH
88 write_record(
89 stream_name=stream_name,
90 record=transformed_record,
91 time_extracted=time_extracted,
92 version=version)
da690bda
JH
93 counter.increment()
94 return counter.value
95
96
99424fee 97def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
da690bda
JH
98 # Should sheets_loaded be synced?
99 if stream_name in selected_streams:
100 LOGGER.info('STARTED Syncing {}'.format(stream_name))
101 update_currently_syncing(state, stream_name)
99424fee
JH
102 selected_fields = get_selected_fields(catalog, stream_name)
103 LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
da690bda 104 write_schema(catalog, stream_name)
99424fee
JH
105 if not time_extracted:
106 time_extracted = utils.now()
da690bda
JH
107 record_count = process_records(
108 catalog=catalog,
109 stream_name=stream_name,
110 records=records,
99424fee 111 time_extracted=time_extracted)
da690bda
JH
112 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
113 update_currently_syncing(state, None)
89643ba6
JH
114
115
116# Currently syncing sets the stream currently being delivered in the state.
117# If the integration is interrupted, this state property is used to identify
118# the starting point to continue from.
119# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
120def update_currently_syncing(state, stream_name):
121 if (stream_name is None) and ('currently_syncing' in state):
122 del state['currently_syncing']
123 else:
124 singer.set_currently_syncing(state, stream_name)
125 singer.write_state(state)
126
127
128# List selected fields from stream catalog
129def get_selected_fields(catalog, stream_name):
130 stream = catalog.get_stream(stream_name)
131 mdata = metadata.to_map(stream.metadata)
132 mdata_list = singer.metadata.to_list(mdata)
133 selected_fields = []
134 for entry in mdata_list:
99424fee 135 field = None
89643ba6 136 try:
99424fee 137 field = entry['breadcrumb'][1]
89643ba6
JH
138 if entry.get('metadata', {}).get('selected', False):
139 selected_fields.append(field)
140 except IndexError:
141 pass
142 return selected_fields
143
144
145def get_data(stream_name,
146 endpoint_config,
147 client,
148 spreadsheet_id,
149 range_rows=None):
150 if not range_rows:
151 range_rows = ''
5890b89c 152 # Replace {placeholder} variables in path
5fc2ead5
JH
153 # Encode stream_name: fixes issue w/ special characters in sheet name
154 stream_name_escaped = re.escape(stream_name)
155 stream_name_encoded = urllib.parse.quote_plus(stream_name)
89643ba6 156 path = endpoint_config.get('path', stream_name).replace(
5fc2ead5 157 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace(
89643ba6
JH
158 '{range_rows}', range_rows)
159 params = endpoint_config.get('params', {})
160 api = endpoint_config.get('api', 'sheets')
5890b89c
JH
161 # Add in querystring parameters and replace {placeholder} variables
162 # querystring function ensures parameters are added but not encoded causing API errors
89643ba6 163 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
5fc2ead5
JH
164 '{sheet_title}', stream_name_encoded)
165 LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring))
89643ba6 166 data = {}
da690bda 167 time_extracted = utils.now()
89643ba6
JH
168 data = client.get(
169 path=path,
170 api=api,
171 params=querystring,
5fc2ead5 172 endpoint=stream_name_escaped)
da690bda 173 return data, time_extracted
89643ba6
JH
174
175
da690bda 176# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
89643ba6
JH
177def transform_file_metadata(file_metadata):
178 # Convert to dict
179 file_metadata_tf = json.loads(json.dumps(file_metadata))
180 # Remove keys
181 if file_metadata_tf.get('lastModifyingUser'):
182 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
183 file_metadata_tf['lastModifyingUser'].pop('me', None)
184 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
185 # Add record to an array of 1
186 file_metadata_arr = []
187 file_metadata_arr.append(file_metadata_tf)
188 return file_metadata_arr
189
190
da690bda 191# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
89643ba6
JH
192def transform_spreadsheet_metadata(spreadsheet_metadata):
193 # Convert to dict
194 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
da690bda 195 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
89643ba6
JH
196 if spreadsheet_metadata_tf.get('properties'):
197 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
198 spreadsheet_metadata_tf.pop('sheets', None)
199 # Add record to an array of 1
200 spreadsheet_metadata_arr = []
201 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
202 return spreadsheet_metadata_arr
203
204
da690bda 205# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
89643ba6
JH
206def transform_sheet_metadata(spreadsheet_id, sheet, columns):
207 # Convert to properties to dict
208 sheet_metadata = sheet.get('properties')
99424fee 209 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
89643ba6
JH
210 sheet_id = sheet_metadata_tf.get('sheetId')
211 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
212 spreadsheet_id, sheet_id)
213 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
214 sheet_metadata_tf['sheetUrl'] = sheet_url
215 sheet_metadata_tf['columns'] = columns
216 return sheet_metadata_tf
217
218
da690bda
JH
219# Convert Excel Date Serial Number (excel_date_sn) to datetime string
220# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
221def excel_to_dttm_str(excel_date_sn, timezone_str=None):
222 if not timezone_str:
223 timezone_str = 'UTC'
99424fee 224 tzn = pytz.timezone(timezone_str)
da690bda 225 sec_per_day = 86400
5890b89c 226 excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
da690bda
JH
227 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
228 epoch_dttm = datetime(1970, 1, 1)
229 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
99424fee 230 utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
da690bda
JH
231 utc_dttm_str = strftime(utc_dttm)
232 return utc_dttm_str
233
234
235# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
236# Convert from array of values to JSON with column names as keys
43a24cba 237def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
da690bda 238 sheet_data_tf = []
da690bda
JH
239 row_num = from_row
240 # Create sorted list of columns based on columnIndex
99424fee 241 cols = sorted(columns, key=lambda i: i['columnIndex'])
da690bda
JH
242
243 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
244 for row in sheet_data_rows:
5890b89c 245 # If empty row, SKIP
da690bda 246 if row == []:
5890b89c
JH
247 LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
248 else:
249 sheet_data_row_tf = {}
250 # Add spreadsheet_id, sheet_id, and row
251 sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
252 sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
253 sheet_data_row_tf['__sdc_row'] = row_num
254 col_num = 1
255 for value in row:
256 # Select column metadata based on column index
257 col = cols[col_num - 1]
258 col_skipped = col.get('columnSkipped')
259 if not col_skipped:
43a24cba 260 # Get column metadata
5890b89c
JH
261 col_name = col.get('columnName')
262 col_type = col.get('columnType')
43a24cba
JH
263 col_letter = col.get('columnLetter')
264
265 # NULL values
266 if value is None or value == '':
267 col_val = None
268
5890b89c
JH
269 # Convert dates/times from Lotus Notes Serial Numbers
270 # DATE-TIME
43a24cba 271 elif col_type == 'numberType.DATE_TIME':
5890b89c
JH
272 if isinstance(value, (int, float)):
273 col_val = excel_to_dttm_str(value)
274 else:
da690bda 275 col_val = str(value)
43a24cba
JH
276 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
277 sheet_title, col_name, col_letter, row_num, col_type, value))
5890b89c
JH
278 # DATE
279 elif col_type == 'numberType.DATE':
280 if isinstance(value, (int, float)):
281 col_val = excel_to_dttm_str(value)[:10]
282 else:
da690bda 283 col_val = str(value)
43a24cba
JH
284 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
285 sheet_title, col_name, col_letter, row_num, col_type, value))
5890b89c
JH
286 # TIME ONLY (NO DATE)
287 elif col_type == 'numberType.TIME':
288 if isinstance(value, (int, float)):
289 try:
290 total_secs = value * 86400 # seconds in day
291 # Create string formatted like HH:MM:SS
292 col_val = str(timedelta(seconds=total_secs))
293 except ValueError:
294 col_val = str(value)
43a24cba
JH
295 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
296 sheet_title, col_name, col_letter, row_num, col_type, value))
da690bda
JH
297 else:
298 col_val = str(value)
5890b89c
JH
299 # NUMBER (INTEGER AND FLOAT)
300 elif col_type == 'numberType':
301 if isinstance(value, int):
302 col_val = int(value)
303 elif isinstance(value, float):
304 # Determine float decimal digits
305 decimal_digits = str(value)[::-1].find('.')
306 if decimal_digits > 15:
307 try:
308 # ROUND to multipleOf: 1e-15
309 col_val = float(round(value, 15))
310 except ValueError:
311 col_val = str(value)
43a24cba
JH
312 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
313 sheet_title, col_name, col_letter, row_num, col_type, value))
5890b89c
JH
314 else: # decimal_digits <= 15, no rounding
315 try:
316 col_val = float(value)
317 except ValueError:
318 col_val = str(value)
43a24cba
JH
319 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
320 sheet_title, col_name, col_letter, row_num, col_type, value))
da690bda
JH
321 else:
322 col_val = str(value)
43a24cba
JH
323 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
324 sheet_title, col_name, col_letter, row_num, col_type, value))
5890b89c
JH
325 # STRING
326 elif col_type == 'stringValue':
327 col_val = str(value)
328 # BOOLEAN
329 elif col_type == 'boolValue':
330 if isinstance(value, bool):
331 col_val = value
332 elif isinstance(value, str):
333 if value.lower() in ('true', 't', 'yes', 'y'):
334 col_val = True
335 elif value.lower() in ('false', 'f', 'no', 'n'):
336 col_val = False
337 else:
338 col_val = str(value)
43a24cba
JH
339 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
340 sheet_title, col_name, col_letter, row, col_type, value))
5890b89c
JH
341 elif isinstance(value, int):
342 if value in (1, -1):
343 col_val = True
344 elif value == 0:
345 col_val = False
346 else:
347 col_val = str(value)
43a24cba
JH
348 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
349 sheet_title, col_name, col_letter, row, col_type, value))
5890b89c
JH
350 # OTHER: Convert everything else to a string
351 else:
352 col_val = str(value)
43a24cba
JH
353 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
354 sheet_title, col_name, col_letter, row, col_type, value))
5890b89c
JH
355 sheet_data_row_tf[col_name] = col_val
356 col_num = col_num + 1
357 # APPEND non-empty row
358 sheet_data_tf.append(sheet_data_row_tf)
da690bda 359 row_num = row_num + 1
5890b89c 360 return sheet_data_tf, row_num
da690bda
JH
361
362
89643ba6
JH
363def sync(client, config, catalog, state):
364 start_date = config.get('start_date')
365 spreadsheet_id = config.get('spreadsheet_id')
366
367 # Get selected_streams from catalog, based on state last_stream
368 # last_stream = Previous currently synced stream, if the load was interrupted
369 last_stream = singer.get_currently_syncing(state)
370 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
371 selected_streams = []
372 for stream in catalog.get_selected_streams(state):
373 selected_streams.append(stream.stream)
374 LOGGER.info('selected_streams: {}'.format(selected_streams))
375
376 if not selected_streams:
377 return
378
da690bda 379 # FILE_METADATA
89643ba6 380 file_metadata = {}
da690bda
JH
381 stream_name = 'file_metadata'
382 file_metadata_config = STREAMS.get(stream_name)
383
384 # GET file_metadata
385 LOGGER.info('GET file_meatadata')
386 file_metadata, time_extracted = get_data(stream_name=stream_name,
387 endpoint_config=file_metadata_config,
388 client=client,
389 spreadsheet_id=spreadsheet_id)
390 # Transform file_metadata
391 LOGGER.info('Transform file_meatadata')
89643ba6
JH
392 file_metadata_tf = transform_file_metadata(file_metadata)
393 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
da690bda
JH
394
395 # Check if file has changed, if not break (return to __init__)
396 last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
89643ba6
JH
397 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
398 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
399 if this_datetime <= last_datetime:
400 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
f012a137
SC
401 # Update file_metadata bookmark
402 write_bookmark(state, 'file_metadata', strftime(this_datetime))
99424fee
JH
403 return
404 # Sync file_metadata if selected
405 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
5890b89c 406 # file_metadata bookmark is updated at the end of sync
da690bda
JH
407
408 # SPREADSHEET_METADATA
89643ba6 409 spreadsheet_metadata = {}
da690bda
JH
410 stream_name = 'spreadsheet_metadata'
411 spreadsheet_metadata_config = STREAMS.get(stream_name)
412
413 # GET spreadsheet_metadata
414 LOGGER.info('GET spreadsheet_meatadata')
99424fee 415 spreadsheet_metadata, ss_time_extracted = get_data(
da690bda
JH
416 stream_name=stream_name,
417 endpoint_config=spreadsheet_metadata_config,
418 client=client,
419 spreadsheet_id=spreadsheet_id)
420
421 # Transform spreadsheet_metadata
422 LOGGER.info('Transform spreadsheet_meatadata')
89643ba6 423 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
89643ba6 424
da690bda 425 # Sync spreadsheet_metadata if selected
99424fee
JH
426 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
427 ss_time_extracted)
da690bda
JH
428
429 # SHEET_METADATA and SHEET_DATA
89643ba6
JH
430 sheets = spreadsheet_metadata.get('sheets')
431 sheet_metadata = []
432 sheets_loaded = []
433 sheets_loaded_config = STREAMS['sheets_loaded']
434 if sheets:
da690bda 435 # Loop thru sheets (worksheet tabs) in spreadsheet
89643ba6
JH
436 for sheet in sheets:
437 sheet_title = sheet.get('properties', {}).get('title')
da690bda
JH
438 sheet_id = sheet.get('properties', {}).get('sheetId')
439
440 # GET sheet_metadata and columns
89643ba6 441 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
5890b89c 442 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
da690bda 443
376f1145
JH
444 # SKIP empty sheets (where sheet_schema and columns are None)
445 if not sheet_schema or not columns:
446 LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
447 else:
448 # Transform sheet_metadata
449 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
450 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
451 sheet_metadata.append(sheet_metadata_tf)
452
453 # SHEET_DATA
454 # Should this worksheet tab be synced?
455 if sheet_title in selected_streams:
456 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
457 update_currently_syncing(state, sheet_title)
458 selected_fields = get_selected_fields(catalog, sheet_title)
459 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
460 write_schema(catalog, sheet_title)
461
462 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
463 # everytime after each sheet sync is complete.
464 # This forces hard deletes on the data downstream if fewer records are sent.
465 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
466 last_integer = int(get_bookmark(state, sheet_title, 0))
467 activate_version = int(time.time() * 1000)
468 activate_version_message = singer.ActivateVersionMessage(
469 stream=sheet_title,
470 version=activate_version)
471 if last_integer == 0:
472 # initial load, send activate_version before AND after data sync
473 singer.write_message(activate_version_message)
474 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
475
476 # Determine max range of columns and rows for "paging" through the data
477 sheet_last_col_index = 1
478 sheet_last_col_letter = 'A'
479 for col in columns:
480 col_index = col.get('columnIndex')
481 col_letter = col.get('columnLetter')
482 if col_index > sheet_last_col_index:
483 sheet_last_col_index = col_index
484 sheet_last_col_letter = col_letter
485 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
486
487 # Initialize paging for 1st batch
488 is_last_row = False
489 batch_rows = 200
490 from_row = 2
491 if sheet_max_row < batch_rows:
da690bda
JH
492 to_row = sheet_max_row
493 else:
376f1145
JH
494 to_row = batch_rows
495
496 # Loop thru batches (each having 200 rows of data)
497 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
498 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
499
500 # GET sheet_data for a worksheet tab
501 sheet_data, time_extracted = get_data(
502 stream_name=sheet_title,
503 endpoint_config=sheets_loaded_config,
504 client=client,
505 spreadsheet_id=spreadsheet_id,
506 range_rows=range_rows)
507 # Data is returned as a list of arrays, an array of values for each row
508 sheet_data_rows = sheet_data.get('values')
509
510 # Transform batch of rows to JSON with keys for each column
511 sheet_data_tf, row_num = transform_sheet_data(
512 spreadsheet_id=spreadsheet_id,
513 sheet_id=sheet_id,
514 sheet_title=sheet_title,
515 from_row=from_row,
516 columns=columns,
517 sheet_data_rows=sheet_data_rows)
518 if row_num < to_row:
519 is_last_row = True
520
521 # Process records, send batch of records to target
522 record_count = process_records(
523 catalog=catalog,
524 stream_name=sheet_title,
525 records=sheet_data_tf,
526 time_extracted=ss_time_extracted,
527 version=activate_version)
528 LOGGER.info('Sheet: {}, records processed: {}'.format(
529 sheet_title, record_count))
530
531 # Update paging from/to_row for next batch
532 from_row = to_row + 1
533 if to_row + batch_rows > sheet_max_row:
534 to_row = sheet_max_row
535 else:
536 to_row = to_row + batch_rows
537
538 # End of Stream: Send Activate Version and update State
539 singer.write_message(activate_version_message)
540 write_bookmark(state, sheet_title, activate_version)
541 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
542 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
543 sheet_title, row_num - 2)) # subtract 1 for header row
544 update_currently_syncing(state, None)
545
546 # SHEETS_LOADED
547 # Add sheet to sheets_loaded
548 sheet_loaded = {}
549 sheet_loaded['spreadsheetId'] = spreadsheet_id
550 sheet_loaded['sheetId'] = sheet_id
551 sheet_loaded['title'] = sheet_title
552 sheet_loaded['loadDate'] = strftime(utils.now())
553 sheet_loaded['lastRowNumber'] = row_num
554 sheets_loaded.append(sheet_loaded)
da690bda 555
da690bda
JH
556 stream_name = 'sheet_metadata'
557 # Sync sheet_metadata if selected
558 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
99424fee 559
da690bda
JH
560 stream_name = 'sheets_loaded'
561 # Sync sheet_metadata if selected
562 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
99424fee 563
5890b89c
JH
564 # Update file_metadata bookmark
565 write_bookmark(state, 'file_metadata', strftime(this_datetime))
566
99424fee 567 return