]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blob - tap_google_sheets/sync.py
client.py rate limit, sync.py changes
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
1 import time
2 import math
3 import singer
4 import json
5 import pytz
6 from datetime import datetime, timedelta
7 from collections import OrderedDict
8 from singer import metrics, metadata, Transformer, utils
9 from singer.utils import strptime_to_utc, strftime
10 from tap_google_sheets.streams import STREAMS
11 from tap_google_sheets.schema import get_sheet_metadata
12
13 LOGGER = singer.get_logger()
14
15
16 def write_schema(catalog, stream_name):
17 stream = catalog.get_stream(stream_name)
18 schema = stream.schema.to_dict()
19 try:
20 singer.write_schema(stream_name, schema, stream.key_properties)
21 except OSError as err:
22 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
23 raise err
24
25
26 def write_record(stream_name, record, time_extracted):
27 try:
28 singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
29 except OSError as err:
30 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
31 LOGGER.info('record: {}'.format(record))
32 raise err
33
34
35 def get_bookmark(state, stream, default):
36 if (state is None) or ('bookmarks' not in state):
37 return default
38 return (
39 state
40 .get('bookmarks', {})
41 .get(stream, default)
42 )
43
44
45 def write_bookmark(state, stream, value):
46 if 'bookmarks' not in state:
47 state['bookmarks'] = {}
48 state['bookmarks'][stream] = value
49 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
50 singer.write_state(state)
51
52
53 # Transform/validate batch of records w/ schema and sent to target
54 def process_records(catalog,
55 stream_name,
56 records,
57 time_extracted):
58 stream = catalog.get_stream(stream_name)
59 schema = stream.schema.to_dict()
60 stream_metadata = metadata.to_map(stream.metadata)
61 with metrics.record_counter(stream_name) as counter:
62 for record in records:
63 # Transform record for Singer.io
64 with Transformer() as transformer:
65 transformed_record = transformer.transform(
66 record,
67 schema,
68 stream_metadata)
69 write_record(stream_name, transformed_record, time_extracted=time_extracted)
70 counter.increment()
71 return counter.value
72
73
74 def sync_stream(stream_name, selected_streams, catalog, state, records):
75 # Should sheets_loaded be synced?
76 if stream_name in selected_streams:
77 LOGGER.info('STARTED Syncing {}'.format(stream_name))
78 update_currently_syncing(state, stream_name)
79 write_schema(catalog, stream_name)
80 record_count = process_records(
81 catalog=catalog,
82 stream_name=stream_name,
83 records=records,
84 time_extracted=utils.now())
85 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
86 update_currently_syncing(state, None)
87
88
89 # Currently syncing sets the stream currently being delivered in the state.
90 # If the integration is interrupted, this state property is used to identify
91 # the starting point to continue from.
92 # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
93 def update_currently_syncing(state, stream_name):
94 if (stream_name is None) and ('currently_syncing' in state):
95 del state['currently_syncing']
96 else:
97 singer.set_currently_syncing(state, stream_name)
98 singer.write_state(state)
99
100
101 # List selected fields from stream catalog
102 def get_selected_fields(catalog, stream_name):
103 stream = catalog.get_stream(stream_name)
104 mdata = metadata.to_map(stream.metadata)
105 mdata_list = singer.metadata.to_list(mdata)
106 selected_fields = []
107 for entry in mdata_list:
108 field = None
109 try:
110 field = entry['breadcrumb'][1]
111 if entry.get('metadata', {}).get('selected', False):
112 selected_fields.append(field)
113 except IndexError:
114 pass
115 return selected_fields
116
117
118 def get_data(stream_name,
119 endpoint_config,
120 client,
121 spreadsheet_id,
122 range_rows=None):
123 if not range_rows:
124 range_rows = ''
125 path = endpoint_config.get('path', stream_name).replace(
126 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
127 '{range_rows}', range_rows)
128 params = endpoint_config.get('params', {})
129 api = endpoint_config.get('api', 'sheets')
130 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
131 '{sheet_title}', stream_name)
132 data = {}
133 time_extracted = utils.now()
134 data = client.get(
135 path=path,
136 api=api,
137 params=querystring,
138 endpoint=stream_name)
139 return data, time_extracted
140
141
142 # Tranform file_metadata: remove nodes from lastModifyingUser, format as array
143 def transform_file_metadata(file_metadata):
144 # Convert to dict
145 file_metadata_tf = json.loads(json.dumps(file_metadata))
146 # Remove keys
147 if file_metadata_tf.get('lastModifyingUser'):
148 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
149 file_metadata_tf['lastModifyingUser'].pop('me', None)
150 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
151 # Add record to an array of 1
152 file_metadata_arr = []
153 file_metadata_arr.append(file_metadata_tf)
154 return file_metadata_arr
155
156
157 # Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
158 def transform_spreadsheet_metadata(spreadsheet_metadata):
159 # Convert to dict
160 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
161 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
162 if spreadsheet_metadata_tf.get('properties'):
163 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
164 spreadsheet_metadata_tf.pop('sheets', None)
165 # Add record to an array of 1
166 spreadsheet_metadata_arr = []
167 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
168 return spreadsheet_metadata_arr
169
170
171 # Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
172 def transform_sheet_metadata(spreadsheet_id, sheet, columns):
173 # Convert to properties to dict
174 sheet_metadata = sheet.get('properties')
175 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
176 sheet_id = sheet_metadata_tf.get('sheetId')
177 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
178 spreadsheet_id, sheet_id)
179 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
180 sheet_metadata_tf['sheetUrl'] = sheet_url
181 sheet_metadata_tf['columns'] = columns
182 return sheet_metadata_tf
183
184
185 # Convert Excel Date Serial Number (excel_date_sn) to datetime string
186 # timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
187 def excel_to_dttm_str(excel_date_sn, timezone_str=None):
188 if not timezone_str:
189 timezone_str = 'UTC'
190 tz = pytz.timezone(timezone_str)
191 sec_per_day = 86400
192 excel_epoch = 25569 # 1970-01-01T00:00:00Z
193 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
194 epoch_dttm = datetime(1970, 1, 1)
195 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
196 utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc)
197 utc_dttm_str = strftime(utc_dttm)
198 return utc_dttm_str
199
200
201 # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
202 # Convert from array of values to JSON with column names as keys
203 def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
204 sheet_data_tf = []
205 is_last_row = False
206 row_num = from_row
207 # Create sorted list of columns based on columnIndex
208 cols = sorted(columns, key = lambda i: i['columnIndex'])
209
210 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
211 for row in sheet_data_rows:
212 # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
213 if row == []:
214 is_last_row = True
215 return sheet_data_tf, row_num - 1, is_last_row
216 sheet_data_row_tf = {}
217 # Add spreadsheet_id, sheet_id, and row
218 sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
219 sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
220 sheet_data_row_tf['__sdc_row'] = row_num
221 col_num = 1
222 for value in row:
223 # Select column metadata based on column index
224 col = cols[col_num - 1]
225 col_skipped = col.get('columnSkipped')
226 if not col_skipped:
227 col_name = col.get('columnName')
228 col_type = col.get('columnType')
229 # Convert dates/times from Lotus Notes Serial Numbers
230 if col_type == 'numberType.DATE_TIME':
231 if isinstance(value, int) or isinstance(value, float):
232 col_val = excel_to_dttm_str(value)
233 else:
234 col_val = str(value)
235 elif col_type == 'numberType.DATE':
236 if isinstance(value, int) or isinstance(value, float):
237 col_val = excel_to_dttm_str(value)[:10]
238 else:
239 col_val = str(value)
240 elif col_type == 'numberType.TIME':
241 if isinstance(value, int) or isinstance(value, float):
242 try:
243 total_secs = value * 86400 # seconds in day
244 col_val = str(timedelta(seconds=total_secs))
245 except ValueError:
246 col_val = str(value)
247 else:
248 col_val = str(value)
249 elif col_type == 'numberType':
250 if isinstance(value, int):
251 col_val = int(value)
252 else:
253 try:
254 col_val = float(value)
255 except ValueError:
256 col_val = str(value)
257 elif col_type == 'stringValue':
258 col_val = str(value)
259 elif col_type == 'boolValue':
260 if isinstance(value, bool):
261 col_val = value
262 elif isinstance(value, str):
263 if value.lower() in ('true', 't', 'yes', 'y'):
264 col_val = True
265 elif value.lower() in ('false', 'f', 'no', 'n'):
266 col_val = False
267 else:
268 col_val = str(value)
269 elif isinstance(value, int):
270 if value == 1 or value == -1:
271 col_val = True
272 elif value == 0:
273 col_val = False
274 else:
275 col_val = str(value)
276
277 else:
278 col_val = value
279 sheet_data_row_tf[col_name] = col_val
280 col_num = col_num + 1
281 sheet_data_tf.append(sheet_data_row_tf)
282 row_num = row_num + 1
283 return sheet_data_tf, row_num, is_last_row
284
285
286 def sync(client, config, catalog, state):
287 start_date = config.get('start_date')
288 spreadsheet_id = config.get('spreadsheet_id')
289
290 # Get selected_streams from catalog, based on state last_stream
291 # last_stream = Previous currently synced stream, if the load was interrupted
292 last_stream = singer.get_currently_syncing(state)
293 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
294 selected_streams = []
295 for stream in catalog.get_selected_streams(state):
296 selected_streams.append(stream.stream)
297 LOGGER.info('selected_streams: {}'.format(selected_streams))
298
299 if not selected_streams:
300 return
301
302 # FILE_METADATA
303 file_metadata = {}
304 stream_name = 'file_metadata'
305 file_metadata_config = STREAMS.get(stream_name)
306
307 # GET file_metadata
308 LOGGER.info('GET file_meatadata')
309 file_metadata, time_extracted = get_data(stream_name=stream_name,
310 endpoint_config=file_metadata_config,
311 client=client,
312 spreadsheet_id=spreadsheet_id)
313 # Transform file_metadata
314 LOGGER.info('Transform file_meatadata')
315 file_metadata_tf = transform_file_metadata(file_metadata)
316 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
317
318 # Check if file has changed, if not break (return to __init__)
319 last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
320 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
321 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
322 if this_datetime <= last_datetime:
323 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
324 return 0
325 else:
326 # Sync file_metadata if selected
327 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf)
328 write_bookmark(state, stream_name, strftime(this_datetime))
329
330 # SPREADSHEET_METADATA
331 spreadsheet_metadata = {}
332 stream_name = 'spreadsheet_metadata'
333 spreadsheet_metadata_config = STREAMS.get(stream_name)
334
335 # GET spreadsheet_metadata
336 LOGGER.info('GET spreadsheet_meatadata')
337 spreadsheet_metadata, ss_time_extracted = get_data(
338 stream_name=stream_name,
339 endpoint_config=spreadsheet_metadata_config,
340 client=client,
341 spreadsheet_id=spreadsheet_id)
342
343 # Transform spreadsheet_metadata
344 LOGGER.info('Transform spreadsheet_meatadata')
345 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
346
347 # Sync spreadsheet_metadata if selected
348 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf)
349
350 # SHEET_METADATA and SHEET_DATA
351 sheets = spreadsheet_metadata.get('sheets')
352 sheet_metadata = []
353 sheets_loaded = []
354 sheets_loaded_config = STREAMS['sheets_loaded']
355 if sheets:
356 # Loop thru sheets (worksheet tabs) in spreadsheet
357 for sheet in sheets:
358 sheet_title = sheet.get('properties', {}).get('title')
359 sheet_id = sheet.get('properties', {}).get('sheetId')
360
361 # GET sheet_metadata and columns
362 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
363
364 # Transform sheet_metadata
365 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
366 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
367 sheet_metadata.append(sheet_metadata_tf)
368
369 # SHEET_DATA
370 # Should this worksheet tab be synced?
371 if sheet_title in selected_streams:
372 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
373 update_currently_syncing(state, sheet_title)
374 write_schema(catalog, sheet_title)
375
376 # Determine max range of columns and rows for "paging" through the data
377 sheet_last_col_index = 1
378 sheet_last_col_letter = 'A'
379 for col in columns:
380 col_index = col.get('columnIndex')
381 col_letter = col.get('columnLetter')
382 if col_index > sheet_last_col_index:
383 sheet_last_col_index = col_index
384 sheet_last_col_letter = col_letter
385 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
386
387 # Initialize paging for 1st batch
388 is_last_row = False
389 batch_rows = 200
390 from_row = 2
391 if sheet_max_row < batch_rows:
392 to_row = sheet_max_row
393 else:
394 to_row = batch_rows
395
396 # Loop thru batches (each having 200 rows of data)
397 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
398 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
399
400 # GET sheet_data for a worksheet tab
401 sheet_data, time_extracted = get_data(
402 stream_name=sheet_title,
403 endpoint_config=sheets_loaded_config,
404 client=client,
405 spreadsheet_id=spreadsheet_id,
406 range_rows=range_rows)
407 # Data is returned as a list of arrays, an array of values for each row
408 sheet_data_rows = sheet_data.get('values')
409
410 # Transform batch of rows to JSON with keys for each column
411 sheet_data_tf, row_num, is_last_row = transform_sheet_data(
412 spreadsheet_id=spreadsheet_id,
413 sheet_id=sheet_id,
414 from_row=from_row,
415 columns=columns,
416 sheet_data_rows=sheet_data_rows)
417 if row_num < to_row:
418 is_last_row = True
419
420 # Process records, send batch of records to target
421 record_count = process_records(
422 catalog=catalog,
423 stream_name=sheet_title,
424 records=sheet_data_tf,
425 time_extracted=ss_time_extracted)
426
427 # Update paging from/to_row for next batch
428 from_row = to_row + 1
429 if to_row + batch_rows > sheet_max_row:
430 to_row = sheet_max_row
431 else:
432 to_row = to_row + batch_rows
433
434 # SHEETS_LOADED
435 # Add sheet to sheets_loaded
436 sheet_loaded = {}
437 sheet_loaded['spreadsheetId'] = spreadsheet_id
438 sheet_loaded['sheetId'] = sheet_id
439 sheet_loaded['title'] = sheet_title
440 sheet_loaded['loadDate'] = strftime(utils.now())
441 sheet_loaded['lastRowNumber'] = row_num
442 sheets_loaded.append(sheet_loaded)
443
444 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
445 # This forces hard deletes on the data downstream if fewer records are sent for a sheet.
446 # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167
447 activate_version_message = singer.ActivateVersionMessage(
448 stream=sheet_title,
449 version=int(time.time() * 1000))
450 singer.write_message(activate_version_message)
451
452 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
453 sheet_title, row_num - 1))
454 update_currently_syncing(state, None)
455
456 stream_name = 'sheet_metadata'
457 # Sync sheet_metadata if selected
458 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
459
460 stream_name = 'sheets_loaded'
461 # Sync sheet_metadata if selected
462 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)