4 from datetime
import datetime
, timedelta
7 from singer
import metrics
, metadata
, Transformer
, utils
8 from singer
.utils
import strptime_to_utc
, strftime
9 from singer
.messages
import RecordMessage
10 from tap_google_sheets
.streams
import STREAMS
11 from tap_google_sheets
.schema
import get_sheet_metadata
13 LOGGER
= singer
.get_logger()
16 def write_schema(catalog
, stream_name
):
17 stream
= catalog
.get_stream(stream_name
)
18 schema
= stream
.schema
.to_dict()
20 singer
.write_schema(stream_name
, schema
, stream
.key_properties
)
21 LOGGER
.info('Writing schema for: {}'.format(stream_name
))
22 except OSError as err
:
23 LOGGER
.info('OS Error writing schema for: {}'.format(stream_name
))
27 def write_record(stream_name
, record
, time_extracted
, version
=None):
30 singer
.messages
.write_message(
35 time_extracted
=time_extracted
))
37 singer
.messages
.write_record(
38 stream_name
=stream_name
,
40 time_extracted
=time_extracted
)
41 except OSError as err
:
42 LOGGER
.info('OS Error writing record for: {}'.format(stream_name
))
43 LOGGER
.info('record: {}'.format(record
))
47 def get_bookmark(state
, stream
, default
):
48 if (state
is None) or ('bookmarks' not in state
):
57 def write_bookmark(state
, stream
, value
):
58 if 'bookmarks' not in state
:
59 state
['bookmarks'] = {}
60 state
['bookmarks'][stream
] = value
61 LOGGER
.info('Write state for stream: {}, value: {}'.format(stream
, value
))
62 singer
.write_state(state
)
65 # Transform/validate batch of records w/ schema and sent to target
66 def process_records(catalog
,
71 stream
= catalog
.get_stream(stream_name
)
72 schema
= stream
.schema
.to_dict()
73 stream_metadata
= metadata
.to_map(stream
.metadata
)
74 with metrics
.record_counter(stream_name
) as counter
:
75 for record
in records
:
76 # Transform record for Singer.io
77 with Transformer() as transformer
:
78 transformed_record
= transformer
.transform(
83 stream_name
=stream_name
,
84 record
=transformed_record
,
85 time_extracted
=time_extracted
,
91 def sync_stream(stream_name
, selected_streams
, catalog
, state
, records
, time_extracted
=None):
92 # Should sheets_loaded be synced?
93 if stream_name
in selected_streams
:
94 LOGGER
.info('STARTED Syncing {}'.format(stream_name
))
95 update_currently_syncing(state
, stream_name
)
96 selected_fields
= get_selected_fields(catalog
, stream_name
)
97 LOGGER
.info('Stream: {}, selected_fields: {}'.format(stream_name
, selected_fields
))
98 write_schema(catalog
, stream_name
)
99 if not time_extracted
:
100 time_extracted
= utils
.now()
101 record_count
= process_records(
103 stream_name
=stream_name
,
105 time_extracted
=time_extracted
)
106 LOGGER
.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name
, record_count
))
107 update_currently_syncing(state
, None)
110 # Currently syncing sets the stream currently being delivered in the state.
111 # If the integration is interrupted, this state property is used to identify
112 # the starting point to continue from.
113 # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
114 def update_currently_syncing(state
, stream_name
):
115 if (stream_name
is None) and ('currently_syncing' in state
):
116 del state
['currently_syncing']
118 singer
.set_currently_syncing(state
, stream_name
)
119 singer
.write_state(state
)
122 # List selected fields from stream catalog
123 def get_selected_fields(catalog
, stream_name
):
124 stream
= catalog
.get_stream(stream_name
)
125 mdata
= metadata
.to_map(stream
.metadata
)
126 mdata_list
= singer
.metadata
.to_list(mdata
)
128 for entry
in mdata_list
:
131 field
= entry
['breadcrumb'][1]
132 if entry
.get('metadata', {}).get('selected', False):
133 selected_fields
.append(field
)
136 return selected_fields
139 def get_data(stream_name
,
146 # Replace {placeholder} variables in path
147 path
= endpoint_config
.get('path', stream_name
).replace(
148 '{spreadsheet_id}', spreadsheet_id
).replace('{sheet_title}', stream_name
).replace(
149 '{range_rows}', range_rows
)
150 params
= endpoint_config
.get('params', {})
151 api
= endpoint_config
.get('api', 'sheets')
152 # Add in querystring parameters and replace {placeholder} variables
153 # querystring function ensures parameters are added but not encoded causing API errors
154 querystring
= '&'.join(['%s=%s' % (key
, value
) for (key
, value
) in params
.items()]).replace(
155 '{sheet_title}', stream_name
)
157 time_extracted
= utils
.now()
162 endpoint
=stream_name
)
163 return data
, time_extracted
166 # Tranform file_metadata: remove nodes from lastModifyingUser, format as array
167 def transform_file_metadata(file_metadata
):
169 file_metadata_tf
= json
.loads(json
.dumps(file_metadata
))
171 if file_metadata_tf
.get('lastModifyingUser'):
172 file_metadata_tf
['lastModifyingUser'].pop('photoLink', None)
173 file_metadata_tf
['lastModifyingUser'].pop('me', None)
174 file_metadata_tf
['lastModifyingUser'].pop('permissionId', None)
175 # Add record to an array of 1
176 file_metadata_arr
= []
177 file_metadata_arr
.append(file_metadata_tf
)
178 return file_metadata_arr
181 # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
182 def transform_spreadsheet_metadata(spreadsheet_metadata
):
184 spreadsheet_metadata_tf
= json
.loads(json
.dumps(spreadsheet_metadata
))
185 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
186 if spreadsheet_metadata_tf
.get('properties'):
187 spreadsheet_metadata_tf
['properties'].pop('defaultFormat', None)
188 spreadsheet_metadata_tf
.pop('sheets', None)
189 # Add record to an array of 1
190 spreadsheet_metadata_arr
= []
191 spreadsheet_metadata_arr
.append(spreadsheet_metadata_tf
)
192 return spreadsheet_metadata_arr
195 # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
196 def transform_sheet_metadata(spreadsheet_id
, sheet
, columns
):
197 # Convert to properties to dict
198 sheet_metadata
= sheet
.get('properties')
199 sheet_metadata_tf
= json
.loads(json
.dumps(sheet_metadata
))
200 sheet_id
= sheet_metadata_tf
.get('sheetId')
201 sheet_url
= 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
202 spreadsheet_id
, sheet_id
)
203 sheet_metadata_tf
['spreadsheetId'] = spreadsheet_id
204 sheet_metadata_tf
['sheetUrl'] = sheet_url
205 sheet_metadata_tf
['columns'] = columns
206 return sheet_metadata_tf
209 # Convert Excel Date Serial Number (excel_date_sn) to datetime string
210 # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
211 def excel_to_dttm_str(excel_date_sn
, timezone_str
=None):
214 tzn
= pytz
.timezone(timezone_str
)
216 excel_epoch
= 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
217 epoch_sec
= math
.floor((excel_date_sn
- excel_epoch
) * sec_per_day
)
218 epoch_dttm
= datetime(1970, 1, 1)
219 excel_dttm
= epoch_dttm
+ timedelta(seconds
=epoch_sec
)
220 utc_dttm
= tzn
.localize(excel_dttm
).astimezone(pytz
.utc
)
221 utc_dttm_str
= strftime(utc_dttm
)
225 # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
226 # Convert from array of values to JSON with column names as keys
227 def transform_sheet_data(spreadsheet_id
, sheet_id
, sheet_title
, from_row
, columns
, sheet_data_rows
):
230 # Create sorted list of columns based on columnIndex
231 cols
= sorted(columns
, key
=lambda i
: i
['columnIndex'])
233 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
234 for row
in sheet_data_rows
:
237 LOGGER
.info('EMPTY ROW: {}, SKIPPING'.format(row_num
))
239 sheet_data_row_tf
= {}
240 # Add spreadsheet_id, sheet_id, and row
241 sheet_data_row_tf
['__sdc_spreadsheet_id'] = spreadsheet_id
242 sheet_data_row_tf
['__sdc_sheet_id'] = sheet_id
243 sheet_data_row_tf
['__sdc_row'] = row_num
246 # Select column metadata based on column index
247 col
= cols
[col_num
- 1]
248 col_skipped
= col
.get('columnSkipped')
250 # Get column metadata
251 col_name
= col
.get('columnName')
252 col_type
= col
.get('columnType')
253 col_letter
= col
.get('columnLetter')
256 if value
is None or value
== '':
259 # Convert dates/times from Lotus Notes Serial Numbers
261 elif col_type
== 'numberType.DATE_TIME':
262 if isinstance(value
, (int, float)):
263 col_val
= excel_to_dttm_str(value
)
266 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
267 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
269 elif col_type
== 'numberType.DATE':
270 if isinstance(value
, (int, float)):
271 col_val
= excel_to_dttm_str(value
)[:10]
274 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
275 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
276 # TIME ONLY (NO DATE)
277 elif col_type
== 'numberType.TIME':
278 if isinstance(value
, (int, float)):
280 total_secs
= value
* 86400 # seconds in day
281 # Create string formatted like HH:MM:SS
282 col_val
= str(timedelta(seconds
=total_secs
))
285 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
286 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
289 # NUMBER (INTEGER AND FLOAT)
290 elif col_type
== 'numberType':
291 if isinstance(value
, int):
293 elif isinstance(value
, float):
294 # Determine float decimal digits
295 decimal_digits
= str(value
)[::-1].find('.')
296 if decimal_digits
> 15:
298 # ROUND to multipleOf: 1e-15
299 col_val
= float(round(value
, 15))
302 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
303 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
304 else: # decimal_digits <= 15, no rounding
306 col_val
= float(value
)
309 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
310 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
313 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
314 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
316 elif col_type
== 'stringValue':
319 elif col_type
== 'boolValue':
320 if isinstance(value
, bool):
322 elif isinstance(value
, str):
323 if value
.lower() in ('true', 't', 'yes', 'y'):
325 elif value
.lower() in ('false', 'f', 'no', 'n'):
329 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
330 sheet_title
, col_name
, col_letter
, row
, col_type
, value
))
331 elif isinstance(value
, int):
338 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
339 sheet_title
, col_name
, col_letter
, row
, col_type
, value
))
340 # OTHER: Convert everything else to a string
343 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
344 sheet_title
, col_name
, col_letter
, row
, col_type
, value
))
345 sheet_data_row_tf
[col_name
] = col_val
346 col_num
= col_num
+ 1
347 # APPEND non-empty row
348 sheet_data_tf
.append(sheet_data_row_tf
)
349 row_num
= row_num
+ 1
350 return sheet_data_tf
, row_num
353 def sync(client
, config
, catalog
, state
):
354 start_date
= config
.get('start_date')
355 spreadsheet_id
= config
.get('spreadsheet_id')
357 # Get selected_streams from catalog, based on state last_stream
358 # last_stream = Previous currently synced stream, if the load was interrupted
359 last_stream
= singer
.get_currently_syncing(state
)
360 LOGGER
.info('last/currently syncing stream: {}'.format(last_stream
))
361 selected_streams
= []
362 for stream
in catalog
.get_selected_streams(state
):
363 selected_streams
.append(stream
.stream
)
364 LOGGER
.info('selected_streams: {}'.format(selected_streams
))
366 if not selected_streams
:
371 stream_name
= 'file_metadata'
372 file_metadata_config
= STREAMS
.get(stream_name
)
375 LOGGER
.info('GET file_meatadata')
376 file_metadata
, time_extracted
= get_data(stream_name
=stream_name
,
377 endpoint_config
=file_metadata_config
,
379 spreadsheet_id
=spreadsheet_id
)
380 # Transform file_metadata
381 LOGGER
.info('Transform file_meatadata')
382 file_metadata_tf
= transform_file_metadata(file_metadata
)
383 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
385 # Check if file has changed, if not break (return to __init__)
386 last_datetime
= strptime_to_utc(get_bookmark(state
, stream_name
, start_date
))
387 this_datetime
= strptime_to_utc(file_metadata
.get('modifiedTime'))
388 LOGGER
.info('last_datetime = {}, this_datetime = {}'.format(last_datetime
, this_datetime
))
389 if this_datetime
<= last_datetime
:
390 LOGGER
.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
392 # Sync file_metadata if selected
393 sync_stream(stream_name
, selected_streams
, catalog
, state
, file_metadata_tf
, time_extracted
)
394 # file_metadata bookmark is updated at the end of sync
396 # SPREADSHEET_METADATA
397 spreadsheet_metadata
= {}
398 stream_name
= 'spreadsheet_metadata'
399 spreadsheet_metadata_config
= STREAMS
.get(stream_name
)
401 # GET spreadsheet_metadata
402 LOGGER
.info('GET spreadsheet_meatadata')
403 spreadsheet_metadata
, ss_time_extracted
= get_data(
404 stream_name
=stream_name
,
405 endpoint_config
=spreadsheet_metadata_config
,
407 spreadsheet_id
=spreadsheet_id
)
409 # Transform spreadsheet_metadata
410 LOGGER
.info('Transform spreadsheet_meatadata')
411 spreadsheet_metadata_tf
= transform_spreadsheet_metadata(spreadsheet_metadata
)
413 # Sync spreadsheet_metadata if selected
414 sync_stream(stream_name
, selected_streams
, catalog
, state
, spreadsheet_metadata_tf
, \
417 # SHEET_METADATA and SHEET_DATA
418 sheets
= spreadsheet_metadata
.get('sheets')
421 sheets_loaded_config
= STREAMS
['sheets_loaded']
423 # Loop thru sheets (worksheet tabs) in spreadsheet
425 sheet_title
= sheet
.get('properties', {}).get('title')
426 sheet_id
= sheet
.get('properties', {}).get('sheetId')
428 # GET sheet_metadata and columns
429 sheet_schema
, columns
= get_sheet_metadata(sheet
, spreadsheet_id
, client
)
430 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
432 # Transform sheet_metadata
433 sheet_metadata_tf
= transform_sheet_metadata(spreadsheet_id
, sheet
, columns
)
434 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
435 sheet_metadata
.append(sheet_metadata_tf
)
438 # Should this worksheet tab be synced?
439 if sheet_title
in selected_streams
:
440 LOGGER
.info('STARTED Syncing Sheet {}'.format(sheet_title
))
441 update_currently_syncing(state
, sheet_title
)
442 selected_fields
= get_selected_fields(catalog
, sheet_title
)
443 LOGGER
.info('Stream: {}, selected_fields: {}'.format(sheet_title
, selected_fields
))
444 write_schema(catalog
, sheet_title
)
446 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
447 # everytime after each sheet sync is complete.
448 # This forces hard deletes on the data downstream if fewer records are sent.
449 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
450 last_integer
= int(get_bookmark(state
, sheet_title
, 0))
451 activate_version
= int(time
.time() * 1000)
452 activate_version_message
= singer
.ActivateVersionMessage(
454 version
=activate_version
)
455 if last_integer
== 0:
456 # initial load, send activate_version before AND after data sync
457 singer
.write_message(activate_version_message
)
458 LOGGER
.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title
, activate_version
))
460 # Determine max range of columns and rows for "paging" through the data
461 sheet_last_col_index
= 1
462 sheet_last_col_letter
= 'A'
464 col_index
= col
.get('columnIndex')
465 col_letter
= col
.get('columnLetter')
466 if col_index
> sheet_last_col_index
:
467 sheet_last_col_index
= col_index
468 sheet_last_col_letter
= col_letter
469 sheet_max_row
= sheet
.get('properties').get('gridProperties', {}).get('rowCount')
471 # Initialize paging for 1st batch
475 if sheet_max_row
< batch_rows
:
476 to_row
= sheet_max_row
480 # Loop thru batches (each having 200 rows of data)
481 while not is_last_row
and from_row
< sheet_max_row
and to_row
<= sheet_max_row
:
482 range_rows
= 'A{}:{}{}'.format(from_row
, sheet_last_col_letter
, to_row
)
484 # GET sheet_data for a worksheet tab
485 sheet_data
, time_extracted
= get_data(
486 stream_name
=sheet_title
,
487 endpoint_config
=sheets_loaded_config
,
489 spreadsheet_id
=spreadsheet_id
,
490 range_rows
=range_rows
)
491 # Data is returned as a list of arrays, an array of values for each row
492 sheet_data_rows
= sheet_data
.get('values')
494 # Transform batch of rows to JSON with keys for each column
495 sheet_data_tf
, row_num
= transform_sheet_data(
496 spreadsheet_id
=spreadsheet_id
,
498 sheet_title
=sheet_title
,
501 sheet_data_rows
=sheet_data_rows
)
505 # Process records, send batch of records to target
506 record_count
= process_records(
508 stream_name
=sheet_title
,
509 records
=sheet_data_tf
,
510 time_extracted
=ss_time_extracted
,
511 version
=activate_version
)
512 LOGGER
.info('Sheet: {}, records processed: {}'.format(
513 sheet_title
, record_count
))
515 # Update paging from/to_row for next batch
516 from_row
= to_row
+ 1
517 if to_row
+ batch_rows
> sheet_max_row
:
518 to_row
= sheet_max_row
520 to_row
= to_row
+ batch_rows
522 # End of Stream: Send Activate Version and update State
523 singer
.write_message(activate_version_message
)
524 write_bookmark(state
, sheet_title
, activate_version
)
525 LOGGER
.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title
, activate_version
))
526 LOGGER
.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
527 sheet_title
, row_num
- 2)) # subtract 1 for header row
528 update_currently_syncing(state
, None)
531 # Add sheet to sheets_loaded
533 sheet_loaded
['spreadsheetId'] = spreadsheet_id
534 sheet_loaded
['sheetId'] = sheet_id
535 sheet_loaded
['title'] = sheet_title
536 sheet_loaded
['loadDate'] = strftime(utils
.now())
537 sheet_loaded
['lastRowNumber'] = row_num
538 sheets_loaded
.append(sheet_loaded
)
540 stream_name
= 'sheet_metadata'
541 # Sync sheet_metadata if selected
542 sync_stream(stream_name
, selected_streams
, catalog
, state
, sheet_metadata
)
544 stream_name
= 'sheets_loaded'
545 # Sync sheet_metadata if selected
546 sync_stream(stream_name
, selected_streams
, catalog
, state
, sheets_loaded
)
548 # Update file_metadata bookmark
549 write_bookmark(state
, 'file_metadata', strftime(this_datetime
))