4 from datetime
import datetime
, timedelta
7 from singer
import metrics
, metadata
, Transformer
, utils
8 from singer
.utils
import strptime_to_utc
, strftime
9 from tap_google_sheets
.streams
import STREAMS
10 from tap_google_sheets
.schema
import get_sheet_metadata
12 LOGGER
= singer
.get_logger()
15 def write_schema(catalog
, stream_name
):
16 stream
= catalog
.get_stream(stream_name
)
17 schema
= stream
.schema
.to_dict()
19 singer
.write_schema(stream_name
, schema
, stream
.key_properties
)
20 except OSError as err
:
21 LOGGER
.info('OS Error writing schema for: {}'.format(stream_name
))
25 def write_record(stream_name
, record
, time_extracted
):
27 singer
.messages
.write_record(stream_name
, record
, time_extracted
=time_extracted
)
28 except OSError as err
:
29 LOGGER
.info('OS Error writing record for: {}'.format(stream_name
))
30 LOGGER
.info('record: {}'.format(record
))
34 def get_bookmark(state
, stream
, default
):
35 if (state
is None) or ('bookmarks' not in state
):
44 def write_bookmark(state
, stream
, value
):
45 if 'bookmarks' not in state
:
46 state
['bookmarks'] = {}
47 state
['bookmarks'][stream
] = value
48 LOGGER
.info('Write state for stream: {}, value: {}'.format(stream
, value
))
49 singer
.write_state(state
)
52 # Transform/validate batch of records w/ schema and sent to target
53 def process_records(catalog
,
57 stream
= catalog
.get_stream(stream_name
)
58 schema
= stream
.schema
.to_dict()
59 stream_metadata
= metadata
.to_map(stream
.metadata
)
60 with metrics
.record_counter(stream_name
) as counter
:
61 for record
in records
:
62 # Transform record for Singer.io
63 with Transformer() as transformer
:
64 transformed_record
= transformer
.transform(
68 write_record(stream_name
, transformed_record
, time_extracted
=time_extracted
)
73 def sync_stream(stream_name
, selected_streams
, catalog
, state
, records
, time_extracted
=None):
74 # Should sheets_loaded be synced?
75 if stream_name
in selected_streams
:
76 LOGGER
.info('STARTED Syncing {}'.format(stream_name
))
77 update_currently_syncing(state
, stream_name
)
78 selected_fields
= get_selected_fields(catalog
, stream_name
)
79 LOGGER
.info('Stream: {}, selected_fields: {}'.format(stream_name
, selected_fields
))
80 write_schema(catalog
, stream_name
)
81 if not time_extracted
:
82 time_extracted
= utils
.now()
83 record_count
= process_records(
85 stream_name
=stream_name
,
87 time_extracted
=time_extracted
)
88 LOGGER
.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name
, record_count
))
89 update_currently_syncing(state
, None)
92 # Currently syncing sets the stream currently being delivered in the state.
93 # If the integration is interrupted, this state property is used to identify
94 # the starting point to continue from.
95 # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
96 def update_currently_syncing(state
, stream_name
):
97 if (stream_name
is None) and ('currently_syncing' in state
):
98 del state
['currently_syncing']
100 singer
.set_currently_syncing(state
, stream_name
)
101 singer
.write_state(state
)
104 # List selected fields from stream catalog
105 def get_selected_fields(catalog
, stream_name
):
106 stream
= catalog
.get_stream(stream_name
)
107 mdata
= metadata
.to_map(stream
.metadata
)
108 mdata_list
= singer
.metadata
.to_list(mdata
)
110 for entry
in mdata_list
:
113 field
= entry
['breadcrumb'][1]
114 if entry
.get('metadata', {}).get('selected', False):
115 selected_fields
.append(field
)
118 return selected_fields
121 def get_data(stream_name
,
128 path
= endpoint_config
.get('path', stream_name
).replace(
129 '{spreadsheet_id}', spreadsheet_id
).replace('{sheet_title}', stream_name
).replace(
130 '{range_rows}', range_rows
)
131 params
= endpoint_config
.get('params', {})
132 api
= endpoint_config
.get('api', 'sheets')
133 querystring
= '&'.join(['%s=%s' % (key
, value
) for (key
, value
) in params
.items()]).replace(
134 '{sheet_title}', stream_name
)
136 time_extracted
= utils
.now()
141 endpoint
=stream_name
)
142 return data
, time_extracted
145 # Tranform file_metadata: remove nodes from lastModifyingUser, format as array
146 def transform_file_metadata(file_metadata
):
148 file_metadata_tf
= json
.loads(json
.dumps(file_metadata
))
150 if file_metadata_tf
.get('lastModifyingUser'):
151 file_metadata_tf
['lastModifyingUser'].pop('photoLink', None)
152 file_metadata_tf
['lastModifyingUser'].pop('me', None)
153 file_metadata_tf
['lastModifyingUser'].pop('permissionId', None)
154 # Add record to an array of 1
155 file_metadata_arr
= []
156 file_metadata_arr
.append(file_metadata_tf
)
157 return file_metadata_arr
160 # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
161 def transform_spreadsheet_metadata(spreadsheet_metadata
):
163 spreadsheet_metadata_tf
= json
.loads(json
.dumps(spreadsheet_metadata
))
164 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
165 if spreadsheet_metadata_tf
.get('properties'):
166 spreadsheet_metadata_tf
['properties'].pop('defaultFormat', None)
167 spreadsheet_metadata_tf
.pop('sheets', None)
168 # Add record to an array of 1
169 spreadsheet_metadata_arr
= []
170 spreadsheet_metadata_arr
.append(spreadsheet_metadata_tf
)
171 return spreadsheet_metadata_arr
174 # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
175 def transform_sheet_metadata(spreadsheet_id
, sheet
, columns
):
176 # Convert to properties to dict
177 sheet_metadata
= sheet
.get('properties')
178 sheet_metadata_tf
= json
.loads(json
.dumps(sheet_metadata
))
179 sheet_id
= sheet_metadata_tf
.get('sheetId')
180 sheet_url
= 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
181 spreadsheet_id
, sheet_id
)
182 sheet_metadata_tf
['spreadsheetId'] = spreadsheet_id
183 sheet_metadata_tf
['sheetUrl'] = sheet_url
184 sheet_metadata_tf
['columns'] = columns
185 return sheet_metadata_tf
188 # Convert Excel Date Serial Number (excel_date_sn) to datetime string
189 # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
190 def excel_to_dttm_str(excel_date_sn
, timezone_str
=None):
193 tzn
= pytz
.timezone(timezone_str
)
195 excel_epoch
= 25569 # 1970-01-01T00:00:00Z
196 epoch_sec
= math
.floor((excel_date_sn
- excel_epoch
) * sec_per_day
)
197 epoch_dttm
= datetime(1970, 1, 1)
198 excel_dttm
= epoch_dttm
+ timedelta(seconds
=epoch_sec
)
199 utc_dttm
= tzn
.localize(excel_dttm
).astimezone(pytz
.utc
)
200 utc_dttm_str
= strftime(utc_dttm
)
204 # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
205 # Convert from array of values to JSON with column names as keys
206 def transform_sheet_data(spreadsheet_id
, sheet_id
, from_row
, columns
, sheet_data_rows
):
210 # Create sorted list of columns based on columnIndex
211 cols
= sorted(columns
, key
=lambda i
: i
['columnIndex'])
213 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
214 for row
in sheet_data_rows
:
215 # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
218 return sheet_data_tf
, row_num
- 1, is_last_row
219 sheet_data_row_tf
= {}
220 # Add spreadsheet_id, sheet_id, and row
221 sheet_data_row_tf
['__sdc_spreadsheet_id'] = spreadsheet_id
222 sheet_data_row_tf
['__sdc_sheet_id'] = sheet_id
223 sheet_data_row_tf
['__sdc_row'] = row_num
226 # Select column metadata based on column index
227 col
= cols
[col_num
- 1]
228 col_skipped
= col
.get('columnSkipped')
230 col_name
= col
.get('columnName')
231 col_type
= col
.get('columnType')
232 # Convert dates/times from Lotus Notes Serial Numbers
233 if col_type
== 'numberType.DATE_TIME':
234 if isinstance(value
, (int, float)):
235 col_val
= excel_to_dttm_str(value
)
238 elif col_type
== 'numberType.DATE':
239 if isinstance(value
, (int, float)):
240 col_val
= excel_to_dttm_str(value
)[:10]
243 elif col_type
== 'numberType.TIME':
244 if isinstance(value
, (int, float)):
246 total_secs
= value
* 86400 # seconds in day
247 col_val
= str(timedelta(seconds
=total_secs
))
252 elif col_type
== 'numberType':
253 if isinstance(value
, int):
257 col_val
= float(value
)
260 elif col_type
== 'stringValue':
262 elif col_type
== 'boolValue':
263 if isinstance(value
, bool):
265 elif isinstance(value
, str):
266 if value
.lower() in ('true', 't', 'yes', 'y'):
268 elif value
.lower() in ('false', 'f', 'no', 'n'):
272 elif isinstance(value
, int):
282 sheet_data_row_tf
[col_name
] = col_val
283 col_num
= col_num
+ 1
284 sheet_data_tf
.append(sheet_data_row_tf
)
285 row_num
= row_num
+ 1
286 return sheet_data_tf
, row_num
, is_last_row
289 def sync(client
, config
, catalog
, state
):
290 start_date
= config
.get('start_date')
291 spreadsheet_id
= config
.get('spreadsheet_id')
293 # Get selected_streams from catalog, based on state last_stream
294 # last_stream = Previous currently synced stream, if the load was interrupted
295 last_stream
= singer
.get_currently_syncing(state
)
296 LOGGER
.info('last/currently syncing stream: {}'.format(last_stream
))
297 selected_streams
= []
298 for stream
in catalog
.get_selected_streams(state
):
299 selected_streams
.append(stream
.stream
)
300 LOGGER
.info('selected_streams: {}'.format(selected_streams
))
302 if not selected_streams
:
307 stream_name
= 'file_metadata'
308 file_metadata_config
= STREAMS
.get(stream_name
)
311 LOGGER
.info('GET file_meatadata')
312 file_metadata
, time_extracted
= get_data(stream_name
=stream_name
,
313 endpoint_config
=file_metadata_config
,
315 spreadsheet_id
=spreadsheet_id
)
316 # Transform file_metadata
317 LOGGER
.info('Transform file_meatadata')
318 file_metadata_tf
= transform_file_metadata(file_metadata
)
319 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
321 # Check if file has changed, if not break (return to __init__)
322 last_datetime
= strptime_to_utc(get_bookmark(state
, stream_name
, start_date
))
323 this_datetime
= strptime_to_utc(file_metadata
.get('modifiedTime'))
324 LOGGER
.info('last_datetime = {}, this_datetime = {}'.format(last_datetime
, this_datetime
))
325 if this_datetime
<= last_datetime
:
326 LOGGER
.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
328 # Sync file_metadata if selected
329 sync_stream(stream_name
, selected_streams
, catalog
, state
, file_metadata_tf
, time_extracted
)
330 write_bookmark(state
, stream_name
, strftime(this_datetime
))
332 # SPREADSHEET_METADATA
333 spreadsheet_metadata
= {}
334 stream_name
= 'spreadsheet_metadata'
335 spreadsheet_metadata_config
= STREAMS
.get(stream_name
)
337 # GET spreadsheet_metadata
338 LOGGER
.info('GET spreadsheet_meatadata')
339 spreadsheet_metadata
, ss_time_extracted
= get_data(
340 stream_name
=stream_name
,
341 endpoint_config
=spreadsheet_metadata_config
,
343 spreadsheet_id
=spreadsheet_id
)
345 # Transform spreadsheet_metadata
346 LOGGER
.info('Transform spreadsheet_meatadata')
347 spreadsheet_metadata_tf
= transform_spreadsheet_metadata(spreadsheet_metadata
)
349 # Sync spreadsheet_metadata if selected
350 sync_stream(stream_name
, selected_streams
, catalog
, state
, spreadsheet_metadata_tf
, \
353 # SHEET_METADATA and SHEET_DATA
354 sheets
= spreadsheet_metadata
.get('sheets')
357 sheets_loaded_config
= STREAMS
['sheets_loaded']
359 # Loop thru sheets (worksheet tabs) in spreadsheet
361 sheet_title
= sheet
.get('properties', {}).get('title')
362 sheet_id
= sheet
.get('properties', {}).get('sheetId')
364 # GET sheet_metadata and columns
365 sheet_schema
, columns
= get_sheet_metadata(sheet
, spreadsheet_id
, client
)
366 LOGGER
.info('sheet_schema: {}'.format(sheet_schema
))
368 # Transform sheet_metadata
369 sheet_metadata_tf
= transform_sheet_metadata(spreadsheet_id
, sheet
, columns
)
370 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
371 sheet_metadata
.append(sheet_metadata_tf
)
374 # Should this worksheet tab be synced?
375 if sheet_title
in selected_streams
:
376 LOGGER
.info('STARTED Syncing Sheet {}'.format(sheet_title
))
377 update_currently_syncing(state
, sheet_title
)
378 selected_fields
= get_selected_fields(catalog
, sheet_title
)
379 LOGGER
.info('Stream: {}, selected_fields: {}'.format(sheet_title
, selected_fields
))
380 write_schema(catalog
, sheet_title
)
382 # Determine max range of columns and rows for "paging" through the data
383 sheet_last_col_index
= 1
384 sheet_last_col_letter
= 'A'
386 col_index
= col
.get('columnIndex')
387 col_letter
= col
.get('columnLetter')
388 if col_index
> sheet_last_col_index
:
389 sheet_last_col_index
= col_index
390 sheet_last_col_letter
= col_letter
391 sheet_max_row
= sheet
.get('properties').get('gridProperties', {}).get('rowCount')
393 # Initialize paging for 1st batch
397 if sheet_max_row
< batch_rows
:
398 to_row
= sheet_max_row
402 # Loop thru batches (each having 200 rows of data)
403 while not is_last_row
and from_row
< sheet_max_row
and to_row
<= sheet_max_row
:
404 range_rows
= 'A{}:{}{}'.format(from_row
, sheet_last_col_letter
, to_row
)
406 # GET sheet_data for a worksheet tab
407 sheet_data
, time_extracted
= get_data(
408 stream_name
=sheet_title
,
409 endpoint_config
=sheets_loaded_config
,
411 spreadsheet_id
=spreadsheet_id
,
412 range_rows
=range_rows
)
413 # Data is returned as a list of arrays, an array of values for each row
414 sheet_data_rows
= sheet_data
.get('values')
416 # Transform batch of rows to JSON with keys for each column
417 sheet_data_tf
, row_num
, is_last_row
= transform_sheet_data(
418 spreadsheet_id
=spreadsheet_id
,
422 sheet_data_rows
=sheet_data_rows
)
426 # Process records, send batch of records to target
427 record_count
= process_records(
429 stream_name
=sheet_title
,
430 records
=sheet_data_tf
,
431 time_extracted
=ss_time_extracted
)
432 LOGGER
.info('Sheet: {}, ecords processed: {}'.format(
433 sheet_title
, record_count
))
435 # Update paging from/to_row for next batch
436 from_row
= to_row
+ 1
437 if to_row
+ batch_rows
> sheet_max_row
:
438 to_row
= sheet_max_row
440 to_row
= to_row
+ batch_rows
443 # Add sheet to sheets_loaded
445 sheet_loaded
['spreadsheetId'] = spreadsheet_id
446 sheet_loaded
['sheetId'] = sheet_id
447 sheet_loaded
['title'] = sheet_title
448 sheet_loaded
['loadDate'] = strftime(utils
.now())
449 sheet_loaded
['lastRowNumber'] = row_num
450 sheets_loaded
.append(sheet_loaded
)
452 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
453 # This forces hard deletes on the data downstream if fewer records are sent.
454 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
455 activate_version_message
= singer
.ActivateVersionMessage(
457 version
=int(time
.time() * 1000))
458 singer
.write_message(activate_version_message
)
460 LOGGER
.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
461 sheet_title
, row_num
- 1))
463 stream_name
= 'sheet_metadata'
464 # Sync sheet_metadata if selected
465 sync_stream(stream_name
, selected_streams
, catalog
, state
, sheet_metadata
)
467 stream_name
= 'sheets_loaded'
468 # Sync sheet_metadata if selected
469 sync_stream(stream_name
, selected_streams
, catalog
, state
, sheets_loaded
)