1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
|
import time
import math
import singer
import json
from collections import OrderedDict
from singer import metrics, metadata, Transformer, utils
from singer.utils import strptime_to_utc, strftime
from tap_google_sheets.streams import STREAMS
from tap_google_sheets.schema import get_sheet_metadata
LOGGER = singer.get_logger()
def write_schema(catalog, stream_name):
stream = catalog.get_stream(stream_name)
schema = stream.schema.to_dict()
try:
singer.write_schema(stream_name, schema, stream.key_properties)
except OSError as err:
LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
raise err
def write_record(stream_name, record, time_extracted):
try:
singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
except OSError as err:
LOGGER.info('OS Error writing record for: {}'.format(stream_name))
LOGGER.info('record: {}'.format(record))
raise err
def get_bookmark(state, stream, default):
if (state is None) or ('bookmarks' not in state):
return default
return (
state
.get('bookmarks', {})
.get(stream, default)
)
def write_bookmark(state, stream, value):
if 'bookmarks' not in state:
state['bookmarks'] = {}
state['bookmarks'][stream] = value
LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
singer.write_state(state)
# def transform_datetime(this_dttm):
def transform_datetime(this_dttm):
with Transformer() as transformer:
new_dttm = transformer._transform_datetime(this_dttm)
return new_dttm
def process_records(catalog, #pylint: disable=too-many-branches
stream_name,
records,
time_extracted,
bookmark_field=None,
bookmark_type=None,
max_bookmark_value=None,
last_datetime=None,
last_integer=None,
parent=None,
parent_id=None):
stream = catalog.get_stream(stream_name)
schema = stream.schema.to_dict()
stream_metadata = metadata.to_map(stream.metadata)
with metrics.record_counter(stream_name) as counter:
for record in records:
# If child object, add parent_id to record
if parent_id and parent:
record[parent + '_id'] = parent_id
# Transform record for Singer.io
with Transformer() as transformer:
transformed_record = transformer.transform(
record,
schema,
stream_metadata)
# Reset max_bookmark_value to new value if higher
if transformed_record.get(bookmark_field):
if max_bookmark_value is None or \
transformed_record[bookmark_field] > transform_datetime(max_bookmark_value):
max_bookmark_value = transformed_record[bookmark_field]
if bookmark_field and (bookmark_field in transformed_record):
if bookmark_type == 'integer':
# Keep only records whose bookmark is after the last_integer
if transformed_record[bookmark_field] >= last_integer:
write_record(stream_name, transformed_record, \
time_extracted=time_extracted)
counter.increment()
elif bookmark_type == 'datetime':
last_dttm = transform_datetime(last_datetime)
bookmark_dttm = transform_datetime(transformed_record[bookmark_field])
# Keep only records whose bookmark is after the last_datetime
if bookmark_dttm >= last_dttm:
write_record(stream_name, transformed_record, \
time_extracted=time_extracted)
counter.increment()
else:
write_record(stream_name, transformed_record, time_extracted=time_extracted)
counter.increment()
return max_bookmark_value, counter.value
# Currently syncing sets the stream currently being delivered in the state.
# If the integration is interrupted, this state property is used to identify
# the starting point to continue from.
# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
def update_currently_syncing(state, stream_name):
if (stream_name is None) and ('currently_syncing' in state):
del state['currently_syncing']
else:
singer.set_currently_syncing(state, stream_name)
singer.write_state(state)
# List selected fields from stream catalog
def get_selected_fields(catalog, stream_name):
stream = catalog.get_stream(stream_name)
mdata = metadata.to_map(stream.metadata)
mdata_list = singer.metadata.to_list(mdata)
selected_fields = []
for entry in mdata_list:
field = None
try:
field = entry['breadcrumb'][1]
if entry.get('metadata', {}).get('selected', False):
selected_fields.append(field)
except IndexError:
pass
return selected_fields
def get_data(stream_name,
endpoint_config,
client,
spreadsheet_id,
range_rows=None):
if not range_rows:
range_rows = ''
path = endpoint_config.get('path', stream_name).replace(
'{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
'{range_rows}', range_rows)
params = endpoint_config.get('params', {})
api = endpoint_config.get('api', 'sheets')
querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
'{sheet_title}', stream_name)
data = {}
data = client.get(
path=path,
api=api,
params=querystring,
endpoint=stream_name)
return data
def transform_file_metadata(file_metadata):
# Convert to dict
file_metadata_tf = json.loads(json.dumps(file_metadata))
# Remove keys
if file_metadata_tf.get('lastModifyingUser'):
file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
file_metadata_tf['lastModifyingUser'].pop('me', None)
file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
# Add record to an array of 1
file_metadata_arr = []
file_metadata_arr.append(file_metadata_tf)
return file_metadata_arr
def transform_spreadsheet_metadata(spreadsheet_metadata):
# Convert to dict
spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
# Remove keys
if spreadsheet_metadata_tf.get('properties'):
spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
spreadsheet_metadata_tf.pop('sheets', None)
# Add record to an array of 1
spreadsheet_metadata_arr = []
spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
return spreadsheet_metadata_arr
def transform_sheet_metadata(spreadsheet_id, sheet, columns):
# Convert to properties to dict
sheet_metadata = sheet.get('properties')
sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
sheet_id = sheet_metadata_tf.get('sheetId')
sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
spreadsheet_id, sheet_id)
sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
sheet_metadata_tf['sheetUrl'] = sheet_url
sheet_metadata_tf['columns'] = columns
return sheet_metadata_tf
def sync(client, config, catalog, state):
start_date = config.get('start_date')
spreadsheet_id = config.get('spreadsheet_id')
# Get selected_streams from catalog, based on state last_stream
# last_stream = Previous currently synced stream, if the load was interrupted
last_stream = singer.get_currently_syncing(state)
LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
selected_streams = []
for stream in catalog.get_selected_streams(state):
selected_streams.append(stream.stream)
LOGGER.info('selected_streams: {}'.format(selected_streams))
if not selected_streams:
return
# Get file_metadata
file_metadata = {}
file_metadata_config = STREAMS.get('file_metadata')
file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id)
file_metadata_tf = transform_file_metadata(file_metadata)
# LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date))
this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
if this_datetime <= last_datetime:
LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
return 0
# Get spreadsheet_metadata
spreadsheet_metadata = {}
spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata')
spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id)
spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
# LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf))
# Get sheet_metadata
sheets = spreadsheet_metadata.get('sheets')
sheet_metadata = []
sheets_loaded = []
sheets_loaded_config = STREAMS['sheets_loaded']
if sheets:
for sheet in sheets:
sheet_title = sheet.get('properties', {}).get('title')
sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
# LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
sheet_metadata.append(sheet_metadata_tf)
# Determine range of rows and columns for "paging" through batch rows of data
sheet_last_col_index = 1
sheet_last_col_letter = 'A'
for col in columns:
col_index = col.get('columnIndex')
col_letter = col.get('columnLetter')
if col_index > sheet_last_col_index:
sheet_last_col_index = col_index
sheet_last_col_letter = col_letter
sheet_max_row = sheet.get('gridProperties', {}).get('rowCount')
is_empty_row = False
batch_rows = 200
from_row = 2
if sheet_max_row < batch_rows:
to_row = sheet_max_row
else:
to_row = batch_rows
while not is_empty_row and to_row <= sheet_max_row:
range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row)
sheet_data = get_data(
stream_name=sheet_title,
endpoint_config=sheets_loaded_config,
client=client,
spreadsheet_id=spreadsheet_id,
range_rows=range_rows)
|