]>
Commit | Line | Data |
---|---|---|
89643ba6 JH |
1 | import time |
2 | import math | |
89643ba6 | 3 | import json |
da690bda | 4 | from datetime import datetime, timedelta |
99424fee JH |
5 | import pytz |
6 | import singer | |
89643ba6 JH |
7 | from singer import metrics, metadata, Transformer, utils |
8 | from singer.utils import strptime_to_utc, strftime | |
43a24cba | 9 | from singer.messages import RecordMessage |
89643ba6 JH |
10 | from tap_google_sheets.streams import STREAMS |
11 | from tap_google_sheets.schema import get_sheet_metadata | |
12 | ||
13 | LOGGER = singer.get_logger() | |
14 | ||
15 | ||
16 | def write_schema(catalog, stream_name): | |
17 | stream = catalog.get_stream(stream_name) | |
18 | schema = stream.schema.to_dict() | |
19 | try: | |
20 | singer.write_schema(stream_name, schema, stream.key_properties) | |
43a24cba | 21 | LOGGER.info('Writing schema for: {}'.format(stream_name)) |
89643ba6 JH |
22 | except OSError as err: |
23 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) | |
24 | raise err | |
25 | ||
26 | ||
43a24cba | 27 | def write_record(stream_name, record, time_extracted, version=None): |
89643ba6 | 28 | try: |
43a24cba JH |
29 | if version: |
30 | singer.messages.write_message( | |
31 | RecordMessage( | |
32 | stream=stream_name, | |
33 | record=record, | |
34 | version=version, | |
35 | time_extracted=time_extracted)) | |
36 | else: | |
37 | singer.messages.write_record( | |
38 | stream_name=stream_name, | |
39 | record=record, | |
40 | time_extracted=time_extracted) | |
89643ba6 JH |
41 | except OSError as err: |
42 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) | |
43 | LOGGER.info('record: {}'.format(record)) | |
44 | raise err | |
45 | ||
46 | ||
47 | def get_bookmark(state, stream, default): | |
48 | if (state is None) or ('bookmarks' not in state): | |
49 | return default | |
50 | return ( | |
51 | state | |
52 | .get('bookmarks', {}) | |
53 | .get(stream, default) | |
54 | ) | |
55 | ||
56 | ||
57 | def write_bookmark(state, stream, value): | |
58 | if 'bookmarks' not in state: | |
59 | state['bookmarks'] = {} | |
60 | state['bookmarks'][stream] = value | |
61 | LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value)) | |
62 | singer.write_state(state) | |
63 | ||
64 | ||
da690bda JH |
65 | # Transform/validate batch of records w/ schema and sent to target |
66 | def process_records(catalog, | |
89643ba6 JH |
67 | stream_name, |
68 | records, | |
43a24cba JH |
69 | time_extracted, |
70 | version=None): | |
89643ba6 JH |
71 | stream = catalog.get_stream(stream_name) |
72 | schema = stream.schema.to_dict() | |
73 | stream_metadata = metadata.to_map(stream.metadata) | |
89643ba6 JH |
74 | with metrics.record_counter(stream_name) as counter: |
75 | for record in records: | |
89643ba6 JH |
76 | # Transform record for Singer.io |
77 | with Transformer() as transformer: | |
78 | transformed_record = transformer.transform( | |
79 | record, | |
80 | schema, | |
81 | stream_metadata) | |
43a24cba JH |
82 | write_record( |
83 | stream_name=stream_name, | |
84 | record=transformed_record, | |
85 | time_extracted=time_extracted, | |
86 | version=version) | |
da690bda JH |
87 | counter.increment() |
88 | return counter.value | |
89 | ||
90 | ||
99424fee | 91 | def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None): |
da690bda JH |
92 | # Should sheets_loaded be synced? |
93 | if stream_name in selected_streams: | |
94 | LOGGER.info('STARTED Syncing {}'.format(stream_name)) | |
95 | update_currently_syncing(state, stream_name) | |
99424fee JH |
96 | selected_fields = get_selected_fields(catalog, stream_name) |
97 | LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields)) | |
da690bda | 98 | write_schema(catalog, stream_name) |
99424fee JH |
99 | if not time_extracted: |
100 | time_extracted = utils.now() | |
da690bda JH |
101 | record_count = process_records( |
102 | catalog=catalog, | |
103 | stream_name=stream_name, | |
104 | records=records, | |
99424fee | 105 | time_extracted=time_extracted) |
da690bda JH |
106 | LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) |
107 | update_currently_syncing(state, None) | |
89643ba6 JH |
108 | |
109 | ||
110 | # Currently syncing sets the stream currently being delivered in the state. | |
111 | # If the integration is interrupted, this state property is used to identify | |
112 | # the starting point to continue from. | |
113 | # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46 | |
114 | def update_currently_syncing(state, stream_name): | |
115 | if (stream_name is None) and ('currently_syncing' in state): | |
116 | del state['currently_syncing'] | |
117 | else: | |
118 | singer.set_currently_syncing(state, stream_name) | |
119 | singer.write_state(state) | |
120 | ||
121 | ||
122 | # List selected fields from stream catalog | |
123 | def get_selected_fields(catalog, stream_name): | |
124 | stream = catalog.get_stream(stream_name) | |
125 | mdata = metadata.to_map(stream.metadata) | |
126 | mdata_list = singer.metadata.to_list(mdata) | |
127 | selected_fields = [] | |
128 | for entry in mdata_list: | |
99424fee | 129 | field = None |
89643ba6 | 130 | try: |
99424fee | 131 | field = entry['breadcrumb'][1] |
89643ba6 JH |
132 | if entry.get('metadata', {}).get('selected', False): |
133 | selected_fields.append(field) | |
134 | except IndexError: | |
135 | pass | |
136 | return selected_fields | |
137 | ||
138 | ||
139 | def get_data(stream_name, | |
140 | endpoint_config, | |
141 | client, | |
142 | spreadsheet_id, | |
143 | range_rows=None): | |
144 | if not range_rows: | |
145 | range_rows = '' | |
5890b89c | 146 | # Replace {placeholder} variables in path |
89643ba6 JH |
147 | path = endpoint_config.get('path', stream_name).replace( |
148 | '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace( | |
149 | '{range_rows}', range_rows) | |
150 | params = endpoint_config.get('params', {}) | |
151 | api = endpoint_config.get('api', 'sheets') | |
5890b89c JH |
152 | # Add in querystring parameters and replace {placeholder} variables |
153 | # querystring function ensures parameters are added but not encoded causing API errors | |
89643ba6 JH |
154 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( |
155 | '{sheet_title}', stream_name) | |
156 | data = {} | |
da690bda | 157 | time_extracted = utils.now() |
89643ba6 JH |
158 | data = client.get( |
159 | path=path, | |
160 | api=api, | |
161 | params=querystring, | |
162 | endpoint=stream_name) | |
da690bda | 163 | return data, time_extracted |
89643ba6 JH |
164 | |
165 | ||
da690bda | 166 | # Tranform file_metadata: remove nodes from lastModifyingUser, format as array |
89643ba6 JH |
167 | def transform_file_metadata(file_metadata): |
168 | # Convert to dict | |
169 | file_metadata_tf = json.loads(json.dumps(file_metadata)) | |
170 | # Remove keys | |
171 | if file_metadata_tf.get('lastModifyingUser'): | |
172 | file_metadata_tf['lastModifyingUser'].pop('photoLink', None) | |
173 | file_metadata_tf['lastModifyingUser'].pop('me', None) | |
174 | file_metadata_tf['lastModifyingUser'].pop('permissionId', None) | |
175 | # Add record to an array of 1 | |
176 | file_metadata_arr = [] | |
177 | file_metadata_arr.append(file_metadata_tf) | |
178 | return file_metadata_arr | |
179 | ||
180 | ||
da690bda | 181 | # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array |
89643ba6 JH |
182 | def transform_spreadsheet_metadata(spreadsheet_metadata): |
183 | # Convert to dict | |
184 | spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) | |
da690bda | 185 | # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata) |
89643ba6 JH |
186 | if spreadsheet_metadata_tf.get('properties'): |
187 | spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) | |
188 | spreadsheet_metadata_tf.pop('sheets', None) | |
189 | # Add record to an array of 1 | |
190 | spreadsheet_metadata_arr = [] | |
191 | spreadsheet_metadata_arr.append(spreadsheet_metadata_tf) | |
192 | return spreadsheet_metadata_arr | |
193 | ||
194 | ||
da690bda | 195 | # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata |
89643ba6 JH |
196 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): |
197 | # Convert to properties to dict | |
198 | sheet_metadata = sheet.get('properties') | |
99424fee | 199 | sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) |
89643ba6 JH |
200 | sheet_id = sheet_metadata_tf.get('sheetId') |
201 | sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( | |
202 | spreadsheet_id, sheet_id) | |
203 | sheet_metadata_tf['spreadsheetId'] = spreadsheet_id | |
204 | sheet_metadata_tf['sheetUrl'] = sheet_url | |
205 | sheet_metadata_tf['columns'] = columns | |
206 | return sheet_metadata_tf | |
207 | ||
208 | ||
da690bda JH |
209 | # Convert Excel Date Serial Number (excel_date_sn) to datetime string |
210 | # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes) | |
211 | def excel_to_dttm_str(excel_date_sn, timezone_str=None): | |
212 | if not timezone_str: | |
213 | timezone_str = 'UTC' | |
99424fee | 214 | tzn = pytz.timezone(timezone_str) |
da690bda | 215 | sec_per_day = 86400 |
5890b89c | 216 | excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date |
da690bda JH |
217 | epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) |
218 | epoch_dttm = datetime(1970, 1, 1) | |
219 | excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) | |
99424fee | 220 | utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc) |
da690bda JH |
221 | utc_dttm_str = strftime(utc_dttm) |
222 | return utc_dttm_str | |
223 | ||
224 | ||
225 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times | |
226 | # Convert from array of values to JSON with column names as keys | |
43a24cba | 227 | def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows): |
da690bda | 228 | sheet_data_tf = [] |
da690bda JH |
229 | row_num = from_row |
230 | # Create sorted list of columns based on columnIndex | |
99424fee | 231 | cols = sorted(columns, key=lambda i: i['columnIndex']) |
da690bda JH |
232 | |
233 | # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) | |
234 | for row in sheet_data_rows: | |
5890b89c | 235 | # If empty row, SKIP |
da690bda | 236 | if row == []: |
5890b89c JH |
237 | LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num)) |
238 | else: | |
239 | sheet_data_row_tf = {} | |
240 | # Add spreadsheet_id, sheet_id, and row | |
241 | sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id | |
242 | sheet_data_row_tf['__sdc_sheet_id'] = sheet_id | |
243 | sheet_data_row_tf['__sdc_row'] = row_num | |
244 | col_num = 1 | |
245 | for value in row: | |
246 | # Select column metadata based on column index | |
247 | col = cols[col_num - 1] | |
248 | col_skipped = col.get('columnSkipped') | |
249 | if not col_skipped: | |
43a24cba | 250 | # Get column metadata |
5890b89c JH |
251 | col_name = col.get('columnName') |
252 | col_type = col.get('columnType') | |
43a24cba JH |
253 | col_letter = col.get('columnLetter') |
254 | ||
255 | # NULL values | |
256 | if value is None or value == '': | |
257 | col_val = None | |
258 | ||
5890b89c JH |
259 | # Convert dates/times from Lotus Notes Serial Numbers |
260 | # DATE-TIME | |
43a24cba | 261 | elif col_type == 'numberType.DATE_TIME': |
5890b89c JH |
262 | if isinstance(value, (int, float)): |
263 | col_val = excel_to_dttm_str(value) | |
264 | else: | |
da690bda | 265 | col_val = str(value) |
43a24cba JH |
266 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
267 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
5890b89c JH |
268 | # DATE |
269 | elif col_type == 'numberType.DATE': | |
270 | if isinstance(value, (int, float)): | |
271 | col_val = excel_to_dttm_str(value)[:10] | |
272 | else: | |
da690bda | 273 | col_val = str(value) |
43a24cba JH |
274 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
275 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
5890b89c JH |
276 | # TIME ONLY (NO DATE) |
277 | elif col_type == 'numberType.TIME': | |
278 | if isinstance(value, (int, float)): | |
279 | try: | |
280 | total_secs = value * 86400 # seconds in day | |
281 | # Create string formatted like HH:MM:SS | |
282 | col_val = str(timedelta(seconds=total_secs)) | |
283 | except ValueError: | |
284 | col_val = str(value) | |
43a24cba JH |
285 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
286 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
da690bda JH |
287 | else: |
288 | col_val = str(value) | |
5890b89c JH |
289 | # NUMBER (INTEGER AND FLOAT) |
290 | elif col_type == 'numberType': | |
291 | if isinstance(value, int): | |
292 | col_val = int(value) | |
293 | elif isinstance(value, float): | |
294 | # Determine float decimal digits | |
295 | decimal_digits = str(value)[::-1].find('.') | |
296 | if decimal_digits > 15: | |
297 | try: | |
298 | # ROUND to multipleOf: 1e-15 | |
299 | col_val = float(round(value, 15)) | |
300 | except ValueError: | |
301 | col_val = str(value) | |
43a24cba JH |
302 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
303 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
5890b89c JH |
304 | else: # decimal_digits <= 15, no rounding |
305 | try: | |
306 | col_val = float(value) | |
307 | except ValueError: | |
308 | col_val = str(value) | |
43a24cba JH |
309 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
310 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
da690bda JH |
311 | else: |
312 | col_val = str(value) | |
43a24cba JH |
313 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
314 | sheet_title, col_name, col_letter, row_num, col_type, value)) | |
5890b89c JH |
315 | # STRING |
316 | elif col_type == 'stringValue': | |
317 | col_val = str(value) | |
318 | # BOOLEAN | |
319 | elif col_type == 'boolValue': | |
320 | if isinstance(value, bool): | |
321 | col_val = value | |
322 | elif isinstance(value, str): | |
323 | if value.lower() in ('true', 't', 'yes', 'y'): | |
324 | col_val = True | |
325 | elif value.lower() in ('false', 'f', 'no', 'n'): | |
326 | col_val = False | |
327 | else: | |
328 | col_val = str(value) | |
43a24cba JH |
329 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
330 | sheet_title, col_name, col_letter, row, col_type, value)) | |
5890b89c JH |
331 | elif isinstance(value, int): |
332 | if value in (1, -1): | |
333 | col_val = True | |
334 | elif value == 0: | |
335 | col_val = False | |
336 | else: | |
337 | col_val = str(value) | |
43a24cba JH |
338 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
339 | sheet_title, col_name, col_letter, row, col_type, value)) | |
5890b89c JH |
340 | # OTHER: Convert everything else to a string |
341 | else: | |
342 | col_val = str(value) | |
43a24cba JH |
343 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( |
344 | sheet_title, col_name, col_letter, row, col_type, value)) | |
5890b89c JH |
345 | sheet_data_row_tf[col_name] = col_val |
346 | col_num = col_num + 1 | |
347 | # APPEND non-empty row | |
348 | sheet_data_tf.append(sheet_data_row_tf) | |
da690bda | 349 | row_num = row_num + 1 |
5890b89c | 350 | return sheet_data_tf, row_num |
da690bda JH |
351 | |
352 | ||
89643ba6 JH |
353 | def sync(client, config, catalog, state): |
354 | start_date = config.get('start_date') | |
355 | spreadsheet_id = config.get('spreadsheet_id') | |
356 | ||
357 | # Get selected_streams from catalog, based on state last_stream | |
358 | # last_stream = Previous currently synced stream, if the load was interrupted | |
359 | last_stream = singer.get_currently_syncing(state) | |
360 | LOGGER.info('last/currently syncing stream: {}'.format(last_stream)) | |
361 | selected_streams = [] | |
362 | for stream in catalog.get_selected_streams(state): | |
363 | selected_streams.append(stream.stream) | |
364 | LOGGER.info('selected_streams: {}'.format(selected_streams)) | |
365 | ||
366 | if not selected_streams: | |
367 | return | |
368 | ||
da690bda | 369 | # FILE_METADATA |
89643ba6 | 370 | file_metadata = {} |
da690bda JH |
371 | stream_name = 'file_metadata' |
372 | file_metadata_config = STREAMS.get(stream_name) | |
373 | ||
374 | # GET file_metadata | |
375 | LOGGER.info('GET file_meatadata') | |
376 | file_metadata, time_extracted = get_data(stream_name=stream_name, | |
377 | endpoint_config=file_metadata_config, | |
378 | client=client, | |
379 | spreadsheet_id=spreadsheet_id) | |
380 | # Transform file_metadata | |
381 | LOGGER.info('Transform file_meatadata') | |
89643ba6 JH |
382 | file_metadata_tf = transform_file_metadata(file_metadata) |
383 | # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) | |
da690bda JH |
384 | |
385 | # Check if file has changed, if not break (return to __init__) | |
386 | last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date)) | |
89643ba6 JH |
387 | this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) |
388 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) | |
389 | if this_datetime <= last_datetime: | |
390 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') | |
99424fee JH |
391 | return |
392 | # Sync file_metadata if selected | |
393 | sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted) | |
5890b89c | 394 | # file_metadata bookmark is updated at the end of sync |
da690bda JH |
395 | |
396 | # SPREADSHEET_METADATA | |
89643ba6 | 397 | spreadsheet_metadata = {} |
da690bda JH |
398 | stream_name = 'spreadsheet_metadata' |
399 | spreadsheet_metadata_config = STREAMS.get(stream_name) | |
400 | ||
401 | # GET spreadsheet_metadata | |
402 | LOGGER.info('GET spreadsheet_meatadata') | |
99424fee | 403 | spreadsheet_metadata, ss_time_extracted = get_data( |
da690bda JH |
404 | stream_name=stream_name, |
405 | endpoint_config=spreadsheet_metadata_config, | |
406 | client=client, | |
407 | spreadsheet_id=spreadsheet_id) | |
408 | ||
409 | # Transform spreadsheet_metadata | |
410 | LOGGER.info('Transform spreadsheet_meatadata') | |
89643ba6 | 411 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) |
89643ba6 | 412 | |
da690bda | 413 | # Sync spreadsheet_metadata if selected |
99424fee JH |
414 | sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \ |
415 | ss_time_extracted) | |
da690bda JH |
416 | |
417 | # SHEET_METADATA and SHEET_DATA | |
89643ba6 JH |
418 | sheets = spreadsheet_metadata.get('sheets') |
419 | sheet_metadata = [] | |
420 | sheets_loaded = [] | |
421 | sheets_loaded_config = STREAMS['sheets_loaded'] | |
422 | if sheets: | |
da690bda | 423 | # Loop thru sheets (worksheet tabs) in spreadsheet |
89643ba6 JH |
424 | for sheet in sheets: |
425 | sheet_title = sheet.get('properties', {}).get('title') | |
da690bda JH |
426 | sheet_id = sheet.get('properties', {}).get('sheetId') |
427 | ||
428 | # GET sheet_metadata and columns | |
89643ba6 | 429 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
5890b89c | 430 | # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) |
da690bda JH |
431 | |
432 | # Transform sheet_metadata | |
89643ba6 JH |
433 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) |
434 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) | |
435 | sheet_metadata.append(sheet_metadata_tf) | |
436 | ||
da690bda JH |
437 | # SHEET_DATA |
438 | # Should this worksheet tab be synced? | |
439 | if sheet_title in selected_streams: | |
440 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) | |
441 | update_currently_syncing(state, sheet_title) | |
99424fee JH |
442 | selected_fields = get_selected_fields(catalog, sheet_title) |
443 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | |
da690bda JH |
444 | write_schema(catalog, sheet_title) |
445 | ||
43a24cba JH |
446 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) |
447 | # everytime after each sheet sync is complete. | |
448 | # This forces hard deletes on the data downstream if fewer records are sent. | |
449 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | |
450 | last_integer = int(get_bookmark(state, sheet_title, 0)) | |
451 | activate_version = int(time.time() * 1000) | |
452 | activate_version_message = singer.ActivateVersionMessage( | |
453 | stream=sheet_title, | |
454 | version=activate_version) | |
455 | if last_integer == 0: | |
456 | # initial load, send activate_version before AND after data sync | |
457 | singer.write_message(activate_version_message) | |
458 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | |
459 | ||
da690bda JH |
460 | # Determine max range of columns and rows for "paging" through the data |
461 | sheet_last_col_index = 1 | |
462 | sheet_last_col_letter = 'A' | |
463 | for col in columns: | |
464 | col_index = col.get('columnIndex') | |
465 | col_letter = col.get('columnLetter') | |
466 | if col_index > sheet_last_col_index: | |
467 | sheet_last_col_index = col_index | |
468 | sheet_last_col_letter = col_letter | |
469 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') | |
470 | ||
471 | # Initialize paging for 1st batch | |
472 | is_last_row = False | |
473 | batch_rows = 200 | |
474 | from_row = 2 | |
475 | if sheet_max_row < batch_rows: | |
476 | to_row = sheet_max_row | |
477 | else: | |
478 | to_row = batch_rows | |
479 | ||
480 | # Loop thru batches (each having 200 rows of data) | |
481 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: | |
482 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) | |
99424fee | 483 | |
da690bda JH |
484 | # GET sheet_data for a worksheet tab |
485 | sheet_data, time_extracted = get_data( | |
486 | stream_name=sheet_title, | |
487 | endpoint_config=sheets_loaded_config, | |
488 | client=client, | |
489 | spreadsheet_id=spreadsheet_id, | |
490 | range_rows=range_rows) | |
491 | # Data is returned as a list of arrays, an array of values for each row | |
492 | sheet_data_rows = sheet_data.get('values') | |
493 | ||
494 | # Transform batch of rows to JSON with keys for each column | |
5890b89c | 495 | sheet_data_tf, row_num = transform_sheet_data( |
da690bda JH |
496 | spreadsheet_id=spreadsheet_id, |
497 | sheet_id=sheet_id, | |
43a24cba | 498 | sheet_title=sheet_title, |
da690bda JH |
499 | from_row=from_row, |
500 | columns=columns, | |
501 | sheet_data_rows=sheet_data_rows) | |
502 | if row_num < to_row: | |
503 | is_last_row = True | |
504 | ||
505 | # Process records, send batch of records to target | |
506 | record_count = process_records( | |
507 | catalog=catalog, | |
508 | stream_name=sheet_title, | |
509 | records=sheet_data_tf, | |
43a24cba JH |
510 | time_extracted=ss_time_extracted, |
511 | version=activate_version) | |
5890b89c | 512 | LOGGER.info('Sheet: {}, records processed: {}'.format( |
99424fee | 513 | sheet_title, record_count)) |
43a24cba | 514 | |
da690bda JH |
515 | # Update paging from/to_row for next batch |
516 | from_row = to_row + 1 | |
517 | if to_row + batch_rows > sheet_max_row: | |
518 | to_row = sheet_max_row | |
519 | else: | |
520 | to_row = to_row + batch_rows | |
521 | ||
43a24cba JH |
522 | # End of Stream: Send Activate Version and update State |
523 | singer.write_message(activate_version_message) | |
524 | write_bookmark(state, sheet_title, activate_version) | |
525 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | |
526 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | |
527 | sheet_title, row_num - 2)) # subtract 1 for header row | |
528 | update_currently_syncing(state, None) | |
529 | ||
da690bda JH |
530 | # SHEETS_LOADED |
531 | # Add sheet to sheets_loaded | |
532 | sheet_loaded = {} | |
533 | sheet_loaded['spreadsheetId'] = spreadsheet_id | |
534 | sheet_loaded['sheetId'] = sheet_id | |
535 | sheet_loaded['title'] = sheet_title | |
536 | sheet_loaded['loadDate'] = strftime(utils.now()) | |
537 | sheet_loaded['lastRowNumber'] = row_num | |
538 | sheets_loaded.append(sheet_loaded) | |
539 | ||
da690bda JH |
540 | stream_name = 'sheet_metadata' |
541 | # Sync sheet_metadata if selected | |
542 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | |
99424fee | 543 | |
da690bda JH |
544 | stream_name = 'sheets_loaded' |
545 | # Sync sheet_metadata if selected | |
546 | sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) | |
99424fee | 547 | |
5890b89c JH |
548 | # Update file_metadata bookmark |
549 | write_bookmark(state, 'file_metadata', strftime(this_datetime)) | |
550 | ||
99424fee | 551 | return |