]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blob - tap_google_sheets/sync.py
v.1.0.3 Fix slashes and discovery errors (#15)
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
1 import time
2 import math
3 import json
4 import re
5 import urllib.parse
6 from datetime import datetime, timedelta
7 import pytz
8 import singer
9 from singer import metrics, metadata, Transformer, utils
10 from singer.utils import strptime_to_utc, strftime
11 from singer.messages import RecordMessage
12 from tap_google_sheets.streams import STREAMS
13 from tap_google_sheets.schema import get_sheet_metadata
14
15 LOGGER = singer.get_logger()
16
17
18 def write_schema(catalog, stream_name):
19 stream = catalog.get_stream(stream_name)
20 schema = stream.schema.to_dict()
21 try:
22 singer.write_schema(stream_name, schema, stream.key_properties)
23 LOGGER.info('Writing schema for: {}'.format(stream_name))
24 except OSError as err:
25 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
26 raise err
27
28
29 def write_record(stream_name, record, time_extracted, version=None):
30 try:
31 if version:
32 singer.messages.write_message(
33 RecordMessage(
34 stream=stream_name,
35 record=record,
36 version=version,
37 time_extracted=time_extracted))
38 else:
39 singer.messages.write_record(
40 stream_name=stream_name,
41 record=record,
42 time_extracted=time_extracted)
43 except OSError as err:
44 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
45 LOGGER.info('record: {}'.format(record))
46 raise err
47
48
49 def get_bookmark(state, stream, default):
50 if (state is None) or ('bookmarks' not in state):
51 return default
52 return (
53 state
54 .get('bookmarks', {})
55 .get(stream, default)
56 )
57
58
59 def write_bookmark(state, stream, value):
60 if 'bookmarks' not in state:
61 state['bookmarks'] = {}
62 state['bookmarks'][stream] = value
63 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
64 singer.write_state(state)
65
66
67 # Transform/validate batch of records w/ schema and sent to target
68 def process_records(catalog,
69 stream_name,
70 records,
71 time_extracted,
72 version=None):
73 stream = catalog.get_stream(stream_name)
74 schema = stream.schema.to_dict()
75 stream_metadata = metadata.to_map(stream.metadata)
76 with metrics.record_counter(stream_name) as counter:
77 for record in records:
78 # Transform record for Singer.io
79 with Transformer() as transformer:
80 try:
81 transformed_record = transformer.transform(
82 record,
83 schema,
84 stream_metadata)
85 except Exception as err:
86 LOGGER.error('{}'.format(err))
87 raise RuntimeError(err)
88 write_record(
89 stream_name=stream_name,
90 record=transformed_record,
91 time_extracted=time_extracted,
92 version=version)
93 counter.increment()
94 return counter.value
95
96
97 def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
98 # Should sheets_loaded be synced?
99 if stream_name in selected_streams:
100 LOGGER.info('STARTED Syncing {}'.format(stream_name))
101 update_currently_syncing(state, stream_name)
102 selected_fields = get_selected_fields(catalog, stream_name)
103 LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
104 write_schema(catalog, stream_name)
105 if not time_extracted:
106 time_extracted = utils.now()
107 record_count = process_records(
108 catalog=catalog,
109 stream_name=stream_name,
110 records=records,
111 time_extracted=time_extracted)
112 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
113 update_currently_syncing(state, None)
114
115
116 # Currently syncing sets the stream currently being delivered in the state.
117 # If the integration is interrupted, this state property is used to identify
118 # the starting point to continue from.
119 # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
120 def update_currently_syncing(state, stream_name):
121 if (stream_name is None) and ('currently_syncing' in state):
122 del state['currently_syncing']
123 else:
124 singer.set_currently_syncing(state, stream_name)
125 singer.write_state(state)
126
127
128 # List selected fields from stream catalog
129 def get_selected_fields(catalog, stream_name):
130 stream = catalog.get_stream(stream_name)
131 mdata = metadata.to_map(stream.metadata)
132 mdata_list = singer.metadata.to_list(mdata)
133 selected_fields = []
134 for entry in mdata_list:
135 field = None
136 try:
137 field = entry['breadcrumb'][1]
138 if entry.get('metadata', {}).get('selected', False):
139 selected_fields.append(field)
140 except IndexError:
141 pass
142 return selected_fields
143
144
145 def get_data(stream_name,
146 endpoint_config,
147 client,
148 spreadsheet_id,
149 range_rows=None):
150 if not range_rows:
151 range_rows = ''
152 # Replace {placeholder} variables in path
153 # Encode stream_name: fixes issue w/ special characters in sheet name
154 stream_name_escaped = re.escape(stream_name)
155 stream_name_encoded = urllib.parse.quote_plus(stream_name)
156 path = endpoint_config.get('path', stream_name).replace(
157 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace(
158 '{range_rows}', range_rows)
159 params = endpoint_config.get('params', {})
160 api = endpoint_config.get('api', 'sheets')
161 # Add in querystring parameters and replace {placeholder} variables
162 # querystring function ensures parameters are added but not encoded causing API errors
163 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
164 '{sheet_title}', stream_name_encoded)
165 LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring))
166 data = {}
167 time_extracted = utils.now()
168 data = client.get(
169 path=path,
170 api=api,
171 params=querystring,
172 endpoint=stream_name_escaped)
173 return data, time_extracted
174
175
176 # Tranform file_metadata: remove nodes from lastModifyingUser, format as array
177 def transform_file_metadata(file_metadata):
178 # Convert to dict
179 file_metadata_tf = json.loads(json.dumps(file_metadata))
180 # Remove keys
181 if file_metadata_tf.get('lastModifyingUser'):
182 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
183 file_metadata_tf['lastModifyingUser'].pop('me', None)
184 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
185 # Add record to an array of 1
186 file_metadata_arr = []
187 file_metadata_arr.append(file_metadata_tf)
188 return file_metadata_arr
189
190
191 # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
192 def transform_spreadsheet_metadata(spreadsheet_metadata):
193 # Convert to dict
194 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
195 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
196 if spreadsheet_metadata_tf.get('properties'):
197 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
198 spreadsheet_metadata_tf.pop('sheets', None)
199 # Add record to an array of 1
200 spreadsheet_metadata_arr = []
201 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
202 return spreadsheet_metadata_arr
203
204
205 # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
206 def transform_sheet_metadata(spreadsheet_id, sheet, columns):
207 # Convert to properties to dict
208 sheet_metadata = sheet.get('properties')
209 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
210 sheet_id = sheet_metadata_tf.get('sheetId')
211 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
212 spreadsheet_id, sheet_id)
213 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
214 sheet_metadata_tf['sheetUrl'] = sheet_url
215 sheet_metadata_tf['columns'] = columns
216 return sheet_metadata_tf
217
218
219 # Convert Excel Date Serial Number (excel_date_sn) to datetime string
220 # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
221 def excel_to_dttm_str(excel_date_sn, timezone_str=None):
222 if not timezone_str:
223 timezone_str = 'UTC'
224 tzn = pytz.timezone(timezone_str)
225 sec_per_day = 86400
226 excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
227 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
228 epoch_dttm = datetime(1970, 1, 1)
229 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
230 utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
231 utc_dttm_str = strftime(utc_dttm)
232 return utc_dttm_str
233
234
235 # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
236 # Convert from array of values to JSON with column names as keys
237 def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
238 sheet_data_tf = []
239 row_num = from_row
240 # Create sorted list of columns based on columnIndex
241 cols = sorted(columns, key=lambda i: i['columnIndex'])
242
243 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
244 for row in sheet_data_rows:
245 # If empty row, SKIP
246 if row == []:
247 LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
248 else:
249 sheet_data_row_tf = {}
250 # Add spreadsheet_id, sheet_id, and row
251 sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
252 sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
253 sheet_data_row_tf['__sdc_row'] = row_num
254 col_num = 1
255 for value in row:
256 # Select column metadata based on column index
257 col = cols[col_num - 1]
258 col_skipped = col.get('columnSkipped')
259 if not col_skipped:
260 # Get column metadata
261 col_name = col.get('columnName')
262 col_type = col.get('columnType')
263 col_letter = col.get('columnLetter')
264
265 # NULL values
266 if value is None or value == '':
267 col_val = None
268
269 # Convert dates/times from Lotus Notes Serial Numbers
270 # DATE-TIME
271 elif col_type == 'numberType.DATE_TIME':
272 if isinstance(value, (int, float)):
273 col_val = excel_to_dttm_str(value)
274 else:
275 col_val = str(value)
276 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
277 sheet_title, col_name, col_letter, row_num, col_type, value))
278 # DATE
279 elif col_type == 'numberType.DATE':
280 if isinstance(value, (int, float)):
281 col_val = excel_to_dttm_str(value)[:10]
282 else:
283 col_val = str(value)
284 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
285 sheet_title, col_name, col_letter, row_num, col_type, value))
286 # TIME ONLY (NO DATE)
287 elif col_type == 'numberType.TIME':
288 if isinstance(value, (int, float)):
289 try:
290 total_secs = value * 86400 # seconds in day
291 # Create string formatted like HH:MM:SS
292 col_val = str(timedelta(seconds=total_secs))
293 except ValueError:
294 col_val = str(value)
295 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
296 sheet_title, col_name, col_letter, row_num, col_type, value))
297 else:
298 col_val = str(value)
299 # NUMBER (INTEGER AND FLOAT)
300 elif col_type == 'numberType':
301 if isinstance(value, int):
302 col_val = int(value)
303 elif isinstance(value, float):
304 # Determine float decimal digits
305 decimal_digits = str(value)[::-1].find('.')
306 if decimal_digits > 15:
307 try:
308 # ROUND to multipleOf: 1e-15
309 col_val = float(round(value, 15))
310 except ValueError:
311 col_val = str(value)
312 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
313 sheet_title, col_name, col_letter, row_num, col_type, value))
314 else: # decimal_digits <= 15, no rounding
315 try:
316 col_val = float(value)
317 except ValueError:
318 col_val = str(value)
319 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
320 sheet_title, col_name, col_letter, row_num, col_type, value))
321 else:
322 col_val = str(value)
323 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
324 sheet_title, col_name, col_letter, row_num, col_type, value))
325 # STRING
326 elif col_type == 'stringValue':
327 col_val = str(value)
328 # BOOLEAN
329 elif col_type == 'boolValue':
330 if isinstance(value, bool):
331 col_val = value
332 elif isinstance(value, str):
333 if value.lower() in ('true', 't', 'yes', 'y'):
334 col_val = True
335 elif value.lower() in ('false', 'f', 'no', 'n'):
336 col_val = False
337 else:
338 col_val = str(value)
339 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
340 sheet_title, col_name, col_letter, row, col_type, value))
341 elif isinstance(value, int):
342 if value in (1, -1):
343 col_val = True
344 elif value == 0:
345 col_val = False
346 else:
347 col_val = str(value)
348 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
349 sheet_title, col_name, col_letter, row, col_type, value))
350 # OTHER: Convert everything else to a string
351 else:
352 col_val = str(value)
353 LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
354 sheet_title, col_name, col_letter, row, col_type, value))
355 sheet_data_row_tf[col_name] = col_val
356 col_num = col_num + 1
357 # APPEND non-empty row
358 sheet_data_tf.append(sheet_data_row_tf)
359 row_num = row_num + 1
360 return sheet_data_tf, row_num
361
362
363 def sync(client, config, catalog, state):
364 start_date = config.get('start_date')
365 spreadsheet_id = config.get('spreadsheet_id')
366
367 # Get selected_streams from catalog, based on state last_stream
368 # last_stream = Previous currently synced stream, if the load was interrupted
369 last_stream = singer.get_currently_syncing(state)
370 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
371 selected_streams = []
372 for stream in catalog.get_selected_streams(state):
373 selected_streams.append(stream.stream)
374 LOGGER.info('selected_streams: {}'.format(selected_streams))
375
376 if not selected_streams:
377 return
378
379 # FILE_METADATA
380 file_metadata = {}
381 stream_name = 'file_metadata'
382 file_metadata_config = STREAMS.get(stream_name)
383
384 # GET file_metadata
385 LOGGER.info('GET file_meatadata')
386 file_metadata, time_extracted = get_data(stream_name=stream_name,
387 endpoint_config=file_metadata_config,
388 client=client,
389 spreadsheet_id=spreadsheet_id)
390 # Transform file_metadata
391 LOGGER.info('Transform file_meatadata')
392 file_metadata_tf = transform_file_metadata(file_metadata)
393 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
394
395 # Check if file has changed, if not break (return to __init__)
396 last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
397 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
398 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
399 if this_datetime <= last_datetime:
400 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
401 # Update file_metadata bookmark
402 write_bookmark(state, 'file_metadata', strftime(this_datetime))
403 return
404 # Sync file_metadata if selected
405 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
406 # file_metadata bookmark is updated at the end of sync
407
408 # SPREADSHEET_METADATA
409 spreadsheet_metadata = {}
410 stream_name = 'spreadsheet_metadata'
411 spreadsheet_metadata_config = STREAMS.get(stream_name)
412
413 # GET spreadsheet_metadata
414 LOGGER.info('GET spreadsheet_meatadata')
415 spreadsheet_metadata, ss_time_extracted = get_data(
416 stream_name=stream_name,
417 endpoint_config=spreadsheet_metadata_config,
418 client=client,
419 spreadsheet_id=spreadsheet_id)
420
421 # Transform spreadsheet_metadata
422 LOGGER.info('Transform spreadsheet_meatadata')
423 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
424
425 # Sync spreadsheet_metadata if selected
426 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
427 ss_time_extracted)
428
429 # SHEET_METADATA and SHEET_DATA
430 sheets = spreadsheet_metadata.get('sheets')
431 sheet_metadata = []
432 sheets_loaded = []
433 sheets_loaded_config = STREAMS['sheets_loaded']
434 if sheets:
435 # Loop thru sheets (worksheet tabs) in spreadsheet
436 for sheet in sheets:
437 sheet_title = sheet.get('properties', {}).get('title')
438 sheet_id = sheet.get('properties', {}).get('sheetId')
439
440 # GET sheet_metadata and columns
441 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
442 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
443
444 # SKIP empty sheets (where sheet_schema and columns are None)
445 if not sheet_schema or not columns:
446 LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
447 else:
448 # Transform sheet_metadata
449 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
450 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
451 sheet_metadata.append(sheet_metadata_tf)
452
453 # SHEET_DATA
454 # Should this worksheet tab be synced?
455 if sheet_title in selected_streams:
456 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
457 update_currently_syncing(state, sheet_title)
458 selected_fields = get_selected_fields(catalog, sheet_title)
459 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
460 write_schema(catalog, sheet_title)
461
462 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
463 # everytime after each sheet sync is complete.
464 # This forces hard deletes on the data downstream if fewer records are sent.
465 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
466 last_integer = int(get_bookmark(state, sheet_title, 0))
467 activate_version = int(time.time() * 1000)
468 activate_version_message = singer.ActivateVersionMessage(
469 stream=sheet_title,
470 version=activate_version)
471 if last_integer == 0:
472 # initial load, send activate_version before AND after data sync
473 singer.write_message(activate_version_message)
474 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
475
476 # Determine max range of columns and rows for "paging" through the data
477 sheet_last_col_index = 1
478 sheet_last_col_letter = 'A'
479 for col in columns:
480 col_index = col.get('columnIndex')
481 col_letter = col.get('columnLetter')
482 if col_index > sheet_last_col_index:
483 sheet_last_col_index = col_index
484 sheet_last_col_letter = col_letter
485 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
486
487 # Initialize paging for 1st batch
488 is_last_row = False
489 batch_rows = 200
490 from_row = 2
491 if sheet_max_row < batch_rows:
492 to_row = sheet_max_row
493 else:
494 to_row = batch_rows
495
496 # Loop thru batches (each having 200 rows of data)
497 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
498 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
499
500 # GET sheet_data for a worksheet tab
501 sheet_data, time_extracted = get_data(
502 stream_name=sheet_title,
503 endpoint_config=sheets_loaded_config,
504 client=client,
505 spreadsheet_id=spreadsheet_id,
506 range_rows=range_rows)
507 # Data is returned as a list of arrays, an array of values for each row
508 sheet_data_rows = sheet_data.get('values')
509
510 # Transform batch of rows to JSON with keys for each column
511 sheet_data_tf, row_num = transform_sheet_data(
512 spreadsheet_id=spreadsheet_id,
513 sheet_id=sheet_id,
514 sheet_title=sheet_title,
515 from_row=from_row,
516 columns=columns,
517 sheet_data_rows=sheet_data_rows)
518 if row_num < to_row:
519 is_last_row = True
520
521 # Process records, send batch of records to target
522 record_count = process_records(
523 catalog=catalog,
524 stream_name=sheet_title,
525 records=sheet_data_tf,
526 time_extracted=ss_time_extracted,
527 version=activate_version)
528 LOGGER.info('Sheet: {}, records processed: {}'.format(
529 sheet_title, record_count))
530
531 # Update paging from/to_row for next batch
532 from_row = to_row + 1
533 if to_row + batch_rows > sheet_max_row:
534 to_row = sheet_max_row
535 else:
536 to_row = to_row + batch_rows
537
538 # End of Stream: Send Activate Version and update State
539 singer.write_message(activate_version_message)
540 write_bookmark(state, sheet_title, activate_version)
541 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
542 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
543 sheet_title, row_num - 2)) # subtract 1 for header row
544 update_currently_syncing(state, None)
545
546 # SHEETS_LOADED
547 # Add sheet to sheets_loaded
548 sheet_loaded = {}
549 sheet_loaded['spreadsheetId'] = spreadsheet_id
550 sheet_loaded['sheetId'] = sheet_id
551 sheet_loaded['title'] = sheet_title
552 sheet_loaded['loadDate'] = strftime(utils.now())
553 sheet_loaded['lastRowNumber'] = row_num
554 sheets_loaded.append(sheet_loaded)
555
556 stream_name = 'sheet_metadata'
557 # Sync sheet_metadata if selected
558 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
559
560 stream_name = 'sheets_loaded'
561 # Sync sheet_metadata if selected
562 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
563
564 # Update file_metadata bookmark
565 write_bookmark(state, 'file_metadata', strftime(this_datetime))
566
567 return