]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blob - tap_google_sheets/sync.py
689be2785010e122842c33482e9efa6c044b558d
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
1 import time
2 import math
3 import json
4 from datetime import datetime, timedelta
5 import pytz
6 import singer
7 from singer import metrics, metadata, Transformer, utils
8 from singer.utils import strptime_to_utc, strftime
9 from singer.messages import RecordMessage
10 from tap_google_sheets.streams import STREAMS
11 from tap_google_sheets.schema import get_sheet_metadata
12
13 LOGGER = singer.get_logger()
14
15
16 def write_schema(catalog, stream_name):
17 stream = catalog.get_stream(stream_name)
18 schema = stream.schema.to_dict()
19 try:
20 singer.write_schema(stream_name, schema, stream.key_properties)
21 LOGGER.info('Writing schema for: {}'.format(stream_name))
22 except OSError as err:
23 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
24 raise err
25
26
27 def write_record(stream_name, record, time_extracted, version=None):
28 try:
29 if version:
30 singer.messages.write_message(
31 RecordMessage(
32 stream=stream_name,
33 record=record,
34 version=version,
35 time_extracted=time_extracted))
36 else:
37 singer.messages.write_record(
38 stream_name=stream_name,
39 record=record,
40 time_extracted=time_extracted)
41 except OSError as err:
42 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
43 LOGGER.info('record: {}'.format(record))
44 raise err
45
46
47 def get_bookmark(state, stream, default):
48 if (state is None) or ('bookmarks' not in state):
49 return default
50 return (
51 state
52 .get('bookmarks', {})
53 .get(stream, default)
54 )
55
56
57 def write_bookmark(state, stream, value):
58 if 'bookmarks' not in state:
59 state['bookmarks'] = {}
60 state['bookmarks'][stream] = value
61 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
62 singer.write_state(state)
63
64
65 # Transform/validate batch of records w/ schema and sent to target
66 def process_records(catalog,
67 stream_name,
68 records,
69 time_extracted,
70 version=None):
71 stream = catalog.get_stream(stream_name)
72 schema = stream.schema.to_dict()
73 stream_metadata = metadata.to_map(stream.metadata)
74 with metrics.record_counter(stream_name) as counter:
75 for record in records:
76 # Transform record for Singer.io
77 with Transformer() as transformer:
78 transformed_record = transformer.transform(
79 record,
80 schema,
81 stream_metadata)
82 write_record(
83 stream_name=stream_name,
84 record=transformed_record,
85 time_extracted=time_extracted,
86 version=version)
87 counter.increment()
88 return counter.value
89
90
91 def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
92 # Should sheets_loaded be synced?
93 if stream_name in selected_streams:
94 LOGGER.info('STARTED Syncing {}'.format(stream_name))
95 update_currently_syncing(state, stream_name)
96 selected_fields = get_selected_fields(catalog, stream_name)
97 LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
98 write_schema(catalog, stream_name)
99 if not time_extracted:
100 time_extracted = utils.now()
101 record_count = process_records(
102 catalog=catalog,
103 stream_name=stream_name,
104 records=records,
105 time_extracted=time_extracted)
106 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
107 update_currently_syncing(state, None)
108
109
110 # Currently syncing sets the stream currently being delivered in the state.
111 # If the integration is interrupted, this state property is used to identify
112 # the starting point to continue from.
113 # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
114 def update_currently_syncing(state, stream_name):
115 if (stream_name is None) and ('currently_syncing' in state):
116 del state['currently_syncing']
117 else:
118 singer.set_currently_syncing(state, stream_name)
119 singer.write_state(state)
120
121
122 # List selected fields from stream catalog
123 def get_selected_fields(catalog, stream_name):
124 stream = catalog.get_stream(stream_name)
125 mdata = metadata.to_map(stream.metadata)
126 mdata_list = singer.metadata.to_list(mdata)
127 selected_fields = []
128 for entry in mdata_list:
129 field = None
130 try:
131 field = entry['breadcrumb'][1]
132 if entry.get('metadata', {}).get('selected', False):
133 selected_fields.append(field)
134 except IndexError:
135 pass
136 return selected_fields
137
138
139 def get_data(stream_name,
140 endpoint_config,
141 client,
142 spreadsheet_id,
143 range_rows=None):
144 if not range_rows:
145 range_rows = ''
146 # Replace {placeholder} variables in path
147 path = endpoint_config.get('path', stream_name).replace(
148 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
149 '{range_rows}', range_rows)
150 params = endpoint_config.get('params', {})
151 api = endpoint_config.get('api', 'sheets')
152 # Add in querystring parameters and replace {placeholder} variables
153 # querystring function ensures parameters are added but not encoded causing API errors
154 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
155 '{sheet_title}', stream_name)
156 data = {}
157 time_extracted = utils.now()
158 data = client.get(
159 path=path,
160 api=api,
161 params=querystring,
162 endpoint=stream_name)
163 return data, time_extracted
164
165
166 # Tranform file_metadata: remove nodes from lastModifyingUser, format as array
167 def transform_file_metadata(file_metadata):
168 # Convert to dict
169 file_metadata_tf = json.loads(json.dumps(file_metadata))
170 # Remove keys
171 if file_metadata_tf.get('lastModifyingUser'):
172 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
173 file_metadata_tf['lastModifyingUser'].pop('me', None)
174 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
175 # Add record to an array of 1
176 file_metadata_arr = []
177 file_metadata_arr.append(file_metadata_tf)
178 return file_metadata_arr
179
180
181 # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
182 def transform_spreadsheet_metadata(spreadsheet_metadata):
183 # Convert to dict
184 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
185 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
186 if spreadsheet_metadata_tf.get('properties'):
187 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
188 spreadsheet_metadata_tf.pop('sheets', None)
189 # Add record to an array of 1
190 spreadsheet_metadata_arr = []
191 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
192 return spreadsheet_metadata_arr
193
194
195 # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
196 def transform_sheet_metadata(spreadsheet_id, sheet, columns):
197 # Convert to properties to dict
198 sheet_metadata = sheet.get('properties')
199 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
200 sheet_id = sheet_metadata_tf.get('sheetId')
201 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
202 spreadsheet_id, sheet_id)
203 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
204 sheet_metadata_tf['sheetUrl'] = sheet_url
205 sheet_metadata_tf['columns'] = columns
206 return sheet_metadata_tf
207
208
209 # Convert Excel Date Serial Number (excel_date_sn) to datetime string
210 # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
211 def excel_to_dttm_str(excel_date_sn, timezone_str=None):
212 if not timezone_str:
213 timezone_str = 'UTC'
214 tzn = pytz.timezone(timezone_str)
215 sec_per_day = 86400
216 excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
217 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
218 epoch_dttm = datetime(1970, 1, 1)
219 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
220 utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
221 utc_dttm_str = strftime(utc_dttm)
222 return utc_dttm_str
223
224
225 # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
226 # Convert from array of values to JSON with column names as keys
227 def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
228 sheet_data_tf = []
229 row_num = from_row
230 # Create sorted list of columns based on columnIndex
231 cols = sorted(columns, key=lambda i: i['columnIndex'])
232
233 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
234 for row in sheet_data_rows:
235 # If empty row, SKIP
236 if row == []:
237 LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
238 else:
239 sheet_data_row_tf = {}
240 # Add spreadsheet_id, sheet_id, and row
241 sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
242 sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
243 sheet_data_row_tf['__sdc_row'] = row_num
244 col_num = 1
245 for value in row:
246 # Select column metadata based on column index
247 col = cols[col_num - 1]
248 col_skipped = col.get('columnSkipped')
249 if not col_skipped:
250 # Get column metadata
251 col_name = col.get('columnName')
252 col_type = col.get('columnType')
253 col_letter = col.get('columnLetter')
254
255 # NULL values
256 if value is None or value == '':
257 col_val = None
258
259 # Convert dates/times from Lotus Notes Serial Numbers
260 # DATE-TIME
261 elif col_type == 'numberType.DATE_TIME':
262 if isinstance(value, (int, float)):
263 col_val = excel_to_dttm_str(value)
264 else:
265 col_val = str(value)
266 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
267 sheet_title, col_name, col_letter, row_num, col_type, value))
268 # DATE
269 elif col_type == 'numberType.DATE':
270 if isinstance(value, (int, float)):
271 col_val = excel_to_dttm_str(value)[:10]
272 else:
273 col_val = str(value)
274 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
275 sheet_title, col_name, col_letter, row_num, col_type, value))
276 # TIME ONLY (NO DATE)
277 elif col_type == 'numberType.TIME':
278 if isinstance(value, (int, float)):
279 try:
280 total_secs = value * 86400 # seconds in day
281 # Create string formatted like HH:MM:SS
282 col_val = str(timedelta(seconds=total_secs))
283 except ValueError:
284 col_val = str(value)
285 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
286 sheet_title, col_name, col_letter, row_num, col_type, value))
287 else:
288 col_val = str(value)
289 # NUMBER (INTEGER AND FLOAT)
290 elif col_type == 'numberType':
291 if isinstance(value, int):
292 col_val = int(value)
293 elif isinstance(value, float):
294 # Determine float decimal digits
295 decimal_digits = str(value)[::-1].find('.')
296 if decimal_digits > 15:
297 try:
298 # ROUND to multipleOf: 1e-15
299 col_val = float(round(value, 15))
300 except ValueError:
301 col_val = str(value)
302 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
303 sheet_title, col_name, col_letter, row_num, col_type, value))
304 else: # decimal_digits <= 15, no rounding
305 try:
306 col_val = float(value)
307 except ValueError:
308 col_val = str(value)
309 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
310 sheet_title, col_name, col_letter, row_num, col_type, value))
311 else:
312 col_val = str(value)
313 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
314 sheet_title, col_name, col_letter, row_num, col_type, value))
315 # STRING
316 elif col_type == 'stringValue':
317 col_val = str(value)
318 # BOOLEAN
319 elif col_type == 'boolValue':
320 if isinstance(value, bool):
321 col_val = value
322 elif isinstance(value, str):
323 if value.lower() in ('true', 't', 'yes', 'y'):
324 col_val = True
325 elif value.lower() in ('false', 'f', 'no', 'n'):
326 col_val = False
327 else:
328 col_val = str(value)
329 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
330 sheet_title, col_name, col_letter, row, col_type, value))
331 elif isinstance(value, int):
332 if value in (1, -1):
333 col_val = True
334 elif value == 0:
335 col_val = False
336 else:
337 col_val = str(value)
338 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
339 sheet_title, col_name, col_letter, row, col_type, value))
340 # OTHER: Convert everything else to a string
341 else:
342 col_val = str(value)
343 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
344 sheet_title, col_name, col_letter, row, col_type, value))
345 sheet_data_row_tf[col_name] = col_val
346 col_num = col_num + 1
347 # APPEND non-empty row
348 sheet_data_tf.append(sheet_data_row_tf)
349 row_num = row_num + 1
350 return sheet_data_tf, row_num
351
352
353 def sync(client, config, catalog, state):
354 start_date = config.get('start_date')
355 spreadsheet_id = config.get('spreadsheet_id')
356
357 # Get selected_streams from catalog, based on state last_stream
358 # last_stream = Previous currently synced stream, if the load was interrupted
359 last_stream = singer.get_currently_syncing(state)
360 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
361 selected_streams = []
362 for stream in catalog.get_selected_streams(state):
363 selected_streams.append(stream.stream)
364 LOGGER.info('selected_streams: {}'.format(selected_streams))
365
366 if not selected_streams:
367 return
368
369 # FILE_METADATA
370 file_metadata = {}
371 stream_name = 'file_metadata'
372 file_metadata_config = STREAMS.get(stream_name)
373
374 # GET file_metadata
375 LOGGER.info('GET file_meatadata')
376 file_metadata, time_extracted = get_data(stream_name=stream_name,
377 endpoint_config=file_metadata_config,
378 client=client,
379 spreadsheet_id=spreadsheet_id)
380 # Transform file_metadata
381 LOGGER.info('Transform file_meatadata')
382 file_metadata_tf = transform_file_metadata(file_metadata)
383 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
384
385 # Check if file has changed, if not break (return to __init__)
386 last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
387 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
388 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
389 if this_datetime <= last_datetime:
390 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
391 # Update file_metadata bookmark
392 write_bookmark(state, 'file_metadata', strftime(this_datetime))
393 return
394 # Sync file_metadata if selected
395 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
396 # file_metadata bookmark is updated at the end of sync
397
398 # SPREADSHEET_METADATA
399 spreadsheet_metadata = {}
400 stream_name = 'spreadsheet_metadata'
401 spreadsheet_metadata_config = STREAMS.get(stream_name)
402
403 # GET spreadsheet_metadata
404 LOGGER.info('GET spreadsheet_meatadata')
405 spreadsheet_metadata, ss_time_extracted = get_data(
406 stream_name=stream_name,
407 endpoint_config=spreadsheet_metadata_config,
408 client=client,
409 spreadsheet_id=spreadsheet_id)
410
411 # Transform spreadsheet_metadata
412 LOGGER.info('Transform spreadsheet_meatadata')
413 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
414
415 # Sync spreadsheet_metadata if selected
416 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
417 ss_time_extracted)
418
419 # SHEET_METADATA and SHEET_DATA
420 sheets = spreadsheet_metadata.get('sheets')
421 sheet_metadata = []
422 sheets_loaded = []
423 sheets_loaded_config = STREAMS['sheets_loaded']
424 if sheets:
425 # Loop thru sheets (worksheet tabs) in spreadsheet
426 for sheet in sheets:
427 sheet_title = sheet.get('properties', {}).get('title')
428 sheet_id = sheet.get('properties', {}).get('sheetId')
429
430 # GET sheet_metadata and columns
431 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
432 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
433
434 # SKIP empty sheets (where sheet_schema and columns are None)
435 if not sheet_schema or not columns:
436 LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
437 else:
438 # Transform sheet_metadata
439 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
440 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
441 sheet_metadata.append(sheet_metadata_tf)
442
443 # SHEET_DATA
444 # Should this worksheet tab be synced?
445 if sheet_title in selected_streams:
446 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
447 update_currently_syncing(state, sheet_title)
448 selected_fields = get_selected_fields(catalog, sheet_title)
449 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
450 write_schema(catalog, sheet_title)
451
452 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
453 # everytime after each sheet sync is complete.
454 # This forces hard deletes on the data downstream if fewer records are sent.
455 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
456 last_integer = int(get_bookmark(state, sheet_title, 0))
457 activate_version = int(time.time() * 1000)
458 activate_version_message = singer.ActivateVersionMessage(
459 stream=sheet_title,
460 version=activate_version)
461 if last_integer == 0:
462 # initial load, send activate_version before AND after data sync
463 singer.write_message(activate_version_message)
464 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
465
466 # Determine max range of columns and rows for "paging" through the data
467 sheet_last_col_index = 1
468 sheet_last_col_letter = 'A'
469 for col in columns:
470 col_index = col.get('columnIndex')
471 col_letter = col.get('columnLetter')
472 if col_index > sheet_last_col_index:
473 sheet_last_col_index = col_index
474 sheet_last_col_letter = col_letter
475 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
476
477 # Initialize paging for 1st batch
478 is_last_row = False
479 batch_rows = 200
480 from_row = 2
481 if sheet_max_row < batch_rows:
482 to_row = sheet_max_row
483 else:
484 to_row = batch_rows
485
486 # Loop thru batches (each having 200 rows of data)
487 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
488 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
489
490 # GET sheet_data for a worksheet tab
491 sheet_data, time_extracted = get_data(
492 stream_name=sheet_title,
493 endpoint_config=sheets_loaded_config,
494 client=client,
495 spreadsheet_id=spreadsheet_id,
496 range_rows=range_rows)
497 # Data is returned as a list of arrays, an array of values for each row
498 sheet_data_rows = sheet_data.get('values')
499
500 # Transform batch of rows to JSON with keys for each column
501 sheet_data_tf, row_num = transform_sheet_data(
502 spreadsheet_id=spreadsheet_id,
503 sheet_id=sheet_id,
504 sheet_title=sheet_title,
505 from_row=from_row,
506 columns=columns,
507 sheet_data_rows=sheet_data_rows)
508 if row_num < to_row:
509 is_last_row = True
510
511 # Process records, send batch of records to target
512 record_count = process_records(
513 catalog=catalog,
514 stream_name=sheet_title,
515 records=sheet_data_tf,
516 time_extracted=ss_time_extracted,
517 version=activate_version)
518 LOGGER.info('Sheet: {}, records processed: {}'.format(
519 sheet_title, record_count))
520
521 # Update paging from/to_row for next batch
522 from_row = to_row + 1
523 if to_row + batch_rows > sheet_max_row:
524 to_row = sheet_max_row
525 else:
526 to_row = to_row + batch_rows
527
528 # End of Stream: Send Activate Version and update State
529 singer.write_message(activate_version_message)
530 write_bookmark(state, sheet_title, activate_version)
531 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
532 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
533 sheet_title, row_num - 2)) # subtract 1 for header row
534 update_currently_syncing(state, None)
535
536 # SHEETS_LOADED
537 # Add sheet to sheets_loaded
538 sheet_loaded = {}
539 sheet_loaded['spreadsheetId'] = spreadsheet_id
540 sheet_loaded['sheetId'] = sheet_id
541 sheet_loaded['title'] = sheet_title
542 sheet_loaded['loadDate'] = strftime(utils.now())
543 sheet_loaded['lastRowNumber'] = row_num
544 sheets_loaded.append(sheet_loaded)
545
546 stream_name = 'sheet_metadata'
547 # Sync sheet_metadata if selected
548 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
549
550 stream_name = 'sheets_loaded'
551 # Sync sheet_metadata if selected
552 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
553
554 # Update file_metadata bookmark
555 write_bookmark(state, 'file_metadata', strftime(this_datetime))
556
557 return