]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blame - tap_google_sheets/sync.py
Remove transform import
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
CommitLineData
89643ba6
JH
1import time
2import math
3import singer
4import json
5from collections import OrderedDict
6from singer import metrics, metadata, Transformer, utils
7from singer.utils import strptime_to_utc, strftime
89643ba6
JH
8from tap_google_sheets.streams import STREAMS
9from tap_google_sheets.schema import get_sheet_metadata
10
11LOGGER = singer.get_logger()
12
13
14def write_schema(catalog, stream_name):
15 stream = catalog.get_stream(stream_name)
16 schema = stream.schema.to_dict()
17 try:
18 singer.write_schema(stream_name, schema, stream.key_properties)
19 except OSError as err:
20 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
21 raise err
22
23
24def write_record(stream_name, record, time_extracted):
25 try:
26 singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
27 except OSError as err:
28 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
29 LOGGER.info('record: {}'.format(record))
30 raise err
31
32
33def get_bookmark(state, stream, default):
34 if (state is None) or ('bookmarks' not in state):
35 return default
36 return (
37 state
38 .get('bookmarks', {})
39 .get(stream, default)
40 )
41
42
43def write_bookmark(state, stream, value):
44 if 'bookmarks' not in state:
45 state['bookmarks'] = {}
46 state['bookmarks'][stream] = value
47 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
48 singer.write_state(state)
49
50
51# def transform_datetime(this_dttm):
52def transform_datetime(this_dttm):
53 with Transformer() as transformer:
54 new_dttm = transformer._transform_datetime(this_dttm)
55 return new_dttm
56
57
58def process_records(catalog, #pylint: disable=too-many-branches
59 stream_name,
60 records,
61 time_extracted,
62 bookmark_field=None,
63 bookmark_type=None,
64 max_bookmark_value=None,
65 last_datetime=None,
66 last_integer=None,
67 parent=None,
68 parent_id=None):
69 stream = catalog.get_stream(stream_name)
70 schema = stream.schema.to_dict()
71 stream_metadata = metadata.to_map(stream.metadata)
72
73 with metrics.record_counter(stream_name) as counter:
74 for record in records:
75 # If child object, add parent_id to record
76 if parent_id and parent:
77 record[parent + '_id'] = parent_id
78
79 # Transform record for Singer.io
80 with Transformer() as transformer:
81 transformed_record = transformer.transform(
82 record,
83 schema,
84 stream_metadata)
85 # Reset max_bookmark_value to new value if higher
86 if transformed_record.get(bookmark_field):
87 if max_bookmark_value is None or \
88 transformed_record[bookmark_field] > transform_datetime(max_bookmark_value):
89 max_bookmark_value = transformed_record[bookmark_field]
90
91 if bookmark_field and (bookmark_field in transformed_record):
92 if bookmark_type == 'integer':
93 # Keep only records whose bookmark is after the last_integer
94 if transformed_record[bookmark_field] >= last_integer:
95 write_record(stream_name, transformed_record, \
96 time_extracted=time_extracted)
97 counter.increment()
98 elif bookmark_type == 'datetime':
99 last_dttm = transform_datetime(last_datetime)
100 bookmark_dttm = transform_datetime(transformed_record[bookmark_field])
101 # Keep only records whose bookmark is after the last_datetime
102 if bookmark_dttm >= last_dttm:
103 write_record(stream_name, transformed_record, \
104 time_extracted=time_extracted)
105 counter.increment()
106 else:
107 write_record(stream_name, transformed_record, time_extracted=time_extracted)
108 counter.increment()
109
110 return max_bookmark_value, counter.value
111
112
113# Currently syncing sets the stream currently being delivered in the state.
114# If the integration is interrupted, this state property is used to identify
115# the starting point to continue from.
116# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
117def update_currently_syncing(state, stream_name):
118 if (stream_name is None) and ('currently_syncing' in state):
119 del state['currently_syncing']
120 else:
121 singer.set_currently_syncing(state, stream_name)
122 singer.write_state(state)
123
124
125# List selected fields from stream catalog
126def get_selected_fields(catalog, stream_name):
127 stream = catalog.get_stream(stream_name)
128 mdata = metadata.to_map(stream.metadata)
129 mdata_list = singer.metadata.to_list(mdata)
130 selected_fields = []
131 for entry in mdata_list:
132 field = None
133 try:
134 field = entry['breadcrumb'][1]
135 if entry.get('metadata', {}).get('selected', False):
136 selected_fields.append(field)
137 except IndexError:
138 pass
139 return selected_fields
140
141
142def get_data(stream_name,
143 endpoint_config,
144 client,
145 spreadsheet_id,
146 range_rows=None):
147 if not range_rows:
148 range_rows = ''
149 path = endpoint_config.get('path', stream_name).replace(
150 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
151 '{range_rows}', range_rows)
152 params = endpoint_config.get('params', {})
153 api = endpoint_config.get('api', 'sheets')
154 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
155 '{sheet_title}', stream_name)
156 data = {}
157 data = client.get(
158 path=path,
159 api=api,
160 params=querystring,
161 endpoint=stream_name)
162 return data
163
164
165def transform_file_metadata(file_metadata):
166 # Convert to dict
167 file_metadata_tf = json.loads(json.dumps(file_metadata))
168 # Remove keys
169 if file_metadata_tf.get('lastModifyingUser'):
170 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
171 file_metadata_tf['lastModifyingUser'].pop('me', None)
172 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
173 # Add record to an array of 1
174 file_metadata_arr = []
175 file_metadata_arr.append(file_metadata_tf)
176 return file_metadata_arr
177
178
179def transform_spreadsheet_metadata(spreadsheet_metadata):
180 # Convert to dict
181 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
182 # Remove keys
183 if spreadsheet_metadata_tf.get('properties'):
184 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
185 spreadsheet_metadata_tf.pop('sheets', None)
186 # Add record to an array of 1
187 spreadsheet_metadata_arr = []
188 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
189 return spreadsheet_metadata_arr
190
191
192def transform_sheet_metadata(spreadsheet_id, sheet, columns):
193 # Convert to properties to dict
194 sheet_metadata = sheet.get('properties')
195 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
196 sheet_id = sheet_metadata_tf.get('sheetId')
197 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
198 spreadsheet_id, sheet_id)
199 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
200 sheet_metadata_tf['sheetUrl'] = sheet_url
201 sheet_metadata_tf['columns'] = columns
202 return sheet_metadata_tf
203
204
205def sync(client, config, catalog, state):
206 start_date = config.get('start_date')
207 spreadsheet_id = config.get('spreadsheet_id')
208
209 # Get selected_streams from catalog, based on state last_stream
210 # last_stream = Previous currently synced stream, if the load was interrupted
211 last_stream = singer.get_currently_syncing(state)
212 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
213 selected_streams = []
214 for stream in catalog.get_selected_streams(state):
215 selected_streams.append(stream.stream)
216 LOGGER.info('selected_streams: {}'.format(selected_streams))
217
218 if not selected_streams:
219 return
220
221 # Get file_metadata
222 file_metadata = {}
223 file_metadata_config = STREAMS.get('file_metadata')
224 file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id)
225 file_metadata_tf = transform_file_metadata(file_metadata)
226 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
227 last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date))
228 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
229 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
230 if this_datetime <= last_datetime:
231 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
232 return 0
233
234 # Get spreadsheet_metadata
235 spreadsheet_metadata = {}
236 spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata')
237 spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id)
238 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
239 # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf))
240
241 # Get sheet_metadata
242 sheets = spreadsheet_metadata.get('sheets')
243 sheet_metadata = []
244 sheets_loaded = []
245 sheets_loaded_config = STREAMS['sheets_loaded']
246 if sheets:
247 for sheet in sheets:
248 sheet_title = sheet.get('properties', {}).get('title')
249 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
250 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
251 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
252 sheet_metadata.append(sheet_metadata_tf)
253
254 # Determine range of rows and columns for "paging" through batch rows of data
255 sheet_last_col_index = 1
256 sheet_last_col_letter = 'A'
257 for col in columns:
258 col_index = col.get('columnIndex')
259 col_letter = col.get('columnLetter')
260 if col_index > sheet_last_col_index:
261 sheet_last_col_index = col_index
262 sheet_last_col_letter = col_letter
263 sheet_max_row = sheet.get('gridProperties', {}).get('rowCount')
264 is_empty_row = False
265 batch_rows = 200
266 from_row = 2
267 if sheet_max_row < batch_rows:
268 to_row = sheet_max_row
269 else:
270 to_row = batch_rows
271
272 while not is_empty_row and to_row <= sheet_max_row:
273 range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row)
274
275 sheet_data = get_data(
276 stream_name=sheet_title,
277 endpoint_config=sheets_loaded_config,
278 client=client,
279 spreadsheet_id=spreadsheet_id,
280 range_rows=range_rows)