diff options
author | Jeff Huth <jeff.huth@bytecode.io> | 2019-11-13 17:03:56 -0800 |
---|---|---|
committer | Jeff Huth <jeff.huth@bytecode.io> | 2019-11-13 17:03:56 -0800 |
commit | 89643ba6fa98db82efd3246805ef801a8bfb5c81 (patch) | |
tree | 739027b4e827def2db81631c9d6ed58ec2b97809 /tap_google_sheets/sync.py | |
parent | 5f8005471d3affaaf23489df93a58ca64c3da3ca (diff) | |
download | tap-google-sheets-89643ba6fa98db82efd3246805ef801a8bfb5c81.tar.gz tap-google-sheets-89643ba6fa98db82efd3246805ef801a8bfb5c81.tar.zst tap-google-sheets-89643ba6fa98db82efd3246805ef801a8bfb5c81.zip |
Initial commit
Discovery mode works. Still working on normal sync.
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r-- | tap_google_sheets/sync.py | 281 |
1 files changed, 281 insertions, 0 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py new file mode 100644 index 0000000..a8b02d0 --- /dev/null +++ b/tap_google_sheets/sync.py | |||
@@ -0,0 +1,281 @@ | |||
1 | import time | ||
2 | import math | ||
3 | import singer | ||
4 | import json | ||
5 | from collections import OrderedDict | ||
6 | from singer import metrics, metadata, Transformer, utils | ||
7 | from singer.utils import strptime_to_utc, strftime | ||
8 | from tap_google_sheets.transform import transform_json | ||
9 | from tap_google_sheets.streams import STREAMS | ||
10 | from tap_google_sheets.schema import get_sheet_metadata | ||
11 | |||
12 | LOGGER = singer.get_logger() | ||
13 | |||
14 | |||
15 | def write_schema(catalog, stream_name): | ||
16 | stream = catalog.get_stream(stream_name) | ||
17 | schema = stream.schema.to_dict() | ||
18 | try: | ||
19 | singer.write_schema(stream_name, schema, stream.key_properties) | ||
20 | except OSError as err: | ||
21 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) | ||
22 | raise err | ||
23 | |||
24 | |||
25 | def write_record(stream_name, record, time_extracted): | ||
26 | try: | ||
27 | singer.messages.write_record(stream_name, record, time_extracted=time_extracted) | ||
28 | except OSError as err: | ||
29 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) | ||
30 | LOGGER.info('record: {}'.format(record)) | ||
31 | raise err | ||
32 | |||
33 | |||
34 | def get_bookmark(state, stream, default): | ||
35 | if (state is None) or ('bookmarks' not in state): | ||
36 | return default | ||
37 | return ( | ||
38 | state | ||
39 | .get('bookmarks', {}) | ||
40 | .get(stream, default) | ||
41 | ) | ||
42 | |||
43 | |||
44 | def write_bookmark(state, stream, value): | ||
45 | if 'bookmarks' not in state: | ||
46 | state['bookmarks'] = {} | ||
47 | state['bookmarks'][stream] = value | ||
48 | LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value)) | ||
49 | singer.write_state(state) | ||
50 | |||
51 | |||
52 | # def transform_datetime(this_dttm): | ||
53 | def transform_datetime(this_dttm): | ||
54 | with Transformer() as transformer: | ||
55 | new_dttm = transformer._transform_datetime(this_dttm) | ||
56 | return new_dttm | ||
57 | |||
58 | |||
59 | def process_records(catalog, #pylint: disable=too-many-branches | ||
60 | stream_name, | ||
61 | records, | ||
62 | time_extracted, | ||
63 | bookmark_field=None, | ||
64 | bookmark_type=None, | ||
65 | max_bookmark_value=None, | ||
66 | last_datetime=None, | ||
67 | last_integer=None, | ||
68 | parent=None, | ||
69 | parent_id=None): | ||
70 | stream = catalog.get_stream(stream_name) | ||
71 | schema = stream.schema.to_dict() | ||
72 | stream_metadata = metadata.to_map(stream.metadata) | ||
73 | |||
74 | with metrics.record_counter(stream_name) as counter: | ||
75 | for record in records: | ||
76 | # If child object, add parent_id to record | ||
77 | if parent_id and parent: | ||
78 | record[parent + '_id'] = parent_id | ||
79 | |||
80 | # Transform record for Singer.io | ||
81 | with Transformer() as transformer: | ||
82 | transformed_record = transformer.transform( | ||
83 | record, | ||
84 | schema, | ||
85 | stream_metadata) | ||
86 | # Reset max_bookmark_value to new value if higher | ||
87 | if transformed_record.get(bookmark_field): | ||
88 | if max_bookmark_value is None or \ | ||
89 | transformed_record[bookmark_field] > transform_datetime(max_bookmark_value): | ||
90 | max_bookmark_value = transformed_record[bookmark_field] | ||
91 | |||
92 | if bookmark_field and (bookmark_field in transformed_record): | ||
93 | if bookmark_type == 'integer': | ||
94 | # Keep only records whose bookmark is after the last_integer | ||
95 | if transformed_record[bookmark_field] >= last_integer: | ||
96 | write_record(stream_name, transformed_record, \ | ||
97 | time_extracted=time_extracted) | ||
98 | counter.increment() | ||
99 | elif bookmark_type == 'datetime': | ||
100 | last_dttm = transform_datetime(last_datetime) | ||
101 | bookmark_dttm = transform_datetime(transformed_record[bookmark_field]) | ||
102 | # Keep only records whose bookmark is after the last_datetime | ||
103 | if bookmark_dttm >= last_dttm: | ||
104 | write_record(stream_name, transformed_record, \ | ||
105 | time_extracted=time_extracted) | ||
106 | counter.increment() | ||
107 | else: | ||
108 | write_record(stream_name, transformed_record, time_extracted=time_extracted) | ||
109 | counter.increment() | ||
110 | |||
111 | return max_bookmark_value, counter.value | ||
112 | |||
113 | |||
114 | # Currently syncing sets the stream currently being delivered in the state. | ||
115 | # If the integration is interrupted, this state property is used to identify | ||
116 | # the starting point to continue from. | ||
117 | # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46 | ||
118 | def update_currently_syncing(state, stream_name): | ||
119 | if (stream_name is None) and ('currently_syncing' in state): | ||
120 | del state['currently_syncing'] | ||
121 | else: | ||
122 | singer.set_currently_syncing(state, stream_name) | ||
123 | singer.write_state(state) | ||
124 | |||
125 | |||
126 | # List selected fields from stream catalog | ||
127 | def get_selected_fields(catalog, stream_name): | ||
128 | stream = catalog.get_stream(stream_name) | ||
129 | mdata = metadata.to_map(stream.metadata) | ||
130 | mdata_list = singer.metadata.to_list(mdata) | ||
131 | selected_fields = [] | ||
132 | for entry in mdata_list: | ||
133 | field = None | ||
134 | try: | ||
135 | field = entry['breadcrumb'][1] | ||
136 | if entry.get('metadata', {}).get('selected', False): | ||
137 | selected_fields.append(field) | ||
138 | except IndexError: | ||
139 | pass | ||
140 | return selected_fields | ||
141 | |||
142 | |||
143 | def get_data(stream_name, | ||
144 | endpoint_config, | ||
145 | client, | ||
146 | spreadsheet_id, | ||
147 | range_rows=None): | ||
148 | if not range_rows: | ||
149 | range_rows = '' | ||
150 | path = endpoint_config.get('path', stream_name).replace( | ||
151 | '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace( | ||
152 | '{range_rows}', range_rows) | ||
153 | params = endpoint_config.get('params', {}) | ||
154 | api = endpoint_config.get('api', 'sheets') | ||
155 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( | ||
156 | '{sheet_title}', stream_name) | ||
157 | data = {} | ||
158 | data = client.get( | ||
159 | path=path, | ||
160 | api=api, | ||
161 | params=querystring, | ||
162 | endpoint=stream_name) | ||
163 | return data | ||
164 | |||
165 | |||
166 | def transform_file_metadata(file_metadata): | ||
167 | # Convert to dict | ||
168 | file_metadata_tf = json.loads(json.dumps(file_metadata)) | ||
169 | # Remove keys | ||
170 | if file_metadata_tf.get('lastModifyingUser'): | ||
171 | file_metadata_tf['lastModifyingUser'].pop('photoLink', None) | ||
172 | file_metadata_tf['lastModifyingUser'].pop('me', None) | ||
173 | file_metadata_tf['lastModifyingUser'].pop('permissionId', None) | ||
174 | # Add record to an array of 1 | ||
175 | file_metadata_arr = [] | ||
176 | file_metadata_arr.append(file_metadata_tf) | ||
177 | return file_metadata_arr | ||
178 | |||
179 | |||
180 | def transform_spreadsheet_metadata(spreadsheet_metadata): | ||
181 | # Convert to dict | ||
182 | spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) | ||
183 | # Remove keys | ||
184 | if spreadsheet_metadata_tf.get('properties'): | ||
185 | spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) | ||
186 | spreadsheet_metadata_tf.pop('sheets', None) | ||
187 | # Add record to an array of 1 | ||
188 | spreadsheet_metadata_arr = [] | ||
189 | spreadsheet_metadata_arr.append(spreadsheet_metadata_tf) | ||
190 | return spreadsheet_metadata_arr | ||
191 | |||
192 | |||
193 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): | ||
194 | # Convert to properties to dict | ||
195 | sheet_metadata = sheet.get('properties') | ||
196 | sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) | ||
197 | sheet_id = sheet_metadata_tf.get('sheetId') | ||
198 | sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( | ||
199 | spreadsheet_id, sheet_id) | ||
200 | sheet_metadata_tf['spreadsheetId'] = spreadsheet_id | ||
201 | sheet_metadata_tf['sheetUrl'] = sheet_url | ||
202 | sheet_metadata_tf['columns'] = columns | ||
203 | return sheet_metadata_tf | ||
204 | |||
205 | |||
206 | def sync(client, config, catalog, state): | ||
207 | start_date = config.get('start_date') | ||
208 | spreadsheet_id = config.get('spreadsheet_id') | ||
209 | |||
210 | # Get selected_streams from catalog, based on state last_stream | ||
211 | # last_stream = Previous currently synced stream, if the load was interrupted | ||
212 | last_stream = singer.get_currently_syncing(state) | ||
213 | LOGGER.info('last/currently syncing stream: {}'.format(last_stream)) | ||
214 | selected_streams = [] | ||
215 | for stream in catalog.get_selected_streams(state): | ||
216 | selected_streams.append(stream.stream) | ||
217 | LOGGER.info('selected_streams: {}'.format(selected_streams)) | ||
218 | |||
219 | if not selected_streams: | ||
220 | return | ||
221 | |||
222 | # Get file_metadata | ||
223 | file_metadata = {} | ||
224 | file_metadata_config = STREAMS.get('file_metadata') | ||
225 | file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id) | ||
226 | file_metadata_tf = transform_file_metadata(file_metadata) | ||
227 | # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) | ||
228 | last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date)) | ||
229 | this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) | ||
230 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) | ||
231 | if this_datetime <= last_datetime: | ||
232 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') | ||
233 | return 0 | ||
234 | |||
235 | # Get spreadsheet_metadata | ||
236 | spreadsheet_metadata = {} | ||
237 | spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata') | ||
238 | spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id) | ||
239 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) | ||
240 | # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf)) | ||
241 | |||
242 | # Get sheet_metadata | ||
243 | sheets = spreadsheet_metadata.get('sheets') | ||
244 | sheet_metadata = [] | ||
245 | sheets_loaded = [] | ||
246 | sheets_loaded_config = STREAMS['sheets_loaded'] | ||
247 | if sheets: | ||
248 | for sheet in sheets: | ||
249 | sheet_title = sheet.get('properties', {}).get('title') | ||
250 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | ||
251 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) | ||
252 | # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) | ||
253 | sheet_metadata.append(sheet_metadata_tf) | ||
254 | |||
255 | # Determine range of rows and columns for "paging" through batch rows of data | ||
256 | sheet_last_col_index = 1 | ||
257 | sheet_last_col_letter = 'A' | ||
258 | for col in columns: | ||
259 | col_index = col.get('columnIndex') | ||
260 | col_letter = col.get('columnLetter') | ||
261 | if col_index > sheet_last_col_index: | ||
262 | sheet_last_col_index = col_index | ||
263 | sheet_last_col_letter = col_letter | ||
264 | sheet_max_row = sheet.get('gridProperties', {}).get('rowCount') | ||
265 | is_empty_row = False | ||
266 | batch_rows = 200 | ||
267 | from_row = 2 | ||
268 | if sheet_max_row < batch_rows: | ||
269 | to_row = sheet_max_row | ||
270 | else: | ||
271 | to_row = batch_rows | ||
272 | |||
273 | while not is_empty_row and to_row <= sheet_max_row: | ||
274 | range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row) | ||
275 | |||
276 | sheet_data = get_data( | ||
277 | stream_name=sheet_title, | ||
278 | endpoint_config=sheets_loaded_config, | ||
279 | client=client, | ||
280 | spreadsheet_id=spreadsheet_id, | ||
281 | range_rows=range_rows) | ||