aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets
diff options
context:
space:
mode:
authorJeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com>2020-01-09 07:30:53 -0800
committerKyle Allan <KAllan357@gmail.com>2020-01-09 10:30:53 -0500
commit43a24cbab1dbc35b893c35b86e34adc0f2fb84e7 (patch)
treebcbaae860aad0a94bcc4d27f4804504691401438 /tap_google_sheets
parent5890b89c1aa7c554235b3cef156b5a5a2c594bec (diff)
downloadtap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.tar.gz
tap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.tar.zst
tap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.zip
v.0.0.3 Sync error handling, activate version, documentation (#2)v0.0.3
* v.0.0.2 schema and sync changes Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py * v.0.0.3 Sync activate version and error handling Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages.
Diffstat (limited to 'tap_google_sheets')
-rw-r--r--tap_google_sheets/schema.py51
-rw-r--r--tap_google_sheets/sync.py94
2 files changed, 116 insertions, 29 deletions
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py
index 243467b..e319c03 100644
--- a/tap_google_sheets/schema.py
+++ b/tap_google_sheets/schema.py
@@ -21,6 +21,7 @@ def colnum_string(num):
21 21
22# Create sheet_metadata_json with columns from sheet 22# Create sheet_metadata_json with columns from sheet
23def get_sheet_schema_columns(sheet): 23def get_sheet_schema_columns(sheet):
24 sheet_title = sheet.get('properties', {}).get('title')
24 sheet_json_schema = OrderedDict() 25 sheet_json_schema = OrderedDict()
25 data = next(iter(sheet.get('data', [])), {}) 26 data = next(iter(sheet.get('data', [])), {})
26 row_data = data.get('rowData', []) 27 row_data = data.get('rowData', [])
@@ -62,15 +63,34 @@ def get_sheet_schema_columns(sheet):
62 skipped = 0 63 skipped = 0
63 column_name = '{}'.format(header_value) 64 column_name = '{}'.format(header_value)
64 if column_name in header_list: 65 if column_name in header_list:
65 raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) 66 raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format(
67 sheet_title, column_name, column_letter))
66 header_list.append(column_name) 68 header_list.append(column_name)
67 69
68 first_value = first_values[i] 70 first_value = None
69 71 try:
72 first_value = first_values[i]
73 except IndexError as err:
74 raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format(
75 sheet_title, column_name, column_letter, err))
76
70 column_effective_value = first_value.get('effectiveValue', {}) 77 column_effective_value = first_value.get('effectiveValue', {})
71 for key in column_effective_value.keys(): 78
72 if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): 79 col_val = None
73 column_effective_value_type = key 80 if column_effective_value == {}:
81 column_effective_value_type = 'stringValue'
82 LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format(
83 sheet_title, column_name, column_letter))
84 LOGGER.info(' Setting column datatype to STRING')
85 else:
86 for key, val in column_effective_value.items():
87 if key in ('numberValue', 'stringValue', 'boolValue'):
88 column_effective_value_type = key
89 col_val = str(val)
90 elif key in ('errorType', 'formulaType'):
91 col_val = str(val)
92 raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
93 sheet_title, column_name, column_letter, key, col_val))
74 94
75 column_number_format = first_values[i].get('effectiveFormat', {}).get( 95 column_number_format = first_values[i].get('effectiveFormat', {}).get(
76 'numberFormat', {}) 96 'numberFormat', {})
@@ -87,7 +107,13 @@ def get_sheet_schema_columns(sheet):
87 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType 107 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
88 # 108 #
89 column_format = None # Default 109 column_format = None # Default
90 if column_effective_value_type == 'stringValue': 110 if column_effective_value == {}:
111 col_properties = {'type': ['null', 'string']}
112 column_gs_type = 'stringValue'
113 LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format(
114 sheet_title, column_name, column_letter))
115 LOGGER.info(' Setting column datatype to STRING')
116 elif column_effective_value_type == 'stringValue':
91 col_properties = {'type': ['null', 'string']} 117 col_properties = {'type': ['null', 'string']}
92 column_gs_type = 'stringValue' 118 column_gs_type = 'stringValue'
93 elif column_effective_value_type == 'boolValue': 119 elif column_effective_value_type == 'boolValue':
@@ -138,8 +164,8 @@ def get_sheet_schema_columns(sheet):
138 else: 164 else:
139 col_properties = {'type': ['null', 'string']} 165 col_properties = {'type': ['null', 'string']}
140 column_gs_type = 'unsupportedValue' 166 column_gs_type = 'unsupportedValue'
141 LOGGER.info('Unsupported data type: {}, value: {}'.format(column_name, \ 167 LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
142 column_effective_value_type)) 168 sheet_title, column_name, column_letter, column_effective_value_type, col_val))
143 LOGGER.info('Converting to string.') 169 LOGGER.info('Converting to string.')
144 else: # skipped 170 else: # skipped
145 column_is_skipped = True 171 column_is_skipped = True
@@ -148,11 +174,16 @@ def get_sheet_schema_columns(sheet):
148 column_name = '__sdc_skip_col_{}'.format(column_index_str) 174 column_name = '__sdc_skip_col_{}'.format(column_index_str)
149 col_properties = {'type': ['null', 'string']} 175 col_properties = {'type': ['null', 'string']}
150 column_gs_type = 'stringValue' 176 column_gs_type = 'stringValue'
177 LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format(
178 sheet_title, column_name, column_letter))
179 LOGGER.info(' This column will be skipped during data loading.')
151 180
152 if skipped >= 2: 181 if skipped >= 2:
153 # skipped = 2 consecutive skipped headers 182 # skipped = 2 consecutive skipped headers
154 # Remove prior_header column_name 183 # Remove prior_header column_name
155 sheet_json_schema['properties'].pop(prior_header, None) 184 sheet_json_schema['properties'].pop(prior_header, None)
185 LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format(
186 sheet_title, column_name, column_letter))
156 break 187 break
157 188
158 else: 189 else:
@@ -245,7 +276,7 @@ def get_schemas(client, spreadsheet_id):
245 for sheet in sheets: 276 for sheet in sheets:
246 # GET sheet_json_schema for each worksheet (from function above) 277 # GET sheet_json_schema for each worksheet (from function above)
247 sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 278 sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
248 LOGGER.info('columns = {}'.format(columns)) 279 # LOGGER.info('columns = {}'.format(columns))
249 280
250 sheet_title = sheet.get('properties', {}).get('title') 281 sheet_title = sheet.get('properties', {}).get('title')
251 schemas[sheet_title] = sheet_json_schema 282 schemas[sheet_title] = sheet_json_schema
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
index 76b2e59..311281c 100644
--- a/tap_google_sheets/sync.py
+++ b/tap_google_sheets/sync.py
@@ -6,6 +6,7 @@ import pytz
6import singer 6import singer
7from singer import metrics, metadata, Transformer, utils 7from singer import metrics, metadata, Transformer, utils
8from singer.utils import strptime_to_utc, strftime 8from singer.utils import strptime_to_utc, strftime
9from singer.messages import RecordMessage
9from tap_google_sheets.streams import STREAMS 10from tap_google_sheets.streams import STREAMS
10from tap_google_sheets.schema import get_sheet_metadata 11from tap_google_sheets.schema import get_sheet_metadata
11 12
@@ -17,14 +18,26 @@ def write_schema(catalog, stream_name):
17 schema = stream.schema.to_dict() 18 schema = stream.schema.to_dict()
18 try: 19 try:
19 singer.write_schema(stream_name, schema, stream.key_properties) 20 singer.write_schema(stream_name, schema, stream.key_properties)
21 LOGGER.info('Writing schema for: {}'.format(stream_name))
20 except OSError as err: 22 except OSError as err:
21 LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) 23 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
22 raise err 24 raise err
23 25
24 26
25def write_record(stream_name, record, time_extracted): 27def write_record(stream_name, record, time_extracted, version=None):
26 try: 28 try:
27 singer.messages.write_record(stream_name, record, time_extracted=time_extracted) 29 if version:
30 singer.messages.write_message(
31 RecordMessage(
32 stream=stream_name,
33 record=record,
34 version=version,
35 time_extracted=time_extracted))
36 else:
37 singer.messages.write_record(
38 stream_name=stream_name,
39 record=record,
40 time_extracted=time_extracted)
28 except OSError as err: 41 except OSError as err:
29 LOGGER.info('OS Error writing record for: {}'.format(stream_name)) 42 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
30 LOGGER.info('record: {}'.format(record)) 43 LOGGER.info('record: {}'.format(record))
@@ -53,7 +66,8 @@ def write_bookmark(state, stream, value):
53def process_records(catalog, 66def process_records(catalog,
54 stream_name, 67 stream_name,
55 records, 68 records,
56 time_extracted): 69 time_extracted,
70 version=None):
57 stream = catalog.get_stream(stream_name) 71 stream = catalog.get_stream(stream_name)
58 schema = stream.schema.to_dict() 72 schema = stream.schema.to_dict()
59 stream_metadata = metadata.to_map(stream.metadata) 73 stream_metadata = metadata.to_map(stream.metadata)
@@ -65,7 +79,11 @@ def process_records(catalog,
65 record, 79 record,
66 schema, 80 schema,
67 stream_metadata) 81 stream_metadata)
68 write_record(stream_name, transformed_record, time_extracted=time_extracted) 82 write_record(
83 stream_name=stream_name,
84 record=transformed_record,
85 time_extracted=time_extracted,
86 version=version)
69 counter.increment() 87 counter.increment()
70 return counter.value 88 return counter.value
71 89
@@ -206,7 +224,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None):
206 224
207# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times 225# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
208# Convert from array of values to JSON with column names as keys 226# Convert from array of values to JSON with column names as keys
209def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): 227def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
210 sheet_data_tf = [] 228 sheet_data_tf = []
211 row_num = from_row 229 row_num = from_row
212 # Create sorted list of columns based on columnIndex 230 # Create sorted list of columns based on columnIndex
@@ -229,21 +247,32 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
229 col = cols[col_num - 1] 247 col = cols[col_num - 1]
230 col_skipped = col.get('columnSkipped') 248 col_skipped = col.get('columnSkipped')
231 if not col_skipped: 249 if not col_skipped:
250 # Get column metadata
232 col_name = col.get('columnName') 251 col_name = col.get('columnName')
233 col_type = col.get('columnType') 252 col_type = col.get('columnType')
253 col_letter = col.get('columnLetter')
254
255 # NULL values
256 if value is None or value == '':
257 col_val = None
258
234 # Convert dates/times from Lotus Notes Serial Numbers 259 # Convert dates/times from Lotus Notes Serial Numbers
235 # DATE-TIME 260 # DATE-TIME
236 if col_type == 'numberType.DATE_TIME': 261 elif col_type == 'numberType.DATE_TIME':
237 if isinstance(value, (int, float)): 262 if isinstance(value, (int, float)):
238 col_val = excel_to_dttm_str(value) 263 col_val = excel_to_dttm_str(value)
239 else: 264 else:
240 col_val = str(value) 265 col_val = str(value)
266 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
267 sheet_title, col_name, col_letter, row_num, col_type, value))
241 # DATE 268 # DATE
242 elif col_type == 'numberType.DATE': 269 elif col_type == 'numberType.DATE':
243 if isinstance(value, (int, float)): 270 if isinstance(value, (int, float)):
244 col_val = excel_to_dttm_str(value)[:10] 271 col_val = excel_to_dttm_str(value)[:10]
245 else: 272 else:
246 col_val = str(value) 273 col_val = str(value)
274 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
275 sheet_title, col_name, col_letter, row_num, col_type, value))
247 # TIME ONLY (NO DATE) 276 # TIME ONLY (NO DATE)
248 elif col_type == 'numberType.TIME': 277 elif col_type == 'numberType.TIME':
249 if isinstance(value, (int, float)): 278 if isinstance(value, (int, float)):
@@ -253,6 +282,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
253 col_val = str(timedelta(seconds=total_secs)) 282 col_val = str(timedelta(seconds=total_secs))
254 except ValueError: 283 except ValueError:
255 col_val = str(value) 284 col_val = str(value)
285 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
286 sheet_title, col_name, col_letter, row_num, col_type, value))
256 else: 287 else:
257 col_val = str(value) 288 col_val = str(value)
258 # NUMBER (INTEGER AND FLOAT) 289 # NUMBER (INTEGER AND FLOAT)
@@ -268,13 +299,19 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
268 col_val = float(round(value, 15)) 299 col_val = float(round(value, 15))
269 except ValueError: 300 except ValueError:
270 col_val = str(value) 301 col_val = str(value)
302 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
303 sheet_title, col_name, col_letter, row_num, col_type, value))
271 else: # decimal_digits <= 15, no rounding 304 else: # decimal_digits <= 15, no rounding
272 try: 305 try:
273 col_val = float(value) 306 col_val = float(value)
274 except ValueError: 307 except ValueError:
275 col_val = str(value) 308 col_val = str(value)
309 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
310 sheet_title, col_name, col_letter, row_num, col_type, value))
276 else: 311 else:
277 col_val = str(value) 312 col_val = str(value)
313 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
314 sheet_title, col_name, col_letter, row_num, col_type, value))
278 # STRING 315 # STRING
279 elif col_type == 'stringValue': 316 elif col_type == 'stringValue':
280 col_val = str(value) 317 col_val = str(value)
@@ -289,6 +326,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
289 col_val = False 326 col_val = False
290 else: 327 else:
291 col_val = str(value) 328 col_val = str(value)
329 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
330 sheet_title, col_name, col_letter, row, col_type, value))
292 elif isinstance(value, int): 331 elif isinstance(value, int):
293 if value in (1, -1): 332 if value in (1, -1):
294 col_val = True 333 col_val = True
@@ -296,9 +335,13 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
296 col_val = False 335 col_val = False
297 else: 336 else:
298 col_val = str(value) 337 col_val = str(value)
338 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
339 sheet_title, col_name, col_letter, row, col_type, value))
299 # OTHER: Convert everything else to a string 340 # OTHER: Convert everything else to a string
300 else: 341 else:
301 col_val = str(value) 342 col_val = str(value)
343 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
344 sheet_title, col_name, col_letter, row, col_type, value))
302 sheet_data_row_tf[col_name] = col_val 345 sheet_data_row_tf[col_name] = col_val
303 col_num = col_num + 1 346 col_num = col_num + 1
304 # APPEND non-empty row 347 # APPEND non-empty row
@@ -400,6 +443,20 @@ def sync(client, config, catalog, state):
400 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) 443 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
401 write_schema(catalog, sheet_title) 444 write_schema(catalog, sheet_title)
402 445
446 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
447 # everytime after each sheet sync is complete.
448 # This forces hard deletes on the data downstream if fewer records are sent.
449 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
450 last_integer = int(get_bookmark(state, sheet_title, 0))
451 activate_version = int(time.time() * 1000)
452 activate_version_message = singer.ActivateVersionMessage(
453 stream=sheet_title,
454 version=activate_version)
455 if last_integer == 0:
456 # initial load, send activate_version before AND after data sync
457 singer.write_message(activate_version_message)
458 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
459
403 # Determine max range of columns and rows for "paging" through the data 460 # Determine max range of columns and rows for "paging" through the data
404 sheet_last_col_index = 1 461 sheet_last_col_index = 1
405 sheet_last_col_letter = 'A' 462 sheet_last_col_letter = 'A'
@@ -438,6 +495,7 @@ def sync(client, config, catalog, state):
438 sheet_data_tf, row_num = transform_sheet_data( 495 sheet_data_tf, row_num = transform_sheet_data(
439 spreadsheet_id=spreadsheet_id, 496 spreadsheet_id=spreadsheet_id,
440 sheet_id=sheet_id, 497 sheet_id=sheet_id,
498 sheet_title=sheet_title,
441 from_row=from_row, 499 from_row=from_row,
442 columns=columns, 500 columns=columns,
443 sheet_data_rows=sheet_data_rows) 501 sheet_data_rows=sheet_data_rows)
@@ -449,10 +507,11 @@ def sync(client, config, catalog, state):
449 catalog=catalog, 507 catalog=catalog,
450 stream_name=sheet_title, 508 stream_name=sheet_title,
451 records=sheet_data_tf, 509 records=sheet_data_tf,
452 time_extracted=ss_time_extracted) 510 time_extracted=ss_time_extracted,
511 version=activate_version)
453 LOGGER.info('Sheet: {}, records processed: {}'.format( 512 LOGGER.info('Sheet: {}, records processed: {}'.format(
454 sheet_title, record_count)) 513 sheet_title, record_count))
455 514
456 # Update paging from/to_row for next batch 515 # Update paging from/to_row for next batch
457 from_row = to_row + 1 516 from_row = to_row + 1
458 if to_row + batch_rows > sheet_max_row: 517 if to_row + batch_rows > sheet_max_row:
@@ -460,6 +519,14 @@ def sync(client, config, catalog, state):
460 else: 519 else:
461 to_row = to_row + batch_rows 520 to_row = to_row + batch_rows
462 521
522 # End of Stream: Send Activate Version and update State
523 singer.write_message(activate_version_message)
524 write_bookmark(state, sheet_title, activate_version)
525 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
526 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
527 sheet_title, row_num - 2)) # subtract 1 for header row
528 update_currently_syncing(state, None)
529
463 # SHEETS_LOADED 530 # SHEETS_LOADED
464 # Add sheet to sheets_loaded 531 # Add sheet to sheets_loaded
465 sheet_loaded = {} 532 sheet_loaded = {}
@@ -470,17 +537,6 @@ def sync(client, config, catalog, state):
470 sheet_loaded['lastRowNumber'] = row_num 537 sheet_loaded['lastRowNumber'] = row_num
471 sheets_loaded.append(sheet_loaded) 538 sheets_loaded.append(sheet_loaded)
472 539
473 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
474 # This forces hard deletes on the data downstream if fewer records are sent.
475 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
476 activate_version_message = singer.ActivateVersionMessage(
477 stream=sheet_title,
478 version=int(time.time() * 1000))
479 singer.write_message(activate_version_message)
480
481 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
482 sheet_title, row_num - 2)) # subtract 1 for header row
483
484 stream_name = 'sheet_metadata' 540 stream_name = 'sheet_metadata'
485 # Sync sheet_metadata if selected 541 # Sync sheet_metadata if selected
486 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) 542 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)