6 from datetime
import datetime
, timedelta
9 from singer
import metrics
, metadata
, Transformer
, utils
10 from singer
.utils
import strptime_to_utc
, strftime
11 from singer
.messages
import RecordMessage
12 from tap_google_sheets
.streams
import STREAMS
13 from tap_google_sheets
.schema
import get_sheet_metadata
15 LOGGER
= singer
.get_logger()
18 def write_schema(catalog
, stream_name
):
19 stream
= catalog
.get_stream(stream_name
)
20 schema
= stream
.schema
.to_dict()
22 singer
.write_schema(stream_name
, schema
, stream
.key_properties
)
23 LOGGER
.info('Writing schema for: {}'.format(stream_name
))
24 except OSError as err
:
25 LOGGER
.info('OS Error writing schema for: {}'.format(stream_name
))
29 def write_record(stream_name
, record
, time_extracted
, version
=None):
32 singer
.messages
.write_message(
37 time_extracted
=time_extracted
))
39 singer
.messages
.write_record(
40 stream_name
=stream_name
,
42 time_extracted
=time_extracted
)
43 except OSError as err
:
44 LOGGER
.info('OS Error writing record for: {}'.format(stream_name
))
45 LOGGER
.info('record: {}'.format(record
))
49 def get_bookmark(state
, stream
, default
):
50 if (state
is None) or ('bookmarks' not in state
):
59 def write_bookmark(state
, stream
, value
):
60 if 'bookmarks' not in state
:
61 state
['bookmarks'] = {}
62 state
['bookmarks'][stream
] = value
63 LOGGER
.info('Write state for stream: {}, value: {}'.format(stream
, value
))
64 singer
.write_state(state
)
67 # Transform/validate batch of records w/ schema and sent to target
68 def process_records(catalog
,
73 stream
= catalog
.get_stream(stream_name
)
74 schema
= stream
.schema
.to_dict()
75 stream_metadata
= metadata
.to_map(stream
.metadata
)
76 with metrics
.record_counter(stream_name
) as counter
:
77 for record
in records
:
78 # Transform record for Singer.io
79 with Transformer() as transformer
:
81 transformed_record
= transformer
.transform(
85 except Exception as err
:
86 LOGGER
.error('{}'.format(err
))
87 raise RuntimeError(err
)
89 stream_name
=stream_name
,
90 record
=transformed_record
,
91 time_extracted
=time_extracted
,
97 def sync_stream(stream_name
, selected_streams
, catalog
, state
, records
, time_extracted
=None):
98 # Should sheets_loaded be synced?
99 if stream_name
in selected_streams
:
100 LOGGER
.info('STARTED Syncing {}'.format(stream_name
))
101 update_currently_syncing(state
, stream_name
)
102 selected_fields
= get_selected_fields(catalog
, stream_name
)
103 LOGGER
.info('Stream: {}, selected_fields: {}'.format(stream_name
, selected_fields
))
104 write_schema(catalog
, stream_name
)
105 if not time_extracted
:
106 time_extracted
= utils
.now()
107 record_count
= process_records(
109 stream_name
=stream_name
,
111 time_extracted
=time_extracted
)
112 LOGGER
.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name
, record_count
))
113 update_currently_syncing(state
, None)
116 # Currently syncing sets the stream currently being delivered in the state.
117 # If the integration is interrupted, this state property is used to identify
118 # the starting point to continue from.
119 # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
120 def update_currently_syncing(state
, stream_name
):
121 if (stream_name
is None) and ('currently_syncing' in state
):
122 del state
['currently_syncing']
124 singer
.set_currently_syncing(state
, stream_name
)
125 singer
.write_state(state
)
128 # List selected fields from stream catalog
129 def get_selected_fields(catalog
, stream_name
):
130 stream
= catalog
.get_stream(stream_name
)
131 mdata
= metadata
.to_map(stream
.metadata
)
132 mdata_list
= singer
.metadata
.to_list(mdata
)
134 for entry
in mdata_list
:
137 field
= entry
['breadcrumb'][1]
138 if entry
.get('metadata', {}).get('selected', False):
139 selected_fields
.append(field
)
142 return selected_fields
145 def get_data(stream_name
,
152 # Replace {placeholder} variables in path
153 # Encode stream_name: fixes issue w/ special characters in sheet name
154 stream_name_escaped
= re
.escape(stream_name
)
155 stream_name_encoded
= urllib
.parse
.quote_plus(stream_name
)
156 path
= endpoint_config
.get('path', stream_name
).replace(
157 '{spreadsheet_id}', spreadsheet_id
).replace('{sheet_title}', stream_name_encoded
).replace(
158 '{range_rows}', range_rows
)
159 params
= endpoint_config
.get('params', {})
160 api
= endpoint_config
.get('api', 'sheets')
161 # Add in querystring parameters and replace {placeholder} variables
162 # querystring function ensures parameters are added but not encoded causing API errors
163 querystring
= '&'.join(['%s=%s' % (key
, value
) for (key
, value
) in params
.items()]).replace(
164 '{sheet_title}', stream_name_encoded
)
165 LOGGER
.info('URL: {}/{}?{}'.format(client
.base_url
, path
, querystring
))
167 time_extracted
= utils
.now()
172 endpoint
=stream_name_escaped
)
173 return data
, time_extracted
176 # Tranform file_metadata: remove nodes from lastModifyingUser, format as array
177 def transform_file_metadata(file_metadata
):
179 file_metadata_tf
= json
.loads(json
.dumps(file_metadata
))
181 if file_metadata_tf
.get('lastModifyingUser'):
182 file_metadata_tf
['lastModifyingUser'].pop('photoLink', None)
183 file_metadata_tf
['lastModifyingUser'].pop('me', None)
184 file_metadata_tf
['lastModifyingUser'].pop('permissionId', None)
185 # Add record to an array of 1
186 file_metadata_arr
= []
187 file_metadata_arr
.append(file_metadata_tf
)
188 return file_metadata_arr
191 # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
192 def transform_spreadsheet_metadata(spreadsheet_metadata
):
194 spreadsheet_metadata_tf
= json
.loads(json
.dumps(spreadsheet_metadata
))
195 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
196 if spreadsheet_metadata_tf
.get('properties'):
197 spreadsheet_metadata_tf
['properties'].pop('defaultFormat', None)
198 spreadsheet_metadata_tf
.pop('sheets', None)
199 # Add record to an array of 1
200 spreadsheet_metadata_arr
= []
201 spreadsheet_metadata_arr
.append(spreadsheet_metadata_tf
)
202 return spreadsheet_metadata_arr
205 # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
206 def transform_sheet_metadata(spreadsheet_id
, sheet
, columns
):
207 # Convert to properties to dict
208 sheet_metadata
= sheet
.get('properties')
209 sheet_metadata_tf
= json
.loads(json
.dumps(sheet_metadata
))
210 sheet_id
= sheet_metadata_tf
.get('sheetId')
211 sheet_url
= 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
212 spreadsheet_id
, sheet_id
)
213 sheet_metadata_tf
['spreadsheetId'] = spreadsheet_id
214 sheet_metadata_tf
['sheetUrl'] = sheet_url
215 sheet_metadata_tf
['columns'] = columns
216 return sheet_metadata_tf
219 # Convert Excel Date Serial Number (excel_date_sn) to datetime string
220 # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
221 def excel_to_dttm_str(excel_date_sn
, timezone_str
=None):
224 tzn
= pytz
.timezone(timezone_str
)
226 excel_epoch
= 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
227 epoch_sec
= math
.floor((excel_date_sn
- excel_epoch
) * sec_per_day
)
228 epoch_dttm
= datetime(1970, 1, 1)
229 excel_dttm
= epoch_dttm
+ timedelta(seconds
=epoch_sec
)
230 utc_dttm
= tzn
.localize(excel_dttm
).astimezone(pytz
.utc
)
231 utc_dttm_str
= strftime(utc_dttm
)
235 # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
236 # Convert from array of values to JSON with column names as keys
237 def transform_sheet_data(spreadsheet_id
, sheet_id
, sheet_title
, from_row
, columns
, sheet_data_rows
):
240 # Create sorted list of columns based on columnIndex
241 cols
= sorted(columns
, key
=lambda i
: i
['columnIndex'])
243 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
244 for row
in sheet_data_rows
:
247 LOGGER
.info('EMPTY ROW: {}, SKIPPING'.format(row_num
))
249 sheet_data_row_tf
= {}
250 # Add spreadsheet_id, sheet_id, and row
251 sheet_data_row_tf
['__sdc_spreadsheet_id'] = spreadsheet_id
252 sheet_data_row_tf
['__sdc_sheet_id'] = sheet_id
253 sheet_data_row_tf
['__sdc_row'] = row_num
256 # Select column metadata based on column index
257 col
= cols
[col_num
- 1]
258 col_skipped
= col
.get('columnSkipped')
260 # Get column metadata
261 col_name
= col
.get('columnName')
262 col_type
= col
.get('columnType')
263 col_letter
= col
.get('columnLetter')
266 if value
is None or value
== '':
269 # Convert dates/times from Lotus Notes Serial Numbers
271 elif col_type
== 'numberType.DATE_TIME':
272 if isinstance(value
, (int, float)):
273 col_val
= excel_to_dttm_str(value
)
276 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
277 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
279 elif col_type
== 'numberType.DATE':
280 if isinstance(value
, (int, float)):
281 col_val
= excel_to_dttm_str(value
)[:10]
284 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
285 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
286 # TIME ONLY (NO DATE)
287 elif col_type
== 'numberType.TIME':
288 if isinstance(value
, (int, float)):
290 total_secs
= value
* 86400 # seconds in day
291 # Create string formatted like HH:MM:SS
292 col_val
= str(timedelta(seconds
=total_secs
))
295 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
296 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
299 # NUMBER (INTEGER AND FLOAT)
300 elif col_type
== 'numberType':
301 if isinstance(value
, int):
303 elif isinstance(value
, float):
304 # Determine float decimal digits
305 decimal_digits
= str(value
)[::-1].find('.')
306 if decimal_digits
> 15:
308 # ROUND to multipleOf: 1e-15
309 col_val
= float(round(value
, 15))
312 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
313 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
314 else: # decimal_digits <= 15, no rounding
316 col_val
= float(value
)
319 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
320 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
323 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
324 sheet_title
, col_name
, col_letter
, row_num
, col_type
, value
))
326 elif col_type
== 'stringValue':
329 elif col_type
== 'boolValue':
330 if isinstance(value
, bool):
332 elif isinstance(value
, str):
333 if value
.lower() in ('true', 't', 'yes', 'y'):
335 elif value
.lower() in ('false', 'f', 'no', 'n'):
339 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
340 sheet_title
, col_name
, col_letter
, row
, col_type
, value
))
341 elif isinstance(value
, int):
348 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
349 sheet_title
, col_name
, col_letter
, row
, col_type
, value
))
350 # OTHER: Convert everything else to a string
353 LOGGER
.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
354 sheet_title
, col_name
, col_letter
, row
, col_type
, value
))
355 sheet_data_row_tf
[col_name
] = col_val
356 col_num
= col_num
+ 1
357 # APPEND non-empty row
358 sheet_data_tf
.append(sheet_data_row_tf
)
359 row_num
= row_num
+ 1
360 return sheet_data_tf
, row_num
363 def sync(client
, config
, catalog
, state
):
364 start_date
= config
.get('start_date')
365 spreadsheet_id
= config
.get('spreadsheet_id')
367 # Get selected_streams from catalog, based on state last_stream
368 # last_stream = Previous currently synced stream, if the load was interrupted
369 last_stream
= singer
.get_currently_syncing(state
)
370 LOGGER
.info('last/currently syncing stream: {}'.format(last_stream
))
371 selected_streams
= []
372 for stream
in catalog
.get_selected_streams(state
):
373 selected_streams
.append(stream
.stream
)
374 LOGGER
.info('selected_streams: {}'.format(selected_streams
))
376 if not selected_streams
:
381 stream_name
= 'file_metadata'
382 file_metadata_config
= STREAMS
.get(stream_name
)
385 LOGGER
.info('GET file_meatadata')
386 file_metadata
, time_extracted
= get_data(stream_name
=stream_name
,
387 endpoint_config
=file_metadata_config
,
389 spreadsheet_id
=spreadsheet_id
)
390 # Transform file_metadata
391 LOGGER
.info('Transform file_meatadata')
392 file_metadata_tf
= transform_file_metadata(file_metadata
)
393 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
395 # Check if file has changed, if not break (return to __init__)
396 last_datetime
= strptime_to_utc(get_bookmark(state
, stream_name
, start_date
))
397 this_datetime
= strptime_to_utc(file_metadata
.get('modifiedTime'))
398 LOGGER
.info('last_datetime = {}, this_datetime = {}'.format(last_datetime
, this_datetime
))
399 if this_datetime
<= last_datetime
:
400 LOGGER
.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
401 # Update file_metadata bookmark
402 write_bookmark(state
, 'file_metadata', strftime(this_datetime
))
404 # Sync file_metadata if selected
405 sync_stream(stream_name
, selected_streams
, catalog
, state
, file_metadata_tf
, time_extracted
)
406 # file_metadata bookmark is updated at the end of sync
408 # SPREADSHEET_METADATA
409 spreadsheet_metadata
= {}
410 stream_name
= 'spreadsheet_metadata'
411 spreadsheet_metadata_config
= STREAMS
.get(stream_name
)
413 # GET spreadsheet_metadata
414 LOGGER
.info('GET spreadsheet_meatadata')
415 spreadsheet_metadata
, ss_time_extracted
= get_data(
416 stream_name
=stream_name
,
417 endpoint_config
=spreadsheet_metadata_config
,
419 spreadsheet_id
=spreadsheet_id
)
421 # Transform spreadsheet_metadata
422 LOGGER
.info('Transform spreadsheet_meatadata')
423 spreadsheet_metadata_tf
= transform_spreadsheet_metadata(spreadsheet_metadata
)
425 # Sync spreadsheet_metadata if selected
426 sync_stream(stream_name
, selected_streams
, catalog
, state
, spreadsheet_metadata_tf
, \
429 # SHEET_METADATA and SHEET_DATA
430 sheets
= spreadsheet_metadata
.get('sheets')
433 sheets_loaded_config
= STREAMS
['sheets_loaded']
435 # Loop thru sheets (worksheet tabs) in spreadsheet
437 sheet_title
= sheet
.get('properties', {}).get('title')
438 sheet_id
= sheet
.get('properties', {}).get('sheetId')
440 # GET sheet_metadata and columns
441 sheet_schema
, columns
= get_sheet_metadata(sheet
, spreadsheet_id
, client
)
442 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
444 # SKIP empty sheets (where sheet_schema and columns are None)
445 if not sheet_schema
or not columns
:
446 LOGGER
.info('SKIPPING Empty Sheet: {}'.format(sheet_title
))
448 # Transform sheet_metadata
449 sheet_metadata_tf
= transform_sheet_metadata(spreadsheet_id
, sheet
, columns
)
450 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
451 sheet_metadata
.append(sheet_metadata_tf
)
454 # Should this worksheet tab be synced?
455 if sheet_title
in selected_streams
:
456 LOGGER
.info('STARTED Syncing Sheet {}'.format(sheet_title
))
457 update_currently_syncing(state
, sheet_title
)
458 selected_fields
= get_selected_fields(catalog
, sheet_title
)
459 LOGGER
.info('Stream: {}, selected_fields: {}'.format(sheet_title
, selected_fields
))
460 write_schema(catalog
, sheet_title
)
462 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
463 # everytime after each sheet sync is complete.
464 # This forces hard deletes on the data downstream if fewer records are sent.
465 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
466 last_integer
= int(get_bookmark(state
, sheet_title
, 0))
467 activate_version
= int(time
.time() * 1000)
468 activate_version_message
= singer
.ActivateVersionMessage(
470 version
=activate_version
)
471 if last_integer
== 0:
472 # initial load, send activate_version before AND after data sync
473 singer
.write_message(activate_version_message
)
474 LOGGER
.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title
, activate_version
))
476 # Determine max range of columns and rows for "paging" through the data
477 sheet_last_col_index
= 1
478 sheet_last_col_letter
= 'A'
480 col_index
= col
.get('columnIndex')
481 col_letter
= col
.get('columnLetter')
482 if col_index
> sheet_last_col_index
:
483 sheet_last_col_index
= col_index
484 sheet_last_col_letter
= col_letter
485 sheet_max_row
= sheet
.get('properties').get('gridProperties', {}).get('rowCount')
487 # Initialize paging for 1st batch
491 if sheet_max_row
< batch_rows
:
492 to_row
= sheet_max_row
496 # Loop thru batches (each having 200 rows of data)
497 while not is_last_row
and from_row
< sheet_max_row
and to_row
<= sheet_max_row
:
498 range_rows
= 'A{}:{}{}'.format(from_row
, sheet_last_col_letter
, to_row
)
500 # GET sheet_data for a worksheet tab
501 sheet_data
, time_extracted
= get_data(
502 stream_name
=sheet_title
,
503 endpoint_config
=sheets_loaded_config
,
505 spreadsheet_id
=spreadsheet_id
,
506 range_rows
=range_rows
)
507 # Data is returned as a list of arrays, an array of values for each row
508 sheet_data_rows
= sheet_data
.get('values', [])
510 # Transform batch of rows to JSON with keys for each column
511 sheet_data_tf
, row_num
= transform_sheet_data(
512 spreadsheet_id
=spreadsheet_id
,
514 sheet_title
=sheet_title
,
517 sheet_data_rows
=sheet_data_rows
)
521 # Process records, send batch of records to target
522 record_count
= process_records(
524 stream_name
=sheet_title
,
525 records
=sheet_data_tf
,
526 time_extracted
=ss_time_extracted
,
527 version
=activate_version
)
528 LOGGER
.info('Sheet: {}, records processed: {}'.format(
529 sheet_title
, record_count
))
531 # Update paging from/to_row for next batch
532 from_row
= to_row
+ 1
533 if to_row
+ batch_rows
> sheet_max_row
:
534 to_row
= sheet_max_row
536 to_row
= to_row
+ batch_rows
538 # End of Stream: Send Activate Version and update State
539 singer
.write_message(activate_version_message
)
540 write_bookmark(state
, sheet_title
, activate_version
)
541 LOGGER
.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title
, activate_version
))
542 LOGGER
.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
543 sheet_title
, row_num
- 2)) # subtract 1 for header row
544 update_currently_syncing(state
, None)
547 # Add sheet to sheets_loaded
549 sheet_loaded
['spreadsheetId'] = spreadsheet_id
550 sheet_loaded
['sheetId'] = sheet_id
551 sheet_loaded
['title'] = sheet_title
552 sheet_loaded
['loadDate'] = strftime(utils
.now())
553 sheet_loaded
['lastRowNumber'] = row_num
554 sheets_loaded
.append(sheet_loaded
)
556 stream_name
= 'sheet_metadata'
557 # Sync sheet_metadata if selected
558 sync_stream(stream_name
, selected_streams
, catalog
, state
, sheet_metadata
)
560 stream_name
= 'sheets_loaded'
561 # Sync sheet_metadata if selected
562 sync_stream(stream_name
, selected_streams
, catalog
, state
, sheets_loaded
)
564 # Update file_metadata bookmark
565 write_bookmark(state
, 'file_metadata', strftime(this_datetime
))