]>
Commit | Line | Data |
---|---|---|
89643ba6 JH |
1 | import time |
2 | import math | |
3 | import singer | |
4 | import json | |
da690bda JH |
5 | import pytz |
6 | from datetime import datetime, timedelta | |
89643ba6 JH |
7 | from collections import OrderedDict |
8 | from singer import metrics, metadata, Transformer, utils | |
9 | from singer.utils import strptime_to_utc, strftime | |
89643ba6 JH |
10 | from tap_google_sheets.streams import STREAMS |
11 | from tap_google_sheets.schema import get_sheet_metadata | |
12 | ||
13 | LOGGER = singer.get_logger() | |
14 | ||
15 | ||
16 | def write_schema(catalog, stream_name): | |
17 | stream = catalog.get_stream(stream_name) | |
18 | schema = stream.schema.to_dict() | |
19 | try: | |
20 | singer.write_schema(stream_name, schema, stream.key_properties) | |
21 | except OSError as err: | |
22 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) | |
23 | raise err | |
24 | ||
25 | ||
26 | def write_record(stream_name, record, time_extracted): | |
27 | try: | |
28 | singer.messages.write_record(stream_name, record, time_extracted=time_extracted) | |
29 | except OSError as err: | |
30 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) | |
31 | LOGGER.info('record: {}'.format(record)) | |
32 | raise err | |
33 | ||
34 | ||
35 | def get_bookmark(state, stream, default): | |
36 | if (state is None) or ('bookmarks' not in state): | |
37 | return default | |
38 | return ( | |
39 | state | |
40 | .get('bookmarks', {}) | |
41 | .get(stream, default) | |
42 | ) | |
43 | ||
44 | ||
45 | def write_bookmark(state, stream, value): | |
46 | if 'bookmarks' not in state: | |
47 | state['bookmarks'] = {} | |
48 | state['bookmarks'][stream] = value | |
49 | LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value)) | |
50 | singer.write_state(state) | |
51 | ||
52 | ||
da690bda JH |
53 | # Transform/validate batch of records w/ schema and sent to target |
54 | def process_records(catalog, | |
89643ba6 JH |
55 | stream_name, |
56 | records, | |
da690bda | 57 | time_extracted): |
89643ba6 JH |
58 | stream = catalog.get_stream(stream_name) |
59 | schema = stream.schema.to_dict() | |
60 | stream_metadata = metadata.to_map(stream.metadata) | |
89643ba6 JH |
61 | with metrics.record_counter(stream_name) as counter: |
62 | for record in records: | |
89643ba6 JH |
63 | # Transform record for Singer.io |
64 | with Transformer() as transformer: | |
65 | transformed_record = transformer.transform( | |
66 | record, | |
67 | schema, | |
68 | stream_metadata) | |
da690bda JH |
69 | write_record(stream_name, transformed_record, time_extracted=time_extracted) |
70 | counter.increment() | |
71 | return counter.value | |
72 | ||
73 | ||
74 | def sync_stream(stream_name, selected_streams, catalog, state, records): | |
75 | # Should sheets_loaded be synced? | |
76 | if stream_name in selected_streams: | |
77 | LOGGER.info('STARTED Syncing {}'.format(stream_name)) | |
78 | update_currently_syncing(state, stream_name) | |
79 | write_schema(catalog, stream_name) | |
80 | record_count = process_records( | |
81 | catalog=catalog, | |
82 | stream_name=stream_name, | |
83 | records=records, | |
84 | time_extracted=utils.now()) | |
85 | LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) | |
86 | update_currently_syncing(state, None) | |
89643ba6 JH |
87 | |
88 | ||
89 | # Currently syncing sets the stream currently being delivered in the state. | |
90 | # If the integration is interrupted, this state property is used to identify | |
91 | # the starting point to continue from. | |
92 | # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46 | |
93 | def update_currently_syncing(state, stream_name): | |
94 | if (stream_name is None) and ('currently_syncing' in state): | |
95 | del state['currently_syncing'] | |
96 | else: | |
97 | singer.set_currently_syncing(state, stream_name) | |
98 | singer.write_state(state) | |
99 | ||
100 | ||
101 | # List selected fields from stream catalog | |
102 | def get_selected_fields(catalog, stream_name): | |
103 | stream = catalog.get_stream(stream_name) | |
104 | mdata = metadata.to_map(stream.metadata) | |
105 | mdata_list = singer.metadata.to_list(mdata) | |
106 | selected_fields = [] | |
107 | for entry in mdata_list: | |
108 | field = None | |
109 | try: | |
110 | field = entry['breadcrumb'][1] | |
111 | if entry.get('metadata', {}).get('selected', False): | |
112 | selected_fields.append(field) | |
113 | except IndexError: | |
114 | pass | |
115 | return selected_fields | |
116 | ||
117 | ||
118 | def get_data(stream_name, | |
119 | endpoint_config, | |
120 | client, | |
121 | spreadsheet_id, | |
122 | range_rows=None): | |
123 | if not range_rows: | |
124 | range_rows = '' | |
125 | path = endpoint_config.get('path', stream_name).replace( | |
126 | '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace( | |
127 | '{range_rows}', range_rows) | |
128 | params = endpoint_config.get('params', {}) | |
129 | api = endpoint_config.get('api', 'sheets') | |
130 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( | |
131 | '{sheet_title}', stream_name) | |
132 | data = {} | |
da690bda | 133 | time_extracted = utils.now() |
89643ba6 JH |
134 | data = client.get( |
135 | path=path, | |
136 | api=api, | |
137 | params=querystring, | |
138 | endpoint=stream_name) | |
da690bda | 139 | return data, time_extracted |
89643ba6 JH |
140 | |
141 | ||
da690bda | 142 | # Tranform file_metadata: remove nodes from lastModifyingUser, format as array |
89643ba6 JH |
143 | def transform_file_metadata(file_metadata): |
144 | # Convert to dict | |
145 | file_metadata_tf = json.loads(json.dumps(file_metadata)) | |
146 | # Remove keys | |
147 | if file_metadata_tf.get('lastModifyingUser'): | |
148 | file_metadata_tf['lastModifyingUser'].pop('photoLink', None) | |
149 | file_metadata_tf['lastModifyingUser'].pop('me', None) | |
150 | file_metadata_tf['lastModifyingUser'].pop('permissionId', None) | |
151 | # Add record to an array of 1 | |
152 | file_metadata_arr = [] | |
153 | file_metadata_arr.append(file_metadata_tf) | |
154 | return file_metadata_arr | |
155 | ||
156 | ||
da690bda | 157 | # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array |
89643ba6 JH |
158 | def transform_spreadsheet_metadata(spreadsheet_metadata): |
159 | # Convert to dict | |
160 | spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) | |
da690bda | 161 | # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata) |
89643ba6 JH |
162 | if spreadsheet_metadata_tf.get('properties'): |
163 | spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) | |
164 | spreadsheet_metadata_tf.pop('sheets', None) | |
165 | # Add record to an array of 1 | |
166 | spreadsheet_metadata_arr = [] | |
167 | spreadsheet_metadata_arr.append(spreadsheet_metadata_tf) | |
168 | return spreadsheet_metadata_arr | |
169 | ||
170 | ||
da690bda | 171 | # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata |
89643ba6 JH |
172 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): |
173 | # Convert to properties to dict | |
174 | sheet_metadata = sheet.get('properties') | |
175 | sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) | |
176 | sheet_id = sheet_metadata_tf.get('sheetId') | |
177 | sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( | |
178 | spreadsheet_id, sheet_id) | |
179 | sheet_metadata_tf['spreadsheetId'] = spreadsheet_id | |
180 | sheet_metadata_tf['sheetUrl'] = sheet_url | |
181 | sheet_metadata_tf['columns'] = columns | |
182 | return sheet_metadata_tf | |
183 | ||
184 | ||
da690bda JH |
185 | # Convert Excel Date Serial Number (excel_date_sn) to datetime string |
186 | # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes) | |
187 | def excel_to_dttm_str(excel_date_sn, timezone_str=None): | |
188 | if not timezone_str: | |
189 | timezone_str = 'UTC' | |
190 | tz = pytz.timezone(timezone_str) | |
191 | sec_per_day = 86400 | |
192 | excel_epoch = 25569 # 1970-01-01T00:00:00Z | |
193 | epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) | |
194 | epoch_dttm = datetime(1970, 1, 1) | |
195 | excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) | |
196 | utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc) | |
197 | utc_dttm_str = strftime(utc_dttm) | |
198 | return utc_dttm_str | |
199 | ||
200 | ||
201 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times | |
202 | # Convert from array of values to JSON with column names as keys | |
203 | def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): | |
204 | sheet_data_tf = [] | |
205 | is_last_row = False | |
206 | row_num = from_row | |
207 | # Create sorted list of columns based on columnIndex | |
208 | cols = sorted(columns, key = lambda i: i['columnIndex']) | |
209 | ||
210 | # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) | |
211 | for row in sheet_data_rows: | |
212 | # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1 | |
213 | if row == []: | |
214 | is_last_row = True | |
215 | return sheet_data_tf, row_num - 1, is_last_row | |
216 | sheet_data_row_tf = {} | |
217 | # Add spreadsheet_id, sheet_id, and row | |
218 | sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id | |
219 | sheet_data_row_tf['__sdc_sheet_id'] = sheet_id | |
220 | sheet_data_row_tf['__sdc_row'] = row_num | |
221 | col_num = 1 | |
222 | for value in row: | |
223 | # Select column metadata based on column index | |
224 | col = cols[col_num - 1] | |
225 | col_skipped = col.get('columnSkipped') | |
226 | if not col_skipped: | |
227 | col_name = col.get('columnName') | |
228 | col_type = col.get('columnType') | |
229 | # Convert dates/times from Lotus Notes Serial Numbers | |
230 | if col_type == 'numberType.DATE_TIME': | |
231 | if isinstance(value, int) or isinstance(value, float): | |
232 | col_val = excel_to_dttm_str(value) | |
233 | else: | |
234 | col_val = str(value) | |
235 | elif col_type == 'numberType.DATE': | |
236 | if isinstance(value, int) or isinstance(value, float): | |
237 | col_val = excel_to_dttm_str(value)[:10] | |
238 | else: | |
239 | col_val = str(value) | |
240 | elif col_type == 'numberType.TIME': | |
241 | if isinstance(value, int) or isinstance(value, float): | |
242 | try: | |
243 | total_secs = value * 86400 # seconds in day | |
244 | col_val = str(timedelta(seconds=total_secs)) | |
245 | except ValueError: | |
246 | col_val = str(value) | |
247 | else: | |
248 | col_val = str(value) | |
249 | elif col_type == 'numberType': | |
250 | if isinstance(value, int): | |
251 | col_val = int(value) | |
252 | else: | |
253 | try: | |
254 | col_val = float(value) | |
255 | except ValueError: | |
256 | col_val = str(value) | |
257 | elif col_type == 'stringValue': | |
258 | col_val = str(value) | |
259 | elif col_type == 'boolValue': | |
260 | if isinstance(value, bool): | |
261 | col_val = value | |
262 | elif isinstance(value, str): | |
263 | if value.lower() in ('true', 't', 'yes', 'y'): | |
264 | col_val = True | |
265 | elif value.lower() in ('false', 'f', 'no', 'n'): | |
266 | col_val = False | |
267 | else: | |
268 | col_val = str(value) | |
269 | elif isinstance(value, int): | |
270 | if value == 1 or value == -1: | |
271 | col_val = True | |
272 | elif value == 0: | |
273 | col_val = False | |
274 | else: | |
275 | col_val = str(value) | |
276 | ||
277 | else: | |
278 | col_val = value | |
279 | sheet_data_row_tf[col_name] = col_val | |
280 | col_num = col_num + 1 | |
281 | sheet_data_tf.append(sheet_data_row_tf) | |
282 | row_num = row_num + 1 | |
283 | return sheet_data_tf, row_num, is_last_row | |
284 | ||
285 | ||
89643ba6 JH |
286 | def sync(client, config, catalog, state): |
287 | start_date = config.get('start_date') | |
288 | spreadsheet_id = config.get('spreadsheet_id') | |
289 | ||
290 | # Get selected_streams from catalog, based on state last_stream | |
291 | # last_stream = Previous currently synced stream, if the load was interrupted | |
292 | last_stream = singer.get_currently_syncing(state) | |
293 | LOGGER.info('last/currently syncing stream: {}'.format(last_stream)) | |
294 | selected_streams = [] | |
295 | for stream in catalog.get_selected_streams(state): | |
296 | selected_streams.append(stream.stream) | |
297 | LOGGER.info('selected_streams: {}'.format(selected_streams)) | |
298 | ||
299 | if not selected_streams: | |
300 | return | |
301 | ||
da690bda | 302 | # FILE_METADATA |
89643ba6 | 303 | file_metadata = {} |
da690bda JH |
304 | stream_name = 'file_metadata' |
305 | file_metadata_config = STREAMS.get(stream_name) | |
306 | ||
307 | # GET file_metadata | |
308 | LOGGER.info('GET file_meatadata') | |
309 | file_metadata, time_extracted = get_data(stream_name=stream_name, | |
310 | endpoint_config=file_metadata_config, | |
311 | client=client, | |
312 | spreadsheet_id=spreadsheet_id) | |
313 | # Transform file_metadata | |
314 | LOGGER.info('Transform file_meatadata') | |
89643ba6 JH |
315 | file_metadata_tf = transform_file_metadata(file_metadata) |
316 | # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) | |
da690bda JH |
317 | |
318 | # Check if file has changed, if not break (return to __init__) | |
319 | last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date)) | |
89643ba6 JH |
320 | this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) |
321 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) | |
322 | if this_datetime <= last_datetime: | |
323 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') | |
324 | return 0 | |
da690bda JH |
325 | else: |
326 | # Sync file_metadata if selected | |
327 | sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf) | |
328 | write_bookmark(state, stream_name, strftime(this_datetime)) | |
329 | ||
330 | # SPREADSHEET_METADATA | |
89643ba6 | 331 | spreadsheet_metadata = {} |
da690bda JH |
332 | stream_name = 'spreadsheet_metadata' |
333 | spreadsheet_metadata_config = STREAMS.get(stream_name) | |
334 | ||
335 | # GET spreadsheet_metadata | |
336 | LOGGER.info('GET spreadsheet_meatadata') | |
337 | spreadsheet_metadata, ss_time_extracted = get_data( | |
338 | stream_name=stream_name, | |
339 | endpoint_config=spreadsheet_metadata_config, | |
340 | client=client, | |
341 | spreadsheet_id=spreadsheet_id) | |
342 | ||
343 | # Transform spreadsheet_metadata | |
344 | LOGGER.info('Transform spreadsheet_meatadata') | |
89643ba6 | 345 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) |
89643ba6 | 346 | |
da690bda JH |
347 | # Sync spreadsheet_metadata if selected |
348 | sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf) | |
349 | ||
350 | # SHEET_METADATA and SHEET_DATA | |
89643ba6 JH |
351 | sheets = spreadsheet_metadata.get('sheets') |
352 | sheet_metadata = [] | |
353 | sheets_loaded = [] | |
354 | sheets_loaded_config = STREAMS['sheets_loaded'] | |
355 | if sheets: | |
da690bda | 356 | # Loop thru sheets (worksheet tabs) in spreadsheet |
89643ba6 JH |
357 | for sheet in sheets: |
358 | sheet_title = sheet.get('properties', {}).get('title') | |
da690bda JH |
359 | sheet_id = sheet.get('properties', {}).get('sheetId') |
360 | ||
361 | # GET sheet_metadata and columns | |
89643ba6 | 362 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
da690bda JH |
363 | |
364 | # Transform sheet_metadata | |
89643ba6 JH |
365 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) |
366 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) | |
367 | sheet_metadata.append(sheet_metadata_tf) | |
368 | ||
da690bda JH |
369 | # SHEET_DATA |
370 | # Should this worksheet tab be synced? | |
371 | if sheet_title in selected_streams: | |
372 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) | |
373 | update_currently_syncing(state, sheet_title) | |
374 | write_schema(catalog, sheet_title) | |
375 | ||
376 | # Determine max range of columns and rows for "paging" through the data | |
377 | sheet_last_col_index = 1 | |
378 | sheet_last_col_letter = 'A' | |
379 | for col in columns: | |
380 | col_index = col.get('columnIndex') | |
381 | col_letter = col.get('columnLetter') | |
382 | if col_index > sheet_last_col_index: | |
383 | sheet_last_col_index = col_index | |
384 | sheet_last_col_letter = col_letter | |
385 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') | |
386 | ||
387 | # Initialize paging for 1st batch | |
388 | is_last_row = False | |
389 | batch_rows = 200 | |
390 | from_row = 2 | |
391 | if sheet_max_row < batch_rows: | |
392 | to_row = sheet_max_row | |
393 | else: | |
394 | to_row = batch_rows | |
395 | ||
396 | # Loop thru batches (each having 200 rows of data) | |
397 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: | |
398 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) | |
399 | ||
400 | # GET sheet_data for a worksheet tab | |
401 | sheet_data, time_extracted = get_data( | |
402 | stream_name=sheet_title, | |
403 | endpoint_config=sheets_loaded_config, | |
404 | client=client, | |
405 | spreadsheet_id=spreadsheet_id, | |
406 | range_rows=range_rows) | |
407 | # Data is returned as a list of arrays, an array of values for each row | |
408 | sheet_data_rows = sheet_data.get('values') | |
409 | ||
410 | # Transform batch of rows to JSON with keys for each column | |
411 | sheet_data_tf, row_num, is_last_row = transform_sheet_data( | |
412 | spreadsheet_id=spreadsheet_id, | |
413 | sheet_id=sheet_id, | |
414 | from_row=from_row, | |
415 | columns=columns, | |
416 | sheet_data_rows=sheet_data_rows) | |
417 | if row_num < to_row: | |
418 | is_last_row = True | |
419 | ||
420 | # Process records, send batch of records to target | |
421 | record_count = process_records( | |
422 | catalog=catalog, | |
423 | stream_name=sheet_title, | |
424 | records=sheet_data_tf, | |
425 | time_extracted=ss_time_extracted) | |
426 | ||
427 | # Update paging from/to_row for next batch | |
428 | from_row = to_row + 1 | |
429 | if to_row + batch_rows > sheet_max_row: | |
430 | to_row = sheet_max_row | |
431 | else: | |
432 | to_row = to_row + batch_rows | |
433 | ||
434 | # SHEETS_LOADED | |
435 | # Add sheet to sheets_loaded | |
436 | sheet_loaded = {} | |
437 | sheet_loaded['spreadsheetId'] = spreadsheet_id | |
438 | sheet_loaded['sheetId'] = sheet_id | |
439 | sheet_loaded['title'] = sheet_title | |
440 | sheet_loaded['loadDate'] = strftime(utils.now()) | |
441 | sheet_loaded['lastRowNumber'] = row_num | |
442 | sheets_loaded.append(sheet_loaded) | |
443 | ||
444 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. | |
445 | # This forces hard deletes on the data downstream if fewer records are sent for a sheet. | |
446 | # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167 | |
447 | activate_version_message = singer.ActivateVersionMessage( | |
448 | stream=sheet_title, | |
449 | version=int(time.time() * 1000)) | |
450 | singer.write_message(activate_version_message) | |
451 | ||
452 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | |
453 | sheet_title, row_num - 1)) | |
454 | update_currently_syncing(state, None) | |
455 | ||
456 | stream_name = 'sheet_metadata' | |
457 | # Sync sheet_metadata if selected | |
458 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | |
459 | ||
460 | stream_name = 'sheets_loaded' | |
461 | # Sync sheet_metadata if selected | |
462 | sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) |