diff options
author | Jeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com> | 2020-02-24 09:53:26 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-24 12:53:26 -0500 |
commit | 376f1145837541d4fff2ad0e499236761f9873c3 (patch) | |
tree | cc086f18b24bda8a86c16c3ec742b89947f382ae /tap_google_sheets | |
parent | f1d1d43c6b74a8705e91e908c582e39c68464c0c (diff) | |
download | tap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.tar.gz tap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.tar.zst tap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.zip |
v.0.0.4 Logic to skip empty sheets (#4)v0.0.4
* v.0.0.2 schema and sync changes
Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py
* v.0.0.3 Sync activate version and error handling
Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages.
* v.0.0.4 Skip empty worksheets
Add logic to skip empty worksheets in Discovery and Sync mode.
* schema.py fix number datatype issue
Nomber datatypes are being created as strings in targets. The JSON schema order needs to be adjusted so that order is null, number, string.
Diffstat (limited to 'tap_google_sheets')
-rw-r--r-- | tap_google_sheets/schema.py | 365 | ||||
-rw-r--r-- | tap_google_sheets/sync.py | 214 |
2 files changed, 294 insertions, 285 deletions
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index e319c03..c229d72 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py | |||
@@ -25,184 +25,188 @@ def get_sheet_schema_columns(sheet): | |||
25 | sheet_json_schema = OrderedDict() | 25 | sheet_json_schema = OrderedDict() |
26 | data = next(iter(sheet.get('data', [])), {}) | 26 | data = next(iter(sheet.get('data', [])), {}) |
27 | row_data = data.get('rowData', []) | 27 | row_data = data.get('rowData', []) |
28 | # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse | 28 | if row_data == []: |
29 | 29 | # Empty sheet, SKIP | |
30 | headers = row_data[0].get('values', []) | 30 | LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title)) |
31 | first_values = row_data[1].get('values', []) | 31 | return None, None |
32 | # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) | 32 | else: |
33 | 33 | # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse | |
34 | sheet_json_schema = { | 34 | headers = row_data[0].get('values', []) |
35 | 'type': 'object', | 35 | first_values = row_data[1].get('values', []) |
36 | 'additionalProperties': False, | 36 | # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) |
37 | 'properties': { | 37 | |
38 | '__sdc_spreadsheet_id': { | 38 | sheet_json_schema = { |
39 | 'type': ['null', 'string'] | 39 | 'type': 'object', |
40 | }, | 40 | 'additionalProperties': False, |
41 | '__sdc_sheet_id': { | 41 | 'properties': { |
42 | 'type': ['null', 'integer'] | 42 | '__sdc_spreadsheet_id': { |
43 | }, | 43 | 'type': ['null', 'string'] |
44 | '__sdc_row': { | 44 | }, |
45 | 'type': ['null', 'integer'] | 45 | '__sdc_sheet_id': { |
46 | 'type': ['null', 'integer'] | ||
47 | }, | ||
48 | '__sdc_row': { | ||
49 | 'type': ['null', 'integer'] | ||
50 | } | ||
46 | } | 51 | } |
47 | } | 52 | } |
48 | } | 53 | |
49 | 54 | header_list = [] # used for checking uniqueness | |
50 | header_list = [] # used for checking uniqueness | 55 | columns = [] |
51 | columns = [] | 56 | prior_header = None |
52 | prior_header = None | 57 | i = 0 |
53 | i = 0 | 58 | skipped = 0 |
54 | skipped = 0 | 59 | # Read column headers until end or 2 consecutive skipped headers |
55 | # Read column headers until end or 2 consecutive skipped headers | 60 | for header in headers: |
56 | for header in headers: | 61 | # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True))) |
57 | # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True))) | 62 | column_index = i + 1 |
58 | column_index = i + 1 | 63 | column_letter = colnum_string(column_index) |
59 | column_letter = colnum_string(column_index) | 64 | header_value = header.get('formattedValue') |
60 | header_value = header.get('formattedValue') | 65 | if header_value: # NOT skipped |
61 | if header_value: # NOT skipped | 66 | column_is_skipped = False |
62 | column_is_skipped = False | 67 | skipped = 0 |
63 | skipped = 0 | 68 | column_name = '{}'.format(header_value) |
64 | column_name = '{}'.format(header_value) | 69 | if column_name in header_list: |
65 | if column_name in header_list: | 70 | raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( |
66 | raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( | ||
67 | sheet_title, column_name, column_letter)) | ||
68 | header_list.append(column_name) | ||
69 | |||
70 | first_value = None | ||
71 | try: | ||
72 | first_value = first_values[i] | ||
73 | except IndexError as err: | ||
74 | raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format( | ||
75 | sheet_title, column_name, column_letter, err)) | ||
76 | |||
77 | column_effective_value = first_value.get('effectiveValue', {}) | ||
78 | |||
79 | col_val = None | ||
80 | if column_effective_value == {}: | ||
81 | column_effective_value_type = 'stringValue' | ||
82 | LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( | ||
83 | sheet_title, column_name, column_letter)) | ||
84 | LOGGER.info(' Setting column datatype to STRING') | ||
85 | else: | ||
86 | for key, val in column_effective_value.items(): | ||
87 | if key in ('numberValue', 'stringValue', 'boolValue'): | ||
88 | column_effective_value_type = key | ||
89 | col_val = str(val) | ||
90 | elif key in ('errorType', 'formulaType'): | ||
91 | col_val = str(val) | ||
92 | raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( | ||
93 | sheet_title, column_name, column_letter, key, col_val)) | ||
94 | |||
95 | column_number_format = first_values[i].get('effectiveFormat', {}).get( | ||
96 | 'numberFormat', {}) | ||
97 | column_number_format_type = column_number_format.get('type') | ||
98 | |||
99 | # Determine datatype for sheet_json_schema | ||
100 | # | ||
101 | # column_effective_value_type = numberValue, stringValue, boolValue; | ||
102 | # INVALID: errorType, formulaType | ||
103 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue | ||
104 | # | ||
105 | # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, | ||
106 | # TIME, DATE_TIME, SCIENTIFIC | ||
107 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType | ||
108 | # | ||
109 | column_format = None # Default | ||
110 | if column_effective_value == {}: | ||
111 | col_properties = {'type': ['null', 'string']} | ||
112 | column_gs_type = 'stringValue' | ||
113 | LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( | ||
114 | sheet_title, column_name, column_letter)) | 71 | sheet_title, column_name, column_letter)) |
115 | LOGGER.info(' Setting column datatype to STRING') | 72 | header_list.append(column_name) |
116 | elif column_effective_value_type == 'stringValue': | 73 | |
117 | col_properties = {'type': ['null', 'string']} | 74 | first_value = None |
118 | column_gs_type = 'stringValue' | 75 | try: |
119 | elif column_effective_value_type == 'boolValue': | 76 | first_value = first_values[i] |
120 | col_properties = {'type': ['null', 'boolean', 'string']} | 77 | except IndexError as err: |
121 | column_gs_type = 'boolValue' | 78 | raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format( |
122 | elif column_effective_value_type == 'numberValue': | 79 | sheet_title, column_name, column_letter, err)) |
123 | if column_number_format_type == 'DATE_TIME': | 80 | |
124 | col_properties = { | 81 | column_effective_value = first_value.get('effectiveValue', {}) |
125 | 'type': ['null', 'string'], | 82 | |
126 | 'format': 'date-time' | 83 | col_val = None |
127 | } | 84 | if column_effective_value == {}: |
128 | column_gs_type = 'numberType.DATE_TIME' | 85 | column_effective_value_type = 'stringValue' |
129 | elif column_number_format_type == 'DATE': | 86 | LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( |
130 | col_properties = { | 87 | sheet_title, column_name, column_letter)) |
131 | 'type': ['null', 'string'], | 88 | LOGGER.info(' Setting column datatype to STRING') |
132 | 'format': 'date' | 89 | else: |
133 | } | 90 | for key, val in column_effective_value.items(): |
134 | column_gs_type = 'numberType.DATE' | 91 | if key in ('numberValue', 'stringValue', 'boolValue'): |
135 | elif column_number_format_type == 'TIME': | 92 | column_effective_value_type = key |
136 | col_properties = { | 93 | col_val = str(val) |
137 | 'type': ['null', 'string'], | 94 | elif key in ('errorType', 'formulaType'): |
138 | 'format': 'time' | 95 | col_val = str(val) |
139 | } | 96 | raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( |
140 | column_gs_type = 'numberType.TIME' | 97 | sheet_title, column_name, column_letter, key, col_val)) |
141 | elif column_number_format_type == 'TEXT': | 98 | |
99 | column_number_format = first_values[i].get('effectiveFormat', {}).get( | ||
100 | 'numberFormat', {}) | ||
101 | column_number_format_type = column_number_format.get('type') | ||
102 | |||
103 | # Determine datatype for sheet_json_schema | ||
104 | # | ||
105 | # column_effective_value_type = numberValue, stringValue, boolValue; | ||
106 | # INVALID: errorType, formulaType | ||
107 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue | ||
108 | # | ||
109 | # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, | ||
110 | # TIME, DATE_TIME, SCIENTIFIC | ||
111 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType | ||
112 | # | ||
113 | column_format = None # Default | ||
114 | if column_effective_value == {}: | ||
115 | col_properties = {'type': ['null', 'string']} | ||
116 | column_gs_type = 'stringValue' | ||
117 | LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( | ||
118 | sheet_title, column_name, column_letter)) | ||
119 | LOGGER.info(' Setting column datatype to STRING') | ||
120 | elif column_effective_value_type == 'stringValue': | ||
142 | col_properties = {'type': ['null', 'string']} | 121 | col_properties = {'type': ['null', 'string']} |
143 | column_gs_type = 'stringValue' | 122 | column_gs_type = 'stringValue' |
123 | elif column_effective_value_type == 'boolValue': | ||
124 | col_properties = {'type': ['null', 'boolean', 'string']} | ||
125 | column_gs_type = 'boolValue' | ||
126 | elif column_effective_value_type == 'numberValue': | ||
127 | if column_number_format_type == 'DATE_TIME': | ||
128 | col_properties = { | ||
129 | 'type': ['null', 'string'], | ||
130 | 'format': 'date-time' | ||
131 | } | ||
132 | column_gs_type = 'numberType.DATE_TIME' | ||
133 | elif column_number_format_type == 'DATE': | ||
134 | col_properties = { | ||
135 | 'type': ['null', 'string'], | ||
136 | 'format': 'date' | ||
137 | } | ||
138 | column_gs_type = 'numberType.DATE' | ||
139 | elif column_number_format_type == 'TIME': | ||
140 | col_properties = { | ||
141 | 'type': ['null', 'string'], | ||
142 | 'format': 'time' | ||
143 | } | ||
144 | column_gs_type = 'numberType.TIME' | ||
145 | elif column_number_format_type == 'TEXT': | ||
146 | col_properties = {'type': ['null', 'string']} | ||
147 | column_gs_type = 'stringValue' | ||
148 | else: | ||
149 | # Interesting - order in the anyOf makes a difference. | ||
150 | # Number w/ multipleOf must be listed last, otherwise errors occur. | ||
151 | col_properties = { | ||
152 | 'anyOf': [ | ||
153 | { | ||
154 | 'type': 'null' | ||
155 | }, | ||
156 | { | ||
157 | 'type': 'number', | ||
158 | 'multipleOf': 1e-15 | ||
159 | }, | ||
160 | { | ||
161 | 'type': 'string' | ||
162 | } | ||
163 | ] | ||
164 | } | ||
165 | column_gs_type = 'numberType' | ||
166 | # Catch-all to deal with other types and set to string | ||
167 | # column_effective_value_type: formulaValue, errorValue, or other | ||
144 | else: | 168 | else: |
145 | # Interesting - order in the anyOf makes a difference. | 169 | col_properties = {'type': ['null', 'string']} |
146 | # Number w/ multipleOf must be listed last, otherwise errors occur. | 170 | column_gs_type = 'unsupportedValue' |
147 | col_properties = { | 171 | LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( |
148 | 'anyOf': [ | 172 | sheet_title, column_name, column_letter, column_effective_value_type, col_val)) |
149 | { | 173 | LOGGER.info('Converting to string.') |
150 | 'type': 'string' | 174 | else: # skipped |
151 | }, | 175 | column_is_skipped = True |
152 | { | 176 | skipped = skipped + 1 |
153 | 'type': 'null' | 177 | column_index_str = str(column_index).zfill(2) |
154 | }, | 178 | column_name = '__sdc_skip_col_{}'.format(column_index_str) |
155 | { | ||
156 | 'type': 'number', | ||
157 | 'multipleOf': 1e-15 | ||
158 | } | ||
159 | ] | ||
160 | } | ||
161 | column_gs_type = 'numberType' | ||
162 | # Catch-all to deal with other types and set to string | ||
163 | # column_effective_value_type: formulaValue, errorValue, or other | ||
164 | else: | ||
165 | col_properties = {'type': ['null', 'string']} | 179 | col_properties = {'type': ['null', 'string']} |
166 | column_gs_type = 'unsupportedValue' | 180 | column_gs_type = 'stringValue' |
167 | LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( | 181 | LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( |
168 | sheet_title, column_name, column_letter, column_effective_value_type, col_val)) | 182 | sheet_title, column_name, column_letter)) |
169 | LOGGER.info('Converting to string.') | 183 | LOGGER.info(' This column will be skipped during data loading.') |
170 | else: # skipped | ||
171 | column_is_skipped = True | ||
172 | skipped = skipped + 1 | ||
173 | column_index_str = str(column_index).zfill(2) | ||
174 | column_name = '__sdc_skip_col_{}'.format(column_index_str) | ||
175 | col_properties = {'type': ['null', 'string']} | ||
176 | column_gs_type = 'stringValue' | ||
177 | LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( | ||
178 | sheet_title, column_name, column_letter)) | ||
179 | LOGGER.info(' This column will be skipped during data loading.') | ||
180 | |||
181 | if skipped >= 2: | ||
182 | # skipped = 2 consecutive skipped headers | ||
183 | # Remove prior_header column_name | ||
184 | sheet_json_schema['properties'].pop(prior_header, None) | ||
185 | LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( | ||
186 | sheet_title, column_name, column_letter)) | ||
187 | break | ||
188 | |||
189 | else: | ||
190 | column = {} | ||
191 | column = { | ||
192 | 'columnIndex': column_index, | ||
193 | 'columnLetter': column_letter, | ||
194 | 'columnName': column_name, | ||
195 | 'columnType': column_gs_type, | ||
196 | 'columnSkipped': column_is_skipped | ||
197 | } | ||
198 | columns.append(column) | ||
199 | 184 | ||
200 | sheet_json_schema['properties'][column_name] = col_properties | 185 | if skipped >= 2: |
186 | # skipped = 2 consecutive skipped headers | ||
187 | # Remove prior_header column_name | ||
188 | sheet_json_schema['properties'].pop(prior_header, None) | ||
189 | LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( | ||
190 | sheet_title, column_name, column_letter)) | ||
191 | break | ||
201 | 192 | ||
202 | prior_header = column_name | 193 | else: |
203 | i = i + 1 | 194 | column = {} |
195 | column = { | ||
196 | 'columnIndex': column_index, | ||
197 | 'columnLetter': column_letter, | ||
198 | 'columnName': column_name, | ||
199 | 'columnType': column_gs_type, | ||
200 | 'columnSkipped': column_is_skipped | ||
201 | } | ||
202 | columns.append(column) | ||
204 | 203 | ||
205 | return sheet_json_schema, columns | 204 | sheet_json_schema['properties'][column_name] = col_properties |
205 | |||
206 | prior_header = column_name | ||
207 | i = i + 1 | ||
208 | |||
209 | return sheet_json_schema, columns | ||
206 | 210 | ||
207 | 211 | ||
208 | # Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query | 212 | # Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query |
@@ -276,17 +280,18 @@ def get_schemas(client, spreadsheet_id): | |||
276 | for sheet in sheets: | 280 | for sheet in sheets: |
277 | # GET sheet_json_schema for each worksheet (from function above) | 281 | # GET sheet_json_schema for each worksheet (from function above) |
278 | sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 282 | sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
279 | # LOGGER.info('columns = {}'.format(columns)) | 283 | |
280 | 284 | # SKIP empty sheets (where sheet_json_schema and columns are None) | |
281 | sheet_title = sheet.get('properties', {}).get('title') | 285 | if sheet_json_schema and columns: |
282 | schemas[sheet_title] = sheet_json_schema | 286 | sheet_title = sheet.get('properties', {}).get('title') |
283 | sheet_mdata = metadata.new() | 287 | schemas[sheet_title] = sheet_json_schema |
284 | sheet_mdata = metadata.get_standard_metadata( | 288 | sheet_mdata = metadata.new() |
285 | schema=sheet_json_schema, | 289 | sheet_mdata = metadata.get_standard_metadata( |
286 | key_properties=['__sdc_row'], | 290 | schema=sheet_json_schema, |
287 | valid_replication_keys=None, | 291 | key_properties=['__sdc_row'], |
288 | replication_method='FULL_TABLE' | 292 | valid_replication_keys=None, |
289 | ) | 293 | replication_method='FULL_TABLE' |
290 | field_metadata[sheet_title] = sheet_mdata | 294 | ) |
295 | field_metadata[sheet_title] = sheet_mdata | ||
291 | 296 | ||
292 | return schemas, field_metadata | 297 | return schemas, field_metadata |
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 311281c..b77eab3 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -429,113 +429,117 @@ def sync(client, config, catalog, state): | |||
429 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 429 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
430 | # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) | 430 | # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) |
431 | 431 | ||
432 | # Transform sheet_metadata | 432 | # SKIP empty sheets (where sheet_schema and columns are None) |
433 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) | 433 | if not sheet_schema or not columns: |
434 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) | 434 | LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title)) |
435 | sheet_metadata.append(sheet_metadata_tf) | 435 | else: |
436 | 436 | # Transform sheet_metadata | |
437 | # SHEET_DATA | 437 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) |
438 | # Should this worksheet tab be synced? | 438 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) |
439 | if sheet_title in selected_streams: | 439 | sheet_metadata.append(sheet_metadata_tf) |
440 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) | 440 | |
441 | update_currently_syncing(state, sheet_title) | 441 | # SHEET_DATA |
442 | selected_fields = get_selected_fields(catalog, sheet_title) | 442 | # Should this worksheet tab be synced? |
443 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | 443 | if sheet_title in selected_streams: |
444 | write_schema(catalog, sheet_title) | 444 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) |
445 | 445 | update_currently_syncing(state, sheet_title) | |
446 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) | 446 | selected_fields = get_selected_fields(catalog, sheet_title) |
447 | # everytime after each sheet sync is complete. | 447 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) |
448 | # This forces hard deletes on the data downstream if fewer records are sent. | 448 | write_schema(catalog, sheet_title) |
449 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | 449 | |
450 | last_integer = int(get_bookmark(state, sheet_title, 0)) | 450 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) |
451 | activate_version = int(time.time() * 1000) | 451 | # everytime after each sheet sync is complete. |
452 | activate_version_message = singer.ActivateVersionMessage( | 452 | # This forces hard deletes on the data downstream if fewer records are sent. |
453 | stream=sheet_title, | 453 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 |
454 | version=activate_version) | 454 | last_integer = int(get_bookmark(state, sheet_title, 0)) |
455 | if last_integer == 0: | 455 | activate_version = int(time.time() * 1000) |
456 | # initial load, send activate_version before AND after data sync | 456 | activate_version_message = singer.ActivateVersionMessage( |
457 | singer.write_message(activate_version_message) | 457 | stream=sheet_title, |
458 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | 458 | version=activate_version) |
459 | 459 | if last_integer == 0: | |
460 | # Determine max range of columns and rows for "paging" through the data | 460 | # initial load, send activate_version before AND after data sync |
461 | sheet_last_col_index = 1 | 461 | singer.write_message(activate_version_message) |
462 | sheet_last_col_letter = 'A' | 462 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) |
463 | for col in columns: | 463 | |
464 | col_index = col.get('columnIndex') | 464 | # Determine max range of columns and rows for "paging" through the data |
465 | col_letter = col.get('columnLetter') | 465 | sheet_last_col_index = 1 |
466 | if col_index > sheet_last_col_index: | 466 | sheet_last_col_letter = 'A' |
467 | sheet_last_col_index = col_index | 467 | for col in columns: |
468 | sheet_last_col_letter = col_letter | 468 | col_index = col.get('columnIndex') |
469 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') | 469 | col_letter = col.get('columnLetter') |
470 | 470 | if col_index > sheet_last_col_index: | |
471 | # Initialize paging for 1st batch | 471 | sheet_last_col_index = col_index |
472 | is_last_row = False | 472 | sheet_last_col_letter = col_letter |
473 | batch_rows = 200 | 473 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') |
474 | from_row = 2 | 474 | |
475 | if sheet_max_row < batch_rows: | 475 | # Initialize paging for 1st batch |
476 | to_row = sheet_max_row | 476 | is_last_row = False |
477 | else: | 477 | batch_rows = 200 |
478 | to_row = batch_rows | 478 | from_row = 2 |
479 | 479 | if sheet_max_row < batch_rows: | |
480 | # Loop thru batches (each having 200 rows of data) | ||
481 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: | ||
482 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) | ||
483 | |||
484 | # GET sheet_data for a worksheet tab | ||
485 | sheet_data, time_extracted = get_data( | ||
486 | stream_name=sheet_title, | ||
487 | endpoint_config=sheets_loaded_config, | ||
488 | client=client, | ||
489 | spreadsheet_id=spreadsheet_id, | ||
490 | range_rows=range_rows) | ||
491 | # Data is returned as a list of arrays, an array of values for each row | ||
492 | sheet_data_rows = sheet_data.get('values') | ||
493 | |||
494 | # Transform batch of rows to JSON with keys for each column | ||
495 | sheet_data_tf, row_num = transform_sheet_data( | ||
496 | spreadsheet_id=spreadsheet_id, | ||
497 | sheet_id=sheet_id, | ||
498 | sheet_title=sheet_title, | ||
499 | from_row=from_row, | ||
500 | columns=columns, | ||
501 | sheet_data_rows=sheet_data_rows) | ||
502 | if row_num < to_row: | ||
503 | is_last_row = True | ||
504 | |||
505 | # Process records, send batch of records to target | ||
506 | record_count = process_records( | ||
507 | catalog=catalog, | ||
508 | stream_name=sheet_title, | ||
509 | records=sheet_data_tf, | ||
510 | time_extracted=ss_time_extracted, | ||
511 | version=activate_version) | ||
512 | LOGGER.info('Sheet: {}, records processed: {}'.format( | ||
513 | sheet_title, record_count)) | ||
514 | |||
515 | # Update paging from/to_row for next batch | ||
516 | from_row = to_row + 1 | ||
517 | if to_row + batch_rows > sheet_max_row: | ||
518 | to_row = sheet_max_row | 480 | to_row = sheet_max_row |
519 | else: | 481 | else: |
520 | to_row = to_row + batch_rows | 482 | to_row = batch_rows |
521 | 483 | ||
522 | # End of Stream: Send Activate Version and update State | 484 | # Loop thru batches (each having 200 rows of data) |
523 | singer.write_message(activate_version_message) | 485 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: |
524 | write_bookmark(state, sheet_title, activate_version) | 486 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) |
525 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | 487 | |
526 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | 488 | # GET sheet_data for a worksheet tab |
527 | sheet_title, row_num - 2)) # subtract 1 for header row | 489 | sheet_data, time_extracted = get_data( |
528 | update_currently_syncing(state, None) | 490 | stream_name=sheet_title, |
529 | 491 | endpoint_config=sheets_loaded_config, | |
530 | # SHEETS_LOADED | 492 | client=client, |
531 | # Add sheet to sheets_loaded | 493 | spreadsheet_id=spreadsheet_id, |
532 | sheet_loaded = {} | 494 | range_rows=range_rows) |
533 | sheet_loaded['spreadsheetId'] = spreadsheet_id | 495 | # Data is returned as a list of arrays, an array of values for each row |
534 | sheet_loaded['sheetId'] = sheet_id | 496 | sheet_data_rows = sheet_data.get('values') |
535 | sheet_loaded['title'] = sheet_title | 497 | |
536 | sheet_loaded['loadDate'] = strftime(utils.now()) | 498 | # Transform batch of rows to JSON with keys for each column |
537 | sheet_loaded['lastRowNumber'] = row_num | 499 | sheet_data_tf, row_num = transform_sheet_data( |
538 | sheets_loaded.append(sheet_loaded) | 500 | spreadsheet_id=spreadsheet_id, |
501 | sheet_id=sheet_id, | ||
502 | sheet_title=sheet_title, | ||
503 | from_row=from_row, | ||
504 | columns=columns, | ||
505 | sheet_data_rows=sheet_data_rows) | ||
506 | if row_num < to_row: | ||
507 | is_last_row = True | ||
508 | |||
509 | # Process records, send batch of records to target | ||
510 | record_count = process_records( | ||
511 | catalog=catalog, | ||
512 | stream_name=sheet_title, | ||
513 | records=sheet_data_tf, | ||
514 | time_extracted=ss_time_extracted, | ||
515 | version=activate_version) | ||
516 | LOGGER.info('Sheet: {}, records processed: {}'.format( | ||
517 | sheet_title, record_count)) | ||
518 | |||
519 | # Update paging from/to_row for next batch | ||
520 | from_row = to_row + 1 | ||
521 | if to_row + batch_rows > sheet_max_row: | ||
522 | to_row = sheet_max_row | ||
523 | else: | ||
524 | to_row = to_row + batch_rows | ||
525 | |||
526 | # End of Stream: Send Activate Version and update State | ||
527 | singer.write_message(activate_version_message) | ||
528 | write_bookmark(state, sheet_title, activate_version) | ||
529 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
530 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
531 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
532 | update_currently_syncing(state, None) | ||
533 | |||
534 | # SHEETS_LOADED | ||
535 | # Add sheet to sheets_loaded | ||
536 | sheet_loaded = {} | ||
537 | sheet_loaded['spreadsheetId'] = spreadsheet_id | ||
538 | sheet_loaded['sheetId'] = sheet_id | ||
539 | sheet_loaded['title'] = sheet_title | ||
540 | sheet_loaded['loadDate'] = strftime(utils.now()) | ||
541 | sheet_loaded['lastRowNumber'] = row_num | ||
542 | sheets_loaded.append(sheet_loaded) | ||
539 | 543 | ||
540 | stream_name = 'sheet_metadata' | 544 | stream_name = 'sheet_metadata' |
541 | # Sync sheet_metadata if selected | 545 | # Sync sheet_metadata if selected |