diff options
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r-- | tap_google_sheets/sync.py | 94 |
1 files changed, 75 insertions, 19 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 76b2e59..311281c 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -6,6 +6,7 @@ import pytz | |||
6 | import singer | 6 | import singer |
7 | from singer import metrics, metadata, Transformer, utils | 7 | from singer import metrics, metadata, Transformer, utils |
8 | from singer.utils import strptime_to_utc, strftime | 8 | from singer.utils import strptime_to_utc, strftime |
9 | from singer.messages import RecordMessage | ||
9 | from tap_google_sheets.streams import STREAMS | 10 | from tap_google_sheets.streams import STREAMS |
10 | from tap_google_sheets.schema import get_sheet_metadata | 11 | from tap_google_sheets.schema import get_sheet_metadata |
11 | 12 | ||
@@ -17,14 +18,26 @@ def write_schema(catalog, stream_name): | |||
17 | schema = stream.schema.to_dict() | 18 | schema = stream.schema.to_dict() |
18 | try: | 19 | try: |
19 | singer.write_schema(stream_name, schema, stream.key_properties) | 20 | singer.write_schema(stream_name, schema, stream.key_properties) |
21 | LOGGER.info('Writing schema for: {}'.format(stream_name)) | ||
20 | except OSError as err: | 22 | except OSError as err: |
21 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) | 23 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) |
22 | raise err | 24 | raise err |
23 | 25 | ||
24 | 26 | ||
25 | def write_record(stream_name, record, time_extracted): | 27 | def write_record(stream_name, record, time_extracted, version=None): |
26 | try: | 28 | try: |
27 | singer.messages.write_record(stream_name, record, time_extracted=time_extracted) | 29 | if version: |
30 | singer.messages.write_message( | ||
31 | RecordMessage( | ||
32 | stream=stream_name, | ||
33 | record=record, | ||
34 | version=version, | ||
35 | time_extracted=time_extracted)) | ||
36 | else: | ||
37 | singer.messages.write_record( | ||
38 | stream_name=stream_name, | ||
39 | record=record, | ||
40 | time_extracted=time_extracted) | ||
28 | except OSError as err: | 41 | except OSError as err: |
29 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) | 42 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) |
30 | LOGGER.info('record: {}'.format(record)) | 43 | LOGGER.info('record: {}'.format(record)) |
@@ -53,7 +66,8 @@ def write_bookmark(state, stream, value): | |||
53 | def process_records(catalog, | 66 | def process_records(catalog, |
54 | stream_name, | 67 | stream_name, |
55 | records, | 68 | records, |
56 | time_extracted): | 69 | time_extracted, |
70 | version=None): | ||
57 | stream = catalog.get_stream(stream_name) | 71 | stream = catalog.get_stream(stream_name) |
58 | schema = stream.schema.to_dict() | 72 | schema = stream.schema.to_dict() |
59 | stream_metadata = metadata.to_map(stream.metadata) | 73 | stream_metadata = metadata.to_map(stream.metadata) |
@@ -65,7 +79,11 @@ def process_records(catalog, | |||
65 | record, | 79 | record, |
66 | schema, | 80 | schema, |
67 | stream_metadata) | 81 | stream_metadata) |
68 | write_record(stream_name, transformed_record, time_extracted=time_extracted) | 82 | write_record( |
83 | stream_name=stream_name, | ||
84 | record=transformed_record, | ||
85 | time_extracted=time_extracted, | ||
86 | version=version) | ||
69 | counter.increment() | 87 | counter.increment() |
70 | return counter.value | 88 | return counter.value |
71 | 89 | ||
@@ -206,7 +224,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None): | |||
206 | 224 | ||
207 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times | 225 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times |
208 | # Convert from array of values to JSON with column names as keys | 226 | # Convert from array of values to JSON with column names as keys |
209 | def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): | 227 | def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows): |
210 | sheet_data_tf = [] | 228 | sheet_data_tf = [] |
211 | row_num = from_row | 229 | row_num = from_row |
212 | # Create sorted list of columns based on columnIndex | 230 | # Create sorted list of columns based on columnIndex |
@@ -229,21 +247,32 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
229 | col = cols[col_num - 1] | 247 | col = cols[col_num - 1] |
230 | col_skipped = col.get('columnSkipped') | 248 | col_skipped = col.get('columnSkipped') |
231 | if not col_skipped: | 249 | if not col_skipped: |
250 | # Get column metadata | ||
232 | col_name = col.get('columnName') | 251 | col_name = col.get('columnName') |
233 | col_type = col.get('columnType') | 252 | col_type = col.get('columnType') |
253 | col_letter = col.get('columnLetter') | ||
254 | |||
255 | # NULL values | ||
256 | if value is None or value == '': | ||
257 | col_val = None | ||
258 | |||
234 | # Convert dates/times from Lotus Notes Serial Numbers | 259 | # Convert dates/times from Lotus Notes Serial Numbers |
235 | # DATE-TIME | 260 | # DATE-TIME |
236 | if col_type == 'numberType.DATE_TIME': | 261 | elif col_type == 'numberType.DATE_TIME': |
237 | if isinstance(value, (int, float)): | 262 | if isinstance(value, (int, float)): |
238 | col_val = excel_to_dttm_str(value) | 263 | col_val = excel_to_dttm_str(value) |
239 | else: | 264 | else: |
240 | col_val = str(value) | 265 | col_val = str(value) |
266 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
267 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
241 | # DATE | 268 | # DATE |
242 | elif col_type == 'numberType.DATE': | 269 | elif col_type == 'numberType.DATE': |
243 | if isinstance(value, (int, float)): | 270 | if isinstance(value, (int, float)): |
244 | col_val = excel_to_dttm_str(value)[:10] | 271 | col_val = excel_to_dttm_str(value)[:10] |
245 | else: | 272 | else: |
246 | col_val = str(value) | 273 | col_val = str(value) |
274 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
275 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
247 | # TIME ONLY (NO DATE) | 276 | # TIME ONLY (NO DATE) |
248 | elif col_type == 'numberType.TIME': | 277 | elif col_type == 'numberType.TIME': |
249 | if isinstance(value, (int, float)): | 278 | if isinstance(value, (int, float)): |
@@ -253,6 +282,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
253 | col_val = str(timedelta(seconds=total_secs)) | 282 | col_val = str(timedelta(seconds=total_secs)) |
254 | except ValueError: | 283 | except ValueError: |
255 | col_val = str(value) | 284 | col_val = str(value) |
285 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
286 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
256 | else: | 287 | else: |
257 | col_val = str(value) | 288 | col_val = str(value) |
258 | # NUMBER (INTEGER AND FLOAT) | 289 | # NUMBER (INTEGER AND FLOAT) |
@@ -268,13 +299,19 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
268 | col_val = float(round(value, 15)) | 299 | col_val = float(round(value, 15)) |
269 | except ValueError: | 300 | except ValueError: |
270 | col_val = str(value) | 301 | col_val = str(value) |
302 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
303 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
271 | else: # decimal_digits <= 15, no rounding | 304 | else: # decimal_digits <= 15, no rounding |
272 | try: | 305 | try: |
273 | col_val = float(value) | 306 | col_val = float(value) |
274 | except ValueError: | 307 | except ValueError: |
275 | col_val = str(value) | 308 | col_val = str(value) |
309 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
310 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
276 | else: | 311 | else: |
277 | col_val = str(value) | 312 | col_val = str(value) |
313 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
314 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
278 | # STRING | 315 | # STRING |
279 | elif col_type == 'stringValue': | 316 | elif col_type == 'stringValue': |
280 | col_val = str(value) | 317 | col_val = str(value) |
@@ -289,6 +326,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
289 | col_val = False | 326 | col_val = False |
290 | else: | 327 | else: |
291 | col_val = str(value) | 328 | col_val = str(value) |
329 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
330 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
292 | elif isinstance(value, int): | 331 | elif isinstance(value, int): |
293 | if value in (1, -1): | 332 | if value in (1, -1): |
294 | col_val = True | 333 | col_val = True |
@@ -296,9 +335,13 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
296 | col_val = False | 335 | col_val = False |
297 | else: | 336 | else: |
298 | col_val = str(value) | 337 | col_val = str(value) |
338 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
339 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
299 | # OTHER: Convert everything else to a string | 340 | # OTHER: Convert everything else to a string |
300 | else: | 341 | else: |
301 | col_val = str(value) | 342 | col_val = str(value) |
343 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
344 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
302 | sheet_data_row_tf[col_name] = col_val | 345 | sheet_data_row_tf[col_name] = col_val |
303 | col_num = col_num + 1 | 346 | col_num = col_num + 1 |
304 | # APPEND non-empty row | 347 | # APPEND non-empty row |
@@ -400,6 +443,20 @@ def sync(client, config, catalog, state): | |||
400 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | 443 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) |
401 | write_schema(catalog, sheet_title) | 444 | write_schema(catalog, sheet_title) |
402 | 445 | ||
446 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) | ||
447 | # everytime after each sheet sync is complete. | ||
448 | # This forces hard deletes on the data downstream if fewer records are sent. | ||
449 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | ||
450 | last_integer = int(get_bookmark(state, sheet_title, 0)) | ||
451 | activate_version = int(time.time() * 1000) | ||
452 | activate_version_message = singer.ActivateVersionMessage( | ||
453 | stream=sheet_title, | ||
454 | version=activate_version) | ||
455 | if last_integer == 0: | ||
456 | # initial load, send activate_version before AND after data sync | ||
457 | singer.write_message(activate_version_message) | ||
458 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
459 | |||
403 | # Determine max range of columns and rows for "paging" through the data | 460 | # Determine max range of columns and rows for "paging" through the data |
404 | sheet_last_col_index = 1 | 461 | sheet_last_col_index = 1 |
405 | sheet_last_col_letter = 'A' | 462 | sheet_last_col_letter = 'A' |
@@ -438,6 +495,7 @@ def sync(client, config, catalog, state): | |||
438 | sheet_data_tf, row_num = transform_sheet_data( | 495 | sheet_data_tf, row_num = transform_sheet_data( |
439 | spreadsheet_id=spreadsheet_id, | 496 | spreadsheet_id=spreadsheet_id, |
440 | sheet_id=sheet_id, | 497 | sheet_id=sheet_id, |
498 | sheet_title=sheet_title, | ||
441 | from_row=from_row, | 499 | from_row=from_row, |
442 | columns=columns, | 500 | columns=columns, |
443 | sheet_data_rows=sheet_data_rows) | 501 | sheet_data_rows=sheet_data_rows) |
@@ -449,10 +507,11 @@ def sync(client, config, catalog, state): | |||
449 | catalog=catalog, | 507 | catalog=catalog, |
450 | stream_name=sheet_title, | 508 | stream_name=sheet_title, |
451 | records=sheet_data_tf, | 509 | records=sheet_data_tf, |
452 | time_extracted=ss_time_extracted) | 510 | time_extracted=ss_time_extracted, |
511 | version=activate_version) | ||
453 | LOGGER.info('Sheet: {}, records processed: {}'.format( | 512 | LOGGER.info('Sheet: {}, records processed: {}'.format( |
454 | sheet_title, record_count)) | 513 | sheet_title, record_count)) |
455 | 514 | ||
456 | # Update paging from/to_row for next batch | 515 | # Update paging from/to_row for next batch |
457 | from_row = to_row + 1 | 516 | from_row = to_row + 1 |
458 | if to_row + batch_rows > sheet_max_row: | 517 | if to_row + batch_rows > sheet_max_row: |
@@ -460,6 +519,14 @@ def sync(client, config, catalog, state): | |||
460 | else: | 519 | else: |
461 | to_row = to_row + batch_rows | 520 | to_row = to_row + batch_rows |
462 | 521 | ||
522 | # End of Stream: Send Activate Version and update State | ||
523 | singer.write_message(activate_version_message) | ||
524 | write_bookmark(state, sheet_title, activate_version) | ||
525 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
526 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
527 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
528 | update_currently_syncing(state, None) | ||
529 | |||
463 | # SHEETS_LOADED | 530 | # SHEETS_LOADED |
464 | # Add sheet to sheets_loaded | 531 | # Add sheet to sheets_loaded |
465 | sheet_loaded = {} | 532 | sheet_loaded = {} |
@@ -470,17 +537,6 @@ def sync(client, config, catalog, state): | |||
470 | sheet_loaded['lastRowNumber'] = row_num | 537 | sheet_loaded['lastRowNumber'] = row_num |
471 | sheets_loaded.append(sheet_loaded) | 538 | sheets_loaded.append(sheet_loaded) |
472 | 539 | ||
473 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. | ||
474 | # This forces hard deletes on the data downstream if fewer records are sent. | ||
475 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | ||
476 | activate_version_message = singer.ActivateVersionMessage( | ||
477 | stream=sheet_title, | ||
478 | version=int(time.time() * 1000)) | ||
479 | singer.write_message(activate_version_message) | ||
480 | |||
481 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
482 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
483 | |||
484 | stream_name = 'sheet_metadata' | 540 | stream_name = 'sheet_metadata' |
485 | # Sync sheet_metadata if selected | 541 | # Sync sheet_metadata if selected |
486 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | 542 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) |