]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blob - tap_google_sheets/sync.py
pylint and testing
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
1 import time
2 import math
3 import json
4 from datetime import datetime, timedelta
5 import pytz
6 import singer
7 from singer import metrics, metadata, Transformer, utils
8 from singer.utils import strptime_to_utc, strftime
9 from tap_google_sheets.streams import STREAMS
10 from tap_google_sheets.schema import get_sheet_metadata
11
12 LOGGER = singer.get_logger()
13
14
15 def write_schema(catalog, stream_name):
16 stream = catalog.get_stream(stream_name)
17 schema = stream.schema.to_dict()
18 try:
19 singer.write_schema(stream_name, schema, stream.key_properties)
20 except OSError as err:
21 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
22 raise err
23
24
25 def write_record(stream_name, record, time_extracted):
26 try:
27 singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
28 except OSError as err:
29 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
30 LOGGER.info('record: {}'.format(record))
31 raise err
32
33
34 def get_bookmark(state, stream, default):
35 if (state is None) or ('bookmarks' not in state):
36 return default
37 return (
38 state
39 .get('bookmarks', {})
40 .get(stream, default)
41 )
42
43
44 def write_bookmark(state, stream, value):
45 if 'bookmarks' not in state:
46 state['bookmarks'] = {}
47 state['bookmarks'][stream] = value
48 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
49 singer.write_state(state)
50
51
52 # Transform/validate batch of records w/ schema and sent to target
53 def process_records(catalog,
54 stream_name,
55 records,
56 time_extracted):
57 stream = catalog.get_stream(stream_name)
58 schema = stream.schema.to_dict()
59 stream_metadata = metadata.to_map(stream.metadata)
60 with metrics.record_counter(stream_name) as counter:
61 for record in records:
62 # Transform record for Singer.io
63 with Transformer() as transformer:
64 transformed_record = transformer.transform(
65 record,
66 schema,
67 stream_metadata)
68 write_record(stream_name, transformed_record, time_extracted=time_extracted)
69 counter.increment()
70 return counter.value
71
72
73 def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
74 # Should sheets_loaded be synced?
75 if stream_name in selected_streams:
76 LOGGER.info('STARTED Syncing {}'.format(stream_name))
77 update_currently_syncing(state, stream_name)
78 selected_fields = get_selected_fields(catalog, stream_name)
79 LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
80 write_schema(catalog, stream_name)
81 if not time_extracted:
82 time_extracted = utils.now()
83 record_count = process_records(
84 catalog=catalog,
85 stream_name=stream_name,
86 records=records,
87 time_extracted=time_extracted)
88 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
89 update_currently_syncing(state, None)
90
91
92 # Currently syncing sets the stream currently being delivered in the state.
93 # If the integration is interrupted, this state property is used to identify
94 # the starting point to continue from.
95 # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
96 def update_currently_syncing(state, stream_name):
97 if (stream_name is None) and ('currently_syncing' in state):
98 del state['currently_syncing']
99 else:
100 singer.set_currently_syncing(state, stream_name)
101 singer.write_state(state)
102
103
104 # List selected fields from stream catalog
105 def get_selected_fields(catalog, stream_name):
106 stream = catalog.get_stream(stream_name)
107 mdata = metadata.to_map(stream.metadata)
108 mdata_list = singer.metadata.to_list(mdata)
109 selected_fields = []
110 for entry in mdata_list:
111 field = None
112 try:
113 field = entry['breadcrumb'][1]
114 if entry.get('metadata', {}).get('selected', False):
115 selected_fields.append(field)
116 except IndexError:
117 pass
118 return selected_fields
119
120
121 def get_data(stream_name,
122 endpoint_config,
123 client,
124 spreadsheet_id,
125 range_rows=None):
126 if not range_rows:
127 range_rows = ''
128 path = endpoint_config.get('path', stream_name).replace(
129 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
130 '{range_rows}', range_rows)
131 params = endpoint_config.get('params', {})
132 api = endpoint_config.get('api', 'sheets')
133 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
134 '{sheet_title}', stream_name)
135 data = {}
136 time_extracted = utils.now()
137 data = client.get(
138 path=path,
139 api=api,
140 params=querystring,
141 endpoint=stream_name)
142 return data, time_extracted
143
144
145 # Tranform file_metadata: remove nodes from lastModifyingUser, format as array
146 def transform_file_metadata(file_metadata):
147 # Convert to dict
148 file_metadata_tf = json.loads(json.dumps(file_metadata))
149 # Remove keys
150 if file_metadata_tf.get('lastModifyingUser'):
151 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
152 file_metadata_tf['lastModifyingUser'].pop('me', None)
153 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
154 # Add record to an array of 1
155 file_metadata_arr = []
156 file_metadata_arr.append(file_metadata_tf)
157 return file_metadata_arr
158
159
160 # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
161 def transform_spreadsheet_metadata(spreadsheet_metadata):
162 # Convert to dict
163 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
164 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
165 if spreadsheet_metadata_tf.get('properties'):
166 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
167 spreadsheet_metadata_tf.pop('sheets', None)
168 # Add record to an array of 1
169 spreadsheet_metadata_arr = []
170 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
171 return spreadsheet_metadata_arr
172
173
174 # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
175 def transform_sheet_metadata(spreadsheet_id, sheet, columns):
176 # Convert to properties to dict
177 sheet_metadata = sheet.get('properties')
178 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
179 sheet_id = sheet_metadata_tf.get('sheetId')
180 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
181 spreadsheet_id, sheet_id)
182 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
183 sheet_metadata_tf['sheetUrl'] = sheet_url
184 sheet_metadata_tf['columns'] = columns
185 return sheet_metadata_tf
186
187
188 # Convert Excel Date Serial Number (excel_date_sn) to datetime string
189 # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
190 def excel_to_dttm_str(excel_date_sn, timezone_str=None):
191 if not timezone_str:
192 timezone_str = 'UTC'
193 tzn = pytz.timezone(timezone_str)
194 sec_per_day = 86400
195 excel_epoch = 25569 # 1970-01-01T00:00:00Z
196 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
197 epoch_dttm = datetime(1970, 1, 1)
198 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
199 utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
200 utc_dttm_str = strftime(utc_dttm)
201 return utc_dttm_str
202
203
204 # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
205 # Convert from array of values to JSON with column names as keys
206 def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
207 sheet_data_tf = []
208 is_last_row = False
209 row_num = from_row
210 # Create sorted list of columns based on columnIndex
211 cols = sorted(columns, key=lambda i: i['columnIndex'])
212
213 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
214 for row in sheet_data_rows:
215 # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
216 if row == []:
217 is_last_row = True
218 return sheet_data_tf, row_num - 1, is_last_row
219 sheet_data_row_tf = {}
220 # Add spreadsheet_id, sheet_id, and row
221 sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
222 sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
223 sheet_data_row_tf['__sdc_row'] = row_num
224 col_num = 1
225 for value in row:
226 # Select column metadata based on column index
227 col = cols[col_num - 1]
228 col_skipped = col.get('columnSkipped')
229 if not col_skipped:
230 col_name = col.get('columnName')
231 col_type = col.get('columnType')
232 # Convert dates/times from Lotus Notes Serial Numbers
233 if col_type == 'numberType.DATE_TIME':
234 if isinstance(value, (int, float)):
235 col_val = excel_to_dttm_str(value)
236 else:
237 col_val = str(value)
238 elif col_type == 'numberType.DATE':
239 if isinstance(value, (int, float)):
240 col_val = excel_to_dttm_str(value)[:10]
241 else:
242 col_val = str(value)
243 elif col_type == 'numberType.TIME':
244 if isinstance(value, (int, float)):
245 try:
246 total_secs = value * 86400 # seconds in day
247 col_val = str(timedelta(seconds=total_secs))
248 except ValueError:
249 col_val = str(value)
250 else:
251 col_val = str(value)
252 elif col_type == 'numberType':
253 if isinstance(value, int):
254 col_val = int(value)
255 else:
256 try:
257 col_val = float(value)
258 except ValueError:
259 col_val = str(value)
260 elif col_type == 'stringValue':
261 col_val = str(value)
262 elif col_type == 'boolValue':
263 if isinstance(value, bool):
264 col_val = value
265 elif isinstance(value, str):
266 if value.lower() in ('true', 't', 'yes', 'y'):
267 col_val = True
268 elif value.lower() in ('false', 'f', 'no', 'n'):
269 col_val = False
270 else:
271 col_val = str(value)
272 elif isinstance(value, int):
273 if value in (1, -1):
274 col_val = True
275 elif value == 0:
276 col_val = False
277 else:
278 col_val = str(value)
279
280 else:
281 col_val = value
282 sheet_data_row_tf[col_name] = col_val
283 col_num = col_num + 1
284 sheet_data_tf.append(sheet_data_row_tf)
285 row_num = row_num + 1
286 return sheet_data_tf, row_num, is_last_row
287
288
289 def sync(client, config, catalog, state):
290 start_date = config.get('start_date')
291 spreadsheet_id = config.get('spreadsheet_id')
292
293 # Get selected_streams from catalog, based on state last_stream
294 # last_stream = Previous currently synced stream, if the load was interrupted
295 last_stream = singer.get_currently_syncing(state)
296 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
297 selected_streams = []
298 for stream in catalog.get_selected_streams(state):
299 selected_streams.append(stream.stream)
300 LOGGER.info('selected_streams: {}'.format(selected_streams))
301
302 if not selected_streams:
303 return
304
305 # FILE_METADATA
306 file_metadata = {}
307 stream_name = 'file_metadata'
308 file_metadata_config = STREAMS.get(stream_name)
309
310 # GET file_metadata
311 LOGGER.info('GET file_meatadata')
312 file_metadata, time_extracted = get_data(stream_name=stream_name,
313 endpoint_config=file_metadata_config,
314 client=client,
315 spreadsheet_id=spreadsheet_id)
316 # Transform file_metadata
317 LOGGER.info('Transform file_meatadata')
318 file_metadata_tf = transform_file_metadata(file_metadata)
319 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
320
321 # Check if file has changed, if not break (return to __init__)
322 last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
323 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
324 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
325 if this_datetime <= last_datetime:
326 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
327 return
328 # Sync file_metadata if selected
329 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
330 write_bookmark(state, stream_name, strftime(this_datetime))
331
332 # SPREADSHEET_METADATA
333 spreadsheet_metadata = {}
334 stream_name = 'spreadsheet_metadata'
335 spreadsheet_metadata_config = STREAMS.get(stream_name)
336
337 # GET spreadsheet_metadata
338 LOGGER.info('GET spreadsheet_meatadata')
339 spreadsheet_metadata, ss_time_extracted = get_data(
340 stream_name=stream_name,
341 endpoint_config=spreadsheet_metadata_config,
342 client=client,
343 spreadsheet_id=spreadsheet_id)
344
345 # Transform spreadsheet_metadata
346 LOGGER.info('Transform spreadsheet_meatadata')
347 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
348
349 # Sync spreadsheet_metadata if selected
350 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
351 ss_time_extracted)
352
353 # SHEET_METADATA and SHEET_DATA
354 sheets = spreadsheet_metadata.get('sheets')
355 sheet_metadata = []
356 sheets_loaded = []
357 sheets_loaded_config = STREAMS['sheets_loaded']
358 if sheets:
359 # Loop thru sheets (worksheet tabs) in spreadsheet
360 for sheet in sheets:
361 sheet_title = sheet.get('properties', {}).get('title')
362 sheet_id = sheet.get('properties', {}).get('sheetId')
363
364 # GET sheet_metadata and columns
365 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
366 LOGGER.info('sheet_schema: {}'.format(sheet_schema))
367
368 # Transform sheet_metadata
369 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
370 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
371 sheet_metadata.append(sheet_metadata_tf)
372
373 # SHEET_DATA
374 # Should this worksheet tab be synced?
375 if sheet_title in selected_streams:
376 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
377 update_currently_syncing(state, sheet_title)
378 selected_fields = get_selected_fields(catalog, sheet_title)
379 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
380 write_schema(catalog, sheet_title)
381
382 # Determine max range of columns and rows for "paging" through the data
383 sheet_last_col_index = 1
384 sheet_last_col_letter = 'A'
385 for col in columns:
386 col_index = col.get('columnIndex')
387 col_letter = col.get('columnLetter')
388 if col_index > sheet_last_col_index:
389 sheet_last_col_index = col_index
390 sheet_last_col_letter = col_letter
391 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
392
393 # Initialize paging for 1st batch
394 is_last_row = False
395 batch_rows = 200
396 from_row = 2
397 if sheet_max_row < batch_rows:
398 to_row = sheet_max_row
399 else:
400 to_row = batch_rows
401
402 # Loop thru batches (each having 200 rows of data)
403 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
404 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
405
406 # GET sheet_data for a worksheet tab
407 sheet_data, time_extracted = get_data(
408 stream_name=sheet_title,
409 endpoint_config=sheets_loaded_config,
410 client=client,
411 spreadsheet_id=spreadsheet_id,
412 range_rows=range_rows)
413 # Data is returned as a list of arrays, an array of values for each row
414 sheet_data_rows = sheet_data.get('values')
415
416 # Transform batch of rows to JSON with keys for each column
417 sheet_data_tf, row_num, is_last_row = transform_sheet_data(
418 spreadsheet_id=spreadsheet_id,
419 sheet_id=sheet_id,
420 from_row=from_row,
421 columns=columns,
422 sheet_data_rows=sheet_data_rows)
423 if row_num < to_row:
424 is_last_row = True
425
426 # Process records, send batch of records to target
427 record_count = process_records(
428 catalog=catalog,
429 stream_name=sheet_title,
430 records=sheet_data_tf,
431 time_extracted=ss_time_extracted)
432 LOGGER.info('Sheet: {}, ecords processed: {}'.format(
433 sheet_title, record_count))
434
435 # Update paging from/to_row for next batch
436 from_row = to_row + 1
437 if to_row + batch_rows > sheet_max_row:
438 to_row = sheet_max_row
439 else:
440 to_row = to_row + batch_rows
441
442 # SHEETS_LOADED
443 # Add sheet to sheets_loaded
444 sheet_loaded = {}
445 sheet_loaded['spreadsheetId'] = spreadsheet_id
446 sheet_loaded['sheetId'] = sheet_id
447 sheet_loaded['title'] = sheet_title
448 sheet_loaded['loadDate'] = strftime(utils.now())
449 sheet_loaded['lastRowNumber'] = row_num
450 sheets_loaded.append(sheet_loaded)
451
452 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
453 # This forces hard deletes on the data downstream if fewer records are sent.
454 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
455 activate_version_message = singer.ActivateVersionMessage(
456 stream=sheet_title,
457 version=int(time.time() * 1000))
458 singer.write_message(activate_version_message)
459
460 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
461 sheet_title, row_num - 1))
462
463 stream_name = 'sheet_metadata'
464 # Sync sheet_metadata if selected
465 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
466
467 stream_name = 'sheets_loaded'
468 # Sync sheet_metadata if selected
469 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
470
471 return