aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets/sync.py
diff options
context:
space:
mode:
authorJeff Huth <jeff.huth@bytecode.io>2019-11-13 17:03:56 -0800
committerJeff Huth <jeff.huth@bytecode.io>2019-11-13 17:03:56 -0800
commit89643ba6fa98db82efd3246805ef801a8bfb5c81 (patch)
tree739027b4e827def2db81631c9d6ed58ec2b97809 /tap_google_sheets/sync.py
parent5f8005471d3affaaf23489df93a58ca64c3da3ca (diff)
downloadtap-google-sheets-89643ba6fa98db82efd3246805ef801a8bfb5c81.tar.gz
tap-google-sheets-89643ba6fa98db82efd3246805ef801a8bfb5c81.tar.zst
tap-google-sheets-89643ba6fa98db82efd3246805ef801a8bfb5c81.zip
Initial commit
Discovery mode works. Still working on normal sync.
Diffstat (limited to 'tap_google_sheets/sync.py')
-rw-r--r--tap_google_sheets/sync.py281
1 files changed, 281 insertions, 0 deletions
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
new file mode 100644
index 0000000..a8b02d0
--- /dev/null
+++ b/tap_google_sheets/sync.py
@@ -0,0 +1,281 @@
1import time
2import math
3import singer
4import json
5from collections import OrderedDict
6from singer import metrics, metadata, Transformer, utils
7from singer.utils import strptime_to_utc, strftime
8from tap_google_sheets.transform import transform_json
9from tap_google_sheets.streams import STREAMS
10from tap_google_sheets.schema import get_sheet_metadata
11
12LOGGER = singer.get_logger()
13
14
15def write_schema(catalog, stream_name):
16 stream = catalog.get_stream(stream_name)
17 schema = stream.schema.to_dict()
18 try:
19 singer.write_schema(stream_name, schema, stream.key_properties)
20 except OSError as err:
21 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
22 raise err
23
24
25def write_record(stream_name, record, time_extracted):
26 try:
27 singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
28 except OSError as err:
29 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
30 LOGGER.info('record: {}'.format(record))
31 raise err
32
33
34def get_bookmark(state, stream, default):
35 if (state is None) or ('bookmarks' not in state):
36 return default
37 return (
38 state
39 .get('bookmarks', {})
40 .get(stream, default)
41 )
42
43
44def write_bookmark(state, stream, value):
45 if 'bookmarks' not in state:
46 state['bookmarks'] = {}
47 state['bookmarks'][stream] = value
48 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
49 singer.write_state(state)
50
51
52# def transform_datetime(this_dttm):
53def transform_datetime(this_dttm):
54 with Transformer() as transformer:
55 new_dttm = transformer._transform_datetime(this_dttm)
56 return new_dttm
57
58
59def process_records(catalog, #pylint: disable=too-many-branches
60 stream_name,
61 records,
62 time_extracted,
63 bookmark_field=None,
64 bookmark_type=None,
65 max_bookmark_value=None,
66 last_datetime=None,
67 last_integer=None,
68 parent=None,
69 parent_id=None):
70 stream = catalog.get_stream(stream_name)
71 schema = stream.schema.to_dict()
72 stream_metadata = metadata.to_map(stream.metadata)
73
74 with metrics.record_counter(stream_name) as counter:
75 for record in records:
76 # If child object, add parent_id to record
77 if parent_id and parent:
78 record[parent + '_id'] = parent_id
79
80 # Transform record for Singer.io
81 with Transformer() as transformer:
82 transformed_record = transformer.transform(
83 record,
84 schema,
85 stream_metadata)
86 # Reset max_bookmark_value to new value if higher
87 if transformed_record.get(bookmark_field):
88 if max_bookmark_value is None or \
89 transformed_record[bookmark_field] > transform_datetime(max_bookmark_value):
90 max_bookmark_value = transformed_record[bookmark_field]
91
92 if bookmark_field and (bookmark_field in transformed_record):
93 if bookmark_type == 'integer':
94 # Keep only records whose bookmark is after the last_integer
95 if transformed_record[bookmark_field] >= last_integer:
96 write_record(stream_name, transformed_record, \
97 time_extracted=time_extracted)
98 counter.increment()
99 elif bookmark_type == 'datetime':
100 last_dttm = transform_datetime(last_datetime)
101 bookmark_dttm = transform_datetime(transformed_record[bookmark_field])
102 # Keep only records whose bookmark is after the last_datetime
103 if bookmark_dttm >= last_dttm:
104 write_record(stream_name, transformed_record, \
105 time_extracted=time_extracted)
106 counter.increment()
107 else:
108 write_record(stream_name, transformed_record, time_extracted=time_extracted)
109 counter.increment()
110
111 return max_bookmark_value, counter.value
112
113
114# Currently syncing sets the stream currently being delivered in the state.
115# If the integration is interrupted, this state property is used to identify
116# the starting point to continue from.
117# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
118def update_currently_syncing(state, stream_name):
119 if (stream_name is None) and ('currently_syncing' in state):
120 del state['currently_syncing']
121 else:
122 singer.set_currently_syncing(state, stream_name)
123 singer.write_state(state)
124
125
126# List selected fields from stream catalog
127def get_selected_fields(catalog, stream_name):
128 stream = catalog.get_stream(stream_name)
129 mdata = metadata.to_map(stream.metadata)
130 mdata_list = singer.metadata.to_list(mdata)
131 selected_fields = []
132 for entry in mdata_list:
133 field = None
134 try:
135 field = entry['breadcrumb'][1]
136 if entry.get('metadata', {}).get('selected', False):
137 selected_fields.append(field)
138 except IndexError:
139 pass
140 return selected_fields
141
142
143def get_data(stream_name,
144 endpoint_config,
145 client,
146 spreadsheet_id,
147 range_rows=None):
148 if not range_rows:
149 range_rows = ''
150 path = endpoint_config.get('path', stream_name).replace(
151 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
152 '{range_rows}', range_rows)
153 params = endpoint_config.get('params', {})
154 api = endpoint_config.get('api', 'sheets')
155 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
156 '{sheet_title}', stream_name)
157 data = {}
158 data = client.get(
159 path=path,
160 api=api,
161 params=querystring,
162 endpoint=stream_name)
163 return data
164
165
166def transform_file_metadata(file_metadata):
167 # Convert to dict
168 file_metadata_tf = json.loads(json.dumps(file_metadata))
169 # Remove keys
170 if file_metadata_tf.get('lastModifyingUser'):
171 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
172 file_metadata_tf['lastModifyingUser'].pop('me', None)
173 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
174 # Add record to an array of 1
175 file_metadata_arr = []
176 file_metadata_arr.append(file_metadata_tf)
177 return file_metadata_arr
178
179
180def transform_spreadsheet_metadata(spreadsheet_metadata):
181 # Convert to dict
182 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
183 # Remove keys
184 if spreadsheet_metadata_tf.get('properties'):
185 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
186 spreadsheet_metadata_tf.pop('sheets', None)
187 # Add record to an array of 1
188 spreadsheet_metadata_arr = []
189 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
190 return spreadsheet_metadata_arr
191
192
193def transform_sheet_metadata(spreadsheet_id, sheet, columns):
194 # Convert to properties to dict
195 sheet_metadata = sheet.get('properties')
196 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
197 sheet_id = sheet_metadata_tf.get('sheetId')
198 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
199 spreadsheet_id, sheet_id)
200 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
201 sheet_metadata_tf['sheetUrl'] = sheet_url
202 sheet_metadata_tf['columns'] = columns
203 return sheet_metadata_tf
204
205
206def sync(client, config, catalog, state):
207 start_date = config.get('start_date')
208 spreadsheet_id = config.get('spreadsheet_id')
209
210 # Get selected_streams from catalog, based on state last_stream
211 # last_stream = Previous currently synced stream, if the load was interrupted
212 last_stream = singer.get_currently_syncing(state)
213 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
214 selected_streams = []
215 for stream in catalog.get_selected_streams(state):
216 selected_streams.append(stream.stream)
217 LOGGER.info('selected_streams: {}'.format(selected_streams))
218
219 if not selected_streams:
220 return
221
222 # Get file_metadata
223 file_metadata = {}
224 file_metadata_config = STREAMS.get('file_metadata')
225 file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id)
226 file_metadata_tf = transform_file_metadata(file_metadata)
227 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
228 last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date))
229 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
230 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
231 if this_datetime <= last_datetime:
232 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
233 return 0
234
235 # Get spreadsheet_metadata
236 spreadsheet_metadata = {}
237 spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata')
238 spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id)
239 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
240 # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf))
241
242 # Get sheet_metadata
243 sheets = spreadsheet_metadata.get('sheets')
244 sheet_metadata = []
245 sheets_loaded = []
246 sheets_loaded_config = STREAMS['sheets_loaded']
247 if sheets:
248 for sheet in sheets:
249 sheet_title = sheet.get('properties', {}).get('title')
250 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
251 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
252 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
253 sheet_metadata.append(sheet_metadata_tf)
254
255 # Determine range of rows and columns for "paging" through batch rows of data
256 sheet_last_col_index = 1
257 sheet_last_col_letter = 'A'
258 for col in columns:
259 col_index = col.get('columnIndex')
260 col_letter = col.get('columnLetter')
261 if col_index > sheet_last_col_index:
262 sheet_last_col_index = col_index
263 sheet_last_col_letter = col_letter
264 sheet_max_row = sheet.get('gridProperties', {}).get('rowCount')
265 is_empty_row = False
266 batch_rows = 200
267 from_row = 2
268 if sheet_max_row < batch_rows:
269 to_row = sheet_max_row
270 else:
271 to_row = batch_rows
272
273 while not is_empty_row and to_row <= sheet_max_row:
274 range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row)
275
276 sheet_data = get_data(
277 stream_name=sheet_title,
278 endpoint_config=sheets_loaded_config,
279 client=client,
280 spreadsheet_id=spreadsheet_id,
281 range_rows=range_rows)