diff options
Diffstat (limited to 'tap_google_sheets')
-rw-r--r-- | tap_google_sheets/schema.py | 51 | ||||
-rw-r--r-- | tap_google_sheets/sync.py | 94 |
2 files changed, 116 insertions, 29 deletions
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index 243467b..e319c03 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py | |||
@@ -21,6 +21,7 @@ def colnum_string(num): | |||
21 | 21 | ||
22 | # Create sheet_metadata_json with columns from sheet | 22 | # Create sheet_metadata_json with columns from sheet |
23 | def get_sheet_schema_columns(sheet): | 23 | def get_sheet_schema_columns(sheet): |
24 | sheet_title = sheet.get('properties', {}).get('title') | ||
24 | sheet_json_schema = OrderedDict() | 25 | sheet_json_schema = OrderedDict() |
25 | data = next(iter(sheet.get('data', [])), {}) | 26 | data = next(iter(sheet.get('data', [])), {}) |
26 | row_data = data.get('rowData', []) | 27 | row_data = data.get('rowData', []) |
@@ -62,15 +63,34 @@ def get_sheet_schema_columns(sheet): | |||
62 | skipped = 0 | 63 | skipped = 0 |
63 | column_name = '{}'.format(header_value) | 64 | column_name = '{}'.format(header_value) |
64 | if column_name in header_list: | 65 | if column_name in header_list: |
65 | raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) | 66 | raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( |
67 | sheet_title, column_name, column_letter)) | ||
66 | header_list.append(column_name) | 68 | header_list.append(column_name) |
67 | 69 | ||
68 | first_value = first_values[i] | 70 | first_value = None |
69 | 71 | try: | |
72 | first_value = first_values[i] | ||
73 | except IndexError as err: | ||
74 | raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format( | ||
75 | sheet_title, column_name, column_letter, err)) | ||
76 | |||
70 | column_effective_value = first_value.get('effectiveValue', {}) | 77 | column_effective_value = first_value.get('effectiveValue', {}) |
71 | for key in column_effective_value.keys(): | 78 | |
72 | if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): | 79 | col_val = None |
73 | column_effective_value_type = key | 80 | if column_effective_value == {}: |
81 | column_effective_value_type = 'stringValue' | ||
82 | LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( | ||
83 | sheet_title, column_name, column_letter)) | ||
84 | LOGGER.info(' Setting column datatype to STRING') | ||
85 | else: | ||
86 | for key, val in column_effective_value.items(): | ||
87 | if key in ('numberValue', 'stringValue', 'boolValue'): | ||
88 | column_effective_value_type = key | ||
89 | col_val = str(val) | ||
90 | elif key in ('errorType', 'formulaType'): | ||
91 | col_val = str(val) | ||
92 | raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( | ||
93 | sheet_title, column_name, column_letter, key, col_val)) | ||
74 | 94 | ||
75 | column_number_format = first_values[i].get('effectiveFormat', {}).get( | 95 | column_number_format = first_values[i].get('effectiveFormat', {}).get( |
76 | 'numberFormat', {}) | 96 | 'numberFormat', {}) |
@@ -87,7 +107,13 @@ def get_sheet_schema_columns(sheet): | |||
87 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType | 107 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType |
88 | # | 108 | # |
89 | column_format = None # Default | 109 | column_format = None # Default |
90 | if column_effective_value_type == 'stringValue': | 110 | if column_effective_value == {}: |
111 | col_properties = {'type': ['null', 'string']} | ||
112 | column_gs_type = 'stringValue' | ||
113 | LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( | ||
114 | sheet_title, column_name, column_letter)) | ||
115 | LOGGER.info(' Setting column datatype to STRING') | ||
116 | elif column_effective_value_type == 'stringValue': | ||
91 | col_properties = {'type': ['null', 'string']} | 117 | col_properties = {'type': ['null', 'string']} |
92 | column_gs_type = 'stringValue' | 118 | column_gs_type = 'stringValue' |
93 | elif column_effective_value_type == 'boolValue': | 119 | elif column_effective_value_type == 'boolValue': |
@@ -138,8 +164,8 @@ def get_sheet_schema_columns(sheet): | |||
138 | else: | 164 | else: |
139 | col_properties = {'type': ['null', 'string']} | 165 | col_properties = {'type': ['null', 'string']} |
140 | column_gs_type = 'unsupportedValue' | 166 | column_gs_type = 'unsupportedValue' |
141 | LOGGER.info('Unsupported data type: {}, value: {}'.format(column_name, \ | 167 | LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( |
142 | column_effective_value_type)) | 168 | sheet_title, column_name, column_letter, column_effective_value_type, col_val)) |
143 | LOGGER.info('Converting to string.') | 169 | LOGGER.info('Converting to string.') |
144 | else: # skipped | 170 | else: # skipped |
145 | column_is_skipped = True | 171 | column_is_skipped = True |
@@ -148,11 +174,16 @@ def get_sheet_schema_columns(sheet): | |||
148 | column_name = '__sdc_skip_col_{}'.format(column_index_str) | 174 | column_name = '__sdc_skip_col_{}'.format(column_index_str) |
149 | col_properties = {'type': ['null', 'string']} | 175 | col_properties = {'type': ['null', 'string']} |
150 | column_gs_type = 'stringValue' | 176 | column_gs_type = 'stringValue' |
177 | LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( | ||
178 | sheet_title, column_name, column_letter)) | ||
179 | LOGGER.info(' This column will be skipped during data loading.') | ||
151 | 180 | ||
152 | if skipped >= 2: | 181 | if skipped >= 2: |
153 | # skipped = 2 consecutive skipped headers | 182 | # skipped = 2 consecutive skipped headers |
154 | # Remove prior_header column_name | 183 | # Remove prior_header column_name |
155 | sheet_json_schema['properties'].pop(prior_header, None) | 184 | sheet_json_schema['properties'].pop(prior_header, None) |
185 | LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( | ||
186 | sheet_title, column_name, column_letter)) | ||
156 | break | 187 | break |
157 | 188 | ||
158 | else: | 189 | else: |
@@ -245,7 +276,7 @@ def get_schemas(client, spreadsheet_id): | |||
245 | for sheet in sheets: | 276 | for sheet in sheets: |
246 | # GET sheet_json_schema for each worksheet (from function above) | 277 | # GET sheet_json_schema for each worksheet (from function above) |
247 | sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 278 | sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
248 | LOGGER.info('columns = {}'.format(columns)) | 279 | # LOGGER.info('columns = {}'.format(columns)) |
249 | 280 | ||
250 | sheet_title = sheet.get('properties', {}).get('title') | 281 | sheet_title = sheet.get('properties', {}).get('title') |
251 | schemas[sheet_title] = sheet_json_schema | 282 | schemas[sheet_title] = sheet_json_schema |
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 76b2e59..311281c 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -6,6 +6,7 @@ import pytz | |||
6 | import singer | 6 | import singer |
7 | from singer import metrics, metadata, Transformer, utils | 7 | from singer import metrics, metadata, Transformer, utils |
8 | from singer.utils import strptime_to_utc, strftime | 8 | from singer.utils import strptime_to_utc, strftime |
9 | from singer.messages import RecordMessage | ||
9 | from tap_google_sheets.streams import STREAMS | 10 | from tap_google_sheets.streams import STREAMS |
10 | from tap_google_sheets.schema import get_sheet_metadata | 11 | from tap_google_sheets.schema import get_sheet_metadata |
11 | 12 | ||
@@ -17,14 +18,26 @@ def write_schema(catalog, stream_name): | |||
17 | schema = stream.schema.to_dict() | 18 | schema = stream.schema.to_dict() |
18 | try: | 19 | try: |
19 | singer.write_schema(stream_name, schema, stream.key_properties) | 20 | singer.write_schema(stream_name, schema, stream.key_properties) |
21 | LOGGER.info('Writing schema for: {}'.format(stream_name)) | ||
20 | except OSError as err: | 22 | except OSError as err: |
21 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) | 23 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) |
22 | raise err | 24 | raise err |
23 | 25 | ||
24 | 26 | ||
25 | def write_record(stream_name, record, time_extracted): | 27 | def write_record(stream_name, record, time_extracted, version=None): |
26 | try: | 28 | try: |
27 | singer.messages.write_record(stream_name, record, time_extracted=time_extracted) | 29 | if version: |
30 | singer.messages.write_message( | ||
31 | RecordMessage( | ||
32 | stream=stream_name, | ||
33 | record=record, | ||
34 | version=version, | ||
35 | time_extracted=time_extracted)) | ||
36 | else: | ||
37 | singer.messages.write_record( | ||
38 | stream_name=stream_name, | ||
39 | record=record, | ||
40 | time_extracted=time_extracted) | ||
28 | except OSError as err: | 41 | except OSError as err: |
29 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) | 42 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) |
30 | LOGGER.info('record: {}'.format(record)) | 43 | LOGGER.info('record: {}'.format(record)) |
@@ -53,7 +66,8 @@ def write_bookmark(state, stream, value): | |||
53 | def process_records(catalog, | 66 | def process_records(catalog, |
54 | stream_name, | 67 | stream_name, |
55 | records, | 68 | records, |
56 | time_extracted): | 69 | time_extracted, |
70 | version=None): | ||
57 | stream = catalog.get_stream(stream_name) | 71 | stream = catalog.get_stream(stream_name) |
58 | schema = stream.schema.to_dict() | 72 | schema = stream.schema.to_dict() |
59 | stream_metadata = metadata.to_map(stream.metadata) | 73 | stream_metadata = metadata.to_map(stream.metadata) |
@@ -65,7 +79,11 @@ def process_records(catalog, | |||
65 | record, | 79 | record, |
66 | schema, | 80 | schema, |
67 | stream_metadata) | 81 | stream_metadata) |
68 | write_record(stream_name, transformed_record, time_extracted=time_extracted) | 82 | write_record( |
83 | stream_name=stream_name, | ||
84 | record=transformed_record, | ||
85 | time_extracted=time_extracted, | ||
86 | version=version) | ||
69 | counter.increment() | 87 | counter.increment() |
70 | return counter.value | 88 | return counter.value |
71 | 89 | ||
@@ -206,7 +224,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None): | |||
206 | 224 | ||
207 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times | 225 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times |
208 | # Convert from array of values to JSON with column names as keys | 226 | # Convert from array of values to JSON with column names as keys |
209 | def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): | 227 | def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows): |
210 | sheet_data_tf = [] | 228 | sheet_data_tf = [] |
211 | row_num = from_row | 229 | row_num = from_row |
212 | # Create sorted list of columns based on columnIndex | 230 | # Create sorted list of columns based on columnIndex |
@@ -229,21 +247,32 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
229 | col = cols[col_num - 1] | 247 | col = cols[col_num - 1] |
230 | col_skipped = col.get('columnSkipped') | 248 | col_skipped = col.get('columnSkipped') |
231 | if not col_skipped: | 249 | if not col_skipped: |
250 | # Get column metadata | ||
232 | col_name = col.get('columnName') | 251 | col_name = col.get('columnName') |
233 | col_type = col.get('columnType') | 252 | col_type = col.get('columnType') |
253 | col_letter = col.get('columnLetter') | ||
254 | |||
255 | # NULL values | ||
256 | if value is None or value == '': | ||
257 | col_val = None | ||
258 | |||
234 | # Convert dates/times from Lotus Notes Serial Numbers | 259 | # Convert dates/times from Lotus Notes Serial Numbers |
235 | # DATE-TIME | 260 | # DATE-TIME |
236 | if col_type == 'numberType.DATE_TIME': | 261 | elif col_type == 'numberType.DATE_TIME': |
237 | if isinstance(value, (int, float)): | 262 | if isinstance(value, (int, float)): |
238 | col_val = excel_to_dttm_str(value) | 263 | col_val = excel_to_dttm_str(value) |
239 | else: | 264 | else: |
240 | col_val = str(value) | 265 | col_val = str(value) |
266 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
267 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
241 | # DATE | 268 | # DATE |
242 | elif col_type == 'numberType.DATE': | 269 | elif col_type == 'numberType.DATE': |
243 | if isinstance(value, (int, float)): | 270 | if isinstance(value, (int, float)): |
244 | col_val = excel_to_dttm_str(value)[:10] | 271 | col_val = excel_to_dttm_str(value)[:10] |
245 | else: | 272 | else: |
246 | col_val = str(value) | 273 | col_val = str(value) |
274 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
275 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
247 | # TIME ONLY (NO DATE) | 276 | # TIME ONLY (NO DATE) |
248 | elif col_type == 'numberType.TIME': | 277 | elif col_type == 'numberType.TIME': |
249 | if isinstance(value, (int, float)): | 278 | if isinstance(value, (int, float)): |
@@ -253,6 +282,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
253 | col_val = str(timedelta(seconds=total_secs)) | 282 | col_val = str(timedelta(seconds=total_secs)) |
254 | except ValueError: | 283 | except ValueError: |
255 | col_val = str(value) | 284 | col_val = str(value) |
285 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
286 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
256 | else: | 287 | else: |
257 | col_val = str(value) | 288 | col_val = str(value) |
258 | # NUMBER (INTEGER AND FLOAT) | 289 | # NUMBER (INTEGER AND FLOAT) |
@@ -268,13 +299,19 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
268 | col_val = float(round(value, 15)) | 299 | col_val = float(round(value, 15)) |
269 | except ValueError: | 300 | except ValueError: |
270 | col_val = str(value) | 301 | col_val = str(value) |
302 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
303 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
271 | else: # decimal_digits <= 15, no rounding | 304 | else: # decimal_digits <= 15, no rounding |
272 | try: | 305 | try: |
273 | col_val = float(value) | 306 | col_val = float(value) |
274 | except ValueError: | 307 | except ValueError: |
275 | col_val = str(value) | 308 | col_val = str(value) |
309 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
310 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
276 | else: | 311 | else: |
277 | col_val = str(value) | 312 | col_val = str(value) |
313 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
314 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
278 | # STRING | 315 | # STRING |
279 | elif col_type == 'stringValue': | 316 | elif col_type == 'stringValue': |
280 | col_val = str(value) | 317 | col_val = str(value) |
@@ -289,6 +326,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
289 | col_val = False | 326 | col_val = False |
290 | else: | 327 | else: |
291 | col_val = str(value) | 328 | col_val = str(value) |
329 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
330 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
292 | elif isinstance(value, int): | 331 | elif isinstance(value, int): |
293 | if value in (1, -1): | 332 | if value in (1, -1): |
294 | col_val = True | 333 | col_val = True |
@@ -296,9 +335,13 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
296 | col_val = False | 335 | col_val = False |
297 | else: | 336 | else: |
298 | col_val = str(value) | 337 | col_val = str(value) |
338 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
339 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
299 | # OTHER: Convert everything else to a string | 340 | # OTHER: Convert everything else to a string |
300 | else: | 341 | else: |
301 | col_val = str(value) | 342 | col_val = str(value) |
343 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
344 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
302 | sheet_data_row_tf[col_name] = col_val | 345 | sheet_data_row_tf[col_name] = col_val |
303 | col_num = col_num + 1 | 346 | col_num = col_num + 1 |
304 | # APPEND non-empty row | 347 | # APPEND non-empty row |
@@ -400,6 +443,20 @@ def sync(client, config, catalog, state): | |||
400 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | 443 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) |
401 | write_schema(catalog, sheet_title) | 444 | write_schema(catalog, sheet_title) |
402 | 445 | ||
446 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) | ||
447 | # everytime after each sheet sync is complete. | ||
448 | # This forces hard deletes on the data downstream if fewer records are sent. | ||
449 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | ||
450 | last_integer = int(get_bookmark(state, sheet_title, 0)) | ||
451 | activate_version = int(time.time() * 1000) | ||
452 | activate_version_message = singer.ActivateVersionMessage( | ||
453 | stream=sheet_title, | ||
454 | version=activate_version) | ||
455 | if last_integer == 0: | ||
456 | # initial load, send activate_version before AND after data sync | ||
457 | singer.write_message(activate_version_message) | ||
458 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
459 | |||
403 | # Determine max range of columns and rows for "paging" through the data | 460 | # Determine max range of columns and rows for "paging" through the data |
404 | sheet_last_col_index = 1 | 461 | sheet_last_col_index = 1 |
405 | sheet_last_col_letter = 'A' | 462 | sheet_last_col_letter = 'A' |
@@ -438,6 +495,7 @@ def sync(client, config, catalog, state): | |||
438 | sheet_data_tf, row_num = transform_sheet_data( | 495 | sheet_data_tf, row_num = transform_sheet_data( |
439 | spreadsheet_id=spreadsheet_id, | 496 | spreadsheet_id=spreadsheet_id, |
440 | sheet_id=sheet_id, | 497 | sheet_id=sheet_id, |
498 | sheet_title=sheet_title, | ||
441 | from_row=from_row, | 499 | from_row=from_row, |
442 | columns=columns, | 500 | columns=columns, |
443 | sheet_data_rows=sheet_data_rows) | 501 | sheet_data_rows=sheet_data_rows) |
@@ -449,10 +507,11 @@ def sync(client, config, catalog, state): | |||
449 | catalog=catalog, | 507 | catalog=catalog, |
450 | stream_name=sheet_title, | 508 | stream_name=sheet_title, |
451 | records=sheet_data_tf, | 509 | records=sheet_data_tf, |
452 | time_extracted=ss_time_extracted) | 510 | time_extracted=ss_time_extracted, |
511 | version=activate_version) | ||
453 | LOGGER.info('Sheet: {}, records processed: {}'.format( | 512 | LOGGER.info('Sheet: {}, records processed: {}'.format( |
454 | sheet_title, record_count)) | 513 | sheet_title, record_count)) |
455 | 514 | ||
456 | # Update paging from/to_row for next batch | 515 | # Update paging from/to_row for next batch |
457 | from_row = to_row + 1 | 516 | from_row = to_row + 1 |
458 | if to_row + batch_rows > sheet_max_row: | 517 | if to_row + batch_rows > sheet_max_row: |
@@ -460,6 +519,14 @@ def sync(client, config, catalog, state): | |||
460 | else: | 519 | else: |
461 | to_row = to_row + batch_rows | 520 | to_row = to_row + batch_rows |
462 | 521 | ||
522 | # End of Stream: Send Activate Version and update State | ||
523 | singer.write_message(activate_version_message) | ||
524 | write_bookmark(state, sheet_title, activate_version) | ||
525 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
526 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
527 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
528 | update_currently_syncing(state, None) | ||
529 | |||
463 | # SHEETS_LOADED | 530 | # SHEETS_LOADED |
464 | # Add sheet to sheets_loaded | 531 | # Add sheet to sheets_loaded |
465 | sheet_loaded = {} | 532 | sheet_loaded = {} |
@@ -470,17 +537,6 @@ def sync(client, config, catalog, state): | |||
470 | sheet_loaded['lastRowNumber'] = row_num | 537 | sheet_loaded['lastRowNumber'] = row_num |
471 | sheets_loaded.append(sheet_loaded) | 538 | sheets_loaded.append(sheet_loaded) |
472 | 539 | ||
473 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. | ||
474 | # This forces hard deletes on the data downstream if fewer records are sent. | ||
475 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | ||
476 | activate_version_message = singer.ActivateVersionMessage( | ||
477 | stream=sheet_title, | ||
478 | version=int(time.time() * 1000)) | ||
479 | singer.write_message(activate_version_message) | ||
480 | |||
481 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
482 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
483 | |||
484 | stream_name = 'sheet_metadata' | 540 | stream_name = 'sheet_metadata' |
485 | # Sync sheet_metadata if selected | 541 | # Sync sheet_metadata if selected |
486 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | 542 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) |