6 from datetime
import datetime
, timedelta
7 from collections
import OrderedDict
8 from singer
import metrics
, metadata
, Transformer
, utils
9 from singer
.utils
import strptime_to_utc
, strftime
10 from tap_google_sheets
.streams
import STREAMS
11 from tap_google_sheets
.schema
import get_sheet_metadata
13 LOGGER
= singer
.get_logger()
16 def write_schema(catalog
, stream_name
):
17 stream
= catalog
.get_stream(stream_name
)
18 schema
= stream
.schema
.to_dict()
20 singer
.write_schema(stream_name
, schema
, stream
.key_properties
)
21 except OSError as err
:
22 LOGGER
.info('OS Error writing schema for: {}'.format(stream_name
))
26 def write_record(stream_name
, record
, time_extracted
):
28 singer
.messages
.write_record(stream_name
, record
, time_extracted
=time_extracted
)
29 except OSError as err
:
30 LOGGER
.info('OS Error writing record for: {}'.format(stream_name
))
31 LOGGER
.info('record: {}'.format(record
))
35 def get_bookmark(state
, stream
, default
):
36 if (state
is None) or ('bookmarks' not in state
):
45 def write_bookmark(state
, stream
, value
):
46 if 'bookmarks' not in state
:
47 state
['bookmarks'] = {}
48 state
['bookmarks'][stream
] = value
49 LOGGER
.info('Write state for stream: {}, value: {}'.format(stream
, value
))
50 singer
.write_state(state
)
53 # Transform/validate batch of records w/ schema and sent to target
54 def process_records(catalog
,
58 stream
= catalog
.get_stream(stream_name
)
59 schema
= stream
.schema
.to_dict()
60 stream_metadata
= metadata
.to_map(stream
.metadata
)
61 with metrics
.record_counter(stream_name
) as counter
:
62 for record
in records
:
63 # Transform record for Singer.io
64 with Transformer() as transformer
:
65 transformed_record
= transformer
.transform(
69 write_record(stream_name
, transformed_record
, time_extracted
=time_extracted
)
74 def sync_stream(stream_name
, selected_streams
, catalog
, state
, records
):
75 # Should sheets_loaded be synced?
76 if stream_name
in selected_streams
:
77 LOGGER
.info('STARTED Syncing {}'.format(stream_name
))
78 update_currently_syncing(state
, stream_name
)
79 write_schema(catalog
, stream_name
)
80 record_count
= process_records(
82 stream_name
=stream_name
,
84 time_extracted
=utils
.now())
85 LOGGER
.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name
, record_count
))
86 update_currently_syncing(state
, None)
89 # Currently syncing sets the stream currently being delivered in the state.
90 # If the integration is interrupted, this state property is used to identify
91 # the starting point to continue from.
92 # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
93 def update_currently_syncing(state
, stream_name
):
94 if (stream_name
is None) and ('currently_syncing' in state
):
95 del state
['currently_syncing']
97 singer
.set_currently_syncing(state
, stream_name
)
98 singer
.write_state(state
)
101 # List selected fields from stream catalog
102 def get_selected_fields(catalog
, stream_name
):
103 stream
= catalog
.get_stream(stream_name
)
104 mdata
= metadata
.to_map(stream
.metadata
)
105 mdata_list
= singer
.metadata
.to_list(mdata
)
107 for entry
in mdata_list
:
110 field
= entry
['breadcrumb'][1]
111 if entry
.get('metadata', {}).get('selected', False):
112 selected_fields
.append(field
)
115 return selected_fields
118 def get_data(stream_name
,
125 path
= endpoint_config
.get('path', stream_name
).replace(
126 '{spreadsheet_id}', spreadsheet_id
).replace('{sheet_title}', stream_name
).replace(
127 '{range_rows}', range_rows
)
128 params
= endpoint_config
.get('params', {})
129 api
= endpoint_config
.get('api', 'sheets')
130 querystring
= '&'.join(['%s=%s' % (key
, value
) for (key
, value
) in params
.items()]).replace(
131 '{sheet_title}', stream_name
)
133 time_extracted
= utils
.now()
138 endpoint
=stream_name
)
139 return data
, time_extracted
142 # Tranform file_metadata: remove nodes from lastModifyingUser, format as array
143 def transform_file_metadata(file_metadata
):
145 file_metadata_tf
= json
.loads(json
.dumps(file_metadata
))
147 if file_metadata_tf
.get('lastModifyingUser'):
148 file_metadata_tf
['lastModifyingUser'].pop('photoLink', None)
149 file_metadata_tf
['lastModifyingUser'].pop('me', None)
150 file_metadata_tf
['lastModifyingUser'].pop('permissionId', None)
151 # Add record to an array of 1
152 file_metadata_arr
= []
153 file_metadata_arr
.append(file_metadata_tf
)
154 return file_metadata_arr
157 # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
158 def transform_spreadsheet_metadata(spreadsheet_metadata
):
160 spreadsheet_metadata_tf
= json
.loads(json
.dumps(spreadsheet_metadata
))
161 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
162 if spreadsheet_metadata_tf
.get('properties'):
163 spreadsheet_metadata_tf
['properties'].pop('defaultFormat', None)
164 spreadsheet_metadata_tf
.pop('sheets', None)
165 # Add record to an array of 1
166 spreadsheet_metadata_arr
= []
167 spreadsheet_metadata_arr
.append(spreadsheet_metadata_tf
)
168 return spreadsheet_metadata_arr
171 # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
172 def transform_sheet_metadata(spreadsheet_id
, sheet
, columns
):
173 # Convert to properties to dict
174 sheet_metadata
= sheet
.get('properties')
175 sheet_metadata_tf
= json
.loads(json
.dumps(sheet_metadata
))
176 sheet_id
= sheet_metadata_tf
.get('sheetId')
177 sheet_url
= 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
178 spreadsheet_id
, sheet_id
)
179 sheet_metadata_tf
['spreadsheetId'] = spreadsheet_id
180 sheet_metadata_tf
['sheetUrl'] = sheet_url
181 sheet_metadata_tf
['columns'] = columns
182 return sheet_metadata_tf
185 # Convert Excel Date Serial Number (excel_date_sn) to datetime string
186 # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
187 def excel_to_dttm_str(excel_date_sn
, timezone_str
=None):
190 tz
= pytz
.timezone(timezone_str
)
192 excel_epoch
= 25569 # 1970-01-01T00:00:00Z
193 epoch_sec
= math
.floor((excel_date_sn
- excel_epoch
) * sec_per_day
)
194 epoch_dttm
= datetime(1970, 1, 1)
195 excel_dttm
= epoch_dttm
+ timedelta(seconds
=epoch_sec
)
196 utc_dttm
= tz
.localize(excel_dttm
).astimezone(pytz
.utc
)
197 utc_dttm_str
= strftime(utc_dttm
)
201 # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
202 # Convert from array of values to JSON with column names as keys
203 def transform_sheet_data(spreadsheet_id
, sheet_id
, from_row
, columns
, sheet_data_rows
):
207 # Create sorted list of columns based on columnIndex
208 cols
= sorted(columns
, key
= lambda i
: i
['columnIndex'])
210 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
211 for row
in sheet_data_rows
:
212 # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
215 return sheet_data_tf
, row_num
- 1, is_last_row
216 sheet_data_row_tf
= {}
217 # Add spreadsheet_id, sheet_id, and row
218 sheet_data_row_tf
['__sdc_spreadsheet_id'] = spreadsheet_id
219 sheet_data_row_tf
['__sdc_sheet_id'] = sheet_id
220 sheet_data_row_tf
['__sdc_row'] = row_num
223 # Select column metadata based on column index
224 col
= cols
[col_num
- 1]
225 col_skipped
= col
.get('columnSkipped')
227 col_name
= col
.get('columnName')
228 col_type
= col
.get('columnType')
229 # Convert dates/times from Lotus Notes Serial Numbers
230 if col_type
== 'numberType.DATE_TIME':
231 if isinstance(value
, int) or isinstance(value
, float):
232 col_val
= excel_to_dttm_str(value
)
235 elif col_type
== 'numberType.DATE':
236 if isinstance(value
, int) or isinstance(value
, float):
237 col_val
= excel_to_dttm_str(value
)[:10]
240 elif col_type
== 'numberType.TIME':
241 if isinstance(value
, int) or isinstance(value
, float):
243 total_secs
= value
* 86400 # seconds in day
244 col_val
= str(timedelta(seconds
=total_secs
))
249 elif col_type
== 'numberType':
250 if isinstance(value
, int):
254 col_val
= float(value
)
257 elif col_type
== 'stringValue':
259 elif col_type
== 'boolValue':
260 if isinstance(value
, bool):
262 elif isinstance(value
, str):
263 if value
.lower() in ('true', 't', 'yes', 'y'):
265 elif value
.lower() in ('false', 'f', 'no', 'n'):
269 elif isinstance(value
, int):
270 if value
== 1 or value
== -1:
279 sheet_data_row_tf
[col_name
] = col_val
280 col_num
= col_num
+ 1
281 sheet_data_tf
.append(sheet_data_row_tf
)
282 row_num
= row_num
+ 1
283 return sheet_data_tf
, row_num
, is_last_row
286 def sync(client
, config
, catalog
, state
):
287 start_date
= config
.get('start_date')
288 spreadsheet_id
= config
.get('spreadsheet_id')
290 # Get selected_streams from catalog, based on state last_stream
291 # last_stream = Previous currently synced stream, if the load was interrupted
292 last_stream
= singer
.get_currently_syncing(state
)
293 LOGGER
.info('last/currently syncing stream: {}'.format(last_stream
))
294 selected_streams
= []
295 for stream
in catalog
.get_selected_streams(state
):
296 selected_streams
.append(stream
.stream
)
297 LOGGER
.info('selected_streams: {}'.format(selected_streams
))
299 if not selected_streams
:
304 stream_name
= 'file_metadata'
305 file_metadata_config
= STREAMS
.get(stream_name
)
308 LOGGER
.info('GET file_meatadata')
309 file_metadata
, time_extracted
= get_data(stream_name
=stream_name
,
310 endpoint_config
=file_metadata_config
,
312 spreadsheet_id
=spreadsheet_id
)
313 # Transform file_metadata
314 LOGGER
.info('Transform file_meatadata')
315 file_metadata_tf
= transform_file_metadata(file_metadata
)
316 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
318 # Check if file has changed, if not break (return to __init__)
319 last_datetime
= strptime_to_utc(get_bookmark(state
, stream_name
, start_date
))
320 this_datetime
= strptime_to_utc(file_metadata
.get('modifiedTime'))
321 LOGGER
.info('last_datetime = {}, this_datetime = {}'.format(last_datetime
, this_datetime
))
322 if this_datetime
<= last_datetime
:
323 LOGGER
.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
326 # Sync file_metadata if selected
327 sync_stream(stream_name
, selected_streams
, catalog
, state
, file_metadata_tf
)
328 write_bookmark(state
, stream_name
, strftime(this_datetime
))
330 # SPREADSHEET_METADATA
331 spreadsheet_metadata
= {}
332 stream_name
= 'spreadsheet_metadata'
333 spreadsheet_metadata_config
= STREAMS
.get(stream_name
)
335 # GET spreadsheet_metadata
336 LOGGER
.info('GET spreadsheet_meatadata')
337 spreadsheet_metadata
, ss_time_extracted
= get_data(
338 stream_name
=stream_name
,
339 endpoint_config
=spreadsheet_metadata_config
,
341 spreadsheet_id
=spreadsheet_id
)
343 # Transform spreadsheet_metadata
344 LOGGER
.info('Transform spreadsheet_meatadata')
345 spreadsheet_metadata_tf
= transform_spreadsheet_metadata(spreadsheet_metadata
)
347 # Sync spreadsheet_metadata if selected
348 sync_stream(stream_name
, selected_streams
, catalog
, state
, spreadsheet_metadata_tf
)
350 # SHEET_METADATA and SHEET_DATA
351 sheets
= spreadsheet_metadata
.get('sheets')
354 sheets_loaded_config
= STREAMS
['sheets_loaded']
356 # Loop thru sheets (worksheet tabs) in spreadsheet
358 sheet_title
= sheet
.get('properties', {}).get('title')
359 sheet_id
= sheet
.get('properties', {}).get('sheetId')
361 # GET sheet_metadata and columns
362 sheet_schema
, columns
= get_sheet_metadata(sheet
, spreadsheet_id
, client
)
364 # Transform sheet_metadata
365 sheet_metadata_tf
= transform_sheet_metadata(spreadsheet_id
, sheet
, columns
)
366 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
367 sheet_metadata
.append(sheet_metadata_tf
)
370 # Should this worksheet tab be synced?
371 if sheet_title
in selected_streams
:
372 LOGGER
.info('STARTED Syncing Sheet {}'.format(sheet_title
))
373 update_currently_syncing(state
, sheet_title
)
374 write_schema(catalog
, sheet_title
)
376 # Determine max range of columns and rows for "paging" through the data
377 sheet_last_col_index
= 1
378 sheet_last_col_letter
= 'A'
380 col_index
= col
.get('columnIndex')
381 col_letter
= col
.get('columnLetter')
382 if col_index
> sheet_last_col_index
:
383 sheet_last_col_index
= col_index
384 sheet_last_col_letter
= col_letter
385 sheet_max_row
= sheet
.get('properties').get('gridProperties', {}).get('rowCount')
387 # Initialize paging for 1st batch
391 if sheet_max_row
< batch_rows
:
392 to_row
= sheet_max_row
396 # Loop thru batches (each having 200 rows of data)
397 while not is_last_row
and from_row
< sheet_max_row
and to_row
<= sheet_max_row
:
398 range_rows
= 'A{}:{}{}'.format(from_row
, sheet_last_col_letter
, to_row
)
400 # GET sheet_data for a worksheet tab
401 sheet_data
, time_extracted
= get_data(
402 stream_name
=sheet_title
,
403 endpoint_config
=sheets_loaded_config
,
405 spreadsheet_id
=spreadsheet_id
,
406 range_rows
=range_rows
)
407 # Data is returned as a list of arrays, an array of values for each row
408 sheet_data_rows
= sheet_data
.get('values')
410 # Transform batch of rows to JSON with keys for each column
411 sheet_data_tf
, row_num
, is_last_row
= transform_sheet_data(
412 spreadsheet_id
=spreadsheet_id
,
416 sheet_data_rows
=sheet_data_rows
)
420 # Process records, send batch of records to target
421 record_count
= process_records(
423 stream_name
=sheet_title
,
424 records
=sheet_data_tf
,
425 time_extracted
=ss_time_extracted
)
427 # Update paging from/to_row for next batch
428 from_row
= to_row
+ 1
429 if to_row
+ batch_rows
> sheet_max_row
:
430 to_row
= sheet_max_row
432 to_row
= to_row
+ batch_rows
435 # Add sheet to sheets_loaded
437 sheet_loaded
['spreadsheetId'] = spreadsheet_id
438 sheet_loaded
['sheetId'] = sheet_id
439 sheet_loaded
['title'] = sheet_title
440 sheet_loaded
['loadDate'] = strftime(utils
.now())
441 sheet_loaded
['lastRowNumber'] = row_num
442 sheets_loaded
.append(sheet_loaded
)
444 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
445 # This forces hard deletes on the data downstream if fewer records are sent for a sheet.
446 # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167
447 activate_version_message
= singer
.ActivateVersionMessage(
449 version
=int(time
.time() * 1000))
450 singer
.write_message(activate_version_message
)
452 LOGGER
.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
453 sheet_title
, row_num
- 1))
454 update_currently_syncing(state
, None)
456 stream_name
= 'sheet_metadata'
457 # Sync sheet_metadata if selected
458 sync_stream(stream_name
, selected_streams
, catalog
, state
, sheet_metadata
)
460 stream_name
= 'sheets_loaded'
461 # Sync sheet_metadata if selected
462 sync_stream(stream_name
, selected_streams
, catalog
, state
, sheets_loaded
)