diff options
-rw-r--r-- | CHANGELOG.md | 3 | ||||
-rw-r--r-- | setup.py | 2 | ||||
-rw-r--r-- | tap_google_sheets/schema.py | 365 | ||||
-rw-r--r-- | tap_google_sheets/sync.py | 214 |
4 files changed, 298 insertions, 286 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c47260..81057ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md | |||
@@ -1,5 +1,8 @@ | |||
1 | # Changelog | 1 | # Changelog |
2 | 2 | ||
3 | ## 0.0.4 | ||
4 | * Add logic to skip empty worksheets in Discovery and Sync mode. | ||
5 | |||
3 | ## 0.0.3 | 6 | ## 0.0.3 |
4 | * Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages. | 7 | * Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages. |
5 | 8 | ||
@@ -3,7 +3,7 @@ | |||
3 | from setuptools import setup, find_packages | 3 | from setuptools import setup, find_packages |
4 | 4 | ||
5 | setup(name='tap-google-sheets', | 5 | setup(name='tap-google-sheets', |
6 | version='0.0.3', | 6 | version='0.0.4', |
7 | description='Singer.io tap for extracting data from the Google Sheets v4 API', | 7 | description='Singer.io tap for extracting data from the Google Sheets v4 API', |
8 | author='jeff.huth@bytecode.io', | 8 | author='jeff.huth@bytecode.io', |
9 | classifiers=['Programming Language :: Python :: 3 :: Only'], | 9 | classifiers=['Programming Language :: Python :: 3 :: Only'], |
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index e319c03..c229d72 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py | |||
@@ -25,184 +25,188 @@ def get_sheet_schema_columns(sheet): | |||
25 | sheet_json_schema = OrderedDict() | 25 | sheet_json_schema = OrderedDict() |
26 | data = next(iter(sheet.get('data', [])), {}) | 26 | data = next(iter(sheet.get('data', [])), {}) |
27 | row_data = data.get('rowData', []) | 27 | row_data = data.get('rowData', []) |
28 | # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse | 28 | if row_data == []: |
29 | 29 | # Empty sheet, SKIP | |
30 | headers = row_data[0].get('values', []) | 30 | LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title)) |
31 | first_values = row_data[1].get('values', []) | 31 | return None, None |
32 | # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) | 32 | else: |
33 | 33 | # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse | |
34 | sheet_json_schema = { | 34 | headers = row_data[0].get('values', []) |
35 | 'type': 'object', | 35 | first_values = row_data[1].get('values', []) |
36 | 'additionalProperties': False, | 36 | # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) |
37 | 'properties': { | 37 | |
38 | '__sdc_spreadsheet_id': { | 38 | sheet_json_schema = { |
39 | 'type': ['null', 'string'] | 39 | 'type': 'object', |
40 | }, | 40 | 'additionalProperties': False, |
41 | '__sdc_sheet_id': { | 41 | 'properties': { |
42 | 'type': ['null', 'integer'] | 42 | '__sdc_spreadsheet_id': { |
43 | }, | 43 | 'type': ['null', 'string'] |
44 | '__sdc_row': { | 44 | }, |
45 | 'type': ['null', 'integer'] | 45 | '__sdc_sheet_id': { |
46 | 'type': ['null', 'integer'] | ||
47 | }, | ||
48 | '__sdc_row': { | ||
49 | 'type': ['null', 'integer'] | ||
50 | } | ||
46 | } | 51 | } |
47 | } | 52 | } |
48 | } | 53 | |
49 | 54 | header_list = [] # used for checking uniqueness | |
50 | header_list = [] # used for checking uniqueness | 55 | columns = [] |
51 | columns = [] | 56 | prior_header = None |
52 | prior_header = None | 57 | i = 0 |
53 | i = 0 | 58 | skipped = 0 |
54 | skipped = 0 | 59 | # Read column headers until end or 2 consecutive skipped headers |
55 | # Read column headers until end or 2 consecutive skipped headers | 60 | for header in headers: |
56 | for header in headers: | 61 | # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True))) |
57 | # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True))) | 62 | column_index = i + 1 |
58 | column_index = i + 1 | 63 | column_letter = colnum_string(column_index) |
59 | column_letter = colnum_string(column_index) | 64 | header_value = header.get('formattedValue') |
60 | header_value = header.get('formattedValue') | 65 | if header_value: # NOT skipped |
61 | if header_value: # NOT skipped | 66 | column_is_skipped = False |
62 | column_is_skipped = False | 67 | skipped = 0 |
63 | skipped = 0 | 68 | column_name = '{}'.format(header_value) |
64 | column_name = '{}'.format(header_value) | 69 | if column_name in header_list: |
65 | if column_name in header_list: | 70 | raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( |
66 | raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( | ||
67 | sheet_title, column_name, column_letter)) | ||
68 | header_list.append(column_name) | ||
69 | |||
70 | first_value = None | ||
71 | try: | ||
72 | first_value = first_values[i] | ||
73 | except IndexError as err: | ||
74 | raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format( | ||
75 | sheet_title, column_name, column_letter, err)) | ||
76 | |||
77 | column_effective_value = first_value.get('effectiveValue', {}) | ||
78 | |||
79 | col_val = None | ||
80 | if column_effective_value == {}: | ||
81 | column_effective_value_type = 'stringValue' | ||
82 | LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( | ||
83 | sheet_title, column_name, column_letter)) | ||
84 | LOGGER.info(' Setting column datatype to STRING') | ||
85 | else: | ||
86 | for key, val in column_effective_value.items(): | ||
87 | if key in ('numberValue', 'stringValue', 'boolValue'): | ||
88 | column_effective_value_type = key | ||
89 | col_val = str(val) | ||
90 | elif key in ('errorType', 'formulaType'): | ||
91 | col_val = str(val) | ||
92 | raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( | ||
93 | sheet_title, column_name, column_letter, key, col_val)) | ||
94 | |||
95 | column_number_format = first_values[i].get('effectiveFormat', {}).get( | ||
96 | 'numberFormat', {}) | ||
97 | column_number_format_type = column_number_format.get('type') | ||
98 | |||
99 | # Determine datatype for sheet_json_schema | ||
100 | # | ||
101 | # column_effective_value_type = numberValue, stringValue, boolValue; | ||
102 | # INVALID: errorType, formulaType | ||
103 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue | ||
104 | # | ||
105 | # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, | ||
106 | # TIME, DATE_TIME, SCIENTIFIC | ||
107 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType | ||
108 | # | ||
109 | column_format = None # Default | ||
110 | if column_effective_value == {}: | ||
111 | col_properties = {'type': ['null', 'string']} | ||
112 | column_gs_type = 'stringValue' | ||
113 | LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( | ||
114 | sheet_title, column_name, column_letter)) | 71 | sheet_title, column_name, column_letter)) |
115 | LOGGER.info(' Setting column datatype to STRING') | 72 | header_list.append(column_name) |
116 | elif column_effective_value_type == 'stringValue': | 73 | |
117 | col_properties = {'type': ['null', 'string']} | 74 | first_value = None |
118 | column_gs_type = 'stringValue' | 75 | try: |
119 | elif column_effective_value_type == 'boolValue': | 76 | first_value = first_values[i] |
120 | col_properties = {'type': ['null', 'boolean', 'string']} | 77 | except IndexError as err: |
121 | column_gs_type = 'boolValue' | 78 | raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format( |
122 | elif column_effective_value_type == 'numberValue': | 79 | sheet_title, column_name, column_letter, err)) |
123 | if column_number_format_type == 'DATE_TIME': | 80 | |
124 | col_properties = { | 81 | column_effective_value = first_value.get('effectiveValue', {}) |
125 | 'type': ['null', 'string'], | 82 | |
126 | 'format': 'date-time' | 83 | col_val = None |
127 | } | 84 | if column_effective_value == {}: |
128 | column_gs_type = 'numberType.DATE_TIME' | 85 | column_effective_value_type = 'stringValue' |
129 | elif column_number_format_type == 'DATE': | 86 | LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( |
130 | col_properties = { | 87 | sheet_title, column_name, column_letter)) |
131 | 'type': ['null', 'string'], | 88 | LOGGER.info(' Setting column datatype to STRING') |
132 | 'format': 'date' | 89 | else: |
133 | } | 90 | for key, val in column_effective_value.items(): |
134 | column_gs_type = 'numberType.DATE' | 91 | if key in ('numberValue', 'stringValue', 'boolValue'): |
135 | elif column_number_format_type == 'TIME': | 92 | column_effective_value_type = key |
136 | col_properties = { | 93 | col_val = str(val) |
137 | 'type': ['null', 'string'], | 94 | elif key in ('errorType', 'formulaType'): |
138 | 'format': 'time' | 95 | col_val = str(val) |
139 | } | 96 | raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( |
140 | column_gs_type = 'numberType.TIME' | 97 | sheet_title, column_name, column_letter, key, col_val)) |
141 | elif column_number_format_type == 'TEXT': | 98 | |
99 | column_number_format = first_values[i].get('effectiveFormat', {}).get( | ||
100 | 'numberFormat', {}) | ||
101 | column_number_format_type = column_number_format.get('type') | ||
102 | |||
103 | # Determine datatype for sheet_json_schema | ||
104 | # | ||
105 | # column_effective_value_type = numberValue, stringValue, boolValue; | ||
106 | # INVALID: errorType, formulaType | ||
107 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue | ||
108 | # | ||
109 | # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, | ||
110 | # TIME, DATE_TIME, SCIENTIFIC | ||
111 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType | ||
112 | # | ||
113 | column_format = None # Default | ||
114 | if column_effective_value == {}: | ||
115 | col_properties = {'type': ['null', 'string']} | ||
116 | column_gs_type = 'stringValue' | ||
117 | LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( | ||
118 | sheet_title, column_name, column_letter)) | ||
119 | LOGGER.info(' Setting column datatype to STRING') | ||
120 | elif column_effective_value_type == 'stringValue': | ||
142 | col_properties = {'type': ['null', 'string']} | 121 | col_properties = {'type': ['null', 'string']} |
143 | column_gs_type = 'stringValue' | 122 | column_gs_type = 'stringValue' |
123 | elif column_effective_value_type == 'boolValue': | ||
124 | col_properties = {'type': ['null', 'boolean', 'string']} | ||
125 | column_gs_type = 'boolValue' | ||
126 | elif column_effective_value_type == 'numberValue': | ||
127 | if column_number_format_type == 'DATE_TIME': | ||
128 | col_properties = { | ||
129 | 'type': ['null', 'string'], | ||
130 | 'format': 'date-time' | ||
131 | } | ||
132 | column_gs_type = 'numberType.DATE_TIME' | ||
133 | elif column_number_format_type == 'DATE': | ||
134 | col_properties = { | ||
135 | 'type': ['null', 'string'], | ||
136 | 'format': 'date' | ||
137 | } | ||
138 | column_gs_type = 'numberType.DATE' | ||
139 | elif column_number_format_type == 'TIME': | ||
140 | col_properties = { | ||
141 | 'type': ['null', 'string'], | ||
142 | 'format': 'time' | ||
143 | } | ||
144 | column_gs_type = 'numberType.TIME' | ||
145 | elif column_number_format_type == 'TEXT': | ||
146 | col_properties = {'type': ['null', 'string']} | ||
147 | column_gs_type = 'stringValue' | ||
148 | else: | ||
149 | # Interesting - order in the anyOf makes a difference. | ||
150 | # Number w/ multipleOf must be listed last, otherwise errors occur. | ||
151 | col_properties = { | ||
152 | 'anyOf': [ | ||
153 | { | ||
154 | 'type': 'null' | ||
155 | }, | ||
156 | { | ||
157 | 'type': 'number', | ||
158 | 'multipleOf': 1e-15 | ||
159 | }, | ||
160 | { | ||
161 | 'type': 'string' | ||
162 | } | ||
163 | ] | ||
164 | } | ||
165 | column_gs_type = 'numberType' | ||
166 | # Catch-all to deal with other types and set to string | ||
167 | # column_effective_value_type: formulaValue, errorValue, or other | ||
144 | else: | 168 | else: |
145 | # Interesting - order in the anyOf makes a difference. | 169 | col_properties = {'type': ['null', 'string']} |
146 | # Number w/ multipleOf must be listed last, otherwise errors occur. | 170 | column_gs_type = 'unsupportedValue' |
147 | col_properties = { | 171 | LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( |
148 | 'anyOf': [ | 172 | sheet_title, column_name, column_letter, column_effective_value_type, col_val)) |
149 | { | 173 | LOGGER.info('Converting to string.') |
150 | 'type': 'string' | 174 | else: # skipped |
151 | }, | 175 | column_is_skipped = True |
152 | { | 176 | skipped = skipped + 1 |
153 | 'type': 'null' | 177 | column_index_str = str(column_index).zfill(2) |
154 | }, | 178 | column_name = '__sdc_skip_col_{}'.format(column_index_str) |
155 | { | ||
156 | 'type': 'number', | ||
157 | 'multipleOf': 1e-15 | ||
158 | } | ||
159 | ] | ||
160 | } | ||
161 | column_gs_type = 'numberType' | ||
162 | # Catch-all to deal with other types and set to string | ||
163 | # column_effective_value_type: formulaValue, errorValue, or other | ||
164 | else: | ||
165 | col_properties = {'type': ['null', 'string']} | 179 | col_properties = {'type': ['null', 'string']} |
166 | column_gs_type = 'unsupportedValue' | 180 | column_gs_type = 'stringValue' |
167 | LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( | 181 | LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( |
168 | sheet_title, column_name, column_letter, column_effective_value_type, col_val)) | 182 | sheet_title, column_name, column_letter)) |
169 | LOGGER.info('Converting to string.') | 183 | LOGGER.info(' This column will be skipped during data loading.') |
170 | else: # skipped | ||
171 | column_is_skipped = True | ||
172 | skipped = skipped + 1 | ||
173 | column_index_str = str(column_index).zfill(2) | ||
174 | column_name = '__sdc_skip_col_{}'.format(column_index_str) | ||
175 | col_properties = {'type': ['null', 'string']} | ||
176 | column_gs_type = 'stringValue' | ||
177 | LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( | ||
178 | sheet_title, column_name, column_letter)) | ||
179 | LOGGER.info(' This column will be skipped during data loading.') | ||
180 | |||
181 | if skipped >= 2: | ||
182 | # skipped = 2 consecutive skipped headers | ||
183 | # Remove prior_header column_name | ||
184 | sheet_json_schema['properties'].pop(prior_header, None) | ||
185 | LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( | ||
186 | sheet_title, column_name, column_letter)) | ||
187 | break | ||
188 | |||
189 | else: | ||
190 | column = {} | ||
191 | column = { | ||
192 | 'columnIndex': column_index, | ||
193 | 'columnLetter': column_letter, | ||
194 | 'columnName': column_name, | ||
195 | 'columnType': column_gs_type, | ||
196 | 'columnSkipped': column_is_skipped | ||
197 | } | ||
198 | columns.append(column) | ||
199 | 184 | ||
200 | sheet_json_schema['properties'][column_name] = col_properties | 185 | if skipped >= 2: |
186 | # skipped = 2 consecutive skipped headers | ||
187 | # Remove prior_header column_name | ||
188 | sheet_json_schema['properties'].pop(prior_header, None) | ||
189 | LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( | ||
190 | sheet_title, column_name, column_letter)) | ||
191 | break | ||
201 | 192 | ||
202 | prior_header = column_name | 193 | else: |
203 | i = i + 1 | 194 | column = {} |
195 | column = { | ||
196 | 'columnIndex': column_index, | ||
197 | 'columnLetter': column_letter, | ||
198 | 'columnName': column_name, | ||
199 | 'columnType': column_gs_type, | ||
200 | 'columnSkipped': column_is_skipped | ||
201 | } | ||
202 | columns.append(column) | ||
204 | 203 | ||
205 | return sheet_json_schema, columns | 204 | sheet_json_schema['properties'][column_name] = col_properties |
205 | |||
206 | prior_header = column_name | ||
207 | i = i + 1 | ||
208 | |||
209 | return sheet_json_schema, columns | ||
206 | 210 | ||
207 | 211 | ||
208 | # Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query | 212 | # Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query |
@@ -276,17 +280,18 @@ def get_schemas(client, spreadsheet_id): | |||
276 | for sheet in sheets: | 280 | for sheet in sheets: |
277 | # GET sheet_json_schema for each worksheet (from function above) | 281 | # GET sheet_json_schema for each worksheet (from function above) |
278 | sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 282 | sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
279 | # LOGGER.info('columns = {}'.format(columns)) | 283 | |
280 | 284 | # SKIP empty sheets (where sheet_json_schema and columns are None) | |
281 | sheet_title = sheet.get('properties', {}).get('title') | 285 | if sheet_json_schema and columns: |
282 | schemas[sheet_title] = sheet_json_schema | 286 | sheet_title = sheet.get('properties', {}).get('title') |
283 | sheet_mdata = metadata.new() | 287 | schemas[sheet_title] = sheet_json_schema |
284 | sheet_mdata = metadata.get_standard_metadata( | 288 | sheet_mdata = metadata.new() |
285 | schema=sheet_json_schema, | 289 | sheet_mdata = metadata.get_standard_metadata( |
286 | key_properties=['__sdc_row'], | 290 | schema=sheet_json_schema, |
287 | valid_replication_keys=None, | 291 | key_properties=['__sdc_row'], |
288 | replication_method='FULL_TABLE' | 292 | valid_replication_keys=None, |
289 | ) | 293 | replication_method='FULL_TABLE' |
290 | field_metadata[sheet_title] = sheet_mdata | 294 | ) |
295 | field_metadata[sheet_title] = sheet_mdata | ||
291 | 296 | ||
292 | return schemas, field_metadata | 297 | return schemas, field_metadata |
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 311281c..b77eab3 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -429,113 +429,117 @@ def sync(client, config, catalog, state): | |||
429 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 429 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
430 | # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) | 430 | # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) |
431 | 431 | ||
432 | # Transform sheet_metadata | 432 | # SKIP empty sheets (where sheet_schema and columns are None) |
433 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) | 433 | if not sheet_schema or not columns: |
434 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) | 434 | LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title)) |
435 | sheet_metadata.append(sheet_metadata_tf) | 435 | else: |
436 | 436 | # Transform sheet_metadata | |
437 | # SHEET_DATA | 437 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) |
438 | # Should this worksheet tab be synced? | 438 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) |
439 | if sheet_title in selected_streams: | 439 | sheet_metadata.append(sheet_metadata_tf) |
440 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) | 440 | |
441 | update_currently_syncing(state, sheet_title) | 441 | # SHEET_DATA |
442 | selected_fields = get_selected_fields(catalog, sheet_title) | 442 | # Should this worksheet tab be synced? |
443 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | 443 | if sheet_title in selected_streams: |
444 | write_schema(catalog, sheet_title) | 444 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) |
445 | 445 | update_currently_syncing(state, sheet_title) | |
446 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) | 446 | selected_fields = get_selected_fields(catalog, sheet_title) |
447 | # everytime after each sheet sync is complete. | 447 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) |
448 | # This forces hard deletes on the data downstream if fewer records are sent. | 448 | write_schema(catalog, sheet_title) |
449 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | 449 | |
450 | last_integer = int(get_bookmark(state, sheet_title, 0)) | 450 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) |
451 | activate_version = int(time.time() * 1000) | 451 | # everytime after each sheet sync is complete. |
452 | activate_version_message = singer.ActivateVersionMessage( | 452 | # This forces hard deletes on the data downstream if fewer records are sent. |
453 | stream=sheet_title, | 453 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 |
454 | version=activate_version) | 454 | last_integer = int(get_bookmark(state, sheet_title, 0)) |
455 | if last_integer == 0: | 455 | activate_version = int(time.time() * 1000) |
456 | # initial load, send activate_version before AND after data sync | 456 | activate_version_message = singer.ActivateVersionMessage( |
457 | singer.write_message(activate_version_message) | 457 | stream=sheet_title, |
458 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | 458 | version=activate_version) |
459 | 459 | if last_integer == 0: | |
460 | # Determine max range of columns and rows for "paging" through the data | 460 | # initial load, send activate_version before AND after data sync |
461 | sheet_last_col_index = 1 | 461 | singer.write_message(activate_version_message) |
462 | sheet_last_col_letter = 'A' | 462 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) |
463 | for col in columns: | 463 | |
464 | col_index = col.get('columnIndex') | 464 | # Determine max range of columns and rows for "paging" through the data |
465 | col_letter = col.get('columnLetter') | 465 | sheet_last_col_index = 1 |
466 | if col_index > sheet_last_col_index: | 466 | sheet_last_col_letter = 'A' |
467 | sheet_last_col_index = col_index | 467 | for col in columns: |
468 | sheet_last_col_letter = col_letter | 468 | col_index = col.get('columnIndex') |
469 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') | 469 | col_letter = col.get('columnLetter') |
470 | 470 | if col_index > sheet_last_col_index: | |
471 | # Initialize paging for 1st batch | 471 | sheet_last_col_index = col_index |
472 | is_last_row = False | 472 | sheet_last_col_letter = col_letter |
473 | batch_rows = 200 | 473 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') |
474 | from_row = 2 | 474 | |
475 | if sheet_max_row < batch_rows: | 475 | # Initialize paging for 1st batch |
476 | to_row = sheet_max_row | 476 | is_last_row = False |
477 | else: | 477 | batch_rows = 200 |
478 | to_row = batch_rows | 478 | from_row = 2 |
479 | 479 | if sheet_max_row < batch_rows: | |
480 | # Loop thru batches (each having 200 rows of data) | ||
481 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: | ||
482 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) | ||
483 | |||
484 | # GET sheet_data for a worksheet tab | ||
485 | sheet_data, time_extracted = get_data( | ||
486 | stream_name=sheet_title, | ||
487 | endpoint_config=sheets_loaded_config, | ||
488 | client=client, | ||
489 | spreadsheet_id=spreadsheet_id, | ||
490 | range_rows=range_rows) | ||
491 | # Data is returned as a list of arrays, an array of values for each row | ||
492 | sheet_data_rows = sheet_data.get('values') | ||
493 | |||
494 | # Transform batch of rows to JSON with keys for each column | ||
495 | sheet_data_tf, row_num = transform_sheet_data( | ||
496 | spreadsheet_id=spreadsheet_id, | ||
497 | sheet_id=sheet_id, | ||
498 | sheet_title=sheet_title, | ||
499 | from_row=from_row, | ||
500 | columns=columns, | ||
501 | sheet_data_rows=sheet_data_rows) | ||
502 | if row_num < to_row: | ||
503 | is_last_row = True | ||
504 | |||
505 | # Process records, send batch of records to target | ||
506 | record_count = process_records( | ||
507 | catalog=catalog, | ||
508 | stream_name=sheet_title, | ||
509 | records=sheet_data_tf, | ||
510 | time_extracted=ss_time_extracted, | ||
511 | version=activate_version) | ||
512 | LOGGER.info('Sheet: {}, records processed: {}'.format( | ||
513 | sheet_title, record_count)) | ||
514 | |||
515 | # Update paging from/to_row for next batch | ||
516 | from_row = to_row + 1 | ||
517 | if to_row + batch_rows > sheet_max_row: | ||
518 | to_row = sheet_max_row | 480 | to_row = sheet_max_row |
519 | else: | 481 | else: |
520 | to_row = to_row + batch_rows | 482 | to_row = batch_rows |
521 | 483 | ||
522 | # End of Stream: Send Activate Version and update State | 484 | # Loop thru batches (each having 200 rows of data) |
523 | singer.write_message(activate_version_message) | 485 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: |
524 | write_bookmark(state, sheet_title, activate_version) | 486 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) |
525 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | 487 | |
526 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | 488 | # GET sheet_data for a worksheet tab |
527 | sheet_title, row_num - 2)) # subtract 1 for header row | 489 | sheet_data, time_extracted = get_data( |
528 | update_currently_syncing(state, None) | 490 | stream_name=sheet_title, |
529 | 491 | endpoint_config=sheets_loaded_config, | |
530 | # SHEETS_LOADED | 492 | client=client, |
531 | # Add sheet to sheets_loaded | 493 | spreadsheet_id=spreadsheet_id, |
532 | sheet_loaded = {} | 494 | range_rows=range_rows) |
533 | sheet_loaded['spreadsheetId'] = spreadsheet_id | 495 | # Data is returned as a list of arrays, an array of values for each row |
534 | sheet_loaded['sheetId'] = sheet_id | 496 | sheet_data_rows = sheet_data.get('values') |
535 | sheet_loaded['title'] = sheet_title | 497 | |
536 | sheet_loaded['loadDate'] = strftime(utils.now()) | 498 | # Transform batch of rows to JSON with keys for each column |
537 | sheet_loaded['lastRowNumber'] = row_num | 499 | sheet_data_tf, row_num = transform_sheet_data( |
538 | sheets_loaded.append(sheet_loaded) | 500 | spreadsheet_id=spreadsheet_id, |
501 | sheet_id=sheet_id, | ||
502 | sheet_title=sheet_title, | ||
503 | from_row=from_row, | ||
504 | columns=columns, | ||
505 | sheet_data_rows=sheet_data_rows) | ||
506 | if row_num < to_row: | ||
507 | is_last_row = True | ||
508 | |||
509 | # Process records, send batch of records to target | ||
510 | record_count = process_records( | ||
511 | catalog=catalog, | ||
512 | stream_name=sheet_title, | ||
513 | records=sheet_data_tf, | ||
514 | time_extracted=ss_time_extracted, | ||
515 | version=activate_version) | ||
516 | LOGGER.info('Sheet: {}, records processed: {}'.format( | ||
517 | sheet_title, record_count)) | ||
518 | |||
519 | # Update paging from/to_row for next batch | ||
520 | from_row = to_row + 1 | ||
521 | if to_row + batch_rows > sheet_max_row: | ||
522 | to_row = sheet_max_row | ||
523 | else: | ||
524 | to_row = to_row + batch_rows | ||
525 | |||
526 | # End of Stream: Send Activate Version and update State | ||
527 | singer.write_message(activate_version_message) | ||
528 | write_bookmark(state, sheet_title, activate_version) | ||
529 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
530 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
531 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
532 | update_currently_syncing(state, None) | ||
533 | |||
534 | # SHEETS_LOADED | ||
535 | # Add sheet to sheets_loaded | ||
536 | sheet_loaded = {} | ||
537 | sheet_loaded['spreadsheetId'] = spreadsheet_id | ||
538 | sheet_loaded['sheetId'] = sheet_id | ||
539 | sheet_loaded['title'] = sheet_title | ||
540 | sheet_loaded['loadDate'] = strftime(utils.now()) | ||
541 | sheet_loaded['lastRowNumber'] = row_num | ||
542 | sheets_loaded.append(sheet_loaded) | ||
539 | 543 | ||
540 | stream_name = 'sheet_metadata' | 544 | stream_name = 'sheet_metadata' |
541 | # Sync sheet_metadata if selected | 545 | # Sync sheet_metadata if selected |