diff options
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r-- | tap_google_sheets/sync.py | 166 |
1 files changed, 95 insertions, 71 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index d7d7184..76b2e59 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -125,11 +125,14 @@ def get_data(stream_name, | |||
125 | range_rows=None): | 125 | range_rows=None): |
126 | if not range_rows: | 126 | if not range_rows: |
127 | range_rows = '' | 127 | range_rows = '' |
128 | # Replace {placeholder} variables in path | ||
128 | path = endpoint_config.get('path', stream_name).replace( | 129 | path = endpoint_config.get('path', stream_name).replace( |
129 | '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace( | 130 | '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace( |
130 | '{range_rows}', range_rows) | 131 | '{range_rows}', range_rows) |
131 | params = endpoint_config.get('params', {}) | 132 | params = endpoint_config.get('params', {}) |
132 | api = endpoint_config.get('api', 'sheets') | 133 | api = endpoint_config.get('api', 'sheets') |
134 | # Add in querystring parameters and replace {placeholder} variables | ||
135 | # querystring function ensures parameters are added but not encoded causing API errors | ||
133 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( | 136 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( |
134 | '{sheet_title}', stream_name) | 137 | '{sheet_title}', stream_name) |
135 | data = {} | 138 | data = {} |
@@ -192,7 +195,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None): | |||
192 | timezone_str = 'UTC' | 195 | timezone_str = 'UTC' |
193 | tzn = pytz.timezone(timezone_str) | 196 | tzn = pytz.timezone(timezone_str) |
194 | sec_per_day = 86400 | 197 | sec_per_day = 86400 |
195 | excel_epoch = 25569 # 1970-01-01T00:00:00Z | 198 | excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date |
196 | epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) | 199 | epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) |
197 | epoch_dttm = datetime(1970, 1, 1) | 200 | epoch_dttm = datetime(1970, 1, 1) |
198 | excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) | 201 | excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) |
@@ -205,85 +208,103 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None): | |||
205 | # Convert from array of values to JSON with column names as keys | 208 | # Convert from array of values to JSON with column names as keys |
206 | def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): | 209 | def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): |
207 | sheet_data_tf = [] | 210 | sheet_data_tf = [] |
208 | is_last_row = False | ||
209 | row_num = from_row | 211 | row_num = from_row |
210 | # Create sorted list of columns based on columnIndex | 212 | # Create sorted list of columns based on columnIndex |
211 | cols = sorted(columns, key=lambda i: i['columnIndex']) | 213 | cols = sorted(columns, key=lambda i: i['columnIndex']) |
212 | 214 | ||
213 | # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) | 215 | # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) |
214 | for row in sheet_data_rows: | 216 | for row in sheet_data_rows: |
215 | # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1 | 217 | # If empty row, SKIP |
216 | if row == []: | 218 | if row == []: |
217 | is_last_row = True | 219 | LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num)) |
218 | return sheet_data_tf, row_num - 1, is_last_row | 220 | else: |
219 | sheet_data_row_tf = {} | 221 | sheet_data_row_tf = {} |
220 | # Add spreadsheet_id, sheet_id, and row | 222 | # Add spreadsheet_id, sheet_id, and row |
221 | sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id | 223 | sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id |
222 | sheet_data_row_tf['__sdc_sheet_id'] = sheet_id | 224 | sheet_data_row_tf['__sdc_sheet_id'] = sheet_id |
223 | sheet_data_row_tf['__sdc_row'] = row_num | 225 | sheet_data_row_tf['__sdc_row'] = row_num |
224 | col_num = 1 | 226 | col_num = 1 |
225 | for value in row: | 227 | for value in row: |
226 | # Select column metadata based on column index | 228 | # Select column metadata based on column index |
227 | col = cols[col_num - 1] | 229 | col = cols[col_num - 1] |
228 | col_skipped = col.get('columnSkipped') | 230 | col_skipped = col.get('columnSkipped') |
229 | if not col_skipped: | 231 | if not col_skipped: |
230 | col_name = col.get('columnName') | 232 | col_name = col.get('columnName') |
231 | col_type = col.get('columnType') | 233 | col_type = col.get('columnType') |
232 | # Convert dates/times from Lotus Notes Serial Numbers | 234 | # Convert dates/times from Lotus Notes Serial Numbers |
233 | if col_type == 'numberType.DATE_TIME': | 235 | # DATE-TIME |
234 | if isinstance(value, (int, float)): | 236 | if col_type == 'numberType.DATE_TIME': |
235 | col_val = excel_to_dttm_str(value) | 237 | if isinstance(value, (int, float)): |
236 | else: | 238 | col_val = excel_to_dttm_str(value) |
237 | col_val = str(value) | 239 | else: |
238 | elif col_type == 'numberType.DATE': | ||
239 | if isinstance(value, (int, float)): | ||
240 | col_val = excel_to_dttm_str(value)[:10] | ||
241 | else: | ||
242 | col_val = str(value) | ||
243 | elif col_type == 'numberType.TIME': | ||
244 | if isinstance(value, (int, float)): | ||
245 | try: | ||
246 | total_secs = value * 86400 # seconds in day | ||
247 | col_val = str(timedelta(seconds=total_secs)) | ||
248 | except ValueError: | ||
249 | col_val = str(value) | 240 | col_val = str(value) |
250 | else: | 241 | # DATE |
251 | col_val = str(value) | 242 | elif col_type == 'numberType.DATE': |
252 | elif col_type == 'numberType': | 243 | if isinstance(value, (int, float)): |
253 | if isinstance(value, int): | 244 | col_val = excel_to_dttm_str(value)[:10] |
254 | col_val = int(value) | 245 | else: |
255 | else: | ||
256 | try: | ||
257 | col_val = float(value) | ||
258 | except ValueError: | ||
259 | col_val = str(value) | 246 | col_val = str(value) |
260 | elif col_type == 'stringValue': | 247 | # TIME ONLY (NO DATE) |
261 | col_val = str(value) | 248 | elif col_type == 'numberType.TIME': |
262 | elif col_type == 'boolValue': | 249 | if isinstance(value, (int, float)): |
263 | if isinstance(value, bool): | 250 | try: |
264 | col_val = value | 251 | total_secs = value * 86400 # seconds in day |
265 | elif isinstance(value, str): | 252 | # Create string formatted like HH:MM:SS |
266 | if value.lower() in ('true', 't', 'yes', 'y'): | 253 | col_val = str(timedelta(seconds=total_secs)) |
267 | col_val = True | 254 | except ValueError: |
268 | elif value.lower() in ('false', 'f', 'no', 'n'): | 255 | col_val = str(value) |
269 | col_val = False | ||
270 | else: | 256 | else: |
271 | col_val = str(value) | 257 | col_val = str(value) |
272 | elif isinstance(value, int): | 258 | # NUMBER (INTEGER AND FLOAT) |
273 | if value in (1, -1): | 259 | elif col_type == 'numberType': |
274 | col_val = True | 260 | if isinstance(value, int): |
275 | elif value == 0: | 261 | col_val = int(value) |
276 | col_val = False | 262 | elif isinstance(value, float): |
263 | # Determine float decimal digits | ||
264 | decimal_digits = str(value)[::-1].find('.') | ||
265 | if decimal_digits > 15: | ||
266 | try: | ||
267 | # ROUND to multipleOf: 1e-15 | ||
268 | col_val = float(round(value, 15)) | ||
269 | except ValueError: | ||
270 | col_val = str(value) | ||
271 | else: # decimal_digits <= 15, no rounding | ||
272 | try: | ||
273 | col_val = float(value) | ||
274 | except ValueError: | ||
275 | col_val = str(value) | ||
277 | else: | 276 | else: |
278 | col_val = str(value) | 277 | col_val = str(value) |
279 | 278 | # STRING | |
280 | else: | 279 | elif col_type == 'stringValue': |
281 | col_val = value | 280 | col_val = str(value) |
282 | sheet_data_row_tf[col_name] = col_val | 281 | # BOOLEAN |
283 | col_num = col_num + 1 | 282 | elif col_type == 'boolValue': |
284 | sheet_data_tf.append(sheet_data_row_tf) | 283 | if isinstance(value, bool): |
284 | col_val = value | ||
285 | elif isinstance(value, str): | ||
286 | if value.lower() in ('true', 't', 'yes', 'y'): | ||
287 | col_val = True | ||
288 | elif value.lower() in ('false', 'f', 'no', 'n'): | ||
289 | col_val = False | ||
290 | else: | ||
291 | col_val = str(value) | ||
292 | elif isinstance(value, int): | ||
293 | if value in (1, -1): | ||
294 | col_val = True | ||
295 | elif value == 0: | ||
296 | col_val = False | ||
297 | else: | ||
298 | col_val = str(value) | ||
299 | # OTHER: Convert everything else to a string | ||
300 | else: | ||
301 | col_val = str(value) | ||
302 | sheet_data_row_tf[col_name] = col_val | ||
303 | col_num = col_num + 1 | ||
304 | # APPEND non-empty row | ||
305 | sheet_data_tf.append(sheet_data_row_tf) | ||
285 | row_num = row_num + 1 | 306 | row_num = row_num + 1 |
286 | return sheet_data_tf, row_num, is_last_row | 307 | return sheet_data_tf, row_num |
287 | 308 | ||
288 | 309 | ||
289 | def sync(client, config, catalog, state): | 310 | def sync(client, config, catalog, state): |
@@ -327,7 +348,7 @@ def sync(client, config, catalog, state): | |||
327 | return | 348 | return |
328 | # Sync file_metadata if selected | 349 | # Sync file_metadata if selected |
329 | sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted) | 350 | sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted) |
330 | write_bookmark(state, stream_name, strftime(this_datetime)) | 351 | # file_metadata bookmark is updated at the end of sync |
331 | 352 | ||
332 | # SPREADSHEET_METADATA | 353 | # SPREADSHEET_METADATA |
333 | spreadsheet_metadata = {} | 354 | spreadsheet_metadata = {} |
@@ -363,7 +384,7 @@ def sync(client, config, catalog, state): | |||
363 | 384 | ||
364 | # GET sheet_metadata and columns | 385 | # GET sheet_metadata and columns |
365 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 386 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
366 | LOGGER.info('sheet_schema: {}'.format(sheet_schema)) | 387 | # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) |
367 | 388 | ||
368 | # Transform sheet_metadata | 389 | # Transform sheet_metadata |
369 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) | 390 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) |
@@ -414,7 +435,7 @@ def sync(client, config, catalog, state): | |||
414 | sheet_data_rows = sheet_data.get('values') | 435 | sheet_data_rows = sheet_data.get('values') |
415 | 436 | ||
416 | # Transform batch of rows to JSON with keys for each column | 437 | # Transform batch of rows to JSON with keys for each column |
417 | sheet_data_tf, row_num, is_last_row = transform_sheet_data( | 438 | sheet_data_tf, row_num = transform_sheet_data( |
418 | spreadsheet_id=spreadsheet_id, | 439 | spreadsheet_id=spreadsheet_id, |
419 | sheet_id=sheet_id, | 440 | sheet_id=sheet_id, |
420 | from_row=from_row, | 441 | from_row=from_row, |
@@ -429,7 +450,7 @@ def sync(client, config, catalog, state): | |||
429 | stream_name=sheet_title, | 450 | stream_name=sheet_title, |
430 | records=sheet_data_tf, | 451 | records=sheet_data_tf, |
431 | time_extracted=ss_time_extracted) | 452 | time_extracted=ss_time_extracted) |
432 | LOGGER.info('Sheet: {}, ecords processed: {}'.format( | 453 | LOGGER.info('Sheet: {}, records processed: {}'.format( |
433 | sheet_title, record_count)) | 454 | sheet_title, record_count)) |
434 | 455 | ||
435 | # Update paging from/to_row for next batch | 456 | # Update paging from/to_row for next batch |
@@ -458,7 +479,7 @@ def sync(client, config, catalog, state): | |||
458 | singer.write_message(activate_version_message) | 479 | singer.write_message(activate_version_message) |
459 | 480 | ||
460 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | 481 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( |
461 | sheet_title, row_num - 1)) | 482 | sheet_title, row_num - 2)) # subtract 1 for header row |
462 | 483 | ||
463 | stream_name = 'sheet_metadata' | 484 | stream_name = 'sheet_metadata' |
464 | # Sync sheet_metadata if selected | 485 | # Sync sheet_metadata if selected |
@@ -468,4 +489,7 @@ def sync(client, config, catalog, state): | |||
468 | # Sync sheet_metadata if selected | 489 | # Sync sheet_metadata if selected |
469 | sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) | 490 | sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) |
470 | 491 | ||
492 | # Update file_metadata bookmark | ||
493 | write_bookmark(state, 'file_metadata', strftime(this_datetime)) | ||
494 | |||
471 | return | 495 | return |