]>
Commit | Line | Data |
---|---|---|
89643ba6 JH |
1 | import time |
2 | import math | |
89643ba6 | 3 | import json |
da690bda | 4 | from datetime import datetime, timedelta |
99424fee JH |
5 | import pytz |
6 | import singer | |
89643ba6 JH |
7 | from singer import metrics, metadata, Transformer, utils |
8 | from singer.utils import strptime_to_utc, strftime | |
89643ba6 JH |
9 | from tap_google_sheets.streams import STREAMS |
10 | from tap_google_sheets.schema import get_sheet_metadata | |
11 | ||
12 | LOGGER = singer.get_logger() | |
13 | ||
14 | ||
15 | def write_schema(catalog, stream_name): | |
16 | stream = catalog.get_stream(stream_name) | |
17 | schema = stream.schema.to_dict() | |
18 | try: | |
19 | singer.write_schema(stream_name, schema, stream.key_properties) | |
20 | except OSError as err: | |
21 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) | |
22 | raise err | |
23 | ||
24 | ||
25 | def write_record(stream_name, record, time_extracted): | |
26 | try: | |
27 | singer.messages.write_record(stream_name, record, time_extracted=time_extracted) | |
28 | except OSError as err: | |
29 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) | |
30 | LOGGER.info('record: {}'.format(record)) | |
31 | raise err | |
32 | ||
33 | ||
34 | def get_bookmark(state, stream, default): | |
35 | if (state is None) or ('bookmarks' not in state): | |
36 | return default | |
37 | return ( | |
38 | state | |
39 | .get('bookmarks', {}) | |
40 | .get(stream, default) | |
41 | ) | |
42 | ||
43 | ||
44 | def write_bookmark(state, stream, value): | |
45 | if 'bookmarks' not in state: | |
46 | state['bookmarks'] = {} | |
47 | state['bookmarks'][stream] = value | |
48 | LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value)) | |
49 | singer.write_state(state) | |
50 | ||
51 | ||
da690bda JH |
52 | # Transform/validate batch of records w/ schema and sent to target |
53 | def process_records(catalog, | |
89643ba6 JH |
54 | stream_name, |
55 | records, | |
da690bda | 56 | time_extracted): |
89643ba6 JH |
57 | stream = catalog.get_stream(stream_name) |
58 | schema = stream.schema.to_dict() | |
59 | stream_metadata = metadata.to_map(stream.metadata) | |
89643ba6 JH |
60 | with metrics.record_counter(stream_name) as counter: |
61 | for record in records: | |
89643ba6 JH |
62 | # Transform record for Singer.io |
63 | with Transformer() as transformer: | |
64 | transformed_record = transformer.transform( | |
65 | record, | |
66 | schema, | |
67 | stream_metadata) | |
da690bda JH |
68 | write_record(stream_name, transformed_record, time_extracted=time_extracted) |
69 | counter.increment() | |
70 | return counter.value | |
71 | ||
72 | ||
99424fee | 73 | def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None): |
da690bda JH |
74 | # Should sheets_loaded be synced? |
75 | if stream_name in selected_streams: | |
76 | LOGGER.info('STARTED Syncing {}'.format(stream_name)) | |
77 | update_currently_syncing(state, stream_name) | |
99424fee JH |
78 | selected_fields = get_selected_fields(catalog, stream_name) |
79 | LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields)) | |
da690bda | 80 | write_schema(catalog, stream_name) |
99424fee JH |
81 | if not time_extracted: |
82 | time_extracted = utils.now() | |
da690bda JH |
83 | record_count = process_records( |
84 | catalog=catalog, | |
85 | stream_name=stream_name, | |
86 | records=records, | |
99424fee | 87 | time_extracted=time_extracted) |
da690bda JH |
88 | LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) |
89 | update_currently_syncing(state, None) | |
89643ba6 JH |
90 | |
91 | ||
92 | # Currently syncing sets the stream currently being delivered in the state. | |
93 | # If the integration is interrupted, this state property is used to identify | |
94 | # the starting point to continue from. | |
95 | # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46 | |
96 | def update_currently_syncing(state, stream_name): | |
97 | if (stream_name is None) and ('currently_syncing' in state): | |
98 | del state['currently_syncing'] | |
99 | else: | |
100 | singer.set_currently_syncing(state, stream_name) | |
101 | singer.write_state(state) | |
102 | ||
103 | ||
104 | # List selected fields from stream catalog | |
105 | def get_selected_fields(catalog, stream_name): | |
106 | stream = catalog.get_stream(stream_name) | |
107 | mdata = metadata.to_map(stream.metadata) | |
108 | mdata_list = singer.metadata.to_list(mdata) | |
109 | selected_fields = [] | |
110 | for entry in mdata_list: | |
99424fee | 111 | field = None |
89643ba6 | 112 | try: |
99424fee | 113 | field = entry['breadcrumb'][1] |
89643ba6 JH |
114 | if entry.get('metadata', {}).get('selected', False): |
115 | selected_fields.append(field) | |
116 | except IndexError: | |
117 | pass | |
118 | return selected_fields | |
119 | ||
120 | ||
121 | def get_data(stream_name, | |
122 | endpoint_config, | |
123 | client, | |
124 | spreadsheet_id, | |
125 | range_rows=None): | |
126 | if not range_rows: | |
127 | range_rows = '' | |
128 | path = endpoint_config.get('path', stream_name).replace( | |
129 | '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace( | |
130 | '{range_rows}', range_rows) | |
131 | params = endpoint_config.get('params', {}) | |
132 | api = endpoint_config.get('api', 'sheets') | |
133 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( | |
134 | '{sheet_title}', stream_name) | |
135 | data = {} | |
da690bda | 136 | time_extracted = utils.now() |
89643ba6 JH |
137 | data = client.get( |
138 | path=path, | |
139 | api=api, | |
140 | params=querystring, | |
141 | endpoint=stream_name) | |
da690bda | 142 | return data, time_extracted |
89643ba6 JH |
143 | |
144 | ||
da690bda | 145 | # Tranform file_metadata: remove nodes from lastModifyingUser, format as array |
89643ba6 JH |
146 | def transform_file_metadata(file_metadata): |
147 | # Convert to dict | |
148 | file_metadata_tf = json.loads(json.dumps(file_metadata)) | |
149 | # Remove keys | |
150 | if file_metadata_tf.get('lastModifyingUser'): | |
151 | file_metadata_tf['lastModifyingUser'].pop('photoLink', None) | |
152 | file_metadata_tf['lastModifyingUser'].pop('me', None) | |
153 | file_metadata_tf['lastModifyingUser'].pop('permissionId', None) | |
154 | # Add record to an array of 1 | |
155 | file_metadata_arr = [] | |
156 | file_metadata_arr.append(file_metadata_tf) | |
157 | return file_metadata_arr | |
158 | ||
159 | ||
da690bda | 160 | # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array |
89643ba6 JH |
161 | def transform_spreadsheet_metadata(spreadsheet_metadata): |
162 | # Convert to dict | |
163 | spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) | |
da690bda | 164 | # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata) |
89643ba6 JH |
165 | if spreadsheet_metadata_tf.get('properties'): |
166 | spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) | |
167 | spreadsheet_metadata_tf.pop('sheets', None) | |
168 | # Add record to an array of 1 | |
169 | spreadsheet_metadata_arr = [] | |
170 | spreadsheet_metadata_arr.append(spreadsheet_metadata_tf) | |
171 | return spreadsheet_metadata_arr | |
172 | ||
173 | ||
da690bda | 174 | # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata |
89643ba6 JH |
175 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): |
176 | # Convert to properties to dict | |
177 | sheet_metadata = sheet.get('properties') | |
99424fee | 178 | sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) |
89643ba6 JH |
179 | sheet_id = sheet_metadata_tf.get('sheetId') |
180 | sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( | |
181 | spreadsheet_id, sheet_id) | |
182 | sheet_metadata_tf['spreadsheetId'] = spreadsheet_id | |
183 | sheet_metadata_tf['sheetUrl'] = sheet_url | |
184 | sheet_metadata_tf['columns'] = columns | |
185 | return sheet_metadata_tf | |
186 | ||
187 | ||
da690bda JH |
188 | # Convert Excel Date Serial Number (excel_date_sn) to datetime string |
189 | # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes) | |
190 | def excel_to_dttm_str(excel_date_sn, timezone_str=None): | |
191 | if not timezone_str: | |
192 | timezone_str = 'UTC' | |
99424fee | 193 | tzn = pytz.timezone(timezone_str) |
da690bda JH |
194 | sec_per_day = 86400 |
195 | excel_epoch = 25569 # 1970-01-01T00:00:00Z | |
196 | epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) | |
197 | epoch_dttm = datetime(1970, 1, 1) | |
198 | excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) | |
99424fee | 199 | utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc) |
da690bda JH |
200 | utc_dttm_str = strftime(utc_dttm) |
201 | return utc_dttm_str | |
202 | ||
203 | ||
204 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times | |
205 | # Convert from array of values to JSON with column names as keys | |
206 | def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): | |
207 | sheet_data_tf = [] | |
208 | is_last_row = False | |
209 | row_num = from_row | |
210 | # Create sorted list of columns based on columnIndex | |
99424fee | 211 | cols = sorted(columns, key=lambda i: i['columnIndex']) |
da690bda JH |
212 | |
213 | # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) | |
214 | for row in sheet_data_rows: | |
215 | # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1 | |
216 | if row == []: | |
217 | is_last_row = True | |
218 | return sheet_data_tf, row_num - 1, is_last_row | |
219 | sheet_data_row_tf = {} | |
220 | # Add spreadsheet_id, sheet_id, and row | |
221 | sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id | |
222 | sheet_data_row_tf['__sdc_sheet_id'] = sheet_id | |
223 | sheet_data_row_tf['__sdc_row'] = row_num | |
224 | col_num = 1 | |
225 | for value in row: | |
226 | # Select column metadata based on column index | |
227 | col = cols[col_num - 1] | |
228 | col_skipped = col.get('columnSkipped') | |
229 | if not col_skipped: | |
230 | col_name = col.get('columnName') | |
231 | col_type = col.get('columnType') | |
232 | # Convert dates/times from Lotus Notes Serial Numbers | |
233 | if col_type == 'numberType.DATE_TIME': | |
99424fee | 234 | if isinstance(value, (int, float)): |
da690bda JH |
235 | col_val = excel_to_dttm_str(value) |
236 | else: | |
237 | col_val = str(value) | |
238 | elif col_type == 'numberType.DATE': | |
99424fee | 239 | if isinstance(value, (int, float)): |
da690bda JH |
240 | col_val = excel_to_dttm_str(value)[:10] |
241 | else: | |
242 | col_val = str(value) | |
243 | elif col_type == 'numberType.TIME': | |
99424fee | 244 | if isinstance(value, (int, float)): |
da690bda JH |
245 | try: |
246 | total_secs = value * 86400 # seconds in day | |
247 | col_val = str(timedelta(seconds=total_secs)) | |
248 | except ValueError: | |
249 | col_val = str(value) | |
250 | else: | |
251 | col_val = str(value) | |
252 | elif col_type == 'numberType': | |
253 | if isinstance(value, int): | |
254 | col_val = int(value) | |
255 | else: | |
256 | try: | |
257 | col_val = float(value) | |
258 | except ValueError: | |
259 | col_val = str(value) | |
260 | elif col_type == 'stringValue': | |
261 | col_val = str(value) | |
262 | elif col_type == 'boolValue': | |
263 | if isinstance(value, bool): | |
264 | col_val = value | |
265 | elif isinstance(value, str): | |
266 | if value.lower() in ('true', 't', 'yes', 'y'): | |
267 | col_val = True | |
268 | elif value.lower() in ('false', 'f', 'no', 'n'): | |
269 | col_val = False | |
270 | else: | |
271 | col_val = str(value) | |
272 | elif isinstance(value, int): | |
99424fee | 273 | if value in (1, -1): |
da690bda JH |
274 | col_val = True |
275 | elif value == 0: | |
276 | col_val = False | |
277 | else: | |
278 | col_val = str(value) | |
279 | ||
280 | else: | |
281 | col_val = value | |
282 | sheet_data_row_tf[col_name] = col_val | |
283 | col_num = col_num + 1 | |
284 | sheet_data_tf.append(sheet_data_row_tf) | |
285 | row_num = row_num + 1 | |
286 | return sheet_data_tf, row_num, is_last_row | |
287 | ||
288 | ||
89643ba6 JH |
289 | def sync(client, config, catalog, state): |
290 | start_date = config.get('start_date') | |
291 | spreadsheet_id = config.get('spreadsheet_id') | |
292 | ||
293 | # Get selected_streams from catalog, based on state last_stream | |
294 | # last_stream = Previous currently synced stream, if the load was interrupted | |
295 | last_stream = singer.get_currently_syncing(state) | |
296 | LOGGER.info('last/currently syncing stream: {}'.format(last_stream)) | |
297 | selected_streams = [] | |
298 | for stream in catalog.get_selected_streams(state): | |
299 | selected_streams.append(stream.stream) | |
300 | LOGGER.info('selected_streams: {}'.format(selected_streams)) | |
301 | ||
302 | if not selected_streams: | |
303 | return | |
304 | ||
da690bda | 305 | # FILE_METADATA |
89643ba6 | 306 | file_metadata = {} |
da690bda JH |
307 | stream_name = 'file_metadata' |
308 | file_metadata_config = STREAMS.get(stream_name) | |
309 | ||
310 | # GET file_metadata | |
311 | LOGGER.info('GET file_meatadata') | |
312 | file_metadata, time_extracted = get_data(stream_name=stream_name, | |
313 | endpoint_config=file_metadata_config, | |
314 | client=client, | |
315 | spreadsheet_id=spreadsheet_id) | |
316 | # Transform file_metadata | |
317 | LOGGER.info('Transform file_meatadata') | |
89643ba6 JH |
318 | file_metadata_tf = transform_file_metadata(file_metadata) |
319 | # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) | |
da690bda JH |
320 | |
321 | # Check if file has changed, if not break (return to __init__) | |
322 | last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date)) | |
89643ba6 JH |
323 | this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) |
324 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) | |
325 | if this_datetime <= last_datetime: | |
326 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') | |
99424fee JH |
327 | return |
328 | # Sync file_metadata if selected | |
329 | sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted) | |
330 | write_bookmark(state, stream_name, strftime(this_datetime)) | |
da690bda JH |
331 | |
332 | # SPREADSHEET_METADATA | |
89643ba6 | 333 | spreadsheet_metadata = {} |
da690bda JH |
334 | stream_name = 'spreadsheet_metadata' |
335 | spreadsheet_metadata_config = STREAMS.get(stream_name) | |
336 | ||
337 | # GET spreadsheet_metadata | |
338 | LOGGER.info('GET spreadsheet_meatadata') | |
99424fee | 339 | spreadsheet_metadata, ss_time_extracted = get_data( |
da690bda JH |
340 | stream_name=stream_name, |
341 | endpoint_config=spreadsheet_metadata_config, | |
342 | client=client, | |
343 | spreadsheet_id=spreadsheet_id) | |
344 | ||
345 | # Transform spreadsheet_metadata | |
346 | LOGGER.info('Transform spreadsheet_meatadata') | |
89643ba6 | 347 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) |
89643ba6 | 348 | |
da690bda | 349 | # Sync spreadsheet_metadata if selected |
99424fee JH |
350 | sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \ |
351 | ss_time_extracted) | |
da690bda JH |
352 | |
353 | # SHEET_METADATA and SHEET_DATA | |
89643ba6 JH |
354 | sheets = spreadsheet_metadata.get('sheets') |
355 | sheet_metadata = [] | |
356 | sheets_loaded = [] | |
357 | sheets_loaded_config = STREAMS['sheets_loaded'] | |
358 | if sheets: | |
da690bda | 359 | # Loop thru sheets (worksheet tabs) in spreadsheet |
89643ba6 JH |
360 | for sheet in sheets: |
361 | sheet_title = sheet.get('properties', {}).get('title') | |
da690bda JH |
362 | sheet_id = sheet.get('properties', {}).get('sheetId') |
363 | ||
364 | # GET sheet_metadata and columns | |
89643ba6 | 365 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
99424fee | 366 | LOGGER.info('sheet_schema: {}'.format(sheet_schema)) |
da690bda JH |
367 | |
368 | # Transform sheet_metadata | |
89643ba6 JH |
369 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) |
370 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) | |
371 | sheet_metadata.append(sheet_metadata_tf) | |
372 | ||
da690bda JH |
373 | # SHEET_DATA |
374 | # Should this worksheet tab be synced? | |
375 | if sheet_title in selected_streams: | |
376 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) | |
377 | update_currently_syncing(state, sheet_title) | |
99424fee JH |
378 | selected_fields = get_selected_fields(catalog, sheet_title) |
379 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | |
da690bda JH |
380 | write_schema(catalog, sheet_title) |
381 | ||
382 | # Determine max range of columns and rows for "paging" through the data | |
383 | sheet_last_col_index = 1 | |
384 | sheet_last_col_letter = 'A' | |
385 | for col in columns: | |
386 | col_index = col.get('columnIndex') | |
387 | col_letter = col.get('columnLetter') | |
388 | if col_index > sheet_last_col_index: | |
389 | sheet_last_col_index = col_index | |
390 | sheet_last_col_letter = col_letter | |
391 | sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') | |
392 | ||
393 | # Initialize paging for 1st batch | |
394 | is_last_row = False | |
395 | batch_rows = 200 | |
396 | from_row = 2 | |
397 | if sheet_max_row < batch_rows: | |
398 | to_row = sheet_max_row | |
399 | else: | |
400 | to_row = batch_rows | |
401 | ||
402 | # Loop thru batches (each having 200 rows of data) | |
403 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: | |
404 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) | |
99424fee | 405 | |
da690bda JH |
406 | # GET sheet_data for a worksheet tab |
407 | sheet_data, time_extracted = get_data( | |
408 | stream_name=sheet_title, | |
409 | endpoint_config=sheets_loaded_config, | |
410 | client=client, | |
411 | spreadsheet_id=spreadsheet_id, | |
412 | range_rows=range_rows) | |
413 | # Data is returned as a list of arrays, an array of values for each row | |
414 | sheet_data_rows = sheet_data.get('values') | |
415 | ||
416 | # Transform batch of rows to JSON with keys for each column | |
417 | sheet_data_tf, row_num, is_last_row = transform_sheet_data( | |
418 | spreadsheet_id=spreadsheet_id, | |
419 | sheet_id=sheet_id, | |
420 | from_row=from_row, | |
421 | columns=columns, | |
422 | sheet_data_rows=sheet_data_rows) | |
423 | if row_num < to_row: | |
424 | is_last_row = True | |
425 | ||
426 | # Process records, send batch of records to target | |
427 | record_count = process_records( | |
428 | catalog=catalog, | |
429 | stream_name=sheet_title, | |
430 | records=sheet_data_tf, | |
431 | time_extracted=ss_time_extracted) | |
99424fee JH |
432 | LOGGER.info('Sheet: {}, ecords processed: {}'.format( |
433 | sheet_title, record_count)) | |
434 | ||
da690bda JH |
435 | # Update paging from/to_row for next batch |
436 | from_row = to_row + 1 | |
437 | if to_row + batch_rows > sheet_max_row: | |
438 | to_row = sheet_max_row | |
439 | else: | |
440 | to_row = to_row + batch_rows | |
441 | ||
442 | # SHEETS_LOADED | |
443 | # Add sheet to sheets_loaded | |
444 | sheet_loaded = {} | |
445 | sheet_loaded['spreadsheetId'] = spreadsheet_id | |
446 | sheet_loaded['sheetId'] = sheet_id | |
447 | sheet_loaded['title'] = sheet_title | |
448 | sheet_loaded['loadDate'] = strftime(utils.now()) | |
449 | sheet_loaded['lastRowNumber'] = row_num | |
450 | sheets_loaded.append(sheet_loaded) | |
451 | ||
452 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. | |
99424fee JH |
453 | # This forces hard deletes on the data downstream if fewer records are sent. |
454 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | |
da690bda JH |
455 | activate_version_message = singer.ActivateVersionMessage( |
456 | stream=sheet_title, | |
457 | version=int(time.time() * 1000)) | |
458 | singer.write_message(activate_version_message) | |
459 | ||
460 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | |
461 | sheet_title, row_num - 1)) | |
da690bda JH |
462 | |
463 | stream_name = 'sheet_metadata' | |
464 | # Sync sheet_metadata if selected | |
465 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | |
99424fee | 466 | |
da690bda JH |
467 | stream_name = 'sheets_loaded' |
468 | # Sync sheet_metadata if selected | |
469 | sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) | |
99424fee JH |
470 | |
471 | return |