]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blame - tap_google_sheets/sync.py
client.py rate limit, sync.py changes
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
CommitLineData
89643ba6
JH
1import time
2import math
3import singer
4import json
da690bda
JH
5import pytz
6from datetime import datetime, timedelta
89643ba6
JH
7from collections import OrderedDict
8from singer import metrics, metadata, Transformer, utils
9from singer.utils import strptime_to_utc, strftime
89643ba6
JH
10from tap_google_sheets.streams import STREAMS
11from tap_google_sheets.schema import get_sheet_metadata
12
13LOGGER = singer.get_logger()
14
15
16def write_schema(catalog, stream_name):
17 stream = catalog.get_stream(stream_name)
18 schema = stream.schema.to_dict()
19 try:
20 singer.write_schema(stream_name, schema, stream.key_properties)
21 except OSError as err:
22 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
23 raise err
24
25
26def write_record(stream_name, record, time_extracted):
27 try:
28 singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
29 except OSError as err:
30 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
31 LOGGER.info('record: {}'.format(record))
32 raise err
33
34
35def get_bookmark(state, stream, default):
36 if (state is None) or ('bookmarks' not in state):
37 return default
38 return (
39 state
40 .get('bookmarks', {})
41 .get(stream, default)
42 )
43
44
45def write_bookmark(state, stream, value):
46 if 'bookmarks' not in state:
47 state['bookmarks'] = {}
48 state['bookmarks'][stream] = value
49 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
50 singer.write_state(state)
51
52
da690bda
JH
53# Transform/validate batch of records w/ schema and sent to target
54def process_records(catalog,
89643ba6
JH
55 stream_name,
56 records,
da690bda 57 time_extracted):
89643ba6
JH
58 stream = catalog.get_stream(stream_name)
59 schema = stream.schema.to_dict()
60 stream_metadata = metadata.to_map(stream.metadata)
89643ba6
JH
61 with metrics.record_counter(stream_name) as counter:
62 for record in records:
89643ba6
JH
63 # Transform record for Singer.io
64 with Transformer() as transformer:
65 transformed_record = transformer.transform(
66 record,
67 schema,
68 stream_metadata)
da690bda
JH
69 write_record(stream_name, transformed_record, time_extracted=time_extracted)
70 counter.increment()
71 return counter.value
72
73
74def sync_stream(stream_name, selected_streams, catalog, state, records):
75 # Should sheets_loaded be synced?
76 if stream_name in selected_streams:
77 LOGGER.info('STARTED Syncing {}'.format(stream_name))
78 update_currently_syncing(state, stream_name)
79 write_schema(catalog, stream_name)
80 record_count = process_records(
81 catalog=catalog,
82 stream_name=stream_name,
83 records=records,
84 time_extracted=utils.now())
85 LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
86 update_currently_syncing(state, None)
89643ba6
JH
87
88
89# Currently syncing sets the stream currently being delivered in the state.
90# If the integration is interrupted, this state property is used to identify
91# the starting point to continue from.
92# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
93def update_currently_syncing(state, stream_name):
94 if (stream_name is None) and ('currently_syncing' in state):
95 del state['currently_syncing']
96 else:
97 singer.set_currently_syncing(state, stream_name)
98 singer.write_state(state)
99
100
101# List selected fields from stream catalog
102def get_selected_fields(catalog, stream_name):
103 stream = catalog.get_stream(stream_name)
104 mdata = metadata.to_map(stream.metadata)
105 mdata_list = singer.metadata.to_list(mdata)
106 selected_fields = []
107 for entry in mdata_list:
108 field = None
109 try:
110 field = entry['breadcrumb'][1]
111 if entry.get('metadata', {}).get('selected', False):
112 selected_fields.append(field)
113 except IndexError:
114 pass
115 return selected_fields
116
117
118def get_data(stream_name,
119 endpoint_config,
120 client,
121 spreadsheet_id,
122 range_rows=None):
123 if not range_rows:
124 range_rows = ''
125 path = endpoint_config.get('path', stream_name).replace(
126 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
127 '{range_rows}', range_rows)
128 params = endpoint_config.get('params', {})
129 api = endpoint_config.get('api', 'sheets')
130 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
131 '{sheet_title}', stream_name)
132 data = {}
da690bda 133 time_extracted = utils.now()
89643ba6
JH
134 data = client.get(
135 path=path,
136 api=api,
137 params=querystring,
138 endpoint=stream_name)
da690bda 139 return data, time_extracted
89643ba6
JH
140
141
da690bda 142# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
89643ba6
JH
143def transform_file_metadata(file_metadata):
144 # Convert to dict
145 file_metadata_tf = json.loads(json.dumps(file_metadata))
146 # Remove keys
147 if file_metadata_tf.get('lastModifyingUser'):
148 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
149 file_metadata_tf['lastModifyingUser'].pop('me', None)
150 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
151 # Add record to an array of 1
152 file_metadata_arr = []
153 file_metadata_arr.append(file_metadata_tf)
154 return file_metadata_arr
155
156
da690bda 157# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
89643ba6
JH
158def transform_spreadsheet_metadata(spreadsheet_metadata):
159 # Convert to dict
160 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
da690bda 161 # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
89643ba6
JH
162 if spreadsheet_metadata_tf.get('properties'):
163 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
164 spreadsheet_metadata_tf.pop('sheets', None)
165 # Add record to an array of 1
166 spreadsheet_metadata_arr = []
167 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
168 return spreadsheet_metadata_arr
169
170
da690bda 171# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
89643ba6
JH
172def transform_sheet_metadata(spreadsheet_id, sheet, columns):
173 # Convert to properties to dict
174 sheet_metadata = sheet.get('properties')
175 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
176 sheet_id = sheet_metadata_tf.get('sheetId')
177 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
178 spreadsheet_id, sheet_id)
179 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
180 sheet_metadata_tf['sheetUrl'] = sheet_url
181 sheet_metadata_tf['columns'] = columns
182 return sheet_metadata_tf
183
184
da690bda
JH
185# Convert Excel Date Serial Number (excel_date_sn) to datetime string
186# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
187def excel_to_dttm_str(excel_date_sn, timezone_str=None):
188 if not timezone_str:
189 timezone_str = 'UTC'
190 tz = pytz.timezone(timezone_str)
191 sec_per_day = 86400
192 excel_epoch = 25569 # 1970-01-01T00:00:00Z
193 epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
194 epoch_dttm = datetime(1970, 1, 1)
195 excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
196 utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc)
197 utc_dttm_str = strftime(utc_dttm)
198 return utc_dttm_str
199
200
201# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
202# Convert from array of values to JSON with column names as keys
203def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
204 sheet_data_tf = []
205 is_last_row = False
206 row_num = from_row
207 # Create sorted list of columns based on columnIndex
208 cols = sorted(columns, key = lambda i: i['columnIndex'])
209
210 # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
211 for row in sheet_data_rows:
212 # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
213 if row == []:
214 is_last_row = True
215 return sheet_data_tf, row_num - 1, is_last_row
216 sheet_data_row_tf = {}
217 # Add spreadsheet_id, sheet_id, and row
218 sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
219 sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
220 sheet_data_row_tf['__sdc_row'] = row_num
221 col_num = 1
222 for value in row:
223 # Select column metadata based on column index
224 col = cols[col_num - 1]
225 col_skipped = col.get('columnSkipped')
226 if not col_skipped:
227 col_name = col.get('columnName')
228 col_type = col.get('columnType')
229 # Convert dates/times from Lotus Notes Serial Numbers
230 if col_type == 'numberType.DATE_TIME':
231 if isinstance(value, int) or isinstance(value, float):
232 col_val = excel_to_dttm_str(value)
233 else:
234 col_val = str(value)
235 elif col_type == 'numberType.DATE':
236 if isinstance(value, int) or isinstance(value, float):
237 col_val = excel_to_dttm_str(value)[:10]
238 else:
239 col_val = str(value)
240 elif col_type == 'numberType.TIME':
241 if isinstance(value, int) or isinstance(value, float):
242 try:
243 total_secs = value * 86400 # seconds in day
244 col_val = str(timedelta(seconds=total_secs))
245 except ValueError:
246 col_val = str(value)
247 else:
248 col_val = str(value)
249 elif col_type == 'numberType':
250 if isinstance(value, int):
251 col_val = int(value)
252 else:
253 try:
254 col_val = float(value)
255 except ValueError:
256 col_val = str(value)
257 elif col_type == 'stringValue':
258 col_val = str(value)
259 elif col_type == 'boolValue':
260 if isinstance(value, bool):
261 col_val = value
262 elif isinstance(value, str):
263 if value.lower() in ('true', 't', 'yes', 'y'):
264 col_val = True
265 elif value.lower() in ('false', 'f', 'no', 'n'):
266 col_val = False
267 else:
268 col_val = str(value)
269 elif isinstance(value, int):
270 if value == 1 or value == -1:
271 col_val = True
272 elif value == 0:
273 col_val = False
274 else:
275 col_val = str(value)
276
277 else:
278 col_val = value
279 sheet_data_row_tf[col_name] = col_val
280 col_num = col_num + 1
281 sheet_data_tf.append(sheet_data_row_tf)
282 row_num = row_num + 1
283 return sheet_data_tf, row_num, is_last_row
284
285
89643ba6
JH
286def sync(client, config, catalog, state):
287 start_date = config.get('start_date')
288 spreadsheet_id = config.get('spreadsheet_id')
289
290 # Get selected_streams from catalog, based on state last_stream
291 # last_stream = Previous currently synced stream, if the load was interrupted
292 last_stream = singer.get_currently_syncing(state)
293 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
294 selected_streams = []
295 for stream in catalog.get_selected_streams(state):
296 selected_streams.append(stream.stream)
297 LOGGER.info('selected_streams: {}'.format(selected_streams))
298
299 if not selected_streams:
300 return
301
da690bda 302 # FILE_METADATA
89643ba6 303 file_metadata = {}
da690bda
JH
304 stream_name = 'file_metadata'
305 file_metadata_config = STREAMS.get(stream_name)
306
307 # GET file_metadata
308 LOGGER.info('GET file_meatadata')
309 file_metadata, time_extracted = get_data(stream_name=stream_name,
310 endpoint_config=file_metadata_config,
311 client=client,
312 spreadsheet_id=spreadsheet_id)
313 # Transform file_metadata
314 LOGGER.info('Transform file_meatadata')
89643ba6
JH
315 file_metadata_tf = transform_file_metadata(file_metadata)
316 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
da690bda
JH
317
318 # Check if file has changed, if not break (return to __init__)
319 last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
89643ba6
JH
320 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
321 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
322 if this_datetime <= last_datetime:
323 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
324 return 0
da690bda
JH
325 else:
326 # Sync file_metadata if selected
327 sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf)
328 write_bookmark(state, stream_name, strftime(this_datetime))
329
330 # SPREADSHEET_METADATA
89643ba6 331 spreadsheet_metadata = {}
da690bda
JH
332 stream_name = 'spreadsheet_metadata'
333 spreadsheet_metadata_config = STREAMS.get(stream_name)
334
335 # GET spreadsheet_metadata
336 LOGGER.info('GET spreadsheet_meatadata')
337 spreadsheet_metadata, ss_time_extracted = get_data(
338 stream_name=stream_name,
339 endpoint_config=spreadsheet_metadata_config,
340 client=client,
341 spreadsheet_id=spreadsheet_id)
342
343 # Transform spreadsheet_metadata
344 LOGGER.info('Transform spreadsheet_meatadata')
89643ba6 345 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
89643ba6 346
da690bda
JH
347 # Sync spreadsheet_metadata if selected
348 sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf)
349
350 # SHEET_METADATA and SHEET_DATA
89643ba6
JH
351 sheets = spreadsheet_metadata.get('sheets')
352 sheet_metadata = []
353 sheets_loaded = []
354 sheets_loaded_config = STREAMS['sheets_loaded']
355 if sheets:
da690bda 356 # Loop thru sheets (worksheet tabs) in spreadsheet
89643ba6
JH
357 for sheet in sheets:
358 sheet_title = sheet.get('properties', {}).get('title')
da690bda
JH
359 sheet_id = sheet.get('properties', {}).get('sheetId')
360
361 # GET sheet_metadata and columns
89643ba6 362 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
da690bda
JH
363
364 # Transform sheet_metadata
89643ba6
JH
365 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
366 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
367 sheet_metadata.append(sheet_metadata_tf)
368
da690bda
JH
369 # SHEET_DATA
370 # Should this worksheet tab be synced?
371 if sheet_title in selected_streams:
372 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
373 update_currently_syncing(state, sheet_title)
374 write_schema(catalog, sheet_title)
375
376 # Determine max range of columns and rows for "paging" through the data
377 sheet_last_col_index = 1
378 sheet_last_col_letter = 'A'
379 for col in columns:
380 col_index = col.get('columnIndex')
381 col_letter = col.get('columnLetter')
382 if col_index > sheet_last_col_index:
383 sheet_last_col_index = col_index
384 sheet_last_col_letter = col_letter
385 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
386
387 # Initialize paging for 1st batch
388 is_last_row = False
389 batch_rows = 200
390 from_row = 2
391 if sheet_max_row < batch_rows:
392 to_row = sheet_max_row
393 else:
394 to_row = batch_rows
395
396 # Loop thru batches (each having 200 rows of data)
397 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
398 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
399
400 # GET sheet_data for a worksheet tab
401 sheet_data, time_extracted = get_data(
402 stream_name=sheet_title,
403 endpoint_config=sheets_loaded_config,
404 client=client,
405 spreadsheet_id=spreadsheet_id,
406 range_rows=range_rows)
407 # Data is returned as a list of arrays, an array of values for each row
408 sheet_data_rows = sheet_data.get('values')
409
410 # Transform batch of rows to JSON with keys for each column
411 sheet_data_tf, row_num, is_last_row = transform_sheet_data(
412 spreadsheet_id=spreadsheet_id,
413 sheet_id=sheet_id,
414 from_row=from_row,
415 columns=columns,
416 sheet_data_rows=sheet_data_rows)
417 if row_num < to_row:
418 is_last_row = True
419
420 # Process records, send batch of records to target
421 record_count = process_records(
422 catalog=catalog,
423 stream_name=sheet_title,
424 records=sheet_data_tf,
425 time_extracted=ss_time_extracted)
426
427 # Update paging from/to_row for next batch
428 from_row = to_row + 1
429 if to_row + batch_rows > sheet_max_row:
430 to_row = sheet_max_row
431 else:
432 to_row = to_row + batch_rows
433
434 # SHEETS_LOADED
435 # Add sheet to sheets_loaded
436 sheet_loaded = {}
437 sheet_loaded['spreadsheetId'] = spreadsheet_id
438 sheet_loaded['sheetId'] = sheet_id
439 sheet_loaded['title'] = sheet_title
440 sheet_loaded['loadDate'] = strftime(utils.now())
441 sheet_loaded['lastRowNumber'] = row_num
442 sheets_loaded.append(sheet_loaded)
443
444 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
445 # This forces hard deletes on the data downstream if fewer records are sent for a sheet.
446 # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167
447 activate_version_message = singer.ActivateVersionMessage(
448 stream=sheet_title,
449 version=int(time.time() * 1000))
450 singer.write_message(activate_version_message)
451
452 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
453 sheet_title, row_num - 1))
454 update_currently_syncing(state, None)
455
456 stream_name = 'sheet_metadata'
457 # Sync sheet_metadata if selected
458 sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
459
460 stream_name = 'sheets_loaded'
461 # Sync sheet_metadata if selected
462 sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)