aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'tap_google_sheets/client.py')
-rw-r--r--tap_google_sheets/client.py239
1 files changed, 117 insertions, 122 deletions
diff --git a/tap_google_sheets/client.py b/tap_google_sheets/client.py
index 4f38352..0cbad98 100644
--- a/tap_google_sheets/client.py
+++ b/tap_google_sheets/client.py
@@ -1,16 +1,21 @@
1from datetime import datetime, timedelta 1from datetime import datetime, timedelta
2from collections import OrderedDict 2from collections import OrderedDict
3import backoff 3import backoff
4import requests
5import singer 4import singer
5import logging
6import pickle
7import json
8import os
6from singer import metrics 9from singer import metrics
7from singer import utils 10from singer import utils
11from google.oauth2 import service_account
12from google_auth_oauthlib.flow import InstalledAppFlow
13from google.auth.transport.requests import Request
14from googleapiclient.errors import HttpError
15import googleapiclient.discovery
8 16
9BASE_URL = 'https://www.googleapis.com'
10GOOGLE_TOKEN_URI = 'https://oauth2.googleapis.com/token'
11LOGGER = singer.get_logger() 17LOGGER = singer.get_logger()
12 18
13
14class Server5xxError(Exception): 19class Server5xxError(Exception):
15 pass 20 pass
16 21
@@ -101,90 +106,85 @@ ERROR_CODE_EXCEPTION_MAPPING = {
101 428: GooglePreconditionRequiredError, 106 428: GooglePreconditionRequiredError,
102 500: GoogleInternalServiceError} 107 500: GoogleInternalServiceError}
103 108
104
105def get_exception_for_error_code(error_code):
106 return ERROR_CODE_EXCEPTION_MAPPING.get(error_code, GoogleError)
107
108def raise_for_error(response):
109 try:
110 response.raise_for_status()
111 except (requests.HTTPError, requests.ConnectionError) as error:
112 try:
113 content_length = len(response.content)
114 if content_length == 0:
115 # There is nothing we can do here since Google has neither sent
116 # us a 2xx response nor a response content.
117 return
118 response = response.json()
119 if ('error' in response) or ('errorCode' in response):
120 message = '%s: %s' % (response.get('error', str(error)),
121 response.get('message', 'Unknown Error'))
122 error_code = response.get('error', {}).get('code')
123 ex = get_exception_for_error_code(error_code)
124 raise ex(message)
125 raise GoogleError(error)
126 except (ValueError, TypeError):
127 raise GoogleError(error)
128
129class GoogleClient: # pylint: disable=too-many-instance-attributes 109class GoogleClient: # pylint: disable=too-many-instance-attributes
130 def __init__(self, 110 SCOPES = [
131 client_id, 111 "https://www.googleapis.com/auth/drive.metadata.readonly",
132 client_secret, 112 "https://www.googleapis.com/auth/spreadsheets.readonly"
133 refresh_token, 113 ]
134 user_agent=None): 114
135 self.__client_id = client_id 115 def __init__(self, credentials_file):
136 self.__client_secret = client_secret 116 self.__credentials = self.fetchCredentials(credentials_file)
137 self.__refresh_token = refresh_token 117 self.__sheets_service = googleapiclient.discovery.build(
138 self.__user_agent = user_agent 118 'sheets',
139 self.__access_token = None 119 'v4',
140 self.__expires = None 120 credentials=self.__credentials,
141 self.__session = requests.Session() 121 cache_discovery=False
142 self.base_url = None 122 )
143 123 self.__drive_service = googleapiclient.discovery.build(
124 'drive',
125 'v3',
126 credentials=self.__credentials,
127 cache_discovery=False
128 )
129
130 def fetchCredentials(self, credentials_file):
131 LOGGER.debug('authenticate with google')
132 data = None
133
134 # Check a credentials file exist
135 if not os.path.exists(credentials_file):
136 raise Exception("The configured Google credentials file {} doesn't exist".format(credentials_file))
137
138 # Load credentials json file
139 with open(credentials_file) as json_file:
140 data = json.load(json_file)
141
142 if data.get('type', '') == 'service_account':
143 return self.fetchServiceAccountCredentials(credentials_file)
144 elif data.get('installed'):
145 return self.fetchInstalledOAuthCredentials(credentials_file)
146 else:
147 raise Exception("""This Google credentials file is not yet recognize.
148
149 Please use either:
150 - a Service Account (https://github.com/googleapis/google-api-python-client/blob/d0110cf4f7aaa93d6f56fc028cd6a1e3d8dd300a/docs/oauth-server.md)
151 - an installed OAuth client (https://github.com/googleapis/google-api-python-client/blob/d0110cf4f7aaa93d6f56fc028cd6a1e3d8dd300a/docs/oauth-installed.md)"""
152 )
153
154 def fetchServiceAccountCredentials(self, credentials_file):
155 # The service account credentials file can be used for server-to-server applications
156 return service_account.Credentials.from_service_account_file(
157 credentials_file, scopes=GoogleClient.SCOPES)
158
159 def fetchInstalledOAuthCredentials(self, credentials_file):
160 creds = None
161
162 # The file token.pickle stores the user's access and refresh tokens, and is
163 # created automatically when the authorization flow completes for the first
164 # time.
165 if os.path.exists('token.pickle'):
166 with open('token.pickle', 'rb') as token:
167 creds = pickle.load(token)
168
169 # If there are no (valid) credentials available, let the user log in.
170 if not creds or not creds.valid:
171 if creds and creds.expired and creds.refresh_token:
172 creds.refresh(Request())
173 else:
174 flow = InstalledAppFlow.from_client_secrets_file(
175 credentials_file, GoogleClient.SCOPES)
176 creds = flow.run_local_server(port=0)
177 # Save the credentials for the next run
178 with open('token.pickle', 'wb') as token:
179 pickle.dump(creds, token)
180
181 return creds
144 182
145 def __enter__(self): 183 def __enter__(self):
146 self.get_access_token()
147 return self 184 return self
148 185
149 def __exit__(self, exception_type, exception_value, traceback): 186 def __exit__(self, exception_type, exception_value, traceback):
150 self.__session.close() 187 LOGGER.debug('exiting google client')
151
152
153 @backoff.on_exception(backoff.expo,
154 Server5xxError,
155 max_tries=5,
156 factor=2)
157 def get_access_token(self):
158 # The refresh_token never expires and may be used many times to generate each access_token
159 # Since the refresh_token does not expire, it is not included in get access_token response
160 if self.__access_token is not None and self.__expires > datetime.utcnow():
161 return
162
163 headers = {}
164 if self.__user_agent:
165 headers['User-Agent'] = self.__user_agent
166
167 response = self.__session.post(
168 url=GOOGLE_TOKEN_URI,
169 headers=headers,
170 data={
171 'grant_type': 'refresh_token',
172 'client_id': self.__client_id,
173 'client_secret': self.__client_secret,
174 'refresh_token': self.__refresh_token,
175 })
176
177 if response.status_code >= 500:
178 raise Server5xxError()
179
180 if response.status_code != 200:
181 raise_for_error(response)
182
183 data = response.json()
184 self.__access_token = data['access_token']
185 self.__expires = datetime.utcnow() + timedelta(seconds=data['expires_in'])
186 LOGGER.info('Authorized, token expires = {}'.format(self.__expires))
187
188 188
189 # Rate Limit: https://developers.google.com/sheets/api/limits 189 # Rate Limit: https://developers.google.com/sheets/api/limits
190 # 100 request per 100 seconds per User 190 # 100 request per 100 seconds per User
@@ -193,53 +193,48 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes
193 max_tries=7, 193 max_tries=7,
194 factor=3) 194 factor=3)
195 @utils.ratelimit(100, 100) 195 @utils.ratelimit(100, 100)
196 def request(self, method, path=None, url=None, api=None, **kwargs): 196 def request(self, endpoint=None, params={}, **kwargs):
197 self.get_access_token() 197 formatted_params = {}
198 self.base_url = 'https://sheets.googleapis.com/v4' 198 for (key, value) in params.items():
199 if api == 'files': 199 # API parameters interpolation
200 self.base_url = 'https://www.googleapis.com/drive/v3' 200 # will raise a KeyError in case a necessary argument is missing
201 201 formatted_params[key] = value.format(**kwargs)
202 if not url and path: 202
203 url = '{}/{}'.format(self.base_url, path) 203 # Call the correct Google API depending on the stream name
204 204 if endpoint == 'spreadsheet_metadata' or endpoint == 'sheet_metadata':
205 # endpoint = stream_name (from sync.py API call) 205 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/get
206 if 'endpoint' in kwargs: 206 request = self.__sheets_service.spreadsheets().get(**formatted_params)
207 endpoint = kwargs['endpoint'] 207 elif endpoint == 'sheets_loaded':
208 del kwargs['endpoint'] 208 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get
209 request = self.__sheets_service.spreadsheets().values().get(**formatted_params)
210 elif endpoint == 'file_metadata':
211 # https://developers.google.com/drive/api/v3/reference/files/get
212 request = self.__drive_service.files().get(**formatted_params)
209 else: 213 else:
210 endpoint = None 214 raise Exception('{} not implemented yet!'.format(endpoint))
211 LOGGER.info('{} URL = {}'.format(endpoint, url))
212
213 if 'headers' not in kwargs:
214 kwargs['headers'] = {}
215 kwargs['headers']['Authorization'] = 'Bearer {}'.format(self.__access_token)
216 215
217 if self.__user_agent: 216 with metrics.http_request_timer(endpoint) as timer:
218 kwargs['headers']['User-Agent'] = self.__user_agent 217 error = None
218 status_code = 400
219 219
220 if method == 'POST': 220 try:
221 kwargs['headers']['Content-Type'] = 'application/json' 221 response = request.execute()
222 status_code = 200
223 except HttpError as e:
224 status_code = e.resp.status or status_code
225 error = e
222 226
223 with metrics.http_request_timer(endpoint) as timer: 227 timer.tags[metrics.Tag.http_status_code] = status_code
224 response = self.__session.request(method, url, **kwargs)
225 timer.tags[metrics.Tag.http_status_code] = response.status_code
226 228
227 if response.status_code >= 500: 229 if status_code >= 500:
228 raise Server5xxError() 230 raise Server5xxError()
229 231
230 #Use retry functionality in backoff to wait and retry if 232 # Use retry functionality in backoff to wait and retry if
231 #response code equals 429 because rate limit has been exceeded 233 # response code equals 429 because rate limit has been exceeded
232 if response.status_code == 429: 234 if status_code == 429:
233 raise Server429Error() 235 raise Server429Error()
234 236
235 if response.status_code != 200: 237 if status_code != 200:
236 raise_for_error(response) 238 raise error
237
238 # Ensure keys and rows are ordered as received from API
239 return response.json(object_pairs_hook=OrderedDict)
240
241 def get(self, path, api, **kwargs):
242 return self.request(method='GET', path=path, api=api, **kwargs)
243 239
244 def post(self, path, api, **kwargs): 240 return response
245 return self.request(method='POST', path=path, api=api, **kwargs)