aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets/sync.py
diff options
context:
space:
mode:
authorJeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com>2020-01-09 07:30:53 -0800
committerKyle Allan <KAllan357@gmail.com>2020-01-09 10:30:53 -0500
commit43a24cbab1dbc35b893c35b86e34adc0f2fb84e7 (patch)
treebcbaae860aad0a94bcc4d27f4804504691401438 /tap_google_sheets/sync.py
parent5890b89c1aa7c554235b3cef156b5a5a2c594bec (diff)
downloadtap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.tar.gz
tap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.tar.zst
tap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.zip
v.0.0.3 Sync error handling, activate version, documentation (#2)v0.0.3
* v.0.0.2 schema and sync changes Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py * v.0.0.3 Sync activate version and error handling Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages.
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r--tap_google_sheets/sync.py94
1 files changed, 75 insertions, 19 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
index 76b2e59..311281c 100644
--- a/tap_google_sheets/sync.py
+++ b/tap_google_sheets/sync.py
@@ -6,6 +6,7 @@ import pytz
6import singer 6import singer
7from singer import metrics, metadata, Transformer, utils 7from singer import metrics, metadata, Transformer, utils
8from singer.utils import strptime_to_utc, strftime 8from singer.utils import strptime_to_utc, strftime
9from singer.messages import RecordMessage
9from tap_google_sheets.streams import STREAMS 10from tap_google_sheets.streams import STREAMS
10from tap_google_sheets.schema import get_sheet_metadata 11from tap_google_sheets.schema import get_sheet_metadata
11 12
@@ -17,14 +18,26 @@ def write_schema(catalog, stream_name):
17 schema = stream.schema.to_dict() 18 schema = stream.schema.to_dict()
18 try: 19 try:
19 singer.write_schema(stream_name, schema, stream.key_properties) 20 singer.write_schema(stream_name, schema, stream.key_properties)
21 LOGGER.info('Writing schema for: {}'.format(stream_name))
20 except OSError as err: 22 except OSError as err:
21 LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) 23 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
22 raise err 24 raise err
23 25
24 26
25def write_record(stream_name, record, time_extracted): 27def write_record(stream_name, record, time_extracted, version=None):
26 try: 28 try:
27 singer.messages.write_record(stream_name, record, time_extracted=time_extracted) 29 if version:
30 singer.messages.write_message(
31 RecordMessage(
32 stream=stream_name,
33 record=record,
34 version=version,
35 time_extracted=time_extracted))
36 else:
37 singer.messages.write_record(
38 stream_name=stream_name,
39 record=record,
40 time_extracted=time_extracted)
28 except OSError as err: 41 except OSError as err:
29 LOGGER.info('OS Error writing record for: {}'.format(stream_name)) 42 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
30 LOGGER.info('record: {}'.format(record)) 43 LOGGER.info('record: {}'.format(record))
@@ -53,7 +66,8 @@ def write_bookmark(state, stream, value):
53def process_records(catalog, 66def process_records(catalog,
54 stream_name, 67 stream_name,
55 records, 68 records,
56 time_extracted): 69 time_extracted,
70 version=None):
57 stream = catalog.get_stream(stream_name) 71 stream = catalog.get_stream(stream_name)
58 schema = stream.schema.to_dict() 72 schema = stream.schema.to_dict()
59 stream_metadata = metadata.to_map(stream.metadata) 73 stream_metadata = metadata.to_map(stream.metadata)
@@ -65,7 +79,11 @@ def process_records(catalog,
65 record, 79 record,
66 schema, 80 schema,
67 stream_metadata) 81 stream_metadata)
68 write_record(stream_name, transformed_record, time_extracted=time_extracted) 82 write_record(
83 stream_name=stream_name,
84 record=transformed_record,
85 time_extracted=time_extracted,
86 version=version)
69 counter.increment() 87 counter.increment()
70 return counter.value 88 return counter.value
71 89
@@ -206,7 +224,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None):
206 224
207# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times 225# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
208# Convert from array of values to JSON with column names as keys 226# Convert from array of values to JSON with column names as keys
209def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): 227def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
210 sheet_data_tf = [] 228 sheet_data_tf = []
211 row_num = from_row 229 row_num = from_row
212 # Create sorted list of columns based on columnIndex 230 # Create sorted list of columns based on columnIndex
@@ -229,21 +247,32 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
229 col = cols[col_num - 1] 247 col = cols[col_num - 1]
230 col_skipped = col.get('columnSkipped') 248 col_skipped = col.get('columnSkipped')
231 if not col_skipped: 249 if not col_skipped:
250 # Get column metadata
232 col_name = col.get('columnName') 251 col_name = col.get('columnName')
233 col_type = col.get('columnType') 252 col_type = col.get('columnType')
253 col_letter = col.get('columnLetter')
254
255 # NULL values
256 if value is None or value == '':
257 col_val = None
258
234 # Convert dates/times from Lotus Notes Serial Numbers 259 # Convert dates/times from Lotus Notes Serial Numbers
235 # DATE-TIME 260 # DATE-TIME
236 if col_type == 'numberType.DATE_TIME': 261 elif col_type == 'numberType.DATE_TIME':
237 if isinstance(value, (int, float)): 262 if isinstance(value, (int, float)):
238 col_val = excel_to_dttm_str(value) 263 col_val = excel_to_dttm_str(value)
239 else: 264 else:
240 col_val = str(value) 265 col_val = str(value)
266 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
267 sheet_title, col_name, col_letter, row_num, col_type, value))
241 # DATE 268 # DATE
242 elif col_type == 'numberType.DATE': 269 elif col_type == 'numberType.DATE':
243 if isinstance(value, (int, float)): 270 if isinstance(value, (int, float)):
244 col_val = excel_to_dttm_str(value)[:10] 271 col_val = excel_to_dttm_str(value)[:10]
245 else: 272 else:
246 col_val = str(value) 273 col_val = str(value)
274 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
275 sheet_title, col_name, col_letter, row_num, col_type, value))
247 # TIME ONLY (NO DATE) 276 # TIME ONLY (NO DATE)
248 elif col_type == 'numberType.TIME': 277 elif col_type == 'numberType.TIME':
249 if isinstance(value, (int, float)): 278 if isinstance(value, (int, float)):
@@ -253,6 +282,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
253 col_val = str(timedelta(seconds=total_secs)) 282 col_val = str(timedelta(seconds=total_secs))
254 except ValueError: 283 except ValueError:
255 col_val = str(value) 284 col_val = str(value)
285 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
286 sheet_title, col_name, col_letter, row_num, col_type, value))
256 else: 287 else:
257 col_val = str(value) 288 col_val = str(value)
258 # NUMBER (INTEGER AND FLOAT) 289 # NUMBER (INTEGER AND FLOAT)
@@ -268,13 +299,19 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
268 col_val = float(round(value, 15)) 299 col_val = float(round(value, 15))
269 except ValueError: 300 except ValueError:
270 col_val = str(value) 301 col_val = str(value)
302 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
303 sheet_title, col_name, col_letter, row_num, col_type, value))
271 else: # decimal_digits <= 15, no rounding 304 else: # decimal_digits <= 15, no rounding
272 try: 305 try:
273 col_val = float(value) 306 col_val = float(value)
274 except ValueError: 307 except ValueError:
275 col_val = str(value) 308 col_val = str(value)
309 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
310 sheet_title, col_name, col_letter, row_num, col_type, value))
276 else: 311 else:
277 col_val = str(value) 312 col_val = str(value)
313 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
314 sheet_title, col_name, col_letter, row_num, col_type, value))
278 # STRING 315 # STRING
279 elif col_type == 'stringValue': 316 elif col_type == 'stringValue':
280 col_val = str(value) 317 col_val = str(value)
@@ -289,6 +326,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
289 col_val = False 326 col_val = False
290 else: 327 else:
291 col_val = str(value) 328 col_val = str(value)
329 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
330 sheet_title, col_name, col_letter, row, col_type, value))
292 elif isinstance(value, int): 331 elif isinstance(value, int):
293 if value in (1, -1): 332 if value in (1, -1):
294 col_val = True 333 col_val = True
@@ -296,9 +335,13 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
296 col_val = False 335 col_val = False
297 else: 336 else:
298 col_val = str(value) 337 col_val = str(value)
338 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
339 sheet_title, col_name, col_letter, row, col_type, value))
299 # OTHER: Convert everything else to a string 340 # OTHER: Convert everything else to a string
300 else: 341 else:
301 col_val = str(value) 342 col_val = str(value)
343 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
344 sheet_title, col_name, col_letter, row, col_type, value))
302 sheet_data_row_tf[col_name] = col_val 345 sheet_data_row_tf[col_name] = col_val
303 col_num = col_num + 1 346 col_num = col_num + 1
304 # APPEND non-empty row 347 # APPEND non-empty row
@@ -400,6 +443,20 @@ def sync(client, config, catalog, state):
400 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) 443 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
401 write_schema(catalog, sheet_title) 444 write_schema(catalog, sheet_title)
402 445
446 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
447 # everytime after each sheet sync is complete.
448 # This forces hard deletes on the data downstream if fewer records are sent.
449 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
450 last_integer = int(get_bookmark(state, sheet_title, 0))
451 activate_version = int(time.time() * 1000)
452 activate_version_message = singer.ActivateVersionMessage(
453 stream=sheet_title,
454 version=activate_version)
455 if last_integer == 0:
456 # initial load, send activate_version before AND after data sync
457 singer.write_message(activate_version_message)
458 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
459
403 # Determine max range of columns and rows for "paging" through the data 460 # Determine max range of columns and rows for "paging" through the data
404 sheet_last_col_index = 1 461 sheet_last_col_index = 1
405 sheet_last_col_letter = 'A' 462 sheet_last_col_letter = 'A'
@@ -438,6 +495,7 @@ def sync(client, config, catalog, state):
438 sheet_data_tf, row_num = transform_sheet_data( 495 sheet_data_tf, row_num = transform_sheet_data(
439 spreadsheet_id=spreadsheet_id, 496 spreadsheet_id=spreadsheet_id,
440 sheet_id=sheet_id, 497 sheet_id=sheet_id,
498 sheet_title=sheet_title,
441 from_row=from_row, 499 from_row=from_row,
442 columns=columns, 500 columns=columns,
443 sheet_data_rows=sheet_data_rows) 501 sheet_data_rows=sheet_data_rows)
@@ -449,10 +507,11 @@ def sync(client, config, catalog, state):
449 catalog=catalog, 507 catalog=catalog,
450 stream_name=sheet_title, 508 stream_name=sheet_title,
451 records=sheet_data_tf, 509 records=sheet_data_tf,
452 time_extracted=ss_time_extracted) 510 time_extracted=ss_time_extracted,
511 version=activate_version)
453 LOGGER.info('Sheet: {}, records processed: {}'.format( 512 LOGGER.info('Sheet: {}, records processed: {}'.format(
454 sheet_title, record_count)) 513 sheet_title, record_count))
455 514
456 # Update paging from/to_row for next batch 515 # Update paging from/to_row for next batch
457 from_row = to_row + 1 516 from_row = to_row + 1
458 if to_row + batch_rows > sheet_max_row: 517 if to_row + batch_rows > sheet_max_row:
@@ -460,6 +519,14 @@ def sync(client, config, catalog, state):
460 else: 519 else:
461 to_row = to_row + batch_rows 520 to_row = to_row + batch_rows
462 521
522 # End of Stream: Send Activate Version and update State
523 singer.write_message(activate_version_message)
524 write_bookmark(state, sheet_title, activate_version)
525 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
526 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
527 sheet_title, row_num - 2)) # subtract 1 for header row
528 update_currently_syncing(state, None)
529
463 # SHEETS_LOADED 530 # SHEETS_LOADED
464 # Add sheet to sheets_loaded 531 # Add sheet to sheets_loaded
465 sheet_loaded = {} 532 sheet_loaded = {}
@@ -470,17 +537,6 @@ def sync(client, config, catalog, state):
470 sheet_loaded['lastRowNumber'] = row_num 537 sheet_loaded['lastRowNumber'] = row_num
471 sheets_loaded.append(sheet_loaded) 538 sheets_loaded.append(sheet_loaded)
472 539
473 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
474 # This forces hard deletes on the data downstream if fewer records are sent.
475 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
476 activate_version_message = singer.ActivateVersionMessage(
477 stream=sheet_title,
478 version=int(time.time() * 1000))
479 singer.write_message(activate_version_message)
480
481 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
482 sheet_title, row_num - 2)) # subtract 1 for header row
483
484 stream_name = 'sheet_metadata' 540 stream_name = 'sheet_metadata'
485 # Sync sheet_metadata if selected 541 # Sync sheet_metadata if selected
486 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) 542 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)