aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets
diff options
context:
space:
mode:
Diffstat (limited to 'tap_google_sheets')
-rw-r--r--tap_google_sheets/__init__.py57
-rw-r--r--tap_google_sheets/client.py247
-rw-r--r--tap_google_sheets/discover.py26
-rw-r--r--tap_google_sheets/schema.py228
-rw-r--r--tap_google_sheets/schemas/file_metadata.json44
-rw-r--r--tap_google_sheets/schemas/sheet_metadata.json89
-rw-r--r--tap_google_sheets/schemas/sheets_loaded.json22
-rw-r--r--tap_google_sheets/schemas/spreadsheet_metadata.json30
-rw-r--r--tap_google_sheets/streams.py66
-rw-r--r--tap_google_sheets/sync.py281
10 files changed, 1090 insertions, 0 deletions
diff --git a/tap_google_sheets/__init__.py b/tap_google_sheets/__init__.py
new file mode 100644
index 0000000..f97d4b8
--- /dev/null
+++ b/tap_google_sheets/__init__.py
@@ -0,0 +1,57 @@
1#!/usr/bin/env python3
2
3import sys
4import json
5import argparse
6import singer
7from singer import metadata, utils
8from tap_google_sheets.client import GoogleClient
9from tap_google_sheets.discover import discover
10from tap_google_sheets.sync import sync
11
12LOGGER = singer.get_logger()
13
14REQUIRED_CONFIG_KEYS = [
15 'client_id',
16 'client_secret',
17 'refresh_token',
18 'spreadsheet_id',
19 'start_date',
20 'user_agent'
21]
22
23def do_discover(client, spreadsheet_id):
24
25 LOGGER.info('Starting discover')
26 catalog = discover(client, spreadsheet_id)
27 json.dump(catalog.to_dict(), sys.stdout, indent=2)
28 LOGGER.info('Finished discover')
29
30
31@singer.utils.handle_top_exception(LOGGER)
32def main():
33
34 parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
35
36 with GoogleClient(parsed_args.config['client_id'],
37 parsed_args.config['client_secret'],
38 parsed_args.config['refresh_token'],
39 parsed_args.config['user_agent']) as client:
40
41 state = {}
42 if parsed_args.state:
43 state = parsed_args.state
44
45 config = parsed_args.config
46 spreadsheet_id = config.get('spreadsheet_id')
47
48 if parsed_args.discover:
49 do_discover(client, spreadsheet_id)
50 elif parsed_args.catalog:
51 sync(client=client,
52 config=config,
53 catalog=parsed_args.catalog,
54 state=state)
55
56if __name__ == '__main__':
57 main()
diff --git a/tap_google_sheets/client.py b/tap_google_sheets/client.py
new file mode 100644
index 0000000..12f0811
--- /dev/null
+++ b/tap_google_sheets/client.py
@@ -0,0 +1,247 @@
1from datetime import datetime, timedelta
2import backoff
3import requests
4from collections import OrderedDict
5
6import singer
7from singer import metrics
8from singer import utils
9
10BASE_URL = 'https://www.googleapis.com'
11GOOGLE_TOKEN_URI = 'https://oauth2.googleapis.com/token'
12LOGGER = singer.get_logger()
13
14
15class Server5xxError(Exception):
16 pass
17
18
19class Server429Error(Exception):
20 pass
21
22
23class GoogleError(Exception):
24 pass
25
26
27class GoogleBadRequestError(GoogleError):
28 pass
29
30
31class GoogleUnauthorizedError(GoogleError):
32 pass
33
34
35class GooglePaymentRequiredError(GoogleError):
36 pass
37
38
39class GoogleNotFoundError(GoogleError):
40 pass
41
42
43class GoogleMethodNotAllowedError(GoogleError):
44 pass
45
46
47class GoogleConflictError(GoogleError):
48 pass
49
50
51class GoogleGoneError(GoogleError):
52 pass
53
54
55class GooglePreconditionFailedError(GoogleError):
56 pass
57
58
59class GoogleRequestEntityTooLargeError(GoogleError):
60 pass
61
62
63class GoogleRequestedRangeNotSatisfiableError(GoogleError):
64 pass
65
66
67class GoogleExpectationFailedError(GoogleError):
68 pass
69
70
71class GoogleForbiddenError(GoogleError):
72 pass
73
74
75class GoogleUnprocessableEntityError(GoogleError):
76 pass
77
78
79class GooglePreconditionRequiredError(GoogleError):
80 pass
81
82
83class GoogleInternalServiceError(GoogleError):
84 pass
85
86
87# Error Codes: https://developers.google.com/webmaster-tools/search-console-api-original/v3/errors
88ERROR_CODE_EXCEPTION_MAPPING = {
89 400: GoogleBadRequestError,
90 401: GoogleUnauthorizedError,
91 402: GooglePaymentRequiredError,
92 403: GoogleForbiddenError,
93 404: GoogleNotFoundError,
94 405: GoogleMethodNotAllowedError,
95 409: GoogleConflictError,
96 410: GoogleGoneError,
97 412: GooglePreconditionFailedError,
98 413: GoogleRequestEntityTooLargeError,
99 416: GoogleRequestedRangeNotSatisfiableError,
100 417: GoogleExpectationFailedError,
101 422: GoogleUnprocessableEntityError,
102 428: GooglePreconditionRequiredError,
103 500: GoogleInternalServiceError}
104
105
106def get_exception_for_error_code(error_code):
107 return ERROR_CODE_EXCEPTION_MAPPING.get(error_code, GoogleError)
108
109def raise_for_error(response):
110 try:
111 response.raise_for_status()
112 except (requests.HTTPError, requests.ConnectionError) as error:
113 try:
114 content_length = len(response.content)
115 if content_length == 0:
116 # There is nothing we can do here since Google has neither sent
117 # us a 2xx response nor a response content.
118 return
119 response = response.json()
120 if ('error' in response) or ('errorCode' in response):
121 message = '%s: %s' % (response.get('error', str(error)),
122 response.get('message', 'Unknown Error'))
123 error_code = response.get('error', {}).get('code')
124 ex = get_exception_for_error_code(error_code)
125 raise ex(message)
126 else:
127 raise GoogleError(error)
128 except (ValueError, TypeError):
129 raise GoogleError(error)
130
131class GoogleClient: # pylint: disable=too-many-instance-attributes
132 def __init__(self,
133 client_id,
134 client_secret,
135 refresh_token,
136 user_agent=None):
137 self.__client_id = client_id
138 self.__client_secret = client_secret
139 self.__refresh_token = refresh_token
140 self.__user_agent = user_agent
141 self.__access_token = None
142 self.__expires = None
143 self.__session = requests.Session()
144 self.base_url = None
145
146
147 def __enter__(self):
148 self.get_access_token()
149 return self
150
151 def __exit__(self, exception_type, exception_value, traceback):
152 self.__session.close()
153
154 @backoff.on_exception(backoff.expo,
155 Server5xxError,
156 max_tries=5,
157 factor=2)
158 def get_access_token(self):
159 # The refresh_token never expires and may be used many times to generate each access_token
160 # Since the refresh_token does not expire, it is not included in get access_token response
161 if self.__access_token is not None and self.__expires > datetime.utcnow():
162 return
163
164 headers = {}
165 if self.__user_agent:
166 headers['User-Agent'] = self.__user_agent
167
168 response = self.__session.post(
169 url=GOOGLE_TOKEN_URI,
170 headers=headers,
171 data={
172 'grant_type': 'refresh_token',
173 'client_id': self.__client_id,
174 'client_secret': self.__client_secret,
175 'refresh_token': self.__refresh_token,
176 })
177
178 if response.status_code >= 500:
179 raise Server5xxError()
180
181 if response.status_code != 200:
182 raise_for_error(response)
183
184 data = response.json()
185 self.__access_token = data['access_token']
186 self.__expires = datetime.utcnow() + timedelta(seconds=data['expires_in'])
187 LOGGER.info('Authorized, token expires = {}'.format(self.__expires))
188
189
190 @backoff.on_exception(backoff.expo,
191 (Server5xxError, ConnectionError, Server429Error),
192 max_tries=7,
193 factor=3)
194 # Rate Limit:
195 # https://developers.google.com/webmaster-tools/search-console-api-original/v3/limits
196 @utils.ratelimit(1200, 60)
197 def request(self, method, path=None, url=None, api=None, **kwargs):
198
199 self.get_access_token()
200
201 self.base_url = 'https://sheets.googleapis.com/v4'
202 if api == 'files':
203 self.base_url = 'https://www.googleapis.com/drive/v3'
204
205 if not url and path:
206 url = '{}/{}'.format(self.base_url, path)
207
208 # endpoint = stream_name (from sync.py API call)
209 if 'endpoint' in kwargs:
210 endpoint = kwargs['endpoint']
211 del kwargs['endpoint']
212 else:
213 endpoint = None
214
215 if 'headers' not in kwargs:
216 kwargs['headers'] = {}
217 kwargs['headers']['Authorization'] = 'Bearer {}'.format(self.__access_token)
218
219 if self.__user_agent:
220 kwargs['headers']['User-Agent'] = self.__user_agent
221
222 if method == 'POST':
223 kwargs['headers']['Content-Type'] = 'application/json'
224
225 with metrics.http_request_timer(endpoint) as timer:
226 response = self.__session.request(method, url, **kwargs)
227 timer.tags[metrics.Tag.http_status_code] = response.status_code
228
229 if response.status_code >= 500:
230 raise Server5xxError()
231
232 #Use retry functionality in backoff to wait and retry if
233 #response code equals 429 because rate limit has been exceeded
234 if response.status_code == 429:
235 raise Server429Error()
236
237 if response.status_code != 200:
238 raise_for_error(response)
239
240 # Ensure keys and rows are ordered as received from API
241 return response.json(object_pairs_hook=OrderedDict)
242
243 def get(self, path, api, **kwargs):
244 return self.request(method='GET', path=path, api=api, **kwargs)
245
246 def post(self, path, api, **kwargs):
247 return self.request(method='POST', path=path, api=api, **kwargs)
diff --git a/tap_google_sheets/discover.py b/tap_google_sheets/discover.py
new file mode 100644
index 0000000..6477a5f
--- /dev/null
+++ b/tap_google_sheets/discover.py
@@ -0,0 +1,26 @@
1from singer.catalog import Catalog, CatalogEntry, Schema
2from tap_google_sheets.schema import get_schemas, STREAMS
3
4
5def discover(client, spreadsheet_id):
6 schemas, field_metadata = get_schemas(client, spreadsheet_id)
7 catalog = Catalog([])
8
9 for stream_name, schema_dict in schemas.items():
10 schema = Schema.from_dict(schema_dict)
11 mdata = field_metadata[stream_name]
12 key_properties = None
13 for md in mdata:
14 table_key_properties = md.get('metadata', {}).get('table-key-properties')
15 if table_key_properties:
16 key_properties = table_key_properties
17
18 catalog.streams.append(CatalogEntry(
19 stream=stream_name,
20 tap_stream_id=stream_name,
21 key_properties=STREAMS.get(stream_name, {}).get('key_properties', key_properties),
22 schema=schema,
23 metadata=mdata
24 ))
25
26 return catalog
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py
new file mode 100644
index 0000000..237ab06
--- /dev/null
+++ b/tap_google_sheets/schema.py
@@ -0,0 +1,228 @@
1import os
2import json
3from collections import OrderedDict
4import singer
5from singer import metadata
6from tap_google_sheets.streams import STREAMS
7
8LOGGER = singer.get_logger()
9
10# Reference:
11# https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata
12
13# Convert column index to column letter
14def colnum_string(n):
15 string = ""
16 while n > 0:
17 n, remainder = divmod(n - 1, 26)
18 string = chr(65 + remainder) + string
19 return string
20
21
22# Create sheet_metadata_json with columns from sheet
23def get_sheet_schema_columns(sheet, spreadsheet_id, client):
24 sheet_json_schema = OrderedDict()
25 data = next(iter(sheet.get('data', [])), {})
26 row_data = data.get('rowData',[])
27 # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse
28
29 headers = row_data[0].get('values', [])
30 first_values = row_data[1].get('values', [])
31 # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True)))
32
33 sheet_json_schema['type'] = 'object'
34 sheet_json_schema['additionalProperties'] = False
35 sheet_json_schema = {
36 'type': 'object',
37 'additionalProperties': False,
38 'properties': {
39 '__sdc_spreadsheet_id': {
40 'type': ['null', 'string']
41 },
42 '__sdc_sheet_id': {
43 'type': ['null', 'integer']
44 },
45 '__sdc_row': {
46 'type': ['null', 'integer']
47 }
48 }
49 }
50
51 header_list = [] # used for checking uniqueness
52 columns = []
53 prior_header = None
54 i = 0
55 skipped = 0
56 # Read column headers until end or 2 consecutive skipped headers
57 for header in headers:
58 # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True)))
59 column_index = i + 1
60 column_letter = colnum_string(column_index)
61 header_value = header.get('formattedValue')
62 if header_value: # NOT skipped
63 column_is_skipped = False
64 skipped = 0
65 column_name = '{}'.format(header_value)
66 if column_name in header_list:
67 raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name))
68 else:
69 header_list.append(column_name)
70
71 first_value = first_values[i]
72 # LOGGER.info('first_value[{}] = {}'.format(i, json.dumps(first_value, indent=2, sort_keys=True)))
73
74 column_effective_value = first_value.get('effectiveValue', {})
75 for key in column_effective_value.keys():
76 if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'):
77 column_effective_value_type = key
78
79 column_number_format = first_values[i].get('effectiveFormat', {}).get('numberFormat', {})
80 column_number_format_type = column_number_format.get('type')
81
82 # Determine datatype for sheet_json_schema
83 #
84 # column_effective_value_type = numberValue, stringValue, boolValue; INVALID: errorType, formulaType
85 # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
86 #
87 # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC
88 # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
89 #
90 column_format = None # Default
91 # column_multiple_of = None # Default
92 if column_effective_value_type in ('formulaValue', 'errorValue'):
93 raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name))
94 elif column_effective_value_type == 'stringValue':
95 column_type = ['null', 'string']
96 column_gs_type = 'stringValue'
97 elif column_effective_value_type == 'boolValue':
98 column_type = ['null', 'boolean', 'string']
99 column_gs_type = 'boolValue'
100 elif column_effective_value_type == 'numberValue':
101 if column_number_format_type == 'DATE_TIME':
102 column_type = ['null', 'string']
103 column_format = 'date-time'
104 column_gs_type = 'numberType.DATE_TIME'
105 elif column_number_format_type == 'DATE':
106 column_type = ['null', 'string']
107 column_format = 'date'
108 column_gs_type = 'numberType.DATE'
109 elif column_number_format_type == 'TIME':
110 column_type = ['null', 'string']
111 column_format = 'time'
112 column_gs_type = 'numberType.TIME'
113 elif column_number_format_type == 'TEXT':
114 column_type = ['null', 'string']
115 column_gs_type = 'stringValue'
116 else:
117 column_type = ['null', 'number', 'string']
118 column_gs_type = 'numberType'
119
120 else: # skipped
121 column_is_skipped = True
122 skipped = skipped + 1
123 column_index_str = str(column_index).zfill(2)
124 column_name = '__sdc_skip_col_{}'.format(column_index_str)
125 column_type = ['null', 'string']
126 column_format = None
127 column_gs_type = 'stringValue'
128
129 if skipped >= 2:
130 # skipped = 2 consecutive skipped headers
131 # Remove prior_header column_name
132 sheet_json_schema['properties'].pop(prior_header, None)
133 column_count = i - 1
134 break
135
136 else:
137 column = {}
138 column = {
139 'columnIndex': column_index,
140 'columnLetter': column_letter,
141 'columnName': column_name,
142 'columnType': column_gs_type,
143 'columnSkipped': column_is_skipped
144 }
145 columns.append(column)
146
147 sheet_json_schema['properties'][column_name] = column
148 sheet_json_schema['properties'][column_name]['type'] = column_type
149 if column_format:
150 sheet_json_schema['properties'][column_name]['format'] = column_format
151
152 prior_header = column_name
153 i = i + 1
154
155 return sheet_json_schema, columns
156
157
158def get_sheet_metadata(sheet, spreadsheet_id, client):
159 sheet_id = sheet.get('properties', {}).get('sheetId')
160 sheet_title = sheet.get('properties', {}).get('title')
161 LOGGER.info('sheet_id = {}, sheet_title = {}'.format(sheet_id, sheet_title))
162
163 stream_name = 'sheet_metadata'
164 stream_metadata = STREAMS.get(stream_name)
165 api = stream_metadata.get('api', 'sheets')
166 params = stream_metadata.get('params', {})
167 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace('{sheet_title}', sheet_title)
168 path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring)
169
170 sheet_md_results = client.get(path=path, api=api, endpoint=stream_name)
171 sheet_cols = sheet_md_results.get('sheets')[0]
172 sheet_schema, columns = get_sheet_schema_columns(sheet_cols, spreadsheet_id, client)
173
174 return sheet_schema, columns
175
176
177def get_abs_path(path):
178 return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
179
180def get_schemas(client, spreadsheet_id):
181 schemas = {}
182 field_metadata = {}
183
184 for stream_name, stream_metadata in STREAMS.items():
185 schema_path = get_abs_path('schemas/{}.json'.format(stream_name))
186 with open(schema_path) as file:
187 schema = json.load(file)
188 schemas[stream_name] = schema
189 mdata = metadata.new()
190
191 # Documentation:
192 # https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#singer-python-helper-functions
193 # Reference:
194 # https://github.com/singer-io/singer-python/blob/master/singer/metadata.py#L25-L44
195 mdata = metadata.get_standard_metadata(
196 schema=schema,
197 key_properties=stream_metadata.get('key_properties', None),
198 valid_replication_keys=stream_metadata.get('replication_keys', None),
199 replication_method=stream_metadata.get('replication_method', None)
200 )
201 field_metadata[stream_name] = mdata
202
203 if stream_name == 'spreadsheet_metadata':
204 api = stream_metadata.get('api', 'sheets')
205 params = stream_metadata.get('params', {})
206 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()])
207 path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring)
208
209 spreadsheet_md_results = client.get(path=path, params=querystring, api=api, endpoint=stream_name)
210
211 sheets = spreadsheet_md_results.get('sheets')
212 if sheets:
213 for sheet in sheets:
214 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
215 # LOGGER.info('sheet_schema = {}'.format(json.dumps(sheet_schema, indent=2, sort_keys=True)))
216
217 sheet_title = sheet.get('properties', {}).get('title')
218 schemas[sheet_title] = sheet_schema
219 sheet_mdata = metadata.new()
220 sheet_mdata = metadata.get_standard_metadata(
221 schema=sheet_schema,
222 key_properties=['__sdc_row'],
223 valid_replication_keys=None,
224 replication_method='FULL_TABLE'
225 )
226 field_metadata[sheet_title] = sheet_mdata
227
228 return schemas, field_metadata
diff --git a/tap_google_sheets/schemas/file_metadata.json b/tap_google_sheets/schemas/file_metadata.json
new file mode 100644
index 0000000..25c19c4
--- /dev/null
+++ b/tap_google_sheets/schemas/file_metadata.json
@@ -0,0 +1,44 @@
1{
2 "type": "object",
3 "additionalProperties": false,
4 "properties": {
5 "id": {
6 "type": ["null", "string"]
7 },
8 "name": {
9 "type": ["null", "string"]
10 },
11 "version": {
12 "type": ["null", "integer"]
13 },
14 "createdTime": {
15 "type": ["null", "string"],
16 "format": "date-time"
17 },
18 "modifiedTime": {
19 "type": ["null", "string"],
20 "format": "date-time"
21 },
22 "teamDriveId": {
23 "type": ["null", "string"]
24 },
25 "driveId": {
26 "type": ["null", "string"]
27 },
28 "lastModifyingUser": {
29 "type": ["null", "object"],
30 "additionalProperties": false,
31 "properties": {
32 "kind": {
33 "type": ["null", "integer"]
34 },
35 "displayName": {
36 "type": ["null", "string"]
37 },
38 "emailAdress": {
39 "type": ["null", "string"]
40 }
41 }
42 }
43 }
44}
diff --git a/tap_google_sheets/schemas/sheet_metadata.json b/tap_google_sheets/schemas/sheet_metadata.json
new file mode 100644
index 0000000..c3f2ac2
--- /dev/null
+++ b/tap_google_sheets/schemas/sheet_metadata.json
@@ -0,0 +1,89 @@
1{
2 "type": "object",
3 "additionalProperties": false,
4 "properties": {
5 "spreadsheetId": {
6 "type": ["null", "string"]
7 },
8 "sheetId": {
9 "type": ["null", "integer"]
10 },
11 "title": {
12 "type": ["null", "string"]
13 },
14 "index": {
15 "type": ["null", "integer"]
16 },
17 "sheetType": {
18 "type": ["null", "string"]
19 },
20 "sheetUrl": {
21 "type": ["null", "string"]
22 },
23 "gridProperties": {
24 "type": ["null", "object"],
25 "additionalProperties": false,
26 "properties": {
27 "rowCount": {
28 "type": ["null", "integer"]
29 },
30 "columnCount": {
31 "type": ["null", "integer"]
32 },
33 "frozenRowCount": {
34 "type": ["null", "integer"]
35 },
36 "frozenColumnCount": {
37 "type": ["null", "integer"]
38 }
39 }
40 },
41 "columns": {
42 "anyOf": [
43 {
44 "type": "array",
45 "items": {
46 "type": ["null", "object"],
47 "additionalProperties": false,
48 "properties": {
49 "columnIndex": {
50 "type": ["null", "integer"]
51 },
52 "columnLetter": {
53 "type": ["null", "string"]
54 },
55 "columnName": {
56 "type": ["null", "string"]
57 },
58 "columnType": {
59 "type": ["null", "string"]
60 },
61 "columnSkipped": {
62 "type": ["null", "boolean"]
63 },
64 "type": {
65 "anyOf": [
66 {
67 "type": "array",
68 "items": {
69 "type": "string"
70 }
71 },
72 {
73 "type": "null"
74 }
75 ]
76 },
77 "format": {
78 "type": ["null", "string"]
79 }
80 }
81 }
82 },
83 {
84 "type": "null"
85 }
86 ]
87 }
88 }
89}
diff --git a/tap_google_sheets/schemas/sheets_loaded.json b/tap_google_sheets/schemas/sheets_loaded.json
new file mode 100644
index 0000000..12f967a
--- /dev/null
+++ b/tap_google_sheets/schemas/sheets_loaded.json
@@ -0,0 +1,22 @@
1{
2 "type": "object",
3 "additionalProperties": false,
4 "properties": {
5 "spreadsheetId": {
6 "type": ["null", "string"]
7 },
8 "sheetId": {
9 "type": ["null", "integer"]
10 },
11 "sheetTitle": {
12 "type": ["null", "string"]
13 },
14 "loadDate": {
15 "type": ["null", "string"],
16 "format": "date-time"
17 },
18 "lastRowNumber": {
19 "type": ["null", "integer"]
20 }
21 }
22}
diff --git a/tap_google_sheets/schemas/spreadsheet_metadata.json b/tap_google_sheets/schemas/spreadsheet_metadata.json
new file mode 100644
index 0000000..852cb76
--- /dev/null
+++ b/tap_google_sheets/schemas/spreadsheet_metadata.json
@@ -0,0 +1,30 @@
1{
2 "type": "object",
3 "additionalProperties": false,
4 "properties": {
5 "spreadsheetId": {
6 "type": ["null", "string"]
7 },
8 "properties": {
9 "type": ["null", "object"],
10 "additionalProperties": false,
11 "properties": {
12 "title": {
13 "type": ["null", "string"]
14 },
15 "locale": {
16 "type": ["null", "string"]
17 },
18 "autoRecalc": {
19 "type": ["null", "string"]
20 },
21 "timeZone": {
22 "type": ["null", "string"]
23 }
24 }
25 },
26 "spreadsheetUrl": {
27 "type": ["null", "string"]
28 }
29 }
30}
diff --git a/tap_google_sheets/streams.py b/tap_google_sheets/streams.py
new file mode 100644
index 0000000..299326a
--- /dev/null
+++ b/tap_google_sheets/streams.py
@@ -0,0 +1,66 @@
1from collections import OrderedDict
2
3# streams: API URL endpoints to be called
4# properties:
5# <root node>: Plural stream name for the endpoint
6# path: API endpoint relative path, when added to the base URL, creates the full path,
7# default = stream_name
8# key_properties: Primary key fields for identifying an endpoint record.
9# replication_method: INCREMENTAL or FULL_TABLE
10# replication_keys: bookmark_field(s), typically a date-time, used for filtering the results
11# and setting the state
12# params: Query, sort, and other endpoint specific parameters; default = {}
13# data_key: JSON element containing the results list for the endpoint; default = root (no data_key)
14# bookmark_query_field: From date-time field used for filtering the query
15# bookmark_type: Data type for bookmark, integer or datetime
16
17FILE_METADATA = {
18 "api": "files",
19 "path": "files/{spreadsheet_id}",
20 "key_properties": ["id"],
21 "replication_method": "FULL_TABLE",
22 "params": {
23 "fields": "id,name,createdTime,modifiedTime,version,teamDriveId,driveId,lastModifyingUser"
24 }
25}
26
27SPREADSHEET_METADATA = {
28 "api": "sheets",
29 "path": "spreadsheets/{spreadsheet_id}",
30 "key_properties": ["spreadsheetId"],
31 "replication_method": "FULL_TABLE",
32 "params": {
33 "includeGridData": "false"
34 }
35}
36
37SHEET_METADATA = {
38 "api": "sheets",
39 "path": "spreadsheets/{spreadsheet_id}",
40 "key_properties": ["sheetId"],
41 "replication_method": "FULL_TABLE",
42 "params": {
43 "includeGridData": "true",
44 "ranges": "'{sheet_title}'!1:2"
45 }
46}
47
48SHEETS_LOADED = {
49 "api": "sheets",
50 "path": "spreadsheets/{spreadsheet_id}/values/'{sheet_title}'!{range_rows}",
51 "data_key": "values",
52 "key_properties": ["spreadsheetId", "sheetId", "loadDate"],
53 "replication_method": "FULL_TABLE",
54 "params": {
55 "dateTimeRenderOption": "SERIAL_NUMBER",
56 "valueRenderOption": "UNFORMATTED_VALUE",
57 "majorDimension": "ROWS"
58 }
59}
60
61# Ensure streams are ordered logically
62STREAMS = OrderedDict()
63STREAMS['file_metadata'] = FILE_METADATA
64STREAMS['spreadsheet_metadata'] = SPREADSHEET_METADATA
65STREAMS['sheet_metadata'] = SHEET_METADATA
66STREAMS['sheets_loaded'] = SHEETS_LOADED
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
new file mode 100644
index 0000000..a8b02d0
--- /dev/null
+++ b/tap_google_sheets/sync.py
@@ -0,0 +1,281 @@
1import time
2import math
3import singer
4import json
5from collections import OrderedDict
6from singer import metrics, metadata, Transformer, utils
7from singer.utils import strptime_to_utc, strftime
8from tap_google_sheets.transform import transform_json
9from tap_google_sheets.streams import STREAMS
10from tap_google_sheets.schema import get_sheet_metadata
11
12LOGGER = singer.get_logger()
13
14
15def write_schema(catalog, stream_name):
16 stream = catalog.get_stream(stream_name)
17 schema = stream.schema.to_dict()
18 try:
19 singer.write_schema(stream_name, schema, stream.key_properties)
20 except OSError as err:
21 LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
22 raise err
23
24
25def write_record(stream_name, record, time_extracted):
26 try:
27 singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
28 except OSError as err:
29 LOGGER.info('OS Error writing record for: {}'.format(stream_name))
30 LOGGER.info('record: {}'.format(record))
31 raise err
32
33
34def get_bookmark(state, stream, default):
35 if (state is None) or ('bookmarks' not in state):
36 return default
37 return (
38 state
39 .get('bookmarks', {})
40 .get(stream, default)
41 )
42
43
44def write_bookmark(state, stream, value):
45 if 'bookmarks' not in state:
46 state['bookmarks'] = {}
47 state['bookmarks'][stream] = value
48 LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
49 singer.write_state(state)
50
51
52# def transform_datetime(this_dttm):
53def transform_datetime(this_dttm):
54 with Transformer() as transformer:
55 new_dttm = transformer._transform_datetime(this_dttm)
56 return new_dttm
57
58
59def process_records(catalog, #pylint: disable=too-many-branches
60 stream_name,
61 records,
62 time_extracted,
63 bookmark_field=None,
64 bookmark_type=None,
65 max_bookmark_value=None,
66 last_datetime=None,
67 last_integer=None,
68 parent=None,
69 parent_id=None):
70 stream = catalog.get_stream(stream_name)
71 schema = stream.schema.to_dict()
72 stream_metadata = metadata.to_map(stream.metadata)
73
74 with metrics.record_counter(stream_name) as counter:
75 for record in records:
76 # If child object, add parent_id to record
77 if parent_id and parent:
78 record[parent + '_id'] = parent_id
79
80 # Transform record for Singer.io
81 with Transformer() as transformer:
82 transformed_record = transformer.transform(
83 record,
84 schema,
85 stream_metadata)
86 # Reset max_bookmark_value to new value if higher
87 if transformed_record.get(bookmark_field):
88 if max_bookmark_value is None or \
89 transformed_record[bookmark_field] > transform_datetime(max_bookmark_value):
90 max_bookmark_value = transformed_record[bookmark_field]
91
92 if bookmark_field and (bookmark_field in transformed_record):
93 if bookmark_type == 'integer':
94 # Keep only records whose bookmark is after the last_integer
95 if transformed_record[bookmark_field] >= last_integer:
96 write_record(stream_name, transformed_record, \
97 time_extracted=time_extracted)
98 counter.increment()
99 elif bookmark_type == 'datetime':
100 last_dttm = transform_datetime(last_datetime)
101 bookmark_dttm = transform_datetime(transformed_record[bookmark_field])
102 # Keep only records whose bookmark is after the last_datetime
103 if bookmark_dttm >= last_dttm:
104 write_record(stream_name, transformed_record, \
105 time_extracted=time_extracted)
106 counter.increment()
107 else:
108 write_record(stream_name, transformed_record, time_extracted=time_extracted)
109 counter.increment()
110
111 return max_bookmark_value, counter.value
112
113
114# Currently syncing sets the stream currently being delivered in the state.
115# If the integration is interrupted, this state property is used to identify
116# the starting point to continue from.
117# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
118def update_currently_syncing(state, stream_name):
119 if (stream_name is None) and ('currently_syncing' in state):
120 del state['currently_syncing']
121 else:
122 singer.set_currently_syncing(state, stream_name)
123 singer.write_state(state)
124
125
126# List selected fields from stream catalog
127def get_selected_fields(catalog, stream_name):
128 stream = catalog.get_stream(stream_name)
129 mdata = metadata.to_map(stream.metadata)
130 mdata_list = singer.metadata.to_list(mdata)
131 selected_fields = []
132 for entry in mdata_list:
133 field = None
134 try:
135 field = entry['breadcrumb'][1]
136 if entry.get('metadata', {}).get('selected', False):
137 selected_fields.append(field)
138 except IndexError:
139 pass
140 return selected_fields
141
142
143def get_data(stream_name,
144 endpoint_config,
145 client,
146 spreadsheet_id,
147 range_rows=None):
148 if not range_rows:
149 range_rows = ''
150 path = endpoint_config.get('path', stream_name).replace(
151 '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
152 '{range_rows}', range_rows)
153 params = endpoint_config.get('params', {})
154 api = endpoint_config.get('api', 'sheets')
155 querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
156 '{sheet_title}', stream_name)
157 data = {}
158 data = client.get(
159 path=path,
160 api=api,
161 params=querystring,
162 endpoint=stream_name)
163 return data
164
165
166def transform_file_metadata(file_metadata):
167 # Convert to dict
168 file_metadata_tf = json.loads(json.dumps(file_metadata))
169 # Remove keys
170 if file_metadata_tf.get('lastModifyingUser'):
171 file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
172 file_metadata_tf['lastModifyingUser'].pop('me', None)
173 file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
174 # Add record to an array of 1
175 file_metadata_arr = []
176 file_metadata_arr.append(file_metadata_tf)
177 return file_metadata_arr
178
179
180def transform_spreadsheet_metadata(spreadsheet_metadata):
181 # Convert to dict
182 spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
183 # Remove keys
184 if spreadsheet_metadata_tf.get('properties'):
185 spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
186 spreadsheet_metadata_tf.pop('sheets', None)
187 # Add record to an array of 1
188 spreadsheet_metadata_arr = []
189 spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
190 return spreadsheet_metadata_arr
191
192
193def transform_sheet_metadata(spreadsheet_id, sheet, columns):
194 # Convert to properties to dict
195 sheet_metadata = sheet.get('properties')
196 sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
197 sheet_id = sheet_metadata_tf.get('sheetId')
198 sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
199 spreadsheet_id, sheet_id)
200 sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
201 sheet_metadata_tf['sheetUrl'] = sheet_url
202 sheet_metadata_tf['columns'] = columns
203 return sheet_metadata_tf
204
205
206def sync(client, config, catalog, state):
207 start_date = config.get('start_date')
208 spreadsheet_id = config.get('spreadsheet_id')
209
210 # Get selected_streams from catalog, based on state last_stream
211 # last_stream = Previous currently synced stream, if the load was interrupted
212 last_stream = singer.get_currently_syncing(state)
213 LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
214 selected_streams = []
215 for stream in catalog.get_selected_streams(state):
216 selected_streams.append(stream.stream)
217 LOGGER.info('selected_streams: {}'.format(selected_streams))
218
219 if not selected_streams:
220 return
221
222 # Get file_metadata
223 file_metadata = {}
224 file_metadata_config = STREAMS.get('file_metadata')
225 file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id)
226 file_metadata_tf = transform_file_metadata(file_metadata)
227 # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
228 last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date))
229 this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
230 LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
231 if this_datetime <= last_datetime:
232 LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
233 return 0
234
235 # Get spreadsheet_metadata
236 spreadsheet_metadata = {}
237 spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata')
238 spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id)
239 spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
240 # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf))
241
242 # Get sheet_metadata
243 sheets = spreadsheet_metadata.get('sheets')
244 sheet_metadata = []
245 sheets_loaded = []
246 sheets_loaded_config = STREAMS['sheets_loaded']
247 if sheets:
248 for sheet in sheets:
249 sheet_title = sheet.get('properties', {}).get('title')
250 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
251 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
252 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
253 sheet_metadata.append(sheet_metadata_tf)
254
255 # Determine range of rows and columns for "paging" through batch rows of data
256 sheet_last_col_index = 1
257 sheet_last_col_letter = 'A'
258 for col in columns:
259 col_index = col.get('columnIndex')
260 col_letter = col.get('columnLetter')
261 if col_index > sheet_last_col_index:
262 sheet_last_col_index = col_index
263 sheet_last_col_letter = col_letter
264 sheet_max_row = sheet.get('gridProperties', {}).get('rowCount')
265 is_empty_row = False
266 batch_rows = 200
267 from_row = 2
268 if sheet_max_row < batch_rows:
269 to_row = sheet_max_row
270 else:
271 to_row = batch_rows
272
273 while not is_empty_row and to_row <= sheet_max_row:
274 range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row)
275
276 sheet_data = get_data(
277 stream_name=sheet_title,
278 endpoint_config=sheets_loaded_config,
279 client=client,
280 spreadsheet_id=spreadsheet_id,
281 range_rows=range_rows)