diff options
author | Jeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com> | 2020-01-09 07:30:53 -0800 |
---|---|---|
committer | Kyle Allan <KAllan357@gmail.com> | 2020-01-09 10:30:53 -0500 |
commit | 43a24cbab1dbc35b893c35b86e34adc0f2fb84e7 (patch) | |
tree | bcbaae860aad0a94bcc4d27f4804504691401438 /tap_google_sheets | |
parent | 5890b89c1aa7c554235b3cef156b5a5a2c594bec (diff) | |
download | tap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.tar.gz tap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.tar.zst tap-google-sheets-43a24cbab1dbc35b893c35b86e34adc0f2fb84e7.zip |
v.0.0.3 Sync error handling, activate version, documentation (#2)v0.0.3
* v.0.0.2 schema and sync changes
Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py
* v.0.0.3 Sync activate version and error handling
Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages.
Diffstat (limited to 'tap_google_sheets')
-rw-r--r-- | tap_google_sheets/schema.py | 51 | ||||
-rw-r--r-- | tap_google_sheets/sync.py | 94 |
2 files changed, 116 insertions, 29 deletions
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index 243467b..e319c03 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py | |||
@@ -21,6 +21,7 @@ def colnum_string(num): | |||
21 | 21 | ||
22 | # Create sheet_metadata_json with columns from sheet | 22 | # Create sheet_metadata_json with columns from sheet |
23 | def get_sheet_schema_columns(sheet): | 23 | def get_sheet_schema_columns(sheet): |
24 | sheet_title = sheet.get('properties', {}).get('title') | ||
24 | sheet_json_schema = OrderedDict() | 25 | sheet_json_schema = OrderedDict() |
25 | data = next(iter(sheet.get('data', [])), {}) | 26 | data = next(iter(sheet.get('data', [])), {}) |
26 | row_data = data.get('rowData', []) | 27 | row_data = data.get('rowData', []) |
@@ -62,15 +63,34 @@ def get_sheet_schema_columns(sheet): | |||
62 | skipped = 0 | 63 | skipped = 0 |
63 | column_name = '{}'.format(header_value) | 64 | column_name = '{}'.format(header_value) |
64 | if column_name in header_list: | 65 | if column_name in header_list: |
65 | raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) | 66 | raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( |
67 | sheet_title, column_name, column_letter)) | ||
66 | header_list.append(column_name) | 68 | header_list.append(column_name) |
67 | 69 | ||
68 | first_value = first_values[i] | 70 | first_value = None |
69 | 71 | try: | |
72 | first_value = first_values[i] | ||
73 | except IndexError as err: | ||
74 | raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format( | ||
75 | sheet_title, column_name, column_letter, err)) | ||
76 | |||
70 | column_effective_value = first_value.get('effectiveValue', {}) | 77 | column_effective_value = first_value.get('effectiveValue', {}) |
71 | for key in column_effective_value.keys(): | 78 | |
72 | if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): | 79 | col_val = None |
73 | column_effective_value_type = key | 80 | if column_effective_value == {}: |
81 | column_effective_value_type = 'stringValue' | ||
82 | LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( | ||
83 | sheet_title, column_name, column_letter)) | ||
84 | LOGGER.info(' Setting column datatype to STRING') | ||
85 | else: | ||
86 | for key, val in column_effective_value.items(): | ||
87 | if key in ('numberValue', 'stringValue', 'boolValue'): | ||
88 | column_effective_value_type = key | ||
89 | col_val = str(val) | ||
90 | elif key in ('errorType', 'formulaType'): | ||
91 | col_val = str(val) | ||
92 | raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( | ||
93 | sheet_title, column_name, column_letter, key, col_val)) | ||
74 | 94 | ||
75 | column_number_format = first_values[i].get('effectiveFormat', {}).get( | 95 | column_number_format = first_values[i].get('effectiveFormat', {}).get( |
76 | 'numberFormat', {}) | 96 | 'numberFormat', {}) |
@@ -87,7 +107,13 @@ def get_sheet_schema_columns(sheet): | |||
87 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType | 107 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType |
88 | # | 108 | # |
89 | column_format = None # Default | 109 | column_format = None # Default |
90 | if column_effective_value_type == 'stringValue': | 110 | if column_effective_value == {}: |
111 | col_properties = {'type': ['null', 'string']} | ||
112 | column_gs_type = 'stringValue' | ||
113 | LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( | ||
114 | sheet_title, column_name, column_letter)) | ||
115 | LOGGER.info(' Setting column datatype to STRING') | ||
116 | elif column_effective_value_type == 'stringValue': | ||
91 | col_properties = {'type': ['null', 'string']} | 117 | col_properties = {'type': ['null', 'string']} |
92 | column_gs_type = 'stringValue' | 118 | column_gs_type = 'stringValue' |
93 | elif column_effective_value_type == 'boolValue': | 119 | elif column_effective_value_type == 'boolValue': |
@@ -138,8 +164,8 @@ def get_sheet_schema_columns(sheet): | |||
138 | else: | 164 | else: |
139 | col_properties = {'type': ['null', 'string']} | 165 | col_properties = {'type': ['null', 'string']} |
140 | column_gs_type = 'unsupportedValue' | 166 | column_gs_type = 'unsupportedValue' |
141 | LOGGER.info('Unsupported data type: {}, value: {}'.format(column_name, \ | 167 | LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( |
142 | column_effective_value_type)) | 168 | sheet_title, column_name, column_letter, column_effective_value_type, col_val)) |
143 | LOGGER.info('Converting to string.') | 169 | LOGGER.info('Converting to string.') |
144 | else: # skipped | 170 | else: # skipped |
145 | column_is_skipped = True | 171 | column_is_skipped = True |
@@ -148,11 +174,16 @@ def get_sheet_schema_columns(sheet): | |||
148 | column_name = '__sdc_skip_col_{}'.format(column_index_str) | 174 | column_name = '__sdc_skip_col_{}'.format(column_index_str) |
149 | col_properties = {'type': ['null', 'string']} | 175 | col_properties = {'type': ['null', 'string']} |
150 | column_gs_type = 'stringValue' | 176 | column_gs_type = 'stringValue' |
177 | LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( | ||
178 | sheet_title, column_name, column_letter)) | ||
179 | LOGGER.info(' This column will be skipped during data loading.') | ||
151 | 180 | ||
152 | if skipped >= 2: | 181 | if skipped >= 2: |
153 | # skipped = 2 consecutive skipped headers | 182 | # skipped = 2 consecutive skipped headers |
154 | # Remove prior_header column_name | 183 | # Remove prior_header column_name |
155 | sheet_json_schema['properties'].pop(prior_header, None) | 184 | sheet_json_schema['properties'].pop(prior_header, None) |
185 | LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( | ||
186 | sheet_title, column_name, column_letter)) | ||
156 | break | 187 | break |
157 | 188 | ||
158 | else: | 189 | else: |
@@ -245,7 +276,7 @@ def get_schemas(client, spreadsheet_id): | |||
245 | for sheet in sheets: | 276 | for sheet in sheets: |
246 | # GET sheet_json_schema for each worksheet (from function above) | 277 | # GET sheet_json_schema for each worksheet (from function above) |
247 | sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 278 | sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
248 | LOGGER.info('columns = {}'.format(columns)) | 279 | # LOGGER.info('columns = {}'.format(columns)) |
249 | 280 | ||
250 | sheet_title = sheet.get('properties', {}).get('title') | 281 | sheet_title = sheet.get('properties', {}).get('title') |
251 | schemas[sheet_title] = sheet_json_schema | 282 | schemas[sheet_title] = sheet_json_schema |
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 76b2e59..311281c 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -6,6 +6,7 @@ import pytz | |||
6 | import singer | 6 | import singer |
7 | from singer import metrics, metadata, Transformer, utils | 7 | from singer import metrics, metadata, Transformer, utils |
8 | from singer.utils import strptime_to_utc, strftime | 8 | from singer.utils import strptime_to_utc, strftime |
9 | from singer.messages import RecordMessage | ||
9 | from tap_google_sheets.streams import STREAMS | 10 | from tap_google_sheets.streams import STREAMS |
10 | from tap_google_sheets.schema import get_sheet_metadata | 11 | from tap_google_sheets.schema import get_sheet_metadata |
11 | 12 | ||
@@ -17,14 +18,26 @@ def write_schema(catalog, stream_name): | |||
17 | schema = stream.schema.to_dict() | 18 | schema = stream.schema.to_dict() |
18 | try: | 19 | try: |
19 | singer.write_schema(stream_name, schema, stream.key_properties) | 20 | singer.write_schema(stream_name, schema, stream.key_properties) |
21 | LOGGER.info('Writing schema for: {}'.format(stream_name)) | ||
20 | except OSError as err: | 22 | except OSError as err: |
21 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) | 23 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) |
22 | raise err | 24 | raise err |
23 | 25 | ||
24 | 26 | ||
25 | def write_record(stream_name, record, time_extracted): | 27 | def write_record(stream_name, record, time_extracted, version=None): |
26 | try: | 28 | try: |
27 | singer.messages.write_record(stream_name, record, time_extracted=time_extracted) | 29 | if version: |
30 | singer.messages.write_message( | ||
31 | RecordMessage( | ||
32 | stream=stream_name, | ||
33 | record=record, | ||
34 | version=version, | ||
35 | time_extracted=time_extracted)) | ||
36 | else: | ||
37 | singer.messages.write_record( | ||
38 | stream_name=stream_name, | ||
39 | record=record, | ||
40 | time_extracted=time_extracted) | ||
28 | except OSError as err: | 41 | except OSError as err: |
29 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) | 42 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) |
30 | LOGGER.info('record: {}'.format(record)) | 43 | LOGGER.info('record: {}'.format(record)) |
@@ -53,7 +66,8 @@ def write_bookmark(state, stream, value): | |||
53 | def process_records(catalog, | 66 | def process_records(catalog, |
54 | stream_name, | 67 | stream_name, |
55 | records, | 68 | records, |
56 | time_extracted): | 69 | time_extracted, |
70 | version=None): | ||
57 | stream = catalog.get_stream(stream_name) | 71 | stream = catalog.get_stream(stream_name) |
58 | schema = stream.schema.to_dict() | 72 | schema = stream.schema.to_dict() |
59 | stream_metadata = metadata.to_map(stream.metadata) | 73 | stream_metadata = metadata.to_map(stream.metadata) |
@@ -65,7 +79,11 @@ def process_records(catalog, | |||
65 | record, | 79 | record, |
66 | schema, | 80 | schema, |
67 | stream_metadata) | 81 | stream_metadata) |
68 | write_record(stream_name, transformed_record, time_extracted=time_extracted) | 82 | write_record( |
83 | stream_name=stream_name, | ||
84 | record=transformed_record, | ||
85 | time_extracted=time_extracted, | ||
86 | version=version) | ||
69 | counter.increment() | 87 | counter.increment() |
70 | return counter.value | 88 | return counter.value |
71 | 89 | ||
@@ -206,7 +224,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None): | |||
206 | 224 | ||
207 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times | 225 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times |
208 | # Convert from array of values to JSON with column names as keys | 226 | # Convert from array of values to JSON with column names as keys |
209 | def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): | 227 | def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows): |
210 | sheet_data_tf = [] | 228 | sheet_data_tf = [] |
211 | row_num = from_row | 229 | row_num = from_row |
212 | # Create sorted list of columns based on columnIndex | 230 | # Create sorted list of columns based on columnIndex |
@@ -229,21 +247,32 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
229 | col = cols[col_num - 1] | 247 | col = cols[col_num - 1] |
230 | col_skipped = col.get('columnSkipped') | 248 | col_skipped = col.get('columnSkipped') |
231 | if not col_skipped: | 249 | if not col_skipped: |
250 | # Get column metadata | ||
232 | col_name = col.get('columnName') | 251 | col_name = col.get('columnName') |
233 | col_type = col.get('columnType') | 252 | col_type = col.get('columnType') |
253 | col_letter = col.get('columnLetter') | ||
254 | |||
255 | # NULL values | ||
256 | if value is None or value == '': | ||
257 | col_val = None | ||
258 | |||
234 | # Convert dates/times from Lotus Notes Serial Numbers | 259 | # Convert dates/times from Lotus Notes Serial Numbers |
235 | # DATE-TIME | 260 | # DATE-TIME |
236 | if col_type == 'numberType.DATE_TIME': | 261 | elif col_type == 'numberType.DATE_TIME': |
237 | if isinstance(value, (int, float)): | 262 | if isinstance(value, (int, float)): |
238 | col_val = excel_to_dttm_str(value) | 263 | col_val = excel_to_dttm_str(value) |
239 | else: | 264 | else: |
240 | col_val = str(value) | 265 | col_val = str(value) |
266 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
267 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
241 | # DATE | 268 | # DATE |
242 | elif col_type == 'numberType.DATE': | 269 | elif col_type == 'numberType.DATE': |
243 | if isinstance(value, (int, float)): | 270 | if isinstance(value, (int, float)): |
244 | col_val = excel_to_dttm_str(value)[:10] | 271 | col_val = excel_to_dttm_str(value)[:10] |
245 | else: | 272 | else: |
246 | col_val = str(value) | 273 | col_val = str(value) |
274 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
275 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
247 | # TIME ONLY (NO DATE) | 276 | # TIME ONLY (NO DATE) |
248 | elif col_type == 'numberType.TIME': | 277 | elif col_type == 'numberType.TIME': |
249 | if isinstance(value, (int, float)): | 278 | if isinstance(value, (int, float)): |
@@ -253,6 +282,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
253 | col_val = str(timedelta(seconds=total_secs)) | 282 | col_val = str(timedelta(seconds=total_secs)) |
254 | except ValueError: | 283 | except ValueError: |
255 | col_val = str(value) | 284 | col_val = str(value) |
285 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
286 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
256 | else: | 287 | else: |
257 | col_val = str(value) | 288 | col_val = str(value) |
258 | # NUMBER (INTEGER AND FLOAT) | 289 | # NUMBER (INTEGER AND FLOAT) |
@@ -268,13 +299,19 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
268 | col_val = float(round(value, 15)) | 299 | col_val = float(round(value, 15)) |
269 | except ValueError: | 300 | except ValueError: |
270 | col_val = str(value) | 301 | col_val = str(value) |
302 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
303 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
271 | else: # decimal_digits <= 15, no rounding | 304 | else: # decimal_digits <= 15, no rounding |
272 | try: | 305 | try: |
273 | col_val = float(value) | 306 | col_val = float(value) |
274 | except ValueError: | 307 | except ValueError: |
275 | col_val = str(value) | 308 | col_val = str(value) |
309 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
310 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
276 | else: | 311 | else: |
277 | col_val = str(value) | 312 | col_val = str(value) |
313 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
314 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
278 | # STRING | 315 | # STRING |
279 | elif col_type == 'stringValue': | 316 | elif col_type == 'stringValue': |
280 | col_val = str(value) | 317 | col_val = str(value) |
@@ -289,6 +326,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
289 | col_val = False | 326 | col_val = False |
290 | else: | 327 | else: |
291 | col_val = str(value) | 328 | col_val = str(value) |
329 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
330 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
292 | elif isinstance(value, int): | 331 | elif isinstance(value, int): |
293 | if value in (1, -1): | 332 | if value in (1, -1): |
294 | col_val = True | 333 | col_val = True |
@@ -296,9 +335,13 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
296 | col_val = False | 335 | col_val = False |
297 | else: | 336 | else: |
298 | col_val = str(value) | 337 | col_val = str(value) |
338 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
339 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
299 | # OTHER: Convert everything else to a string | 340 | # OTHER: Convert everything else to a string |
300 | else: | 341 | else: |
301 | col_val = str(value) | 342 | col_val = str(value) |
343 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
344 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
302 | sheet_data_row_tf[col_name] = col_val | 345 | sheet_data_row_tf[col_name] = col_val |
303 | col_num = col_num + 1 | 346 | col_num = col_num + 1 |
304 | # APPEND non-empty row | 347 | # APPEND non-empty row |
@@ -400,6 +443,20 @@ def sync(client, config, catalog, state): | |||
400 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | 443 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) |
401 | write_schema(catalog, sheet_title) | 444 | write_schema(catalog, sheet_title) |
402 | 445 | ||
446 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) | ||
447 | # everytime after each sheet sync is complete. | ||
448 | # This forces hard deletes on the data downstream if fewer records are sent. | ||
449 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | ||
450 | last_integer = int(get_bookmark(state, sheet_title, 0)) | ||
451 | activate_version = int(time.time() * 1000) | ||
452 | activate_version_message = singer.ActivateVersionMessage( | ||
453 | stream=sheet_title, | ||
454 | version=activate_version) | ||
455 | if last_integer == 0: | ||
456 | # initial load, send activate_version before AND after data sync | ||
457 | singer.write_message(activate_version_message) | ||
458 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
459 | |||
403 | # Determine max range of columns and rows for "paging" through the data | 460 | # Determine max range of columns and rows for "paging" through the data |
404 | sheet_last_col_index = 1 | 461 | sheet_last_col_index = 1 |
405 | sheet_last_col_letter = 'A' | 462 | sheet_last_col_letter = 'A' |
@@ -438,6 +495,7 @@ def sync(client, config, catalog, state): | |||
438 | sheet_data_tf, row_num = transform_sheet_data( | 495 | sheet_data_tf, row_num = transform_sheet_data( |
439 | spreadsheet_id=spreadsheet_id, | 496 | spreadsheet_id=spreadsheet_id, |
440 | sheet_id=sheet_id, | 497 | sheet_id=sheet_id, |
498 | sheet_title=sheet_title, | ||
441 | from_row=from_row, | 499 | from_row=from_row, |
442 | columns=columns, | 500 | columns=columns, |
443 | sheet_data_rows=sheet_data_rows) | 501 | sheet_data_rows=sheet_data_rows) |
@@ -449,10 +507,11 @@ def sync(client, config, catalog, state): | |||
449 | catalog=catalog, | 507 | catalog=catalog, |
450 | stream_name=sheet_title, | 508 | stream_name=sheet_title, |
451 | records=sheet_data_tf, | 509 | records=sheet_data_tf, |
452 | time_extracted=ss_time_extracted) | 510 | time_extracted=ss_time_extracted, |
511 | version=activate_version) | ||
453 | LOGGER.info('Sheet: {}, records processed: {}'.format( | 512 | LOGGER.info('Sheet: {}, records processed: {}'.format( |
454 | sheet_title, record_count)) | 513 | sheet_title, record_count)) |
455 | 514 | ||
456 | # Update paging from/to_row for next batch | 515 | # Update paging from/to_row for next batch |
457 | from_row = to_row + 1 | 516 | from_row = to_row + 1 |
458 | if to_row + batch_rows > sheet_max_row: | 517 | if to_row + batch_rows > sheet_max_row: |
@@ -460,6 +519,14 @@ def sync(client, config, catalog, state): | |||
460 | else: | 519 | else: |
461 | to_row = to_row + batch_rows | 520 | to_row = to_row + batch_rows |
462 | 521 | ||
522 | # End of Stream: Send Activate Version and update State | ||
523 | singer.write_message(activate_version_message) | ||
524 | write_bookmark(state, sheet_title, activate_version) | ||
525 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
526 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
527 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
528 | update_currently_syncing(state, None) | ||
529 | |||
463 | # SHEETS_LOADED | 530 | # SHEETS_LOADED |
464 | # Add sheet to sheets_loaded | 531 | # Add sheet to sheets_loaded |
465 | sheet_loaded = {} | 532 | sheet_loaded = {} |
@@ -470,17 +537,6 @@ def sync(client, config, catalog, state): | |||
470 | sheet_loaded['lastRowNumber'] = row_num | 537 | sheet_loaded['lastRowNumber'] = row_num |
471 | sheets_loaded.append(sheet_loaded) | 538 | sheets_loaded.append(sheet_loaded) |
472 | 539 | ||
473 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. | ||
474 | # This forces hard deletes on the data downstream if fewer records are sent. | ||
475 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | ||
476 | activate_version_message = singer.ActivateVersionMessage( | ||
477 | stream=sheet_title, | ||
478 | version=int(time.time() * 1000)) | ||
479 | singer.write_message(activate_version_message) | ||
480 | |||
481 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
482 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
483 | |||
484 | stream_name = 'sheet_metadata' | 540 | stream_name = 'sheet_metadata' |
485 | # Sync sheet_metadata if selected | 541 | # Sync sheet_metadata if selected |
486 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | 542 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) |