]>
Commit | Line | Data |
---|---|---|
89643ba6 JH |
1 | import time |
2 | import math | |
89643ba6 | 3 | import json |
5fc2ead5 JH |
4 | import re |
5 | import urllib.parse | |
da690bda | 6 | from datetime import datetime, timedelta |
99424fee JH |
7 | import pytz |
8 | import singer | |
89643ba6 JH |
9 | from singer import metrics, metadata, Transformer, utils |
10 | from singer.utils import strptime_to_utc, strftime | |
43a24cba | 11 | from singer.messages import RecordMessage |
89643ba6 JH |
12 | from tap_google_sheets.streams import STREAMS |
13 | from tap_google_sheets.schema import get_sheet_metadata | |
14 | ||
15 | LOGGER = singer.get_logger() | |
16 | ||
17 | ||
18 | def write_schema(catalog, stream_name): | |
19 | stream = catalog.get_stream(stream_name) | |
20 | schema = stream.schema.to_dict() | |
21 | try: | |
22 | singer.write_schema(stream_name, schema, stream.key_properties) | |
43a24cba | 23 | LOGGER.info('Writing schema for: {}'.format(stream_name)) |
89643ba6 JH |
24 | except OSError as err: |
25 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) | |
26 | raise err | |
27 | ||
28 | ||
43a24cba | 29 | def write_record(stream_name, record, time_extracted, version=None): |
89643ba6 | 30 | try: |
43a24cba JH |
31 | if version: |
32 | singer.messages.write_message( | |
33 | RecordMessage( | |
34 | stream=stream_name, | |
35 | record=record, | |
36 | version=version, | |
37 | time_extracted=time_extracted)) | |
38 | else: | |
39 | singer.messages.write_record( | |
40 | stream_name=stream_name, | |
41 | record=record, | |
42 | time_extracted=time_extracted) | |
89643ba6 JH |
43 | except OSError as err: |
44 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) | |
45 | LOGGER.info('record: {}'.format(record)) | |
46 | raise err | |
47 | ||
48 | ||
49 | def get_bookmark(state, stream, default): | |
50 | if (state is None) or ('bookmarks' not in state): | |
51 | return default | |
52 | return ( | |
53 | state | |
54 | .get('bookmarks', {}) | |
55 | .get(stream, default) | |
56 | ) | |
57 | ||
58 | ||
59 | def write_bookmark(state, stream, value): | |
60 | if 'bookmarks' not in state: | |
61 | state['bookmarks'] = {} | |
62 | state['bookmarks'][stream] = value | |
63 | LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value)) | |
64 | singer.write_state(state) | |
65 | ||
66 | ||
da690bda JH |
67 | # Transform/validate batch of records w/ schema and sent to target |
68 | def process_records(catalog, | |
89643ba6 JH |
69 | stream_name, |
70 | records, | |
43a24cba JH |
71 | time_extracted, |
72 | version=None): | |
89643ba6 JH |
73 | stream = catalog.get_stream(stream_name) |
74 | schema = stream.schema.to_dict() | |
75 | stream_metadata = metadata.to_map(stream.metadata) | |
89643ba6 JH |
76 | with metrics.record_counter(stream_name) as counter: |
77 | for record in records: | |
89643ba6 JH |
78 | # Transform record for Singer.io |
79 | with Transformer() as transformer: | |
5fc2ead5 JH |
80 | try: |
81 | transformed_record = transformer.transform( | |
82 | record, | |
83 | schema, | |
84 | stream_metadata) | |
85 | except Exception as err: | |
86 | LOGGER.error('{}'.format(err)) | |
87 | raise RuntimeError(err) | |
43a24cba JH |
88 | write_record( |
89 | stream_name=stream_name, | |
90 | record=transformed_record, | |
91 | time_extracted=time_extracted, | |
92 | version=version) | |
da690bda JH |
93 | counter.increment() |
94 | return counter.value | |
95 | ||
96 | ||
99424fee | 97 | def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None): |
da690bda JH |
98 | # Should sheets_loaded be synced? |
99 | if stream_name in selected_streams: | |
100 | LOGGER.info('STARTED Syncing {}'.format(stream_name)) | |
101 | update_currently_syncing(state, stream_name) | |
99424fee JH |
102 | selected_fields = get_selected_fields(catalog, stream_name) |
103 | LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields)) | |
da690bda | 104 | write_schema(catalog, stream_name) |
99424fee JH |
105 | if not time_extracted: |
106 | time_extracted = utils.now() | |
da690bda JH |
107 | record_count = process_records( |
108 | catalog=catalog, | |
109 | stream_name=stream_name, | |
110 | records=records, | |
99424fee | 111 | time_extracted=time_extracted) |
da690bda JH |
112 | LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) |
113 | update_currently_syncing(state, None) | |
89643ba6 JH |
114 | |
115 | ||
116 | # Currently syncing sets the stream currently being delivered in the state. | |
117 | # If the integration is interrupted, this state property is used to identify | |
118 | # the starting point to continue from. | |
119 | # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46 | |
120 | def update_currently_syncing(state, stream_name): | |
121 | if (stream_name is None) and ('currently_syncing' in state): | |
122 | del state['currently_syncing'] | |
123 | else: | |
124 | singer.set_currently_syncing(state, stream_name) | |
125 | singer.write_state(state) | |
126 | ||
127 | ||
128 | # List selected fields from stream catalog | |
129 | def get_selected_fields(catalog, stream_name): | |
130 | stream = catalog.get_stream(stream_name) | |
131 | mdata = metadata.to_map(stream.metadata) | |
132 | mdata_list = singer.metadata.to_list(mdata) | |
133 | selected_fields = [] | |
134 | for entry in mdata_list: | |
99424fee | 135 | field = None |
89643ba6 | 136 | try: |
99424fee | 137 | field = entry['breadcrumb'][1] |
89643ba6 JH |
138 | if entry.get('metadata', {}).get('selected', False): |
139 | selected_fields.append(field) | |
140 | except IndexError: | |
141 | pass | |
142 | return selected_fields | |
143 | ||
144 | ||
145 | def get_data(stream_name, | |
146 | endpoint_config, | |
147 | client, | |
148 | spreadsheet_id, | |
149 | range_rows=None): | |
150 | if not range_rows: | |
151 | range_rows = '' | |
5890b89c | 152 | # Replace {placeholder} variables in path |
5fc2ead5 JH |
153 | # Encode stream_name: fixes issue w/ special characters in sheet name |
154 | stream_name_escaped = re.escape(stream_name) | |
155 | stream_name_encoded = urllib.parse.quote_plus(stream_name) | |
89643ba6 | 156 | path = endpoint_config.get('path', stream_name).replace( |
5fc2ead5 | 157 | '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace( |
89643ba6 JH |
158 | '{range_rows}', range_rows) |
159 | params = endpoint_config.get('params', {}) | |
160 | api = endpoint_config.get('api', 'sheets') | |
5890b89c JH |
161 | # Add in querystring parameters and replace {placeholder} variables |
162 | # querystring function ensures parameters are added but not encoded causing API errors | |
89643ba6 | 163 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( |
5fc2ead5 JH |
164 | '{sheet_title}', stream_name_encoded) |
165 | LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring)) | |
89643ba6 | 166 | data = {} |
da690bda | 167 | time_extracted = utils.now() |
89643ba6 JH |
168 | data = client.get( |
169 | path=path, | |
170 | api=api, | |
171 | params=querystring, | |
5fc2ead5 | 172 | endpoint=stream_name_escaped) |
da690bda | 173 | return data, time_extracted |
89643ba6 JH |
174 | |
175 | ||
da690bda | 176 | # Tranform file_metadata: remove nodes from lastModifyingUser, format as array |
89643ba6 JH |
177 | def transform_file_metadata(file_metadata): |
178 | # Convert to dict | |
179 | file_metadata_tf = json.loads(json.dumps(file_metadata)) | |
180 | # Remove keys | |
181 | if file_metadata_tf.get('lastModifyingUser'): | |
182 | file_metadata_tf['lastModifyingUser'].pop('photoLink', None) | |
183 | file_metadata_tf['lastModifyingUser'].pop('me', None) | |
184 | file_metadata_tf['lastModifyingUser'].pop('permissionId', None) | |
185 | # Add record to an array of 1 | |
186 | file_metadata_arr = [] | |
187 | file_metadata_arr.append(file_metadata_tf) | |
188 | return file_metadata_arr | |
189 | ||
190 | ||
da690bda | 191 | # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array |
89643ba6 JH |
192 | def transform_spreadsheet_metadata(spreadsheet_metadata): |
193 | # Convert to dict | |
194 | spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) | |
da690bda | 195 | # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata) |
89643ba6 JH |
196 | if spreadsheet_metadata_tf.get('properties'): |
197 | spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) | |
198 | spreadsheet_metadata_tf.pop('sheets', None) | |
199 | # Add record to an array of 1 | |
200 | spreadsheet_metadata_arr = [] | |
201 | spreadsheet_metadata_arr.append(spreadsheet_metadata_tf) | |
202 | return spreadsheet_metadata_arr | |
203 | ||
204 | ||
da690bda | 205 | # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata |
89643ba6 JH |
206 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): |
207 | # Convert to properties to dict | |
208 | sheet_metadata = sheet.get('properties') | |
99424fee | 209 | sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) |
89643ba6 JH |
210 | sheet_id = sheet_metadata_tf.get('sheetId') |
211 | sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( | |
212 | spreadsheet_id, sheet_id) | |
213 | sheet_metadata_tf['spreadsheetId'] = spreadsheet_id | |
214 | sheet_metadata_tf['sheetUrl'] = sheet_url | |
215 | sheet_metadata_tf['columns'] = columns | |
216 | return sheet_metadata_tf | |
217 | ||
218 | ||
da690bda JH |
219 | # Convert Excel Date Serial Number (excel_date_sn) to datetime string |
220 | # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes) | |
221 | def excel_to_dttm_str(excel_date_sn, timezone_str=None): | |
222 | if not timezone_str: | |
223 | timezone_str = 'UTC' | |
99424fee | 224 | tzn = pytz.timezone(timezone_str) |
da690bda | 225 | sec_per_day = 86400 |
5890b89c | 226 | excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date |
da690bda JH |
227 | epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) |
228 | epoch_dttm = datetime(1970, 1, 1) | |
229 | excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) | |
99424fee | 230 | utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc) |
da690bda JH |
231 | utc_dttm_str = strftime(utc_dttm) |
232 | return utc_dttm_str | |
233 | ||
234 | ||
235 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times | |
236 | # Convert from array of values to JSON with column names as keys | |
43a24cba | 237 | def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows): |
da690bda | 238 | sheet_data_tf = [] |
da690bda JH |
239 | row_num = from_row |
240 | # Create sorted list of columns based on columnIndex | |
99424fee | 241 | cols = sorted(columns, key=lambda i: i['columnIndex']) |
da690bda JH |
242 | |
243 | # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) | |
244 | for row in sheet_data_rows: | |
5890b89c | 245 | # If empty row, SKIP |
da690bda | 246 | if row == []: |
5890b89c JH |
247 | LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num)) |
248 | else: | |
249 | sheet_data_row_tf = {} | |
250 | # Add spreadsheet_id, sheet_id, and row | |
251 | sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id | |
252 | sheet_data_row_tf['__sdc_sheet_id'] = sheet_id | |
253 | sheet_data_row_tf['__sdc_row'] = row_num | |
254 | col_num = 1 | |
255 | for value in row: | |
256 | # Select column metadata based on column index | |
257 | col = cols[col_num - 1] | |
258 | col_skipped = col.get('columnSkipped') | |
259 | if not col_skipped: | |
43a24cba | 260 | # Get column metadata |
5890b89c JH |
261 | col_name = col.get('columnName') |
262 | col_type = col.get('columnType') | |
43a24cba JH |
263 | col_letter = col.get('columnLetter') |
264 | ||
265 | # NULL values | |
266 | if value is None or value == '': | |
267 | col_val = None | |
268 | ||
5890b89c JH |
269 | # Convert dates/times from Lotus Notes Serial Numbers |
270 | # DATE-TIME | |
43a24cba | 271 | elif col_type == 'numberType.DATE_TIME': |
5890b89c JH |
272 | if isinstance(value, (int, float)): |
273 | col_val = excel_to_dttm_str(value) | |
274 | else: | |
da690bda | 275 | col_val = str(value) |
43a24cba JH |
276 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
277 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
5890b89c JH |
278 | # DATE |
279 | elif col_type == 'numberType.DATE': | |
280 | if isinstance(value, (int, float)): | |
281 | col_val = excel_to_dttm_str(value)[:10] | |
282 | else: | |
da690bda | 283 | col_val = str(value) |
43a24cba JH |
284 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
285 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
5890b89c JH |
286 | # TIME ONLY (NO DATE) |
287 | elif col_type == 'numberType.TIME': | |
288 | if isinstance(value, (int, float)): | |
289 | try: | |
290 | total_secs = value * 86400 # seconds in day | |
291 | # Create string formatted like HH:MM:SS | |
292 | col_val = str(timedelta(seconds=total_secs)) | |
293 | except ValueError: | |
294 | col_val = str(value) | |
43a24cba JH |
295 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
296 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
da690bda JH |
297 | else: |
298 | col_val = str(value) | |
5890b89c JH |
299 | # NUMBER (INTEGER AND FLOAT) |
300 | elif col_type == 'numberType': | |
301 | if isinstance(value, int): | |
302 | col_val = int(value) | |
303 | elif isinstance(value, float): | |
304 | # Determine float decimal digits | |
305 | decimal_digits = str(value)[::-1].find('.') | |
306 | if decimal_digits > 15: | |
307 | try: | |
308 | # ROUND to multipleOf: 1e-15 | |
309 | col_val = float(round(value, 15)) | |
310 | except ValueError: | |
311 | col_val = str(value) | |
43a24cba JH |
312 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
313 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
5890b89c JH |
314 | else: # decimal_digits <= 15, no rounding |
315 | try: | |
316 | col_val = float(value) | |
317 | except ValueError: | |
318 | col_val = str(value) | |
43a24cba JH |
319 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
320 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
da690bda JH |
321 | else: |
322 | col_val = str(value) | |
43a24cba JH |
323 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
324 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
5890b89c JH |
325 | # STRING |
326 | elif col_type == 'stringValue': | |
327 | col_val = str(value) | |
328 | # BOOLEAN | |
329 | elif col_type == 'boolValue': | |
330 | if isinstance(value, bool): | |
331 | col_val = value | |
332 | elif isinstance(value, str): | |
333 | if value.lower() in ('true', 't', 'yes', 'y'): | |
334 | col_val = True | |
335 | elif value.lower() in ('false', 'f', 'no', 'n'): | |
336 | col_val = False | |
337 | else: | |
338 | col_val = str(value) | |
43a24cba JH |
339 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
340 | sheet_title, col_name, col_letter, row, col_type, value)) | |
5890b89c JH |
341 | elif isinstance(value, int): |
342 | if value in (1, -1): | |
343 | col_val = True | |
344 | elif value == 0: | |
345 | col_val = False | |
346 | else: | |
347 | col_val = str(value) | |
43a24cba JH |
348 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
349 | sheet_title, col_name, col_letter, row, col_type, value)) | |
5890b89c JH |
350 | # OTHER: Convert everything else to a string |
351 | else: | |
352 | col_val = str(value) | |
43a24cba JH |
353 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
354 | sheet_title, col_name, col_letter, row, col_type, value)) | |
5890b89c JH |
355 | sheet_data_row_tf[col_name] = col_val |
356 | col_num = col_num + 1 | |
357 | # APPEND non-empty row | |
358 | sheet_data_tf.append(sheet_data_row_tf) | |
da690bda | 359 | row_num = row_num + 1 |
5890b89c | 360 | return sheet_data_tf, row_num |
da690bda JH |
361 | |
362 | ||
89643ba6 JH |
363 | def sync(client, config, catalog, state): |
364 | start_date = config.get('start_date') | |
365 | spreadsheet_id = config.get('spreadsheet_id') | |
366 | ||
367 | # Get selected_streams from catalog, based on state last_stream | |
368 | # last_stream = Previous currently synced stream, if the load was interrupted | |
369 | last_stream = singer.get_currently_syncing(state) | |
370 | LOGGER.info('last/currently syncing stream: {}'.format(last_stream)) | |
371 | selected_streams = [] | |
372 | for stream in catalog.get_selected_streams(state): | |
373 | selected_streams.append(stream.stream) | |
374 | LOGGER.info('selected_streams: {}'.format(selected_streams)) | |
375 | ||
376 | if not selected_streams: | |
377 | return | |
378 | ||
da690bda | 379 | # FILE_METADATA |
89643ba6 | 380 | file_metadata = {} |
da690bda JH |
381 | stream_name = 'file_metadata' |
382 | file_metadata_config = STREAMS.get(stream_name) | |
383 | ||
384 | # GET file_metadata | |
385 | LOGGER.info('GET file_meatadata') | |
386 | file_metadata, time_extracted = get_data(stream_name=stream_name, | |
387 | endpoint_config=file_metadata_config, | |
388 | client=client, | |
389 | spreadsheet_id=spreadsheet_id) | |
390 | # Transform file_metadata | |
391 | LOGGER.info('Transform file_meatadata') | |
89643ba6 JH |
392 | file_metadata_tf = transform_file_metadata(file_metadata) |
393 | # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) | |
da690bda JH |
394 | |
395 | # Check if file has changed, if not break (return to __init__) | |
396 | last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date)) | |
89643ba6 JH |
397 | this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) |
398 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) | |
399 | if this_datetime <= last_datetime: | |
400 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') | |
f012a137 SC |
401 | # Update file_metadata bookmark |
402 | write_bookmark(state, 'file_metadata', strftime(this_datetime)) | |
99424fee JH |
403 | return |
404 | # Sync file_metadata if selected | |
405 | sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted) | |
5890b89c | 406 | # file_metadata bookmark is updated at the end of sync |
da690bda JH |
407 | |
408 | # SPREADSHEET_METADATA | |
89643ba6 | 409 | spreadsheet_metadata = {} |
da690bda JH |
410 | stream_name = 'spreadsheet_metadata' |
411 | spreadsheet_metadata_config = STREAMS.get(stream_name) | |
412 | ||
413 | # GET spreadsheet_metadata | |
414 | LOGGER.info('GET spreadsheet_meatadata') | |
99424fee | 415 | spreadsheet_metadata, ss_time_extracted = get_data( |
da690bda JH |
416 | stream_name=stream_name, |
417 | endpoint_config=spreadsheet_metadata_config, | |
418 | client=client, | |
419 | spreadsheet_id=spreadsheet_id) | |
420 | ||
421 | # Transform spreadsheet_metadata | |
422 | LOGGER.info('Transform spreadsheet_meatadata') | |
89643ba6 | 423 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) |
89643ba6 | 424 | |
da690bda | 425 | # Sync spreadsheet_metadata if selected |
99424fee JH |
426 | sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \ |
427 | ss_time_extracted) | |
da690bda JH |
428 | |
429 | # SHEET_METADATA and SHEET_DATA | |
89643ba6 JH |
430 | sheets = spreadsheet_metadata.get('sheets') |
431 | sheet_metadata = [] | |
432 | sheets_loaded = [] | |
433 | sheets_loaded_config = STREAMS['sheets_loaded'] | |
434 | if sheets: | |
da690bda | 435 | # Loop thru sheets (worksheet tabs) in spreadsheet |
89643ba6 JH |
436 | for sheet in sheets: |
437 | sheet_title = sheet.get('properties', {}).get('title') | |
da690bda JH |
438 | sheet_id = sheet.get('properties', {}).get('sheetId') |
439 | ||
440 | # GET sheet_metadata and columns | |
89643ba6 | 441 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
5890b89c | 442 | # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) |
da690bda | 443 | |
376f1145 JH |
444 | # SKIP empty sheets (where sheet_schema and columns are None) |
445 | if not sheet_schema or not columns: | |
446 | LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title)) | |
447 | else: | |
448 | # Transform sheet_metadata | |
449 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) | |
450 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) | |
451 | sheet_metadata.append(sheet_metadata_tf) | |
452 | ||
453 | # SHEET_DATA | |
454 | # Should this worksheet tab be synced? | |
455 | if sheet_title in selected_streams: | |
456 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) | |
457 | update_currently_syncing(state, sheet_title) | |
458 | selected_fields = get_selected_fields(catalog, sheet_title) | |
459 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | |
460 | write_schema(catalog, sheet_title) | |
461 | ||
462 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) | |
463 | # everytime after each sheet sync is complete. | |
464 | # This forces hard deletes on the data downstream if fewer records are sent. | |
465 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | |
466 | last_integer = int(get_bookmark(state, sheet_title, 0)) | |
467 | activate_version = int(time.time() * 1000) | |
468 | activate_version_message = singer.ActivateVersionMessage( | |
469 | stream=sheet_title, | |
470 | version=activate_version) | |
471 | if last_integer == 0: | |
472 | # initial load, send activate_version before AND after data sync | |
473 | singer.write_message(activate_version_message) | |
474 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | |
475 | ||
476 | # Determine max range of columns and rows for "paging" through the data | |
477 | sheet_last_col_index = 1 | |
478 | sheet_last_col_letter = 'A' | |
479 | for col in columns: | |
480 | col_index = col.get('columnIndex') | |
481 | col_letter = col.get('columnLetter') | |
482 | if col_index > sheet_last_col_index: | |
483 | sheet_last_col_index = col_index | |
484 | sheet_last_col_letter = col_letter | |
485 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') | |
486 | ||
487 | # Initialize paging for 1st batch | |
488 | is_last_row = False | |
489 | batch_rows = 200 | |
490 | from_row = 2 | |
491 | if sheet_max_row < batch_rows: | |
da690bda JH |
492 | to_row = sheet_max_row |
493 | else: | |
376f1145 JH |
494 | to_row = batch_rows |
495 | ||
496 | # Loop thru batches (each having 200 rows of data) | |
497 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: | |
498 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) | |
499 | ||
500 | # GET sheet_data for a worksheet tab | |
501 | sheet_data, time_extracted = get_data( | |
502 | stream_name=sheet_title, | |
503 | endpoint_config=sheets_loaded_config, | |
504 | client=client, | |
505 | spreadsheet_id=spreadsheet_id, | |
506 | range_rows=range_rows) | |
507 | # Data is returned as a list of arrays, an array of values for each row | |
516c2082 | 508 | sheet_data_rows = sheet_data.get('values', []) |
376f1145 JH |
509 | |
510 | # Transform batch of rows to JSON with keys for each column | |
511 | sheet_data_tf, row_num = transform_sheet_data( | |
512 | spreadsheet_id=spreadsheet_id, | |
513 | sheet_id=sheet_id, | |
514 | sheet_title=sheet_title, | |
515 | from_row=from_row, | |
516 | columns=columns, | |
517 | sheet_data_rows=sheet_data_rows) | |
518 | if row_num < to_row: | |
519 | is_last_row = True | |
520 | ||
521 | # Process records, send batch of records to target | |
522 | record_count = process_records( | |
523 | catalog=catalog, | |
524 | stream_name=sheet_title, | |
525 | records=sheet_data_tf, | |
526 | time_extracted=ss_time_extracted, | |
527 | version=activate_version) | |
528 | LOGGER.info('Sheet: {}, records processed: {}'.format( | |
529 | sheet_title, record_count)) | |
530 | ||
531 | # Update paging from/to_row for next batch | |
532 | from_row = to_row + 1 | |
533 | if to_row + batch_rows > sheet_max_row: | |
534 | to_row = sheet_max_row | |
535 | else: | |
536 | to_row = to_row + batch_rows | |
537 | ||
538 | # End of Stream: Send Activate Version and update State | |
539 | singer.write_message(activate_version_message) | |
540 | write_bookmark(state, sheet_title, activate_version) | |
541 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | |
542 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | |
543 | sheet_title, row_num - 2)) # subtract 1 for header row | |
544 | update_currently_syncing(state, None) | |
545 | ||
546 | # SHEETS_LOADED | |
547 | # Add sheet to sheets_loaded | |
548 | sheet_loaded = {} | |
549 | sheet_loaded['spreadsheetId'] = spreadsheet_id | |
550 | sheet_loaded['sheetId'] = sheet_id | |
551 | sheet_loaded['title'] = sheet_title | |
552 | sheet_loaded['loadDate'] = strftime(utils.now()) | |
553 | sheet_loaded['lastRowNumber'] = row_num | |
554 | sheets_loaded.append(sheet_loaded) | |
da690bda | 555 | |
da690bda JH |
556 | stream_name = 'sheet_metadata' |
557 | # Sync sheet_metadata if selected | |
558 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | |
99424fee | 559 | |
da690bda JH |
560 | stream_name = 'sheets_loaded' |
561 | # Sync sheet_metadata if selected | |
562 | sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) | |
99424fee | 563 | |
5890b89c JH |
564 | # Update file_metadata bookmark |
565 | write_bookmark(state, 'file_metadata', strftime(this_datetime)) | |
566 | ||
99424fee | 567 | return |