5 from collections
import OrderedDict
6 from singer
import metrics
, metadata
, Transformer
, utils
7 from singer
.utils
import strptime_to_utc
, strftime
8 from tap_google_sheets
.transform
import transform_json
9 from tap_google_sheets
.streams
import STREAMS
10 from tap_google_sheets
.schema
import get_sheet_metadata
12 LOGGER
= singer
.get_logger()
15 def write_schema(catalog
, stream_name
):
16 stream
= catalog
.get_stream(stream_name
)
17 schema
= stream
.schema
.to_dict()
19 singer
.write_schema(stream_name
, schema
, stream
.key_properties
)
20 except OSError as err
:
21 LOGGER
.info('OS Error writing schema for: {}'.format(stream_name
))
25 def write_record(stream_name
, record
, time_extracted
):
27 singer
.messages
.write_record(stream_name
, record
, time_extracted
=time_extracted
)
28 except OSError as err
:
29 LOGGER
.info('OS Error writing record for: {}'.format(stream_name
))
30 LOGGER
.info('record: {}'.format(record
))
34 def get_bookmark(state
, stream
, default
):
35 if (state
is None) or ('bookmarks' not in state
):
44 def write_bookmark(state
, stream
, value
):
45 if 'bookmarks' not in state
:
46 state
['bookmarks'] = {}
47 state
['bookmarks'][stream
] = value
48 LOGGER
.info('Write state for stream: {}, value: {}'.format(stream
, value
))
49 singer
.write_state(state
)
52 # def transform_datetime(this_dttm):
53 def transform_datetime(this_dttm
):
54 with Transformer() as transformer
:
55 new_dttm
= transformer
._transform
_datetime
(this_dttm
)
59 def process_records(catalog
, #pylint: disable=too-many-branches
65 max_bookmark_value
=None,
70 stream
= catalog
.get_stream(stream_name
)
71 schema
= stream
.schema
.to_dict()
72 stream_metadata
= metadata
.to_map(stream
.metadata
)
74 with metrics
.record_counter(stream_name
) as counter
:
75 for record
in records
:
76 # If child object, add parent_id to record
77 if parent_id
and parent
:
78 record
[parent
+ '_id'] = parent_id
80 # Transform record for Singer.io
81 with Transformer() as transformer
:
82 transformed_record
= transformer
.transform(
86 # Reset max_bookmark_value to new value if higher
87 if transformed_record
.get(bookmark_field
):
88 if max_bookmark_value
is None or \
89 transformed_record
[bookmark_field
] > transform_datetime(max_bookmark_value
):
90 max_bookmark_value
= transformed_record
[bookmark_field
]
92 if bookmark_field
and (bookmark_field
in transformed_record
):
93 if bookmark_type
== 'integer':
94 # Keep only records whose bookmark is after the last_integer
95 if transformed_record
[bookmark_field
] >= last_integer
:
96 write_record(stream_name
, transformed_record
, \
97 time_extracted
=time_extracted
)
99 elif bookmark_type
== 'datetime':
100 last_dttm
= transform_datetime(last_datetime
)
101 bookmark_dttm
= transform_datetime(transformed_record
[bookmark_field
])
102 # Keep only records whose bookmark is after the last_datetime
103 if bookmark_dttm
>= last_dttm
:
104 write_record(stream_name
, transformed_record
, \
105 time_extracted
=time_extracted
)
108 write_record(stream_name
, transformed_record
, time_extracted
=time_extracted
)
111 return max_bookmark_value
, counter
.value
114 # Currently syncing sets the stream currently being delivered in the state.
115 # If the integration is interrupted, this state property is used to identify
116 # the starting point to continue from.
117 # Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
118 def update_currently_syncing(state
, stream_name
):
119 if (stream_name
is None) and ('currently_syncing' in state
):
120 del state
['currently_syncing']
122 singer
.set_currently_syncing(state
, stream_name
)
123 singer
.write_state(state
)
126 # List selected fields from stream catalog
127 def get_selected_fields(catalog
, stream_name
):
128 stream
= catalog
.get_stream(stream_name
)
129 mdata
= metadata
.to_map(stream
.metadata
)
130 mdata_list
= singer
.metadata
.to_list(mdata
)
132 for entry
in mdata_list
:
135 field
= entry
['breadcrumb'][1]
136 if entry
.get('metadata', {}).get('selected', False):
137 selected_fields
.append(field
)
140 return selected_fields
143 def get_data(stream_name
,
150 path
= endpoint_config
.get('path', stream_name
).replace(
151 '{spreadsheet_id}', spreadsheet_id
).replace('{sheet_title}', stream_name
).replace(
152 '{range_rows}', range_rows
)
153 params
= endpoint_config
.get('params', {})
154 api
= endpoint_config
.get('api', 'sheets')
155 querystring
= '&'.join(['%s=%s' % (key
, value
) for (key
, value
) in params
.items()]).replace(
156 '{sheet_title}', stream_name
)
162 endpoint
=stream_name
)
166 def transform_file_metadata(file_metadata
):
168 file_metadata_tf
= json
.loads(json
.dumps(file_metadata
))
170 if file_metadata_tf
.get('lastModifyingUser'):
171 file_metadata_tf
['lastModifyingUser'].pop('photoLink', None)
172 file_metadata_tf
['lastModifyingUser'].pop('me', None)
173 file_metadata_tf
['lastModifyingUser'].pop('permissionId', None)
174 # Add record to an array of 1
175 file_metadata_arr
= []
176 file_metadata_arr
.append(file_metadata_tf
)
177 return file_metadata_arr
180 def transform_spreadsheet_metadata(spreadsheet_metadata
):
182 spreadsheet_metadata_tf
= json
.loads(json
.dumps(spreadsheet_metadata
))
184 if spreadsheet_metadata_tf
.get('properties'):
185 spreadsheet_metadata_tf
['properties'].pop('defaultFormat', None)
186 spreadsheet_metadata_tf
.pop('sheets', None)
187 # Add record to an array of 1
188 spreadsheet_metadata_arr
= []
189 spreadsheet_metadata_arr
.append(spreadsheet_metadata_tf
)
190 return spreadsheet_metadata_arr
193 def transform_sheet_metadata(spreadsheet_id
, sheet
, columns
):
194 # Convert to properties to dict
195 sheet_metadata
= sheet
.get('properties')
196 sheet_metadata_tf
= json
.loads(json
.dumps(sheet_metadata
))
197 sheet_id
= sheet_metadata_tf
.get('sheetId')
198 sheet_url
= 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
199 spreadsheet_id
, sheet_id
)
200 sheet_metadata_tf
['spreadsheetId'] = spreadsheet_id
201 sheet_metadata_tf
['sheetUrl'] = sheet_url
202 sheet_metadata_tf
['columns'] = columns
203 return sheet_metadata_tf
206 def sync(client
, config
, catalog
, state
):
207 start_date
= config
.get('start_date')
208 spreadsheet_id
= config
.get('spreadsheet_id')
210 # Get selected_streams from catalog, based on state last_stream
211 # last_stream = Previous currently synced stream, if the load was interrupted
212 last_stream
= singer
.get_currently_syncing(state
)
213 LOGGER
.info('last/currently syncing stream: {}'.format(last_stream
))
214 selected_streams
= []
215 for stream
in catalog
.get_selected_streams(state
):
216 selected_streams
.append(stream
.stream
)
217 LOGGER
.info('selected_streams: {}'.format(selected_streams
))
219 if not selected_streams
:
224 file_metadata_config
= STREAMS
.get('file_metadata')
225 file_metadata
= get_data('file_metadata', file_metadata_config
, client
, spreadsheet_id
)
226 file_metadata_tf
= transform_file_metadata(file_metadata
)
227 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
228 last_datetime
= strptime_to_utc(get_bookmark(state
, 'file_metadata', start_date
))
229 this_datetime
= strptime_to_utc(file_metadata
.get('modifiedTime'))
230 LOGGER
.info('last_datetime = {}, this_datetime = {}'.format(last_datetime
, this_datetime
))
231 if this_datetime
<= last_datetime
:
232 LOGGER
.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
235 # Get spreadsheet_metadata
236 spreadsheet_metadata
= {}
237 spreadsheet_metadata_config
= STREAMS
.get('spreadsheet_metadata')
238 spreadsheet_metadata
= get_data('spreadsheet_metadata', spreadsheet_metadata_config
, client
, spreadsheet_id
)
239 spreadsheet_metadata_tf
= transform_spreadsheet_metadata(spreadsheet_metadata
)
240 # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf))
243 sheets
= spreadsheet_metadata
.get('sheets')
246 sheets_loaded_config
= STREAMS
['sheets_loaded']
249 sheet_title
= sheet
.get('properties', {}).get('title')
250 sheet_schema
, columns
= get_sheet_metadata(sheet
, spreadsheet_id
, client
)
251 sheet_metadata_tf
= transform_sheet_metadata(spreadsheet_id
, sheet
, columns
)
252 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
253 sheet_metadata
.append(sheet_metadata_tf
)
255 # Determine range of rows and columns for "paging" through batch rows of data
256 sheet_last_col_index
= 1
257 sheet_last_col_letter
= 'A'
259 col_index
= col
.get('columnIndex')
260 col_letter
= col
.get('columnLetter')
261 if col_index
> sheet_last_col_index
:
262 sheet_last_col_index
= col_index
263 sheet_last_col_letter
= col_letter
264 sheet_max_row
= sheet
.get('gridProperties', {}).get('rowCount')
268 if sheet_max_row
< batch_rows
:
269 to_row
= sheet_max_row
273 while not is_empty_row
and to_row
<= sheet_max_row
:
274 range_rows
= 'A2:{}{}'.format(sheet_last_col_letter
, to_row
)
276 sheet_data
= get_data(
277 stream_name
=sheet_title
,
278 endpoint_config
=sheets_loaded_config
,
280 spreadsheet_id
=spreadsheet_id
,
281 range_rows
=range_rows
)