aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets
diff options
context:
space:
mode:
Diffstat (limited to 'tap_google_sheets')
-rw-r--r--tap_google_sheets/schema.py51
-rw-r--r--tap_google_sheets/sync.py94
2 files changed, 116 insertions, 29 deletions
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py
index 243467b..e319c03 100644
--- a/tap_google_sheets/schema.py
+++ b/tap_google_sheets/schema.py
@@ -21,6 +21,7 @@ def colnum_string(num):
21 21
22# Create sheet_metadata_json with columns from sheet 22# Create sheet_metadata_json with columns from sheet
23def get_sheet_schema_columns(sheet): 23def get_sheet_schema_columns(sheet):
24 sheet_title = sheet.get('properties', {}).get('title')
24 sheet_json_schema = OrderedDict() 25 sheet_json_schema = OrderedDict()
25 data = next(iter(sheet.get('data', [])), {}) 26 data = next(iter(sheet.get('data', [])), {})
26 row_data = data.get('rowData', []) 27 row_data = data.get('rowData', [])
@@ -62,15 +63,34 @@ def get_sheet_schema_columns(sheet):
62 skipped = 0 63 skipped = 0
63 column_name = '{}'.format(header_value) 64 column_name = '{}'.format(header_value)
64 if column_name in header_list: 65 if column_name in header_list:
65 raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) 66 raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format(
67 sheet_title, column_name, column_letter))
66 header_list.append(column_name) 68 header_list.append(column_name)
67 69
68 first_value = first_values[i] 70 first_value = None
69 71 try:
72 first_value = first_values[i]
73 except IndexError as err:
74 raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format(
75 sheet_title, column_name, column_letter, err))
76
70 column_effective_value = first_value.get('effectiveValue', {}) 77 column_effective_value = first_value.get('effectiveValue', {})
71 for key in column_effective_value.keys(): 78
72 if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): 79 col_val = None
73 column_effective_value_type = key 80 if column_effective_value == {}:
81 column_effective_value_type = 'stringValue'
82 LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format(
83 sheet_title, column_name, column_letter))
84 LOGGER.info(' Setting column datatype to STRING')
85 else:
86 for key, val in column_effective_value.items():
87 if key in ('numberValue', 'stringValue', 'boolValue'):
88 column_effective_value_type = key
89 col_val = str(val)
90 elif key in ('errorType', 'formulaType'):
91 col_val = str(val)
92 raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
93 sheet_title, column_name, column_letter, key, col_val))
74 94
75 column_number_format = first_values[i].get('effectiveFormat', {}).get( 95 column_number_format = first_values[i].get('effectiveFormat', {}).get(
76 'numberFormat', {}) 96 'numberFormat', {})
@@ -87,7 +107,13 @@ def get_sheet_schema_columns(sheet):
87 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType 107 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
88 # 108 #
89 column_format = None # Default 109 column_format = None # Default
90 if column_effective_value_type == 'stringValue': 110 if column_effective_value == {}:
111 col_properties = {'type': ['null', 'string']}
112 column_gs_type = 'stringValue'
113 LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format(
114 sheet_title, column_name, column_letter))
115 LOGGER.info(' Setting column datatype to STRING')
116 elif column_effective_value_type == 'stringValue':
91 col_properties = {'type': ['null', 'string']} 117 col_properties = {'type': ['null', 'string']}
92 column_gs_type = 'stringValue' 118 column_gs_type = 'stringValue'
93 elif column_effective_value_type == 'boolValue': 119 elif column_effective_value_type == 'boolValue':
@@ -138,8 +164,8 @@ def get_sheet_schema_columns(sheet):
138 else: 164 else:
139 col_properties = {'type': ['null', 'string']} 165 col_properties = {'type': ['null', 'string']}
140 column_gs_type = 'unsupportedValue' 166 column_gs_type = 'unsupportedValue'
141 LOGGER.info('Unsupported data type: {}, value: {}'.format(column_name, \ 167 LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
142 column_effective_value_type)) 168 sheet_title, column_name, column_letter, column_effective_value_type, col_val))
143 LOGGER.info('Converting to string.') 169 LOGGER.info('Converting to string.')
144 else: # skipped 170 else: # skipped
145 column_is_skipped = True 171 column_is_skipped = True
@@ -148,11 +174,16 @@ def get_sheet_schema_columns(sheet):
148 column_name = '__sdc_skip_col_{}'.format(column_index_str) 174 column_name = '__sdc_skip_col_{}'.format(column_index_str)
149 col_properties = {'type': ['null', 'string']} 175 col_properties = {'type': ['null', 'string']}
150 column_gs_type = 'stringValue' 176 column_gs_type = 'stringValue'
177 LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format(
178 sheet_title, column_name, column_letter))
179 LOGGER.info(' This column will be skipped during data loading.')
151 180
152 if skipped >= 2: 181 if skipped >= 2:
153 # skipped = 2 consecutive skipped headers 182 # skipped = 2 consecutive skipped headers
154 # Remove prior_header column_name 183 # Remove prior_header column_name
155 sheet_json_schema['properties'].pop(prior_header, None) 184 sheet_json_schema['properties'].pop(prior_header, None)
185 LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format(
186 sheet_title, column_name, column_letter))
156 break 187 break
157 188
158 else: 189 else:
@@ -245,7 +276,7 @@ def get_schemas(client, spreadsheet_id):
245 for sheet in sheets: 276 for sheet in sheets:
246 # GET sheet_json_schema for each worksheet (from function above) 277 # GET sheet_json_schema for each worksheet (from function above)
247 sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 278 sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
248 LOGGER.info('columns = {}'.format(columns)) 279 # LOGGER.info('columns = {}'.format(columns))
249 280
250 sheet_title = sheet.get('properties', {}).get('title') 281 sheet_title = sheet.get('properties', {}).get('title')
251 schemas[sheet_title] = sheet_json_schema 282 schemas[sheet_title] = sheet_json_schema
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
index 76b2e59..311281c 100644
--- a/tap_google_sheets/sync.py
+++ b/tap_google_sheets/sync.py
@@ -6,6 +6,7 @@ import pytz
6import singer 6import singer
7from singer import metrics, metadata, Transformer, utils 7from singer import metrics, metadata, Transformer, utils
8from singer.utils import strptime_to_utc, strftime 8from singer.utils import strptime_to_utc, strftime
9from singer.messages import RecordMessage
9from tap_google_sheets.streams import STREAMS 10from tap_google_sheets.streams import STREAMS
10from tap_google_sheets.schema import get_sheet_metadata 11from tap_google_sheets.schema import get_sheet_metadata
11 12
@@ -17,14 +18,26 @@ def write_schema(catalog, stream_name):
17 schema = stream.schema.to_dict() 18 schema = stream.schema.to_dict()
18 try: 19 try:
19 singer.write_schema(stream_name, schema, stream.key_properties) 20 singer.write_schema(stream_name, schema, stream.key_properties)
21 LOGGER.info('Writing schema for: {}'.format(stream_name))
20 except OSError as err: 22 except OSError as err:
21 LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) 23 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
22 raise err 24 raise err
23 25
24 26
25def write_record(stream_name, record, time_extracted): 27def write_record(stream_name, record, time_extracted, version=None):
26 try: 28 try:
27 singer.messages.write_record(stream_name, record, time_extracted=time_extracted) 29 if version:
30 singer.messages.write_message(
31 RecordMessage(
32 stream=stream_name,
33 record=record,
34 version=version,
35 time_extracted=time_extracted))
36 else:
37 singer.messages.write_record(
38 stream_name=stream_name,
39 record=record,
40 time_extracted=time_extracted)
28 except OSError as err: 41 except OSError as err:
29 LOGGER.info('OS Error writing record for: {}'.format(stream_name)) 42 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
30 LOGGER.info('record: {}'.format(record)) 43 LOGGER.info('record: {}'.format(record))
@@ -53,7 +66,8 @@ def write_bookmark(state, stream, value):
53def process_records(catalog, 66def process_records(catalog,
54 stream_name, 67 stream_name,
55 records, 68 records,
56 time_extracted): 69 time_extracted,
70 version=None):
57 stream = catalog.get_stream(stream_name) 71 stream = catalog.get_stream(stream_name)
58 schema = stream.schema.to_dict() 72 schema = stream.schema.to_dict()
59 stream_metadata = metadata.to_map(stream.metadata) 73 stream_metadata = metadata.to_map(stream.metadata)
@@ -65,7 +79,11 @@ def process_records(catalog,
65 record, 79 record,
66 schema, 80 schema,
67 stream_metadata) 81 stream_metadata)
68 write_record(stream_name, transformed_record, time_extracted=time_extracted) 82 write_record(
83 stream_name=stream_name,
84 record=transformed_record,
85 time_extracted=time_extracted,
86 version=version)
69 counter.increment() 87 counter.increment()
70 return counter.value 88 return counter.value
71 89
@@ -206,7 +224,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None):
206 224
207# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times 225# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
208# Convert from array of values to JSON with column names as keys 226# Convert from array of values to JSON with column names as keys
209def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): 227def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
210 sheet_data_tf = [] 228 sheet_data_tf = []
211 row_num = from_row 229 row_num = from_row
212 # Create sorted list of columns based on columnIndex 230 # Create sorted list of columns based on columnIndex
@@ -229,21 +247,32 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
229 col = cols[col_num - 1] 247 col = cols[col_num - 1]
230 col_skipped = col.get('columnSkipped') 248 col_skipped = col.get('columnSkipped')
231 if not col_skipped: 249 if not col_skipped:
250 # Get column metadata
232 col_name = col.get('columnName') 251 col_name = col.get('columnName')
233 col_type = col.get('columnType') 252 col_type = col.get('columnType')
253 col_letter = col.get('columnLetter')
254
255 # NULL values
256 if value is None or value == '':
257 col_val = None
258
234 # Convert dates/times from Lotus Notes Serial Numbers 259 # Convert dates/times from Lotus Notes Serial Numbers
235 # DATE-TIME 260 # DATE-TIME
236 if col_type == 'numberType.DATE_TIME': 261 elif col_type == 'numberType.DATE_TIME':
237 if isinstance(value, (int, float)): 262 if isinstance(value, (int, float)):
238 col_val = excel_to_dttm_str(value) 263 col_val = excel_to_dttm_str(value)
239 else: 264 else:
240 col_val = str(value) 265 col_val = str(value)
266 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
267 sheet_title, col_name, col_letter, row_num, col_type, value))
241 # DATE 268 # DATE
242 elif col_type == 'numberType.DATE': 269 elif col_type == 'numberType.DATE':
243 if isinstance(value, (int, float)): 270 if isinstance(value, (int, float)):
244 col_val = excel_to_dttm_str(value)[:10] 271 col_val = excel_to_dttm_str(value)[:10]
245 else: 272 else:
246 col_val = str(value) 273 col_val = str(value)
274 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
275 sheet_title, col_name, col_letter, row_num, col_type, value))
247 # TIME ONLY (NO DATE) 276 # TIME ONLY (NO DATE)
248 elif col_type == 'numberType.TIME': 277 elif col_type == 'numberType.TIME':
249 if isinstance(value, (int, float)): 278 if isinstance(value, (int, float)):
@@ -253,6 +282,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
253 col_val = str(timedelta(seconds=total_secs)) 282 col_val = str(timedelta(seconds=total_secs))
254 except ValueError: 283 except ValueError:
255 col_val = str(value) 284 col_val = str(value)
285 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
286 sheet_title, col_name, col_letter, row_num, col_type, value))
256 else: 287 else:
257 col_val = str(value) 288 col_val = str(value)
258 # NUMBER (INTEGER AND FLOAT) 289 # NUMBER (INTEGER AND FLOAT)
@@ -268,13 +299,19 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
268 col_val = float(round(value, 15)) 299 col_val = float(round(value, 15))
269 except ValueError: 300 except ValueError:
270 col_val = str(value) 301 col_val = str(value)
302 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
303 sheet_title, col_name, col_letter, row_num, col_type, value))
271 else: # decimal_digits <= 15, no rounding 304 else: # decimal_digits <= 15, no rounding
272 try: 305 try:
273 col_val = float(value) 306 col_val = float(value)
274 except ValueError: 307 except ValueError:
275 col_val = str(value) 308 col_val = str(value)
309 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
310 sheet_title, col_name, col_letter, row_num, col_type, value))
276 else: 311 else:
277 col_val = str(value) 312 col_val = str(value)
313 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
314 sheet_title, col_name, col_letter, row_num, col_type, value))
278 # STRING 315 # STRING
279 elif col_type == 'stringValue': 316 elif col_type == 'stringValue':
280 col_val = str(value) 317 col_val = str(value)
@@ -289,6 +326,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
289 col_val = False 326 col_val = False
290 else: 327 else:
291 col_val = str(value) 328 col_val = str(value)
329 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
330 sheet_title, col_name, col_letter, row, col_type, value))
292 elif isinstance(value, int): 331 elif isinstance(value, int):
293 if value in (1, -1): 332 if value in (1, -1):
294 col_val = True 333 col_val = True
@@ -296,9 +335,13 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
296 col_val = False 335 col_val = False
297 else: 336 else:
298 col_val = str(value) 337 col_val = str(value)
338 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
339 sheet_title, col_name, col_letter, row, col_type, value))
299 # OTHER: Convert everything else to a string 340 # OTHER: Convert everything else to a string
300 else: 341 else:
301 col_val = str(value) 342 col_val = str(value)
343 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
344 sheet_title, col_name, col_letter, row, col_type, value))
302 sheet_data_row_tf[col_name] = col_val 345 sheet_data_row_tf[col_name] = col_val
303 col_num = col_num + 1 346 col_num = col_num + 1
304 # APPEND non-empty row 347 # APPEND non-empty row
@@ -400,6 +443,20 @@ def sync(client, config, catalog, state):
400 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) 443 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
401 write_schema(catalog, sheet_title) 444 write_schema(catalog, sheet_title)
402 445
446 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
447 # everytime after each sheet sync is complete.
448 # This forces hard deletes on the data downstream if fewer records are sent.
449 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
450 last_integer = int(get_bookmark(state, sheet_title, 0))
451 activate_version = int(time.time() * 1000)
452 activate_version_message = singer.ActivateVersionMessage(
453 stream=sheet_title,
454 version=activate_version)
455 if last_integer == 0:
456 # initial load, send activate_version before AND after data sync
457 singer.write_message(activate_version_message)
458 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
459
403 # Determine max range of columns and rows for "paging" through the data 460 # Determine max range of columns and rows for "paging" through the data
404 sheet_last_col_index = 1 461 sheet_last_col_index = 1
405 sheet_last_col_letter = 'A' 462 sheet_last_col_letter = 'A'
@@ -438,6 +495,7 @@ def sync(client, config, catalog, state):
438 sheet_data_tf, row_num = transform_sheet_data( 495 sheet_data_tf, row_num = transform_sheet_data(
439 spreadsheet_id=spreadsheet_id, 496 spreadsheet_id=spreadsheet_id,
440 sheet_id=sheet_id, 497 sheet_id=sheet_id,
498 sheet_title=sheet_title,
441 from_row=from_row, 499 from_row=from_row,
442 columns=columns, 500 columns=columns,
443 sheet_data_rows=sheet_data_rows) 501 sheet_data_rows=sheet_data_rows)
@@ -449,10 +507,11 @@ def sync(client, config, catalog, state):
449 catalog=catalog, 507 catalog=catalog,
450 stream_name=sheet_title, 508 stream_name=sheet_title,
451 records=sheet_data_tf, 509 records=sheet_data_tf,
452 time_extracted=ss_time_extracted) 510 time_extracted=ss_time_extracted,
511 version=activate_version)
453 LOGGER.info('Sheet: {}, records processed: {}'.format( 512 LOGGER.info('Sheet: {}, records processed: {}'.format(
454 sheet_title, record_count)) 513 sheet_title, record_count))
455 514
456 # Update paging from/to_row for next batch 515 # Update paging from/to_row for next batch
457 from_row = to_row + 1 516 from_row = to_row + 1
458 if to_row + batch_rows > sheet_max_row: 517 if to_row + batch_rows > sheet_max_row:
@@ -460,6 +519,14 @@ def sync(client, config, catalog, state):
460 else: 519 else:
461 to_row = to_row + batch_rows 520 to_row = to_row + batch_rows
462 521
522 # End of Stream: Send Activate Version and update State
523 singer.write_message(activate_version_message)
524 write_bookmark(state, sheet_title, activate_version)
525 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
526 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
527 sheet_title, row_num - 2)) # subtract 1 for header row
528 update_currently_syncing(state, None)
529
463 # SHEETS_LOADED 530 # SHEETS_LOADED
464 # Add sheet to sheets_loaded 531 # Add sheet to sheets_loaded
465 sheet_loaded = {} 532 sheet_loaded = {}
@@ -470,17 +537,6 @@ def sync(client, config, catalog, state):
470 sheet_loaded['lastRowNumber'] = row_num 537 sheet_loaded['lastRowNumber'] = row_num
471 sheets_loaded.append(sheet_loaded) 538 sheets_loaded.append(sheet_loaded)
472 539
473 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
474 # This forces hard deletes on the data downstream if fewer records are sent.
475 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
476 activate_version_message = singer.ActivateVersionMessage(
477 stream=sheet_title,
478 version=int(time.time() * 1000))
479 singer.write_message(activate_version_message)
480
481 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
482 sheet_title, row_num - 2)) # subtract 1 for header row
483
484 stream_name = 'sheet_metadata' 540 stream_name = 'sheet_metadata'
485 # Sync sheet_metadata if selected 541 # Sync sheet_metadata if selected
486 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) 542 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)