]>
Commit | Line | Data |
---|---|---|
89643ba6 JH |
1 | import time |
2 | import math | |
3 | import singer | |
4 | import json | |
5 | from collections import OrderedDict | |
6 | from singer import metrics, metadata, Transformer, utils | |
7 | from singer.utils import strptime_to_utc, strftime | |
89643ba6 JH |
8 | from tap_google_sheets.streams import STREAMS |
9 | from tap_google_sheets.schema import get_sheet_metadata | |
10 | ||
11 | LOGGER = singer.get_logger() | |
12 | ||
13 | ||
14 | def write_schema(catalog, stream_name): | |
15 | stream = catalog.get_stream(stream_name) | |
16 | schema = stream.schema.to_dict() | |
17 | try: | |
18 | singer.write_schema(stream_name, schema, stream.key_properties) | |
19 | except OSError as err: | |
20 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) | |
21 | raise err | |
22 | ||
23 | ||
24 | def write_record(stream_name, record, time_extracted): | |
25 | try: | |
26 | singer.messages.write_record(stream_name, record, time_extracted=time_extracted) | |
27 | except OSError as err: | |
28 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) | |
29 | LOGGER.info('record: {}'.format(record)) | |
30 | raise err | |
31 | ||
32 | ||
33 | def get_bookmark(state, stream, default): | |
34 | if (state is None) or ('bookmarks' not in state): | |
35 | return default | |
36 | return ( | |
37 | state | |
38 | .get('bookmarks', {}) | |
39 | .get(stream, default) | |
40 | ) | |
41 | ||
42 | ||
43 | def write_bookmark(state, stream, value): | |
44 | if 'bookmarks' not in state: | |
45 | state['bookmarks'] = {} | |
46 | state['bookmarks'][stream] = value | |
47 | LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value)) | |
48 | singer.write_state(state) | |
49 | ||
50 | ||
51 | # def transform_datetime(this_dttm): | |
52 | def transform_datetime(this_dttm): | |
53 | with Transformer() as transformer: | |
54 | new_dttm = transformer._transform_datetime(this_dttm) | |
55 | return new_dttm | |
56 | ||
57 | ||
58 | def process_records(catalog, #pylint: disable=too-many-branches | |
59 | stream_name, | |
60 | records, | |
61 | time_extracted, | |
62 | bookmark_field=None, | |
63 | bookmark_type=None, | |
64 | max_bookmark_value=None, | |
65 | last_datetime=None, | |
66 | last_integer=None, | |
67 | parent=None, | |
68 | parent_id=None): | |
69 | stream = catalog.get_stream(stream_name) | |
70 | schema = stream.schema.to_dict() | |
71 | stream_metadata = metadata.to_map(stream.metadata) | |
72 | ||
73 | with metrics.record_counter(stream_name) as counter: | |
74 | for record in records: | |
75 | # If child object, add parent_id to record | |
76 | if parent_id and parent: | |
77 | record[parent + '_id'] = parent_id | |
78 | ||
79 | # Transform record for Singer.io | |
80 | with Transformer() as transformer: | |
81 | transformed_record = transformer.transform( | |
82 | record, | |
83 | schema, | |
84 | stream_metadata) | |
85 | # Reset max_bookmark_value to new value if higher | |
86 | if transformed_record.get(bookmark_field): | |
87 | if max_bookmark_value is None or \ | |
88 | transformed_record[bookmark_field] > transform_datetime(max_bookmark_value): | |
89 | max_bookmark_value = transformed_record[bookmark_field] | |
90 | ||
91 | if bookmark_field and (bookmark_field in transformed_record): | |
92 | if bookmark_type == 'integer': | |
93 | # Keep only records whose bookmark is after the last_integer | |
94 | if transformed_record[bookmark_field] >= last_integer: | |
95 | write_record(stream_name, transformed_record, \ | |
96 | time_extracted=time_extracted) | |
97 | counter.increment() | |
98 | elif bookmark_type == 'datetime': | |
99 | last_dttm = transform_datetime(last_datetime) | |
100 | bookmark_dttm = transform_datetime(transformed_record[bookmark_field]) | |
101 | # Keep only records whose bookmark is after the last_datetime | |
102 | if bookmark_dttm >= last_dttm: | |
103 | write_record(stream_name, transformed_record, \ | |
104 | time_extracted=time_extracted) | |
105 | counter.increment() | |
106 | else: | |
107 | write_record(stream_name, transformed_record, time_extracted=time_extracted) | |
108 | counter.increment() | |
109 | ||
110 | return max_bookmark_value, counter.value | |
111 | ||
112 | ||
113 | # Currently syncing sets the stream currently being delivered in the state. | |
114 | # If the integration is interrupted, this state property is used to identify | |
115 | # the starting point to continue from. | |
116 | # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46 | |
117 | def update_currently_syncing(state, stream_name): | |
118 | if (stream_name is None) and ('currently_syncing' in state): | |
119 | del state['currently_syncing'] | |
120 | else: | |
121 | singer.set_currently_syncing(state, stream_name) | |
122 | singer.write_state(state) | |
123 | ||
124 | ||
125 | # List selected fields from stream catalog | |
126 | def get_selected_fields(catalog, stream_name): | |
127 | stream = catalog.get_stream(stream_name) | |
128 | mdata = metadata.to_map(stream.metadata) | |
129 | mdata_list = singer.metadata.to_list(mdata) | |
130 | selected_fields = [] | |
131 | for entry in mdata_list: | |
132 | field = None | |
133 | try: | |
134 | field = entry['breadcrumb'][1] | |
135 | if entry.get('metadata', {}).get('selected', False): | |
136 | selected_fields.append(field) | |
137 | except IndexError: | |
138 | pass | |
139 | return selected_fields | |
140 | ||
141 | ||
142 | def get_data(stream_name, | |
143 | endpoint_config, | |
144 | client, | |
145 | spreadsheet_id, | |
146 | range_rows=None): | |
147 | if not range_rows: | |
148 | range_rows = '' | |
149 | path = endpoint_config.get('path', stream_name).replace( | |
150 | '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace( | |
151 | '{range_rows}', range_rows) | |
152 | params = endpoint_config.get('params', {}) | |
153 | api = endpoint_config.get('api', 'sheets') | |
154 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( | |
155 | '{sheet_title}', stream_name) | |
156 | data = {} | |
157 | data = client.get( | |
158 | path=path, | |
159 | api=api, | |
160 | params=querystring, | |
161 | endpoint=stream_name) | |
162 | return data | |
163 | ||
164 | ||
165 | def transform_file_metadata(file_metadata): | |
166 | # Convert to dict | |
167 | file_metadata_tf = json.loads(json.dumps(file_metadata)) | |
168 | # Remove keys | |
169 | if file_metadata_tf.get('lastModifyingUser'): | |
170 | file_metadata_tf['lastModifyingUser'].pop('photoLink', None) | |
171 | file_metadata_tf['lastModifyingUser'].pop('me', None) | |
172 | file_metadata_tf['lastModifyingUser'].pop('permissionId', None) | |
173 | # Add record to an array of 1 | |
174 | file_metadata_arr = [] | |
175 | file_metadata_arr.append(file_metadata_tf) | |
176 | return file_metadata_arr | |
177 | ||
178 | ||
179 | def transform_spreadsheet_metadata(spreadsheet_metadata): | |
180 | # Convert to dict | |
181 | spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) | |
182 | # Remove keys | |
183 | if spreadsheet_metadata_tf.get('properties'): | |
184 | spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) | |
185 | spreadsheet_metadata_tf.pop('sheets', None) | |
186 | # Add record to an array of 1 | |
187 | spreadsheet_metadata_arr = [] | |
188 | spreadsheet_metadata_arr.append(spreadsheet_metadata_tf) | |
189 | return spreadsheet_metadata_arr | |
190 | ||
191 | ||
192 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): | |
193 | # Convert to properties to dict | |
194 | sheet_metadata = sheet.get('properties') | |
195 | sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) | |
196 | sheet_id = sheet_metadata_tf.get('sheetId') | |
197 | sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( | |
198 | spreadsheet_id, sheet_id) | |
199 | sheet_metadata_tf['spreadsheetId'] = spreadsheet_id | |
200 | sheet_metadata_tf['sheetUrl'] = sheet_url | |
201 | sheet_metadata_tf['columns'] = columns | |
202 | return sheet_metadata_tf | |
203 | ||
204 | ||
205 | def sync(client, config, catalog, state): | |
206 | start_date = config.get('start_date') | |
207 | spreadsheet_id = config.get('spreadsheet_id') | |
208 | ||
209 | # Get selected_streams from catalog, based on state last_stream | |
210 | # last_stream = Previous currently synced stream, if the load was interrupted | |
211 | last_stream = singer.get_currently_syncing(state) | |
212 | LOGGER.info('last/currently syncing stream: {}'.format(last_stream)) | |
213 | selected_streams = [] | |
214 | for stream in catalog.get_selected_streams(state): | |
215 | selected_streams.append(stream.stream) | |
216 | LOGGER.info('selected_streams: {}'.format(selected_streams)) | |
217 | ||
218 | if not selected_streams: | |
219 | return | |
220 | ||
221 | # Get file_metadata | |
222 | file_metadata = {} | |
223 | file_metadata_config = STREAMS.get('file_metadata') | |
224 | file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id) | |
225 | file_metadata_tf = transform_file_metadata(file_metadata) | |
226 | # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) | |
227 | last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date)) | |
228 | this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) | |
229 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) | |
230 | if this_datetime <= last_datetime: | |
231 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') | |
232 | return 0 | |
233 | ||
234 | # Get spreadsheet_metadata | |
235 | spreadsheet_metadata = {} | |
236 | spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata') | |
237 | spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id) | |
238 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) | |
239 | # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf)) | |
240 | ||
241 | # Get sheet_metadata | |
242 | sheets = spreadsheet_metadata.get('sheets') | |
243 | sheet_metadata = [] | |
244 | sheets_loaded = [] | |
245 | sheets_loaded_config = STREAMS['sheets_loaded'] | |
246 | if sheets: | |
247 | for sheet in sheets: | |
248 | sheet_title = sheet.get('properties', {}).get('title') | |
249 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | |
250 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) | |
251 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) | |
252 | sheet_metadata.append(sheet_metadata_tf) | |
253 | ||
254 | # Determine range of rows and columns for "paging" through batch rows of data | |
255 | sheet_last_col_index = 1 | |
256 | sheet_last_col_letter = 'A' | |
257 | for col in columns: | |
258 | col_index = col.get('columnIndex') | |
259 | col_letter = col.get('columnLetter') | |
260 | if col_index > sheet_last_col_index: | |
261 | sheet_last_col_index = col_index | |
262 | sheet_last_col_letter = col_letter | |
263 | sheet_max_row = sheet.get('gridProperties', {}).get('rowCount') | |
264 | is_empty_row = False | |
265 | batch_rows = 200 | |
266 | from_row = 2 | |
267 | if sheet_max_row < batch_rows: | |
268 | to_row = sheet_max_row | |
269 | else: | |
270 | to_row = batch_rows | |
271 | ||
272 | while not is_empty_row and to_row <= sheet_max_row: | |
273 | range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row) | |
274 | ||
275 | sheet_data = get_data( | |
276 | stream_name=sheet_title, | |
277 | endpoint_config=sheets_loaded_config, | |
278 | client=client, | |
279 | spreadsheet_id=spreadsheet_id, | |
280 | range_rows=range_rows) |