diff options
author | Jeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com> | 2020-01-09 07:30:53 -0800 |
---|---|---|
committer | Kyle Allan <KAllan357@gmail.com> | 2020-01-09 10:30:53 -0500 |
commit | 43a24cbab1dbc35b893c35b86e34adc0f2fb84e7 (patch) | |
tree | bcbaae860aad0a94bcc4d27f4804504691401438 /tap_google_sheets/sync.py | |
parent | 5890b89c1aa7c554235b3cef156b5a5a2c594bec (diff) | |
download | tap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.tar.gz tap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.tar.zst tap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.zip |
v.0.0.3 Sync error handling, activate version, documentation (#2)v0.0.3
* v.0.0.2 schema and sync changes
Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py
* v.0.0.3 Sync activate version and error handling
Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages.
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r-- | tap_google_sheets/sync.py | 94 |
1 files changed, 75 insertions, 19 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 76b2e59..311281c 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -6,6 +6,7 @@ import pytz | |||
6 | import singer | 6 | import singer |
7 | from singer import metrics, metadata, Transformer, utils | 7 | from singer import metrics, metadata, Transformer, utils |
8 | from singer.utils import strptime_to_utc, strftime | 8 | from singer.utils import strptime_to_utc, strftime |
9 | from singer.messages import RecordMessage | ||
9 | from tap_google_sheets.streams import STREAMS | 10 | from tap_google_sheets.streams import STREAMS |
10 | from tap_google_sheets.schema import get_sheet_metadata | 11 | from tap_google_sheets.schema import get_sheet_metadata |
11 | 12 | ||
@@ -17,14 +18,26 @@ def write_schema(catalog, stream_name): | |||
17 | schema = stream.schema.to_dict() | 18 | schema = stream.schema.to_dict() |
18 | try: | 19 | try: |
19 | singer.write_schema(stream_name, schema, stream.key_properties) | 20 | singer.write_schema(stream_name, schema, stream.key_properties) |
21 | LOGGER.info('Writing schema for: {}'.format(stream_name)) | ||
20 | except OSError as err: | 22 | except OSError as err: |
21 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) | 23 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) |
22 | raise err | 24 | raise err |
23 | 25 | ||
24 | 26 | ||
25 | def write_record(stream_name, record, time_extracted): | 27 | def write_record(stream_name, record, time_extracted, version=None): |
26 | try: | 28 | try: |
27 | singer.messages.write_record(stream_name, record, time_extracted=time_extracted) | 29 | if version: |
30 | singer.messages.write_message( | ||
31 | RecordMessage( | ||
32 | stream=stream_name, | ||
33 | record=record, | ||
34 | version=version, | ||
35 | time_extracted=time_extracted)) | ||
36 | else: | ||
37 | singer.messages.write_record( | ||
38 | stream_name=stream_name, | ||
39 | record=record, | ||
40 | time_extracted=time_extracted) | ||
28 | except OSError as err: | 41 | except OSError as err: |
29 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) | 42 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) |
30 | LOGGER.info('record: {}'.format(record)) | 43 | LOGGER.info('record: {}'.format(record)) |
@@ -53,7 +66,8 @@ def write_bookmark(state, stream, value): | |||
53 | def process_records(catalog, | 66 | def process_records(catalog, |
54 | stream_name, | 67 | stream_name, |
55 | records, | 68 | records, |
56 | time_extracted): | 69 | time_extracted, |
70 | version=None): | ||
57 | stream = catalog.get_stream(stream_name) | 71 | stream = catalog.get_stream(stream_name) |
58 | schema = stream.schema.to_dict() | 72 | schema = stream.schema.to_dict() |
59 | stream_metadata = metadata.to_map(stream.metadata) | 73 | stream_metadata = metadata.to_map(stream.metadata) |
@@ -65,7 +79,11 @@ def process_records(catalog, | |||
65 | record, | 79 | record, |
66 | schema, | 80 | schema, |
67 | stream_metadata) | 81 | stream_metadata) |
68 | write_record(stream_name, transformed_record, time_extracted=time_extracted) | 82 | write_record( |
83 | stream_name=stream_name, | ||
84 | record=transformed_record, | ||
85 | time_extracted=time_extracted, | ||
86 | version=version) | ||
69 | counter.increment() | 87 | counter.increment() |
70 | return counter.value | 88 | return counter.value |
71 | 89 | ||
@@ -206,7 +224,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None): | |||
206 | 224 | ||
207 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times | 225 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times |
208 | # Convert from array of values to JSON with column names as keys | 226 | # Convert from array of values to JSON with column names as keys |
209 | def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): | 227 | def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows): |
210 | sheet_data_tf = [] | 228 | sheet_data_tf = [] |
211 | row_num = from_row | 229 | row_num = from_row |
212 | # Create sorted list of columns based on columnIndex | 230 | # Create sorted list of columns based on columnIndex |
@@ -229,21 +247,32 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
229 | col = cols[col_num - 1] | 247 | col = cols[col_num - 1] |
230 | col_skipped = col.get('columnSkipped') | 248 | col_skipped = col.get('columnSkipped') |
231 | if not col_skipped: | 249 | if not col_skipped: |
250 | # Get column metadata | ||
232 | col_name = col.get('columnName') | 251 | col_name = col.get('columnName') |
233 | col_type = col.get('columnType') | 252 | col_type = col.get('columnType') |
253 | col_letter = col.get('columnLetter') | ||
254 | |||
255 | # NULL values | ||
256 | if value is None or value == '': | ||
257 | col_val = None | ||
258 | |||
234 | # Convert dates/times from Lotus Notes Serial Numbers | 259 | # Convert dates/times from Lotus Notes Serial Numbers |
235 | # DATE-TIME | 260 | # DATE-TIME |
236 | if col_type == 'numberType.DATE_TIME': | 261 | elif col_type == 'numberType.DATE_TIME': |
237 | if isinstance(value, (int, float)): | 262 | if isinstance(value, (int, float)): |
238 | col_val = excel_to_dttm_str(value) | 263 | col_val = excel_to_dttm_str(value) |
239 | else: | 264 | else: |
240 | col_val = str(value) | 265 | col_val = str(value) |
266 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
267 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
241 | # DATE | 268 | # DATE |
242 | elif col_type == 'numberType.DATE': | 269 | elif col_type == 'numberType.DATE': |
243 | if isinstance(value, (int, float)): | 270 | if isinstance(value, (int, float)): |
244 | col_val = excel_to_dttm_str(value)[:10] | 271 | col_val = excel_to_dttm_str(value)[:10] |
245 | else: | 272 | else: |
246 | col_val = str(value) | 273 | col_val = str(value) |
274 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
275 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
247 | # TIME ONLY (NO DATE) | 276 | # TIME ONLY (NO DATE) |
248 | elif col_type == 'numberType.TIME': | 277 | elif col_type == 'numberType.TIME': |
249 | if isinstance(value, (int, float)): | 278 | if isinstance(value, (int, float)): |
@@ -253,6 +282,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
253 | col_val = str(timedelta(seconds=total_secs)) | 282 | col_val = str(timedelta(seconds=total_secs)) |
254 | except ValueError: | 283 | except ValueError: |
255 | col_val = str(value) | 284 | col_val = str(value) |
285 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
286 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
256 | else: | 287 | else: |
257 | col_val = str(value) | 288 | col_val = str(value) |
258 | # NUMBER (INTEGER AND FLOAT) | 289 | # NUMBER (INTEGER AND FLOAT) |
@@ -268,13 +299,19 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
268 | col_val = float(round(value, 15)) | 299 | col_val = float(round(value, 15)) |
269 | except ValueError: | 300 | except ValueError: |
270 | col_val = str(value) | 301 | col_val = str(value) |
302 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
303 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
271 | else: # decimal_digits <= 15, no rounding | 304 | else: # decimal_digits <= 15, no rounding |
272 | try: | 305 | try: |
273 | col_val = float(value) | 306 | col_val = float(value) |
274 | except ValueError: | 307 | except ValueError: |
275 | col_val = str(value) | 308 | col_val = str(value) |
309 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
310 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
276 | else: | 311 | else: |
277 | col_val = str(value) | 312 | col_val = str(value) |
313 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
314 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
278 | # STRING | 315 | # STRING |
279 | elif col_type == 'stringValue': | 316 | elif col_type == 'stringValue': |
280 | col_val = str(value) | 317 | col_val = str(value) |
@@ -289,6 +326,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
289 | col_val = False | 326 | col_val = False |
290 | else: | 327 | else: |
291 | col_val = str(value) | 328 | col_val = str(value) |
329 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
330 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
292 | elif isinstance(value, int): | 331 | elif isinstance(value, int): |
293 | if value in (1, -1): | 332 | if value in (1, -1): |
294 | col_val = True | 333 | col_val = True |
@@ -296,9 +335,13 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
296 | col_val = False | 335 | col_val = False |
297 | else: | 336 | else: |
298 | col_val = str(value) | 337 | col_val = str(value) |
338 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
339 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
299 | # OTHER: Convert everything else to a string | 340 | # OTHER: Convert everything else to a string |
300 | else: | 341 | else: |
301 | col_val = str(value) | 342 | col_val = str(value) |
343 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
344 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
302 | sheet_data_row_tf[col_name] = col_val | 345 | sheet_data_row_tf[col_name] = col_val |
303 | col_num = col_num + 1 | 346 | col_num = col_num + 1 |
304 | # APPEND non-empty row | 347 | # APPEND non-empty row |
@@ -400,6 +443,20 @@ def sync(client, config, catalog, state): | |||
400 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | 443 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) |
401 | write_schema(catalog, sheet_title) | 444 | write_schema(catalog, sheet_title) |
402 | 445 | ||
446 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) | ||
447 | # everytime after each sheet sync is complete. | ||
448 | # This forces hard deletes on the data downstream if fewer records are sent. | ||
449 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | ||
450 | last_integer = int(get_bookmark(state, sheet_title, 0)) | ||
451 | activate_version = int(time.time() * 1000) | ||
452 | activate_version_message = singer.ActivateVersionMessage( | ||
453 | stream=sheet_title, | ||
454 | version=activate_version) | ||
455 | if last_integer == 0: | ||
456 | # initial load, send activate_version before AND after data sync | ||
457 | singer.write_message(activate_version_message) | ||
458 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
459 | |||
403 | # Determine max range of columns and rows for "paging" through the data | 460 | # Determine max range of columns and rows for "paging" through the data |
404 | sheet_last_col_index = 1 | 461 | sheet_last_col_index = 1 |
405 | sheet_last_col_letter = 'A' | 462 | sheet_last_col_letter = 'A' |
@@ -438,6 +495,7 @@ def sync(client, config, catalog, state): | |||
438 | sheet_data_tf, row_num = transform_sheet_data( | 495 | sheet_data_tf, row_num = transform_sheet_data( |
439 | spreadsheet_id=spreadsheet_id, | 496 | spreadsheet_id=spreadsheet_id, |
440 | sheet_id=sheet_id, | 497 | sheet_id=sheet_id, |
498 | sheet_title=sheet_title, | ||
441 | from_row=from_row, | 499 | from_row=from_row, |
442 | columns=columns, | 500 | columns=columns, |
443 | sheet_data_rows=sheet_data_rows) | 501 | sheet_data_rows=sheet_data_rows) |
@@ -449,10 +507,11 @@ def sync(client, config, catalog, state): | |||
449 | catalog=catalog, | 507 | catalog=catalog, |
450 | stream_name=sheet_title, | 508 | stream_name=sheet_title, |
451 | records=sheet_data_tf, | 509 | records=sheet_data_tf, |
452 | time_extracted=ss_time_extracted) | 510 | time_extracted=ss_time_extracted, |
511 | version=activate_version) | ||
453 | LOGGER.info('Sheet: {}, records processed: {}'.format( | 512 | LOGGER.info('Sheet: {}, records processed: {}'.format( |
454 | sheet_title, record_count)) | 513 | sheet_title, record_count)) |
455 | 514 | ||
456 | # Update paging from/to_row for next batch | 515 | # Update paging from/to_row for next batch |
457 | from_row = to_row + 1 | 516 | from_row = to_row + 1 |
458 | if to_row + batch_rows > sheet_max_row: | 517 | if to_row + batch_rows > sheet_max_row: |
@@ -460,6 +519,14 @@ def sync(client, config, catalog, state): | |||
460 | else: | 519 | else: |
461 | to_row = to_row + batch_rows | 520 | to_row = to_row + batch_rows |
462 | 521 | ||
522 | # End of Stream: Send Activate Version and update State | ||
523 | singer.write_message(activate_version_message) | ||
524 | write_bookmark(state, sheet_title, activate_version) | ||
525 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
526 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
527 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
528 | update_currently_syncing(state, None) | ||
529 | |||
463 | # SHEETS_LOADED | 530 | # SHEETS_LOADED |
464 | # Add sheet to sheets_loaded | 531 | # Add sheet to sheets_loaded |
465 | sheet_loaded = {} | 532 | sheet_loaded = {} |
@@ -470,17 +537,6 @@ def sync(client, config, catalog, state): | |||
470 | sheet_loaded['lastRowNumber'] = row_num | 537 | sheet_loaded['lastRowNumber'] = row_num |
471 | sheets_loaded.append(sheet_loaded) | 538 | sheets_loaded.append(sheet_loaded) |
472 | 539 | ||
473 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. | ||
474 | # This forces hard deletes on the data downstream if fewer records are sent. | ||
475 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | ||
476 | activate_version_message = singer.ActivateVersionMessage( | ||
477 | stream=sheet_title, | ||
478 | version=int(time.time() * 1000)) | ||
479 | singer.write_message(activate_version_message) | ||
480 | |||
481 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
482 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
483 | |||
484 | stream_name = 'sheet_metadata' | 540 | stream_name = 'sheet_metadata' |
485 | # Sync sheet_metadata if selected | 541 | # Sync sheet_metadata if selected |
486 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | 542 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) |