aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r--tap_google_sheets/sync.py94
1 files changed, 75 insertions, 19 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
index 76b2e59..311281c 100644
--- a/tap_google_sheets/sync.py
+++ b/tap_google_sheets/sync.py
@@ -6,6 +6,7 @@ import pytz
6import singer 6import singer
7from singer import metrics, metadata, Transformer, utils 7from singer import metrics, metadata, Transformer, utils
8from singer.utils import strptime_to_utc, strftime 8from singer.utils import strptime_to_utc, strftime
9from singer.messages import RecordMessage
9from tap_google_sheets.streams import STREAMS 10from tap_google_sheets.streams import STREAMS
10from tap_google_sheets.schema import get_sheet_metadata 11from tap_google_sheets.schema import get_sheet_metadata
11 12
@@ -17,14 +18,26 @@ def write_schema(catalog, stream_name):
17 schema = stream.schema.to_dict() 18 schema = stream.schema.to_dict()
18 try: 19 try:
19 singer.write_schema(stream_name, schema, stream.key_properties) 20 singer.write_schema(stream_name, schema, stream.key_properties)
21 LOGGER.info('Writing schema for: {}'.format(stream_name))
20 except OSError as err: 22 except OSError as err:
21 LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) 23 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
22 raise err 24 raise err
23 25
24 26
25def write_record(stream_name, record, time_extracted): 27def write_record(stream_name, record, time_extracted, version=None):
26 try: 28 try:
27 singer.messages.write_record(stream_name, record, time_extracted=time_extracted) 29 if version:
30 singer.messages.write_message(
31 RecordMessage(
32 stream=stream_name,
33 record=record,
34 version=version,
35 time_extracted=time_extracted))
36 else:
37 singer.messages.write_record(
38 stream_name=stream_name,
39 record=record,
40 time_extracted=time_extracted)
28 except OSError as err: 41 except OSError as err:
29 LOGGER.info('OS Error writing record for: {}'.format(stream_name)) 42 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
30 LOGGER.info('record: {}'.format(record)) 43 LOGGER.info('record: {}'.format(record))
@@ -53,7 +66,8 @@ def write_bookmark(state, stream, value):
53def process_records(catalog, 66def process_records(catalog,
54 stream_name, 67 stream_name,
55 records, 68 records,
56 time_extracted): 69 time_extracted,
70 version=None):
57 stream = catalog.get_stream(stream_name) 71 stream = catalog.get_stream(stream_name)
58 schema = stream.schema.to_dict() 72 schema = stream.schema.to_dict()
59 stream_metadata = metadata.to_map(stream.metadata) 73 stream_metadata = metadata.to_map(stream.metadata)
@@ -65,7 +79,11 @@ def process_records(catalog,
65 record, 79 record,
66 schema, 80 schema,
67 stream_metadata) 81 stream_metadata)
68 write_record(stream_name, transformed_record, time_extracted=time_extracted) 82 write_record(
83 stream_name=stream_name,
84 record=transformed_record,
85 time_extracted=time_extracted,
86 version=version)
69 counter.increment() 87 counter.increment()
70 return counter.value 88 return counter.value
71 89
@@ -206,7 +224,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None):
206 224
207# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times 225# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
208# Convert from array of values to JSON with column names as keys 226# Convert from array of values to JSON with column names as keys
209def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): 227def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
210 sheet_data_tf = [] 228 sheet_data_tf = []
211 row_num = from_row 229 row_num = from_row
212 # Create sorted list of columns based on columnIndex 230 # Create sorted list of columns based on columnIndex
@@ -229,21 +247,32 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
229 col = cols[col_num - 1] 247 col = cols[col_num - 1]
230 col_skipped = col.get('columnSkipped') 248 col_skipped = col.get('columnSkipped')
231 if not col_skipped: 249 if not col_skipped:
250 # Get column metadata
232 col_name = col.get('columnName') 251 col_name = col.get('columnName')
233 col_type = col.get('columnType') 252 col_type = col.get('columnType')
253 col_letter = col.get('columnLetter')
254
255 # NULL values
256 if value is None or value == '':
257 col_val = None
258
234 # Convert dates/times from Lotus Notes Serial Numbers 259 # Convert dates/times from Lotus Notes Serial Numbers
235 # DATE-TIME 260 # DATE-TIME
236 if col_type == 'numberType.DATE_TIME': 261 elif col_type == 'numberType.DATE_TIME':
237 if isinstance(value, (int, float)): 262 if isinstance(value, (int, float)):
238 col_val = excel_to_dttm_str(value) 263 col_val = excel_to_dttm_str(value)
239 else: 264 else:
240 col_val = str(value) 265 col_val = str(value)
266 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
267 sheet_title, col_name, col_letter, row_num, col_type, value))
241 # DATE 268 # DATE
242 elif col_type == 'numberType.DATE': 269 elif col_type == 'numberType.DATE':
243 if isinstance(value, (int, float)): 270 if isinstance(value, (int, float)):
244 col_val = excel_to_dttm_str(value)[:10] 271 col_val = excel_to_dttm_str(value)[:10]
245 else: 272 else:
246 col_val = str(value) 273 col_val = str(value)
274 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
275 sheet_title, col_name, col_letter, row_num, col_type, value))
247 # TIME ONLY (NO DATE) 276 # TIME ONLY (NO DATE)
248 elif col_type == 'numberType.TIME': 277 elif col_type == 'numberType.TIME':
249 if isinstance(value, (int, float)): 278 if isinstance(value, (int, float)):
@@ -253,6 +282,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
253 col_val = str(timedelta(seconds=total_secs)) 282 col_val = str(timedelta(seconds=total_secs))
254 except ValueError: 283 except ValueError:
255 col_val = str(value) 284 col_val = str(value)
285 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
286 sheet_title, col_name, col_letter, row_num, col_type, value))
256 else: 287 else:
257 col_val = str(value) 288 col_val = str(value)
258 # NUMBER (INTEGER AND FLOAT) 289 # NUMBER (INTEGER AND FLOAT)
@@ -268,13 +299,19 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
268 col_val = float(round(value, 15)) 299 col_val = float(round(value, 15))
269 except ValueError: 300 except ValueError:
270 col_val = str(value) 301 col_val = str(value)
302 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
303 sheet_title, col_name, col_letter, row_num, col_type, value))
271 else: # decimal_digits <= 15, no rounding 304 else: # decimal_digits <= 15, no rounding
272 try: 305 try:
273 col_val = float(value) 306 col_val = float(value)
274 except ValueError: 307 except ValueError:
275 col_val = str(value) 308 col_val = str(value)
309 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
310 sheet_title, col_name, col_letter, row_num, col_type, value))
276 else: 311 else:
277 col_val = str(value) 312 col_val = str(value)
313 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
314 sheet_title, col_name, col_letter, row_num, col_type, value))
278 # STRING 315 # STRING
279 elif col_type == 'stringValue': 316 elif col_type == 'stringValue':
280 col_val = str(value) 317 col_val = str(value)
@@ -289,6 +326,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
289 col_val = False 326 col_val = False
290 else: 327 else:
291 col_val = str(value) 328 col_val = str(value)
329 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
330 sheet_title, col_name, col_letter, row, col_type, value))
292 elif isinstance(value, int): 331 elif isinstance(value, int):
293 if value in (1, -1): 332 if value in (1, -1):
294 col_val = True 333 col_val = True
@@ -296,9 +335,13 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
296 col_val = False 335 col_val = False
297 else: 336 else:
298 col_val = str(value) 337 col_val = str(value)
338 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
339 sheet_title, col_name, col_letter, row, col_type, value))
299 # OTHER: Convert everything else to a string 340 # OTHER: Convert everything else to a string
300 else: 341 else:
301 col_val = str(value) 342 col_val = str(value)
343 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
344 sheet_title, col_name, col_letter, row, col_type, value))
302 sheet_data_row_tf[col_name] = col_val 345 sheet_data_row_tf[col_name] = col_val
303 col_num = col_num + 1 346 col_num = col_num + 1
304 # APPEND non-empty row 347 # APPEND non-empty row
@@ -400,6 +443,20 @@ def sync(client, config, catalog, state):
400 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) 443 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
401 write_schema(catalog, sheet_title) 444 write_schema(catalog, sheet_title)
402 445
446 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
447 # everytime after each sheet sync is complete.
448 # This forces hard deletes on the data downstream if fewer records are sent.
449 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
450 last_integer = int(get_bookmark(state, sheet_title, 0))
451 activate_version = int(time.time() * 1000)
452 activate_version_message = singer.ActivateVersionMessage(
453 stream=sheet_title,
454 version=activate_version)
455 if last_integer == 0:
456 # initial load, send activate_version before AND after data sync
457 singer.write_message(activate_version_message)
458 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
459
403 # Determine max range of columns and rows for "paging" through the data 460 # Determine max range of columns and rows for "paging" through the data
404 sheet_last_col_index = 1 461 sheet_last_col_index = 1
405 sheet_last_col_letter = 'A' 462 sheet_last_col_letter = 'A'
@@ -438,6 +495,7 @@ def sync(client, config, catalog, state):
438 sheet_data_tf, row_num = transform_sheet_data( 495 sheet_data_tf, row_num = transform_sheet_data(
439 spreadsheet_id=spreadsheet_id, 496 spreadsheet_id=spreadsheet_id,
440 sheet_id=sheet_id, 497 sheet_id=sheet_id,
498 sheet_title=sheet_title,
441 from_row=from_row, 499 from_row=from_row,
442 columns=columns, 500 columns=columns,
443 sheet_data_rows=sheet_data_rows) 501 sheet_data_rows=sheet_data_rows)
@@ -449,10 +507,11 @@ def sync(client, config, catalog, state):
449 catalog=catalog, 507 catalog=catalog,
450 stream_name=sheet_title, 508 stream_name=sheet_title,
451 records=sheet_data_tf, 509 records=sheet_data_tf,
452 time_extracted=ss_time_extracted) 510 time_extracted=ss_time_extracted,
511 version=activate_version)
453 LOGGER.info('Sheet: {}, records processed: {}'.format( 512 LOGGER.info('Sheet: {}, records processed: {}'.format(
454 sheet_title, record_count)) 513 sheet_title, record_count))
455 514
456 # Update paging from/to_row for next batch 515 # Update paging from/to_row for next batch
457 from_row = to_row + 1 516 from_row = to_row + 1
458 if to_row + batch_rows > sheet_max_row: 517 if to_row + batch_rows > sheet_max_row:
@@ -460,6 +519,14 @@ def sync(client, config, catalog, state):
460 else: 519 else:
461 to_row = to_row + batch_rows 520 to_row = to_row + batch_rows
462 521
522 # End of Stream: Send Activate Version and update State
523 singer.write_message(activate_version_message)
524 write_bookmark(state, sheet_title, activate_version)
525 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
526 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
527 sheet_title, row_num - 2)) # subtract 1 for header row
528 update_currently_syncing(state, None)
529
463 # SHEETS_LOADED 530 # SHEETS_LOADED
464 # Add sheet to sheets_loaded 531 # Add sheet to sheets_loaded
465 sheet_loaded = {} 532 sheet_loaded = {}
@@ -470,17 +537,6 @@ def sync(client, config, catalog, state):
470 sheet_loaded['lastRowNumber'] = row_num 537 sheet_loaded['lastRowNumber'] = row_num
471 sheets_loaded.append(sheet_loaded) 538 sheets_loaded.append(sheet_loaded)
472 539
473 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
474 # This forces hard deletes on the data downstream if fewer records are sent.
475 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
476 activate_version_message = singer.ActivateVersionMessage(
477 stream=sheet_title,
478 version=int(time.time() * 1000))
479 singer.write_message(activate_version_message)
480
481 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
482 sheet_title, row_num - 2)) # subtract 1 for header row
483
484 stream_name = 'sheet_metadata' 540 stream_name = 'sheet_metadata'
485 # Sync sheet_metadata if selected 541 # Sync sheet_metadata if selected
486 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) 542 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)