]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blame - tap_google_sheets/sync.py
pylint and testing
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
CommitLineData
89643ba6
JH
1import time
2import math
89643ba6 3import json
da690bda 4from datetime import datetime, timedelta
99424fee
JH
5import pytz
6import singer
89643ba6
JH
7from singer import metrics, metadata, Transformer, utils
8from singer.utils import strptime_to_utc, strftime
89643ba6
JH
9from tap_google_sheets.streams import STREAMS
10from tap_google_sheets.schema import get_sheet_metadata
11
12LOGGER = singer.get_logger()
13
14
15def write_schema(catalog, stream_name):
16 stream = catalog.get_stream(stream_name)
17 schema = stream.schema.to_dict()
18 try:
19 singer.write_schema(stream_name, schema, stream.key_properties)
20 except OSError as err:
21 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
22 raise err
23
24
25def write_record(stream_name, record, time_extracted):
26 try:
27 singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
28 except OSError as err:
29 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
30 LOGGER.info('record: {}'.format(record))
31 raise err
32
33
34def get_bookmark(state, stream, default):
35 if (state is None) or ('bookmarks' not in state):
36 return default
37 return (
38 state
39 .get('bookmarks', {})
40 .get(stream, default)
41 )
42
43
44def write_bookmark(state, stream, value):
45 if 'bookmarks' not in state:
46 state['bookmarks'] = {}
47 state['bookmarks'][stream] = value
48 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
49 singer.write_state(state)
50
51
da690bda
JH
52# Transform/validate batch of records w/ schema and sent to target
53def process_records(catalog,
89643ba6
JH
54 stream_name,
55 records,
da690bda 56 time_extracted):
89643ba6
JH
57 stream = catalog.get_stream(stream_name)
58 schema = stream.schema.to_dict()
59 stream_metadata = metadata.to_map(stream.metadata)
89643ba6
JH
60 with metrics.record_counter(stream_name) as counter:
61 for record in records:
89643ba6
JH
62 # Transform record for Singer.io
63 with Transformer() as transformer:
64 transformed_record = transformer.transform(
65 record,
66 schema,
67 stream_metadata)
da690bda
JH
68 write_record(stream_name, transformed_record, time_extracted=time_extracted)
69 counter.increment()
70 return counter.value
71
72
99424fee 73def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
da690bda
JH
74 # Should sheets_loaded be synced?
75 if stream_name in selected_streams:
76 LOGGER.info('STARTED Syncing {}'.format(stream_name))
77 update_currently_syncing(state, stream_name)
99424fee
JH
78 selected_fields = get_selected_fields(catalog, stream_name)
79 LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
da690bda 80 write_schema(catalog, stream_name)
99424fee
JH
81 if not time_extracted:
82 time_extracted = utils.now()
da690bda
JH
83 record_count = process_records(
84 catalog=catalog,
85 stream_name=stream_name,
86 records=records,
99424fee 87 time_extracted=time_extracted)
da690bda
JH
88 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
89 update_currently_syncing(state, None)
89643ba6
JH
90
91
92# Currently syncing sets the stream currently being delivered in the state.
93# If the integration is interrupted, this state property is used to identify
94# the starting point to continue from.
95# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
96def update_currently_syncing(state, stream_name):
97 if (stream_name is None) and ('currently_syncing' in state):
98 del state['currently_syncing']
99 else:
100 singer.set_currently_syncing(state, stream_name)
101 singer.write_state(state)
102
103
104# List selected fields from stream catalog
105def get_selected_fields(catalog, stream_name):
106 stream = catalog.get_stream(stream_name)
107 mdata = metadata.to_map(stream.metadata)
108 mdata_list = singer.metadata.to_list(mdata)
109 selected_fields = []
110 for entry in mdata_list:
99424fee 111 field = None
89643ba6 112 try:
99424fee 113 field = entry['breadcrumb'][1]
89643ba6
JH
114 if entry.get('metadata', {}).get('selected', False):
115 selected_fields.append(field)
116 except IndexError:
117 pass
118 return selected_fields
119
120
121def get_data(stream_name,
122 endpoint_config,
123 client,
124 spreadsheet_id,
125 range_rows=None):
126 if not range_rows:
127 range_rows = ''
128 path = endpoint_config.get('path', stream_name).replace(
129 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
130 '{range_rows}', range_rows)
131 params = endpoint_config.get('params', {})
132 api = endpoint_config.get('api', 'sheets')
133 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
134 '{sheet_title}', stream_name)
135 data = {}
da690bda 136 time_extracted = utils.now()
89643ba6
JH
137 data = client.get(
138 path=path,
139 api=api,
140 params=querystring,
141 endpoint=stream_name)
da690bda 142 return data, time_extracted
89643ba6
JH
143
144
da690bda 145# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
89643ba6
JH
146def transform_file_metadata(file_metadata):
147 # Convert to dict
148 file_metadata_tf = json.loads(json.dumps(file_metadata))
149 # Remove keys
150 if file_metadata_tf.get('lastModifyingUser'):
151 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
152 file_metadata_tf['lastModifyingUser'].pop('me', None)
153 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
154 # Add record to an array of 1
155 file_metadata_arr = []
156 file_metadata_arr.append(file_metadata_tf)
157 return file_metadata_arr
158
159
da690bda 160# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
89643ba6
JH
161def transform_spreadsheet_metadata(spreadsheet_metadata):
162 # Convert to dict
163 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
da690bda 164 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
89643ba6
JH
165 if spreadsheet_metadata_tf.get('properties'):
166 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
167 spreadsheet_metadata_tf.pop('sheets', None)
168 # Add record to an array of 1
169 spreadsheet_metadata_arr = []
170 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
171 return spreadsheet_metadata_arr
172
173
da690bda 174# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
89643ba6
JH
175def transform_sheet_metadata(spreadsheet_id, sheet, columns):
176 # Convert to properties to dict
177 sheet_metadata = sheet.get('properties')
99424fee 178 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
89643ba6
JH
179 sheet_id = sheet_metadata_tf.get('sheetId')
180 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
181 spreadsheet_id, sheet_id)
182 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
183 sheet_metadata_tf['sheetUrl'] = sheet_url
184 sheet_metadata_tf['columns'] = columns
185 return sheet_metadata_tf
186
187
da690bda
JH
188# Convert Excel Date Serial Number (excel_date_sn) to datetime string
189# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
190def excel_to_dttm_str(excel_date_sn, timezone_str=None):
191 if not timezone_str:
192 timezone_str = 'UTC'
99424fee 193 tzn = pytz.timezone(timezone_str)
da690bda
JH
194 sec_per_day = 86400
195 excel_epoch = 25569 # 1970-01-01T00:00:00Z
196 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
197 epoch_dttm = datetime(1970, 1, 1)
198 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
99424fee 199 utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
da690bda
JH
200 utc_dttm_str = strftime(utc_dttm)
201 return utc_dttm_str
202
203
204# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
205# Convert from array of values to JSON with column names as keys
206def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
207 sheet_data_tf = []
208 is_last_row = False
209 row_num = from_row
210 # Create sorted list of columns based on columnIndex
99424fee 211 cols = sorted(columns, key=lambda i: i['columnIndex'])
da690bda
JH
212
213 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
214 for row in sheet_data_rows:
215 # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
216 if row == []:
217 is_last_row = True
218 return sheet_data_tf, row_num - 1, is_last_row
219 sheet_data_row_tf = {}
220 # Add spreadsheet_id, sheet_id, and row
221 sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
222 sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
223 sheet_data_row_tf['__sdc_row'] = row_num
224 col_num = 1
225 for value in row:
226 # Select column metadata based on column index
227 col = cols[col_num - 1]
228 col_skipped = col.get('columnSkipped')
229 if not col_skipped:
230 col_name = col.get('columnName')
231 col_type = col.get('columnType')
232 # Convert dates/times from Lotus Notes Serial Numbers
233 if col_type == 'numberType.DATE_TIME':
99424fee 234 if isinstance(value, (int, float)):
da690bda
JH
235 col_val = excel_to_dttm_str(value)
236 else:
237 col_val = str(value)
238 elif col_type == 'numberType.DATE':
99424fee 239 if isinstance(value, (int, float)):
da690bda
JH
240 col_val = excel_to_dttm_str(value)[:10]
241 else:
242 col_val = str(value)
243 elif col_type == 'numberType.TIME':
99424fee 244 if isinstance(value, (int, float)):
da690bda
JH
245 try:
246 total_secs = value * 86400 # seconds in day
247 col_val = str(timedelta(seconds=total_secs))
248 except ValueError:
249 col_val = str(value)
250 else:
251 col_val = str(value)
252 elif col_type == 'numberType':
253 if isinstance(value, int):
254 col_val = int(value)
255 else:
256 try:
257 col_val = float(value)
258 except ValueError:
259 col_val = str(value)
260 elif col_type == 'stringValue':
261 col_val = str(value)
262 elif col_type == 'boolValue':
263 if isinstance(value, bool):
264 col_val = value
265 elif isinstance(value, str):
266 if value.lower() in ('true', 't', 'yes', 'y'):
267 col_val = True
268 elif value.lower() in ('false', 'f', 'no', 'n'):
269 col_val = False
270 else:
271 col_val = str(value)
272 elif isinstance(value, int):
99424fee 273 if value in (1, -1):
da690bda
JH
274 col_val = True
275 elif value == 0:
276 col_val = False
277 else:
278 col_val = str(value)
279
280 else:
281 col_val = value
282 sheet_data_row_tf[col_name] = col_val
283 col_num = col_num + 1
284 sheet_data_tf.append(sheet_data_row_tf)
285 row_num = row_num + 1
286 return sheet_data_tf, row_num, is_last_row
287
288
89643ba6
JH
289def sync(client, config, catalog, state):
290 start_date = config.get('start_date')
291 spreadsheet_id = config.get('spreadsheet_id')
292
293 # Get selected_streams from catalog, based on state last_stream
294 # last_stream = Previous currently synced stream, if the load was interrupted
295 last_stream = singer.get_currently_syncing(state)
296 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
297 selected_streams = []
298 for stream in catalog.get_selected_streams(state):
299 selected_streams.append(stream.stream)
300 LOGGER.info('selected_streams: {}'.format(selected_streams))
301
302 if not selected_streams:
303 return
304
da690bda 305 # FILE_METADATA
89643ba6 306 file_metadata = {}
da690bda
JH
307 stream_name = 'file_metadata'
308 file_metadata_config = STREAMS.get(stream_name)
309
310 # GET file_metadata
311 LOGGER.info('GET file_meatadata')
312 file_metadata, time_extracted = get_data(stream_name=stream_name,
313 endpoint_config=file_metadata_config,
314 client=client,
315 spreadsheet_id=spreadsheet_id)
316 # Transform file_metadata
317 LOGGER.info('Transform file_meatadata')
89643ba6
JH
318 file_metadata_tf = transform_file_metadata(file_metadata)
319 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
da690bda
JH
320
321 # Check if file has changed, if not break (return to __init__)
322 last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
89643ba6
JH
323 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
324 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
325 if this_datetime <= last_datetime:
326 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
99424fee
JH
327 return
328 # Sync file_metadata if selected
329 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
330 write_bookmark(state, stream_name, strftime(this_datetime))
da690bda
JH
331
332 # SPREADSHEET_METADATA
89643ba6 333 spreadsheet_metadata = {}
da690bda
JH
334 stream_name = 'spreadsheet_metadata'
335 spreadsheet_metadata_config = STREAMS.get(stream_name)
336
337 # GET spreadsheet_metadata
338 LOGGER.info('GET spreadsheet_meatadata')
99424fee 339 spreadsheet_metadata, ss_time_extracted = get_data(
da690bda
JH
340 stream_name=stream_name,
341 endpoint_config=spreadsheet_metadata_config,
342 client=client,
343 spreadsheet_id=spreadsheet_id)
344
345 # Transform spreadsheet_metadata
346 LOGGER.info('Transform spreadsheet_meatadata')
89643ba6 347 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
89643ba6 348
da690bda 349 # Sync spreadsheet_metadata if selected
99424fee
JH
350 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
351 ss_time_extracted)
da690bda
JH
352
353 # SHEET_METADATA and SHEET_DATA
89643ba6
JH
354 sheets = spreadsheet_metadata.get('sheets')
355 sheet_metadata = []
356 sheets_loaded = []
357 sheets_loaded_config = STREAMS['sheets_loaded']
358 if sheets:
da690bda 359 # Loop thru sheets (worksheet tabs) in spreadsheet
89643ba6
JH
360 for sheet in sheets:
361 sheet_title = sheet.get('properties', {}).get('title')
da690bda
JH
362 sheet_id = sheet.get('properties', {}).get('sheetId')
363
364 # GET sheet_metadata and columns
89643ba6 365 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
99424fee 366 LOGGER.info('sheet_schema: {}'.format(sheet_schema))
da690bda
JH
367
368 # Transform sheet_metadata
89643ba6
JH
369 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
370 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
371 sheet_metadata.append(sheet_metadata_tf)
372
da690bda
JH
373 # SHEET_DATA
374 # Should this worksheet tab be synced?
375 if sheet_title in selected_streams:
376 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
377 update_currently_syncing(state, sheet_title)
99424fee
JH
378 selected_fields = get_selected_fields(catalog, sheet_title)
379 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
da690bda
JH
380 write_schema(catalog, sheet_title)
381
382 # Determine max range of columns and rows for "paging" through the data
383 sheet_last_col_index = 1
384 sheet_last_col_letter = 'A'
385 for col in columns:
386 col_index = col.get('columnIndex')
387 col_letter = col.get('columnLetter')
388 if col_index > sheet_last_col_index:
389 sheet_last_col_index = col_index
390 sheet_last_col_letter = col_letter
391 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
392
393 # Initialize paging for 1st batch
394 is_last_row = False
395 batch_rows = 200
396 from_row = 2
397 if sheet_max_row < batch_rows:
398 to_row = sheet_max_row
399 else:
400 to_row = batch_rows
401
402 # Loop thru batches (each having 200 rows of data)
403 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
404 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
99424fee 405
da690bda
JH
406 # GET sheet_data for a worksheet tab
407 sheet_data, time_extracted = get_data(
408 stream_name=sheet_title,
409 endpoint_config=sheets_loaded_config,
410 client=client,
411 spreadsheet_id=spreadsheet_id,
412 range_rows=range_rows)
413 # Data is returned as a list of arrays, an array of values for each row
414 sheet_data_rows = sheet_data.get('values')
415
416 # Transform batch of rows to JSON with keys for each column
417 sheet_data_tf, row_num, is_last_row = transform_sheet_data(
418 spreadsheet_id=spreadsheet_id,
419 sheet_id=sheet_id,
420 from_row=from_row,
421 columns=columns,
422 sheet_data_rows=sheet_data_rows)
423 if row_num < to_row:
424 is_last_row = True
425
426 # Process records, send batch of records to target
427 record_count = process_records(
428 catalog=catalog,
429 stream_name=sheet_title,
430 records=sheet_data_tf,
431 time_extracted=ss_time_extracted)
99424fee
JH
432 LOGGER.info('Sheet: {}, ecords processed: {}'.format(
433 sheet_title, record_count))
434
da690bda
JH
435 # Update paging from/to_row for next batch
436 from_row = to_row + 1
437 if to_row + batch_rows > sheet_max_row:
438 to_row = sheet_max_row
439 else:
440 to_row = to_row + batch_rows
441
442 # SHEETS_LOADED
443 # Add sheet to sheets_loaded
444 sheet_loaded = {}
445 sheet_loaded['spreadsheetId'] = spreadsheet_id
446 sheet_loaded['sheetId'] = sheet_id
447 sheet_loaded['title'] = sheet_title
448 sheet_loaded['loadDate'] = strftime(utils.now())
449 sheet_loaded['lastRowNumber'] = row_num
450 sheets_loaded.append(sheet_loaded)
451
452 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
99424fee
JH
453 # This forces hard deletes on the data downstream if fewer records are sent.
454 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
da690bda
JH
455 activate_version_message = singer.ActivateVersionMessage(
456 stream=sheet_title,
457 version=int(time.time() * 1000))
458 singer.write_message(activate_version_message)
459
460 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
461 sheet_title, row_num - 1))
da690bda
JH
462
463 stream_name = 'sheet_metadata'
464 # Sync sheet_metadata if selected
465 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
99424fee 466
da690bda
JH
467 stream_name = 'sheets_loaded'
468 # Sync sheet_metadata if selected
469 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
99424fee
JH
470
471 return