6 from datetime
import datetime
, timedelta
9 from singer
import metrics
, metadata
, Transformer
, utils
10 from singer
.utils
import strptime_to_utc
, strftime
11 from singer
.messages
import RecordMessage
12 from tap_google_sheets
.streams
import STREAMS
13 from tap_google_sheets
.schema
import get_sheet_metadata
15 LOGGER
= singer
.get_logger()
18 def write_schema(catalog
, stream_name
):
19 stream
= catalog
.get_stream(stream_name
)
20 schema
= stream
.schema
.to_dict()
22 singer
.write_schema(stream_name
, schema
, stream
.key_properties
)
23 LOGGER
.info('Writing schema for: {}'.format(stream_name
))
24 except OSError as err
:
25 LOGGER
.info('OS Error writing schema for: {}'.format(stream_name
))
29 def write_record(stream_name
, record
, time_extracted
, version
=None):
32 singer
.messages
.write_message(
37 time_extracted
=time_extracted
))
39 singer
.messages
.write_record(
40 stream_name
=stream_name
,
42 time_extracted
=time_extracted
)
43 except OSError as err
:
44 LOGGER
.info('OS Error writing record for: {}'.format(stream_name
))
45 LOGGER
.info('record: {}'.format(record
))
49 def get_bookmark(state
, stream
, default
):
50 if (state
is None) or ('bookmarks' not in state
):
59 def write_bookmark(state
, stream
, value
):
60 if 'bookmarks' not in state
:
61 state
['bookmarks'] = {}
62 state
['bookmarks'][stream
] = value
63 LOGGER
.info('Write state for stream: {}, value: {}'.format(stream
, value
))
64 singer
.write_state(state
)
67 def drop_date_on_time(schema
, record
):
68 for field
, field_schema
in schema
['properties'].items():
69 if field_schema
.get('format') == 'time' and 'days,' in record
[field
]:
70 # `time` fields can come back from Google like `X days, H:M:S`
71 old_time
= record
[field
]
72 new_time
= old_time
.split(',')[1].strip()
73 record
[field
] = new_time
76 # Transform/validate batch of records w/ schema and sent to target
77 def process_records(catalog
,
82 stream
= catalog
.get_stream(stream_name
)
83 schema
= stream
.schema
.to_dict()
84 stream_metadata
= metadata
.to_map(stream
.metadata
)
85 with metrics
.record_counter(stream_name
) as counter
:
86 for record
in records
:
87 # Transform record for Singer.io
88 with Transformer() as transformer
:
90 edited_record
= drop_date_on_time(schema
, record
)
91 transformed_record
= transformer
.transform(
95 except Exception as err
:
96 LOGGER
.error('{}'.format(err
))
97 raise RuntimeError(err
)
99 stream_name
=stream_name
,
100 record
=transformed_record
,
101 time_extracted
=time_extracted
,
107 def sync_stream(stream_name
, selected_streams
, catalog
, state
, records
, time_extracted
=None):
108 # Should sheets_loaded be synced?
109 if stream_name
in selected_streams
:
110 LOGGER
.info('STARTED Syncing {}'.format(stream_name
))
111 update_currently_syncing(state
, stream_name
)
112 selected_fields
= get_selected_fields(catalog
, stream_name
)
113 LOGGER
.info('Stream: {}, selected_fields: {}'.format(stream_name
, selected_fields
))
114 write_schema(catalog
, stream_name
)
115 if not time_extracted
:
116 time_extracted
= utils
.now()
117 record_count
= process_records(
119 stream_name
=stream_name
,
121 time_extracted
=time_extracted
)
122 LOGGER
.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name
, record_count
))
123 update_currently_syncing(state
, None)
126 # Currently syncing sets the stream currently being delivered in the state.
127 # If the integration is interrupted, this state property is used to identify
128 # the starting point to continue from.
129 # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
130 def update_currently_syncing(state
, stream_name
):
131 if (stream_name
is None) and ('currently_syncing' in state
):
132 del state
['currently_syncing']
134 singer
.set_currently_syncing(state
, stream_name
)
135 singer
.write_state(state
)
138 # List selected fields from stream catalog
139 def get_selected_fields(catalog
, stream_name
):
140 stream
= catalog
.get_stream(stream_name
)
141 mdata
= metadata
.to_map(stream
.metadata
)
142 mdata_list
= singer
.metadata
.to_list(mdata
)
144 for entry
in mdata_list
:
147 field
= entry
['breadcrumb'][1]
148 if entry
.get('metadata', {}).get('selected', False):
149 selected_fields
.append(field
)
152 return selected_fields
155 def get_data(stream_name
,
162 # Replace {placeholder} variables in path
163 # Encode stream_name: fixes issue w/ special characters in sheet name
164 stream_name_escaped
= re
.escape(stream_name
)
165 stream_name_encoded
= urllib
.parse
.quote_plus(stream_name
)
166 path
= endpoint_config
.get('path', stream_name
).replace(
167 '{spreadsheet_id}', spreadsheet_id
).replace('{sheet_title}', stream_name_encoded
).replace(
168 '{range_rows}', range_rows
)
169 params
= endpoint_config
.get('params', {})
170 api
= endpoint_config
.get('api', 'sheets')
171 # Add in querystring parameters and replace {placeholder} variables
172 # querystring function ensures parameters are added but not encoded causing API errors
173 querystring
= '&'.join(['%s=%s' % (key
, value
) for (key
, value
) in params
.items()]).replace(
174 '{sheet_title}', stream_name_encoded
)
175 LOGGER
.info('URL: {}/{}?{}'.format(client
.base_url
, path
, querystring
))
177 time_extracted
= utils
.now()
182 endpoint
=stream_name_escaped
)
183 return data
, time_extracted
186 # Tranform file_metadata: remove nodes from lastModifyingUser, format as array
187 def transform_file_metadata(file_metadata
):
189 file_metadata_tf
= json
.loads(json
.dumps(file_metadata
))
191 if file_metadata_tf
.get('lastModifyingUser'):
192 file_metadata_tf
['lastModifyingUser'].pop('photoLink', None)
193 file_metadata_tf
['lastModifyingUser'].pop('me', None)
194 file_metadata_tf
['lastModifyingUser'].pop('permissionId', None)
195 # Add record to an array of 1
196 file_metadata_arr
= []
197 file_metadata_arr
.append(file_metadata_tf
)
198 return file_metadata_arr
201 # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
202 def transform_spreadsheet_metadata(spreadsheet_metadata
):
204 spreadsheet_metadata_tf
= json
.loads(json
.dumps(spreadsheet_metadata
))
205 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
206 if spreadsheet_metadata_tf
.get('properties'):
207 spreadsheet_metadata_tf
['properties'].pop('defaultFormat', None)
208 spreadsheet_metadata_tf
.pop('sheets', None)
209 # Add record to an array of 1
210 spreadsheet_metadata_arr
= []
211 spreadsheet_metadata_arr
.append(spreadsheet_metadata_tf
)
212 return spreadsheet_metadata_arr
215 # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
216 def transform_sheet_metadata(spreadsheet_id
, sheet
, columns
):
217 # Convert to properties to dict
218 sheet_metadata
= sheet
.get('properties')
219 sheet_metadata_tf
= json
.loads(json
.dumps(sheet_metadata
))
220 sheet_id
= sheet_metadata_tf
.get('sheetId')
221 sheet_url
= 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
222 spreadsheet_id
, sheet_id
)
223 sheet_metadata_tf
['spreadsheetId'] = spreadsheet_id
224 sheet_metadata_tf
['sheetUrl'] = sheet_url
225 sheet_metadata_tf
['columns'] = columns
226 return sheet_metadata_tf
229 # Convert Excel Date Serial Number (excel_date_sn) to datetime string
230 # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
231 def excel_to_dttm_str(excel_date_sn
, timezone_str
=None):
234 tzn
= pytz
.timezone(timezone_str
)
236 excel_epoch
= 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
237 epoch_sec
= math
.floor((excel_date_sn
- excel_epoch
) * sec_per_day
)
238 epoch_dttm
= datetime(1970, 1, 1)
239 excel_dttm
= epoch_dttm
+ timedelta(seconds
=epoch_sec
)
240 utc_dttm
= tzn
.localize(excel_dttm
).astimezone(pytz
.utc
)
241 utc_dttm_str
= strftime(utc_dttm
)
245 # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
246 # Convert from array of values to JSON with column names as keys
247 def transform_sheet_data(spreadsheet_id
, sheet_id
, sheet_title
, from_row
, columns
, sheet_data_rows
):
250 # Create sorted list of columns based on columnIndex
251 cols
= sorted(columns
, key
=lambda i
: i
['columnIndex'])
253 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
254 for row
in sheet_data_rows
:
257 LOGGER
.info('EMPTY ROW: {}, SKIPPING'.format(row_num
))
259 sheet_data_row_tf
= {}
260 # Add spreadsheet_id, sheet_id, and row
261 sheet_data_row_tf
['__sdc_spreadsheet_id'] = spreadsheet_id
262 sheet_data_row_tf
['__sdc_sheet_id'] = sheet_id
263 sheet_data_row_tf
['__sdc_row'] = row_num
266 # Select column metadata based on column index
267 col
= cols
[col_num
- 1]
268 col_skipped
= col
.get('columnSkipped')
270 # Get column metadata
271 col_name
= col
.get('columnName')
272 col_type
= col
.get('columnType')
273 col_letter
= col
.get('columnLetter')
276 if value
is None or value
== '':
279 # Convert dates/times from Lotus Notes Serial Numbers
281 elif col_type
== 'numberType.DATE_TIME':
282 if isinstance(value
, (int, float)):
283 col_val
= excel_to_dttm_str(value
)
286 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
287 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
289 elif col_type
== 'numberType.DATE':
290 if isinstance(value
, (int, float)):
291 col_val
= excel_to_dttm_str(value
)[:10]
294 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
295 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
296 # TIME ONLY (NO DATE)
297 elif col_type
== 'numberType.TIME':
298 if isinstance(value
, (int, float)):
300 total_secs
= value
* 86400 # seconds in day
301 # Create string formatted like HH:MM:SS
302 col_val
= str(timedelta(seconds
=total_secs
))
305 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
306 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
309 # NUMBER (INTEGER AND FLOAT)
310 elif col_type
== 'numberType':
311 if isinstance(value
, int):
313 elif isinstance(value
, float):
314 # Determine float decimal digits
315 decimal_digits
= str(value
)[::-1].find('.')
316 if decimal_digits
> 15:
318 # ROUND to multipleOf: 1e-15
319 col_val
= float(round(value
, 15))
322 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
323 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
324 else: # decimal_digits <= 15, no rounding
326 col_val
= float(value
)
329 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
330 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
333 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
334 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
336 elif col_type
== 'stringValue':
339 elif col_type
== 'boolValue':
340 if isinstance(value
, bool):
342 elif isinstance(value
, str):
343 if value
.lower() in ('true', 't', 'yes', 'y'):
345 elif value
.lower() in ('false', 'f', 'no', 'n'):
349 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
350 sheet_title
, col_name
, col_letter
, row
, col_type
, value
))
351 elif isinstance(value
, int):
358 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
359 sheet_title
, col_name
, col_letter
, row
, col_type
, value
))
360 # OTHER: Convert everything else to a string
363 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
364 sheet_title
, col_name
, col_letter
, row
, col_type
, value
))
365 sheet_data_row_tf
[col_name
] = col_val
366 col_num
= col_num
+ 1
367 # APPEND non-empty row
368 sheet_data_tf
.append(sheet_data_row_tf
)
369 row_num
= row_num
+ 1
370 return sheet_data_tf
, row_num
373 def sync(client
, config
, catalog
, state
):
374 start_date
= config
.get('start_date')
375 spreadsheet_id
= config
.get('spreadsheet_id')
377 # Get selected_streams from catalog, based on state last_stream
378 # last_stream = Previous currently synced stream, if the load was interrupted
379 last_stream
= singer
.get_currently_syncing(state
)
380 LOGGER
.info('last/currently syncing stream: {}'.format(last_stream
))
381 selected_streams
= []
382 for stream
in catalog
.get_selected_streams(state
):
383 selected_streams
.append(stream
.stream
)
384 LOGGER
.info('selected_streams: {}'.format(selected_streams
))
386 if not selected_streams
:
391 stream_name
= 'file_metadata'
392 file_metadata_config
= STREAMS
.get(stream_name
)
395 LOGGER
.info('GET file_meatadata')
396 file_metadata
, time_extracted
= get_data(stream_name
=stream_name
,
397 endpoint_config
=file_metadata_config
,
399 spreadsheet_id
=spreadsheet_id
)
400 # Transform file_metadata
401 LOGGER
.info('Transform file_meatadata')
402 file_metadata_tf
= transform_file_metadata(file_metadata
)
403 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
405 # Check if file has changed, if not break (return to __init__)
406 last_datetime
= strptime_to_utc(get_bookmark(state
, stream_name
, start_date
))
407 this_datetime
= strptime_to_utc(file_metadata
.get('modifiedTime'))
408 LOGGER
.info('last_datetime = {}, this_datetime = {}'.format(last_datetime
, this_datetime
))
409 if this_datetime
<= last_datetime
:
410 LOGGER
.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
411 # Update file_metadata bookmark
412 write_bookmark(state
, 'file_metadata', strftime(this_datetime
))
414 # Sync file_metadata if selected
415 sync_stream(stream_name
, selected_streams
, catalog
, state
, file_metadata_tf
, time_extracted
)
416 # file_metadata bookmark is updated at the end of sync
418 # SPREADSHEET_METADATA
419 spreadsheet_metadata
= {}
420 stream_name
= 'spreadsheet_metadata'
421 spreadsheet_metadata_config
= STREAMS
.get(stream_name
)
423 # GET spreadsheet_metadata
424 LOGGER
.info('GET spreadsheet_meatadata')
425 spreadsheet_metadata
, ss_time_extracted
= get_data(
426 stream_name
=stream_name
,
427 endpoint_config
=spreadsheet_metadata_config
,
429 spreadsheet_id
=spreadsheet_id
)
431 # Transform spreadsheet_metadata
432 LOGGER
.info('Transform spreadsheet_meatadata')
433 spreadsheet_metadata_tf
= transform_spreadsheet_metadata(spreadsheet_metadata
)
435 # Sync spreadsheet_metadata if selected
436 sync_stream(stream_name
, selected_streams
, catalog
, state
, spreadsheet_metadata_tf
, \
439 # SHEET_METADATA and SHEET_DATA
440 sheets
= spreadsheet_metadata
.get('sheets')
443 sheets_loaded_config
= STREAMS
['sheets_loaded']
445 # Loop thru sheets (worksheet tabs) in spreadsheet
447 sheet_title
= sheet
.get('properties', {}).get('title')
448 sheet_id
= sheet
.get('properties', {}).get('sheetId')
450 # GET sheet_metadata and columns
451 sheet_schema
, columns
= get_sheet_metadata(sheet
, spreadsheet_id
, client
)
452 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
454 # SKIP empty sheets (where sheet_schema and columns are None)
455 if not sheet_schema
or not columns
:
456 LOGGER
.info('SKIPPING Empty Sheet: {}'.format(sheet_title
))
458 # Transform sheet_metadata
459 sheet_metadata_tf
= transform_sheet_metadata(spreadsheet_id
, sheet
, columns
)
460 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
461 sheet_metadata
.append(sheet_metadata_tf
)
464 # Should this worksheet tab be synced?
465 if sheet_title
in selected_streams
:
466 LOGGER
.info('STARTED Syncing Sheet {}'.format(sheet_title
))
467 update_currently_syncing(state
, sheet_title
)
468 selected_fields
= get_selected_fields(catalog
, sheet_title
)
469 LOGGER
.info('Stream: {}, selected_fields: {}'.format(sheet_title
, selected_fields
))
470 write_schema(catalog
, sheet_title
)
472 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
473 # everytime after each sheet sync is complete.
474 # This forces hard deletes on the data downstream if fewer records are sent.
475 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
476 last_integer
= int(get_bookmark(state
, sheet_title
, 0))
477 activate_version
= int(time
.time() * 1000)
478 activate_version_message
= singer
.ActivateVersionMessage(
480 version
=activate_version
)
481 if last_integer
== 0:
482 # initial load, send activate_version before AND after data sync
483 singer
.write_message(activate_version_message
)
484 LOGGER
.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title
, activate_version
))
486 # Determine max range of columns and rows for "paging" through the data
487 sheet_last_col_index
= 1
488 sheet_last_col_letter
= 'A'
490 col_index
= col
.get('columnIndex')
491 col_letter
= col
.get('columnLetter')
492 if col_index
> sheet_last_col_index
:
493 sheet_last_col_index
= col_index
494 sheet_last_col_letter
= col_letter
495 sheet_max_row
= sheet
.get('properties').get('gridProperties', {}).get('rowCount')
497 # Initialize paging for 1st batch
501 if sheet_max_row
< batch_rows
:
502 to_row
= sheet_max_row
506 # Loop thru batches (each having 200 rows of data)
507 while not is_last_row
and from_row
< sheet_max_row
and to_row
<= sheet_max_row
:
508 range_rows
= 'A{}:{}{}'.format(from_row
, sheet_last_col_letter
, to_row
)
510 # GET sheet_data for a worksheet tab
511 sheet_data
, time_extracted
= get_data(
512 stream_name
=sheet_title
,
513 endpoint_config
=sheets_loaded_config
,
515 spreadsheet_id
=spreadsheet_id
,
516 range_rows
=range_rows
)
517 # Data is returned as a list of arrays, an array of values for each row
518 sheet_data_rows
= sheet_data
.get('values', [])
520 # Transform batch of rows to JSON with keys for each column
521 sheet_data_tf
, row_num
= transform_sheet_data(
522 spreadsheet_id
=spreadsheet_id
,
524 sheet_title
=sheet_title
,
527 sheet_data_rows
=sheet_data_rows
)
531 # Process records, send batch of records to target
532 record_count
= process_records(
534 stream_name
=sheet_title
,
535 records
=sheet_data_tf
,
536 time_extracted
=ss_time_extracted
,
537 version
=activate_version
)
538 LOGGER
.info('Sheet: {}, records processed: {}'.format(
539 sheet_title
, record_count
))
541 # Update paging from/to_row for next batch
542 from_row
= to_row
+ 1
543 if to_row
+ batch_rows
> sheet_max_row
:
544 to_row
= sheet_max_row
546 to_row
= to_row
+ batch_rows
548 # End of Stream: Send Activate Version and update State
549 singer
.write_message(activate_version_message
)
550 write_bookmark(state
, sheet_title
, activate_version
)
551 LOGGER
.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title
, activate_version
))
552 LOGGER
.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
553 sheet_title
, row_num
- 2)) # subtract 1 for header row
554 update_currently_syncing(state
, None)
557 # Add sheet to sheets_loaded
559 sheet_loaded
['spreadsheetId'] = spreadsheet_id
560 sheet_loaded
['sheetId'] = sheet_id
561 sheet_loaded
['title'] = sheet_title
562 sheet_loaded
['loadDate'] = strftime(utils
.now())
563 sheet_loaded
['lastRowNumber'] = row_num
564 sheets_loaded
.append(sheet_loaded
)
566 stream_name
= 'sheet_metadata'
567 # Sync sheet_metadata if selected
568 sync_stream(stream_name
, selected_streams
, catalog
, state
, sheet_metadata
)
570 stream_name
= 'sheets_loaded'
571 # Sync sheet_metadata if selected
572 sync_stream(stream_name
, selected_streams
, catalog
, state
, sheets_loaded
)
574 # Update file_metadata bookmark
575 write_bookmark(state
, 'file_metadata', strftime(this_datetime
))