diff options
author | Jeff Huth <jeff.huth@bytecode.io> | 2019-11-15 00:49:39 -0800 |
---|---|---|
committer | Jeff Huth <jeff.huth@bytecode.io> | 2019-11-15 00:49:39 -0800 |
commit | da690bda91ea6a14964a2378e5dbb5d4de91a7e2 (patch) | |
tree | 5674346c5615e6d5dce5982c282af6580563bcc8 | |
parent | 3eed42f0063de695f0a9199bf32bf38652e5b7ed (diff) | |
download | tap-google-sheets-da690bda91ea6a14964a2378e5dbb5d4de91a7e2.tar.gz tap-google-sheets-da690bda91ea6a14964a2378e5dbb5d4de91a7e2.tar.zst tap-google-sheets-da690bda91ea6a14964a2378e5dbb5d4de91a7e2.zip |
client.py rate limit, sync.py changes
client.py rate limit, fix json schemas, sync.py many changes
-rw-r--r-- | tap_google_sheets/client.py | 8 | ||||
-rw-r--r-- | tap_google_sheets/schemas/file_metadata.json | 2 | ||||
-rw-r--r-- | tap_google_sheets/schemas/sheets_loaded.json | 2 | ||||
-rw-r--r-- | tap_google_sheets/sync.py | 354 |
4 files changed, 275 insertions, 91 deletions
diff --git a/tap_google_sheets/client.py b/tap_google_sheets/client.py index 12f0811..0a0ce5a 100644 --- a/tap_google_sheets/client.py +++ b/tap_google_sheets/client.py | |||
@@ -151,6 +151,7 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes | |||
151 | def __exit__(self, exception_type, exception_value, traceback): | 151 | def __exit__(self, exception_type, exception_value, traceback): |
152 | self.__session.close() | 152 | self.__session.close() |
153 | 153 | ||
154 | |||
154 | @backoff.on_exception(backoff.expo, | 155 | @backoff.on_exception(backoff.expo, |
155 | Server5xxError, | 156 | Server5xxError, |
156 | max_tries=5, | 157 | max_tries=5, |
@@ -187,13 +188,13 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes | |||
187 | LOGGER.info('Authorized, token expires = {}'.format(self.__expires)) | 188 | LOGGER.info('Authorized, token expires = {}'.format(self.__expires)) |
188 | 189 | ||
189 | 190 | ||
191 | # Rate Limit: https://developers.google.com/sheets/api/limits | ||
192 | # 100 request per 100 seconds per User | ||
190 | @backoff.on_exception(backoff.expo, | 193 | @backoff.on_exception(backoff.expo, |
191 | (Server5xxError, ConnectionError, Server429Error), | 194 | (Server5xxError, ConnectionError, Server429Error), |
192 | max_tries=7, | 195 | max_tries=7, |
193 | factor=3) | 196 | factor=3) |
194 | # Rate Limit: | 197 | @utils.ratelimit(100, 100) |
195 | # https://developers.google.com/webmaster-tools/search-console-api-original/v3/limits | ||
196 | @utils.ratelimit(1200, 60) | ||
197 | def request(self, method, path=None, url=None, api=None, **kwargs): | 198 | def request(self, method, path=None, url=None, api=None, **kwargs): |
198 | 199 | ||
199 | self.get_access_token() | 200 | self.get_access_token() |
@@ -211,6 +212,7 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes | |||
211 | del kwargs['endpoint'] | 212 | del kwargs['endpoint'] |
212 | else: | 213 | else: |
213 | endpoint = None | 214 | endpoint = None |
215 | LOGGER.info('{} URL = {}'.format(endpoint, url)) | ||
214 | 216 | ||
215 | if 'headers' not in kwargs: | 217 | if 'headers' not in kwargs: |
216 | kwargs['headers'] = {} | 218 | kwargs['headers'] = {} |
diff --git a/tap_google_sheets/schemas/file_metadata.json b/tap_google_sheets/schemas/file_metadata.json index 25c19c4..03fefc6 100644 --- a/tap_google_sheets/schemas/file_metadata.json +++ b/tap_google_sheets/schemas/file_metadata.json | |||
@@ -30,7 +30,7 @@ | |||
30 | "additionalProperties": false, | 30 | "additionalProperties": false, |
31 | "properties": { | 31 | "properties": { |
32 | "kind": { | 32 | "kind": { |
33 | "type": ["null", "integer"] | 33 | "type": ["null", "string"] |
34 | }, | 34 | }, |
35 | "displayName": { | 35 | "displayName": { |
36 | "type": ["null", "string"] | 36 | "type": ["null", "string"] |
diff --git a/tap_google_sheets/schemas/sheets_loaded.json b/tap_google_sheets/schemas/sheets_loaded.json index 12f967a..f7a323d 100644 --- a/tap_google_sheets/schemas/sheets_loaded.json +++ b/tap_google_sheets/schemas/sheets_loaded.json | |||
@@ -8,7 +8,7 @@ | |||
8 | "sheetId": { | 8 | "sheetId": { |
9 | "type": ["null", "integer"] | 9 | "type": ["null", "integer"] |
10 | }, | 10 | }, |
11 | "sheetTitle": { | 11 | "title": { |
12 | "type": ["null", "string"] | 12 | "type": ["null", "string"] |
13 | }, | 13 | }, |
14 | "loadDate": { | 14 | "loadDate": { |
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 5b57e77..79e05f9 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -2,6 +2,8 @@ import time | |||
2 | import math | 2 | import math |
3 | import singer | 3 | import singer |
4 | import json | 4 | import json |
5 | import pytz | ||
6 | from datetime import datetime, timedelta | ||
5 | from collections import OrderedDict | 7 | from collections import OrderedDict |
6 | from singer import metrics, metadata, Transformer, utils | 8 | from singer import metrics, metadata, Transformer, utils |
7 | from singer.utils import strptime_to_utc, strftime | 9 | from singer.utils import strptime_to_utc, strftime |
@@ -48,66 +50,40 @@ def write_bookmark(state, stream, value): | |||
48 | singer.write_state(state) | 50 | singer.write_state(state) |
49 | 51 | ||
50 | 52 | ||
51 | # def transform_datetime(this_dttm): | 53 | # Transform/validate batch of records w/ schema and sent to target |
52 | def transform_datetime(this_dttm): | 54 | def process_records(catalog, |
53 | with Transformer() as transformer: | ||
54 | new_dttm = transformer._transform_datetime(this_dttm) | ||
55 | return new_dttm | ||
56 | |||
57 | |||
58 | def process_records(catalog, #pylint: disable=too-many-branches | ||
59 | stream_name, | 55 | stream_name, |
60 | records, | 56 | records, |
61 | time_extracted, | 57 | time_extracted): |
62 | bookmark_field=None, | ||
63 | bookmark_type=None, | ||
64 | max_bookmark_value=None, | ||
65 | last_datetime=None, | ||
66 | last_integer=None, | ||
67 | parent=None, | ||
68 | parent_id=None): | ||
69 | stream = catalog.get_stream(stream_name) | 58 | stream = catalog.get_stream(stream_name) |
70 | schema = stream.schema.to_dict() | 59 | schema = stream.schema.to_dict() |
71 | stream_metadata = metadata.to_map(stream.metadata) | 60 | stream_metadata = metadata.to_map(stream.metadata) |
72 | |||
73 | with metrics.record_counter(stream_name) as counter: | 61 | with metrics.record_counter(stream_name) as counter: |
74 | for record in records: | 62 | for record in records: |
75 | # If child object, add parent_id to record | ||
76 | if parent_id and parent: | ||
77 | record[parent + '_id'] = parent_id | ||
78 | |||
79 | # Transform record for Singer.io | 63 | # Transform record for Singer.io |
80 | with Transformer() as transformer: | 64 | with Transformer() as transformer: |
81 | transformed_record = transformer.transform( | 65 | transformed_record = transformer.transform( |
82 | record, | 66 | record, |
83 | schema, | 67 | schema, |
84 | stream_metadata) | 68 | stream_metadata) |
85 | # Reset max_bookmark_value to new value if higher | 69 | write_record(stream_name, transformed_record, time_extracted=time_extracted) |
86 | if transformed_record.get(bookmark_field): | 70 | counter.increment() |
87 | if max_bookmark_value is None or \ | 71 | return counter.value |
88 | transformed_record[bookmark_field] > transform_datetime(max_bookmark_value): | 72 | |
89 | max_bookmark_value = transformed_record[bookmark_field] | 73 | |
90 | 74 | def sync_stream(stream_name, selected_streams, catalog, state, records): | |
91 | if bookmark_field and (bookmark_field in transformed_record): | 75 | # Should sheets_loaded be synced? |
92 | if bookmark_type == 'integer': | 76 | if stream_name in selected_streams: |
93 | # Keep only records whose bookmark is after the last_integer | 77 | LOGGER.info('STARTED Syncing {}'.format(stream_name)) |
94 | if transformed_record[bookmark_field] >= last_integer: | 78 | update_currently_syncing(state, stream_name) |
95 | write_record(stream_name, transformed_record, \ | 79 | write_schema(catalog, stream_name) |
96 | time_extracted=time_extracted) | 80 | record_count = process_records( |
97 | counter.increment() | 81 | catalog=catalog, |
98 | elif bookmark_type == 'datetime': | 82 | stream_name=stream_name, |
99 | last_dttm = transform_datetime(last_datetime) | 83 | records=records, |
100 | bookmark_dttm = transform_datetime(transformed_record[bookmark_field]) | 84 | time_extracted=utils.now()) |
101 | # Keep only records whose bookmark is after the last_datetime | 85 | LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) |
102 | if bookmark_dttm >= last_dttm: | 86 | update_currently_syncing(state, None) |
103 | write_record(stream_name, transformed_record, \ | ||
104 | time_extracted=time_extracted) | ||
105 | counter.increment() | ||
106 | else: | ||
107 | write_record(stream_name, transformed_record, time_extracted=time_extracted) | ||
108 | counter.increment() | ||
109 | |||
110 | return max_bookmark_value, counter.value | ||
111 | 87 | ||
112 | 88 | ||
113 | # Currently syncing sets the stream currently being delivered in the state. | 89 | # Currently syncing sets the stream currently being delivered in the state. |
@@ -154,14 +130,16 @@ def get_data(stream_name, | |||
154 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( | 130 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( |
155 | '{sheet_title}', stream_name) | 131 | '{sheet_title}', stream_name) |
156 | data = {} | 132 | data = {} |
133 | time_extracted = utils.now() | ||
157 | data = client.get( | 134 | data = client.get( |
158 | path=path, | 135 | path=path, |
159 | api=api, | 136 | api=api, |
160 | params=querystring, | 137 | params=querystring, |
161 | endpoint=stream_name) | 138 | endpoint=stream_name) |
162 | return data | 139 | return data, time_extracted |
163 | 140 | ||
164 | 141 | ||
142 | # Tranform file_metadata: remove nodes from lastModifyingUser, format as array | ||
165 | def transform_file_metadata(file_metadata): | 143 | def transform_file_metadata(file_metadata): |
166 | # Convert to dict | 144 | # Convert to dict |
167 | file_metadata_tf = json.loads(json.dumps(file_metadata)) | 145 | file_metadata_tf = json.loads(json.dumps(file_metadata)) |
@@ -176,10 +154,11 @@ def transform_file_metadata(file_metadata): | |||
176 | return file_metadata_arr | 154 | return file_metadata_arr |
177 | 155 | ||
178 | 156 | ||
157 | # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array | ||
179 | def transform_spreadsheet_metadata(spreadsheet_metadata): | 158 | def transform_spreadsheet_metadata(spreadsheet_metadata): |
180 | # Convert to dict | 159 | # Convert to dict |
181 | spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) | 160 | spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) |
182 | # Remove keys | 161 | # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata) |
183 | if spreadsheet_metadata_tf.get('properties'): | 162 | if spreadsheet_metadata_tf.get('properties'): |
184 | spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) | 163 | spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) |
185 | spreadsheet_metadata_tf.pop('sheets', None) | 164 | spreadsheet_metadata_tf.pop('sheets', None) |
@@ -189,6 +168,7 @@ def transform_spreadsheet_metadata(spreadsheet_metadata): | |||
189 | return spreadsheet_metadata_arr | 168 | return spreadsheet_metadata_arr |
190 | 169 | ||
191 | 170 | ||
171 | # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata | ||
192 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): | 172 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): |
193 | # Convert to properties to dict | 173 | # Convert to properties to dict |
194 | sheet_metadata = sheet.get('properties') | 174 | sheet_metadata = sheet.get('properties') |
@@ -202,6 +182,107 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns): | |||
202 | return sheet_metadata_tf | 182 | return sheet_metadata_tf |
203 | 183 | ||
204 | 184 | ||
185 | # Convert Excel Date Serial Number (excel_date_sn) to datetime string | ||
186 | # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes) | ||
187 | def excel_to_dttm_str(excel_date_sn, timezone_str=None): | ||
188 | if not timezone_str: | ||
189 | timezone_str = 'UTC' | ||
190 | tz = pytz.timezone(timezone_str) | ||
191 | sec_per_day = 86400 | ||
192 | excel_epoch = 25569 # 1970-01-01T00:00:00Z | ||
193 | epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) | ||
194 | epoch_dttm = datetime(1970, 1, 1) | ||
195 | excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) | ||
196 | utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc) | ||
197 | utc_dttm_str = strftime(utc_dttm) | ||
198 | return utc_dttm_str | ||
199 | |||
200 | |||
201 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times | ||
202 | # Convert from array of values to JSON with column names as keys | ||
203 | def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): | ||
204 | sheet_data_tf = [] | ||
205 | is_last_row = False | ||
206 | row_num = from_row | ||
207 | # Create sorted list of columns based on columnIndex | ||
208 | cols = sorted(columns, key = lambda i: i['columnIndex']) | ||
209 | |||
210 | # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) | ||
211 | for row in sheet_data_rows: | ||
212 | # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1 | ||
213 | if row == []: | ||
214 | is_last_row = True | ||
215 | return sheet_data_tf, row_num - 1, is_last_row | ||
216 | sheet_data_row_tf = {} | ||
217 | # Add spreadsheet_id, sheet_id, and row | ||
218 | sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id | ||
219 | sheet_data_row_tf['__sdc_sheet_id'] = sheet_id | ||
220 | sheet_data_row_tf['__sdc_row'] = row_num | ||
221 | col_num = 1 | ||
222 | for value in row: | ||
223 | # Select column metadata based on column index | ||
224 | col = cols[col_num - 1] | ||
225 | col_skipped = col.get('columnSkipped') | ||
226 | if not col_skipped: | ||
227 | col_name = col.get('columnName') | ||
228 | col_type = col.get('columnType') | ||
229 | # Convert dates/times from Lotus Notes Serial Numbers | ||
230 | if col_type == 'numberType.DATE_TIME': | ||
231 | if isinstance(value, int) or isinstance(value, float): | ||
232 | col_val = excel_to_dttm_str(value) | ||
233 | else: | ||
234 | col_val = str(value) | ||
235 | elif col_type == 'numberType.DATE': | ||
236 | if isinstance(value, int) or isinstance(value, float): | ||
237 | col_val = excel_to_dttm_str(value)[:10] | ||
238 | else: | ||
239 | col_val = str(value) | ||
240 | elif col_type == 'numberType.TIME': | ||
241 | if isinstance(value, int) or isinstance(value, float): | ||
242 | try: | ||
243 | total_secs = value * 86400 # seconds in day | ||
244 | col_val = str(timedelta(seconds=total_secs)) | ||
245 | except ValueError: | ||
246 | col_val = str(value) | ||
247 | else: | ||
248 | col_val = str(value) | ||
249 | elif col_type == 'numberType': | ||
250 | if isinstance(value, int): | ||
251 | col_val = int(value) | ||
252 | else: | ||
253 | try: | ||
254 | col_val = float(value) | ||
255 | except ValueError: | ||
256 | col_val = str(value) | ||
257 | elif col_type == 'stringValue': | ||
258 | col_val = str(value) | ||
259 | elif col_type == 'boolValue': | ||
260 | if isinstance(value, bool): | ||
261 | col_val = value | ||
262 | elif isinstance(value, str): | ||
263 | if value.lower() in ('true', 't', 'yes', 'y'): | ||
264 | col_val = True | ||
265 | elif value.lower() in ('false', 'f', 'no', 'n'): | ||
266 | col_val = False | ||
267 | else: | ||
268 | col_val = str(value) | ||
269 | elif isinstance(value, int): | ||
270 | if value == 1 or value == -1: | ||
271 | col_val = True | ||
272 | elif value == 0: | ||
273 | col_val = False | ||
274 | else: | ||
275 | col_val = str(value) | ||
276 | |||
277 | else: | ||
278 | col_val = value | ||
279 | sheet_data_row_tf[col_name] = col_val | ||
280 | col_num = col_num + 1 | ||
281 | sheet_data_tf.append(sheet_data_row_tf) | ||
282 | row_num = row_num + 1 | ||
283 | return sheet_data_tf, row_num, is_last_row | ||
284 | |||
285 | |||
205 | def sync(client, config, catalog, state): | 286 | def sync(client, config, catalog, state): |
206 | start_date = config.get('start_date') | 287 | start_date = config.get('start_date') |
207 | spreadsheet_id = config.get('spreadsheet_id') | 288 | spreadsheet_id = config.get('spreadsheet_id') |
@@ -218,63 +299,164 @@ def sync(client, config, catalog, state): | |||
218 | if not selected_streams: | 299 | if not selected_streams: |
219 | return | 300 | return |
220 | 301 | ||
221 | # Get file_metadata | 302 | # FILE_METADATA |
222 | file_metadata = {} | 303 | file_metadata = {} |
223 | file_metadata_config = STREAMS.get('file_metadata') | 304 | stream_name = 'file_metadata' |
224 | file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id) | 305 | file_metadata_config = STREAMS.get(stream_name) |
306 | |||
307 | # GET file_metadata | ||
308 | LOGGER.info('GET file_meatadata') | ||
309 | file_metadata, time_extracted = get_data(stream_name=stream_name, | ||
310 | endpoint_config=file_metadata_config, | ||
311 | client=client, | ||
312 | spreadsheet_id=spreadsheet_id) | ||
313 | # Transform file_metadata | ||
314 | LOGGER.info('Transform file_meatadata') | ||
225 | file_metadata_tf = transform_file_metadata(file_metadata) | 315 | file_metadata_tf = transform_file_metadata(file_metadata) |
226 | # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) | 316 | # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) |
227 | last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date)) | 317 | |
318 | # Check if file has changed, if not break (return to __init__) | ||
319 | last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date)) | ||
228 | this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) | 320 | this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) |
229 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) | 321 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) |
230 | if this_datetime <= last_datetime: | 322 | if this_datetime <= last_datetime: |
231 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') | 323 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') |
232 | return 0 | 324 | return 0 |
233 | 325 | else: | |
234 | # Get spreadsheet_metadata | 326 | # Sync file_metadata if selected |
327 | sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf) | ||
328 | write_bookmark(state, stream_name, strftime(this_datetime)) | ||
329 | |||
330 | # SPREADSHEET_METADATA | ||
235 | spreadsheet_metadata = {} | 331 | spreadsheet_metadata = {} |
236 | spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata') | 332 | stream_name = 'spreadsheet_metadata' |
237 | spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id) | 333 | spreadsheet_metadata_config = STREAMS.get(stream_name) |
334 | |||
335 | # GET spreadsheet_metadata | ||
336 | LOGGER.info('GET spreadsheet_meatadata') | ||
337 | spreadsheet_metadata, ss_time_extracted = get_data( | ||
338 | stream_name=stream_name, | ||
339 | endpoint_config=spreadsheet_metadata_config, | ||
340 | client=client, | ||
341 | spreadsheet_id=spreadsheet_id) | ||
342 | |||
343 | # Transform spreadsheet_metadata | ||
344 | LOGGER.info('Transform spreadsheet_meatadata') | ||
238 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) | 345 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) |
239 | # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf)) | ||
240 | 346 | ||
241 | # Get sheet_metadata | 347 | # Sync spreadsheet_metadata if selected |
348 | sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf) | ||
349 | |||
350 | # SHEET_METADATA and SHEET_DATA | ||
242 | sheets = spreadsheet_metadata.get('sheets') | 351 | sheets = spreadsheet_metadata.get('sheets') |
243 | sheet_metadata = [] | 352 | sheet_metadata = [] |
244 | sheets_loaded = [] | 353 | sheets_loaded = [] |
245 | sheets_loaded_config = STREAMS['sheets_loaded'] | 354 | sheets_loaded_config = STREAMS['sheets_loaded'] |
246 | if sheets: | 355 | if sheets: |
356 | # Loop thru sheets (worksheet tabs) in spreadsheet | ||
247 | for sheet in sheets: | 357 | for sheet in sheets: |
248 | sheet_title = sheet.get('properties', {}).get('title') | 358 | sheet_title = sheet.get('properties', {}).get('title') |
359 | sheet_id = sheet.get('properties', {}).get('sheetId') | ||
360 | |||
361 | # GET sheet_metadata and columns | ||
249 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 362 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
363 | |||
364 | # Transform sheet_metadata | ||
250 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) | 365 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) |
251 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) | 366 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) |
252 | sheet_metadata.append(sheet_metadata_tf) | 367 | sheet_metadata.append(sheet_metadata_tf) |
253 | 368 | ||
254 | # Determine range of rows and columns for "paging" through batch rows of data | 369 | # SHEET_DATA |
255 | sheet_last_col_index = 1 | 370 | # Should this worksheet tab be synced? |
256 | sheet_last_col_letter = 'A' | 371 | if sheet_title in selected_streams: |
257 | for col in columns: | 372 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) |
258 | col_index = col.get('columnIndex') | 373 | update_currently_syncing(state, sheet_title) |
259 | col_letter = col.get('columnLetter') | 374 | write_schema(catalog, sheet_title) |
260 | if col_index > sheet_last_col_index: | 375 | |
261 | sheet_last_col_index = col_index | 376 | # Determine max range of columns and rows for "paging" through the data |
262 | sheet_last_col_letter = col_letter | 377 | sheet_last_col_index = 1 |
263 | sheet_max_row = sheet.get('gridProperties', {}).get('rowCount') | 378 | sheet_last_col_letter = 'A' |
264 | is_empty_row = False | 379 | for col in columns: |
265 | batch_rows = 200 | 380 | col_index = col.get('columnIndex') |
266 | from_row = 2 | 381 | col_letter = col.get('columnLetter') |
267 | if sheet_max_row < batch_rows: | 382 | if col_index > sheet_last_col_index: |
268 | to_row = sheet_max_row | 383 | sheet_last_col_index = col_index |
269 | else: | 384 | sheet_last_col_letter = col_letter |
270 | to_row = batch_rows | 385 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') |
271 | 386 | ||
272 | while not is_empty_row and to_row <= sheet_max_row: | 387 | # Initialize paging for 1st batch |
273 | range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row) | 388 | is_last_row = False |
274 | 389 | batch_rows = 200 | |
275 | sheet_data = get_data( | 390 | from_row = 2 |
276 | stream_name=sheet_title, | 391 | if sheet_max_row < batch_rows: |
277 | endpoint_config=sheets_loaded_config, | 392 | to_row = sheet_max_row |
278 | client=client, | 393 | else: |
279 | spreadsheet_id=spreadsheet_id, | 394 | to_row = batch_rows |
280 | range_rows=range_rows) | 395 | |
396 | # Loop thru batches (each having 200 rows of data) | ||
397 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: | ||
398 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) | ||
399 | |||
400 | # GET sheet_data for a worksheet tab | ||
401 | sheet_data, time_extracted = get_data( | ||
402 | stream_name=sheet_title, | ||
403 | endpoint_config=sheets_loaded_config, | ||
404 | client=client, | ||
405 | spreadsheet_id=spreadsheet_id, | ||
406 | range_rows=range_rows) | ||
407 | # Data is returned as a list of arrays, an array of values for each row | ||
408 | sheet_data_rows = sheet_data.get('values') | ||
409 | |||
410 | # Transform batch of rows to JSON with keys for each column | ||
411 | sheet_data_tf, row_num, is_last_row = transform_sheet_data( | ||
412 | spreadsheet_id=spreadsheet_id, | ||
413 | sheet_id=sheet_id, | ||
414 | from_row=from_row, | ||
415 | columns=columns, | ||
416 | sheet_data_rows=sheet_data_rows) | ||
417 | if row_num < to_row: | ||
418 | is_last_row = True | ||
419 | |||
420 | # Process records, send batch of records to target | ||
421 | record_count = process_records( | ||
422 | catalog=catalog, | ||
423 | stream_name=sheet_title, | ||
424 | records=sheet_data_tf, | ||
425 | time_extracted=ss_time_extracted) | ||
426 | |||
427 | # Update paging from/to_row for next batch | ||
428 | from_row = to_row + 1 | ||
429 | if to_row + batch_rows > sheet_max_row: | ||
430 | to_row = sheet_max_row | ||
431 | else: | ||
432 | to_row = to_row + batch_rows | ||
433 | |||
434 | # SHEETS_LOADED | ||
435 | # Add sheet to sheets_loaded | ||
436 | sheet_loaded = {} | ||
437 | sheet_loaded['spreadsheetId'] = spreadsheet_id | ||
438 | sheet_loaded['sheetId'] = sheet_id | ||
439 | sheet_loaded['title'] = sheet_title | ||
440 | sheet_loaded['loadDate'] = strftime(utils.now()) | ||
441 | sheet_loaded['lastRowNumber'] = row_num | ||
442 | sheets_loaded.append(sheet_loaded) | ||
443 | |||
444 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. | ||
445 | # This forces hard deletes on the data downstream if fewer records are sent for a sheet. | ||
446 | # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167 | ||
447 | activate_version_message = singer.ActivateVersionMessage( | ||
448 | stream=sheet_title, | ||
449 | version=int(time.time() * 1000)) | ||
450 | singer.write_message(activate_version_message) | ||
451 | |||
452 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
453 | sheet_title, row_num - 1)) | ||
454 | update_currently_syncing(state, None) | ||
455 | |||
456 | stream_name = 'sheet_metadata' | ||
457 | # Sync sheet_metadata if selected | ||
458 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | ||
459 | |||
460 | stream_name = 'sheets_loaded' | ||
461 | # Sync sheet_metadata if selected | ||
462 | sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) | ||