from datetime import datetime, timedelta
from collections import OrderedDict
import backoff
import requests
import singer
from singer import metrics
from singer import utils
BASE_URL = 'https://www.googleapis.com'
GOOGLE_TOKEN_URI = 'https://oauth2.googleapis.com/token'
LOGGER = singer.get_logger()
class Server5xxError(Exception):
pass
class Server429Error(Exception):
pass
class GoogleError(Exception):
pass
class GoogleBadRequestError(GoogleError):
pass
class GoogleUnauthorizedError(GoogleError):
pass
class GooglePaymentRequiredError(GoogleError):
pass
class GoogleNotFoundError(GoogleError):
pass
class GoogleMethodNotAllowedError(GoogleError):
pass
class GoogleConflictError(GoogleError):
pass
class GoogleGoneError(GoogleError):
pass
class GooglePreconditionFailedError(GoogleError):
pass
class GoogleRequestEntityTooLargeError(GoogleError):
pass
class GoogleRequestedRangeNotSatisfiableError(GoogleError):
pass
class GoogleExpectationFailedError(GoogleError):
pass
class GoogleForbiddenError(GoogleError):
pass
class GoogleUnprocessableEntityError(GoogleError):
pass
class GooglePreconditionRequiredError(GoogleError):
pass
class GoogleInternalServiceError(GoogleError):
pass
Error Codes: https://developers.google.com/webmaster-tools/search-console-api-original/v3/errors
ERROR_CODE_EXCEPTION_MAPPING = {
400: GoogleBadRequestError,
401: GoogleUnauthorizedError,
402: GooglePaymentRequiredError,
403: GoogleForbiddenError,
404: GoogleNotFoundError,
405: GoogleMethodNotAllowedError,
409: GoogleConflictError,
410: GoogleGoneError,
412: GooglePreconditionFailedError,
413: GoogleRequestEntityTooLargeError,
416: GoogleRequestedRangeNotSatisfiableError,
417: GoogleExpectationFailedError,
422: GoogleUnprocessableEntityError,
428: GooglePreconditionRequiredError,
500: GoogleInternalServiceError}
def get_exception_for_error_code(error_code):
return ERROR_CODE_EXCEPTION_MAPPING.get(error_code, GoogleError)
client.py:raise_for_error()
calls the raise_for_status()
function from the requests
library.
and catches all requests.HTTPError
and requests.ConnectionError
. Note the name difference.
I believe there are 5 ways to leave this function. It’s worth skimming this just to understand the structure. I’ll note below, but I think there’s just two ways to leave this function.
requests.HTTPError
and successfully returns to
the calling functionresponse.json()
, then we attempt to create a specific error message via
client.py:get_exception_for_error_code()
, which just looks up a code found in
response.json()
raise_for_status()
is if you raise_for_status()
is unsuccessful then response.json()
will also be unsuccessful. So, because we are in the exception handling for
raise_for_status()
I think we never make it past response = response.json()
on
client.py:118
response.json()
does fail, then that function will raise a
simplejson.scanner.JSONDecodeError
with an error message like "Expecting value: line 1
column 1 (char 0)"
response.json()
worked, but it’s lacks an error
key and lacks an errorCode
key,
we re-raise whatever was caught from raise_for_status()
raise_for_status()
if a ValueError
or TypeError
occurs in trying to handle the raise_for_status()
errorTry to catch API errors to rethrow as tap specific errors
def raise_for_error(response):
try:
response.raise_for_status()
except (requests.HTTPError, requests.ConnectionError) as error:
try:
content_length = len(response.content)
if content_length == 0:
return
response = response.json()
if ('error' in response) or ('errorCode' in response):
message = '%s: %s' % (response.get('error', str(error)),
response.get('message', 'Unknown Error'))
error_code = response.get('error', {}).get('code')
ex = get_exception_for_error_code(error_code)
raise ex(message)
raise GoogleError(error)
except (ValueError, TypeError):
raise GoogleError(error)
A successful response is defined as anything that returns a HTTP 200
.
On a successful response, we store the access_token
returned on a private field,
GoogleClient.__access_token
, and we update GoogleClient.__expires
to be the time this
access_token
expires. GoogleClient.__expires
is a datetime
object in UTC.
To handle unsuccessful requests, the tap has the following pattern
if response.status_code >= 500:
raise Server5xxError()
if response.status_code != 200:
raise_for_error(response)
The client.py:Server5xxError
is caught by backoff
and we exponentially backoff the request.
This is a class implemented in the tap in client.py
. We initialize it once in __init__.py
as
a context manager in __init__.py:main()
class GoogleClient: # pylint: disable=too-many-instance-attributes
To create the GoogleClient
object, we have to pass in the three OAuth2 variables. Optionally we
can include the user_agent
.
Side Effects:
requests.Session
. def __init__(self, client_id, client_secret, refresh_token, access_token, user_agent=None):
self.__client_id = client_id
self.__client_secret = client_secret
self.__refresh_token = refresh_token
self.__user_agent = user_agent
self.__access_token = access_token
self.__session = requests.Session()
self.base_url = None
On enter, get a new access token
def __enter__(self):
self.get_access_token()
return self
On exit, close the Requests Session
def __exit__(self, exception_type, exception_value, traceback):
self.__session.close()
get_access_token()
will POST
to client.py:GOOGLE_TOKEN_URI
which is just
https://oauth2.googleapis.com/token
. The body of the POST
looks like
{
"grant_type": "refresh_token",
"client_id": my_client_id,
"client_secret": my_client_secret,
"refresh_token": my_refresh_token
}
Side Effects:
@backoff.on_exception(backoff.expo,
Server5xxError,
max_tries=5,
factor=2)
def get_access_token(self):
if self.__access_token is not None:
return
headers = {}
if self.__user_agent:
headers['User-Agent'] = self.__user_agent
response = self.__session.post(
url=GOOGLE_TOKEN_URI,
headers=headers,
data={
'grant_type': 'refresh_token',
'client_id': self.__client_id,
'client_secret': self.__client_secret,
'refresh_token': self.__refresh_token,
})
if response.status_code >= 500:
raise Server5xxError()
if response.status_code != 200:
raise_for_error(response)
data = response.json()
self.__access_token = data['access_token']
self.__expires = datetime.utcnow() + timedelta(seconds=data['expires_in'])
LOGGER.info('Authorized, token expires = {}'.format(self.__expires))
This function starts with a call to GoogleClient.get_access_token()
which likely returns
immediately most of the time.
Then we decide what url we are sending the request to. Sometimes it’s
https://sheets.googleapis.com/v4
and sometimes it’s https://www.googleapis.com/drive/v3
.
Then we set up the request headers. The authorization
, user-agent
, and content-type
keys come
into play here
requests.Session
is that you can set the headers for the session. I’m not
sure why we don’t do that hereThen we make the request, timing how long it takes with a singer.metrics.http_request_timer
.
The chunk of code after making the request handles an unsuccessful response. We will retry HTTP
500
and HTTP 429
errors, and client.py:raise_for_error
for everything else.
The most unique thing of this tap happens here: we return an OrderedDict
of the response with this
line
return response.json(object_pairs_hook=OrderedDict)
where object_pairs_hook
is a kwarg
passed to the JSON parser used by requests
.
This turns every key-value pair in the JSON response into a OrderedDict
.
Why do we do this? I don’t know. See the footnote for code examples
Rate Limit: https://developers.google.com/sheets/api/limits 100 request per 100 seconds per User
@backoff.on_exception(backoff.expo,
(Server5xxError, ConnectionError, Server429Error),
max_tries=7,
factor=3)
@utils.ratelimit(100, 100)
def request(self, method, path=None, url=None, api=None, **kwargs):
Make a request to the API
Inputs:
Returns:
Side Effects:
self.get_access_token()
Construct the URL to make a request to
self.base_url = 'https://sheets.googleapis.com/v4'
if api == 'files':
self.base_url = 'https://www.googleapis.com/drive/v3'
if not url and path:
url = '{}/{}'.format(self.base_url, path)
if 'endpoint' in kwargs:
endpoint = kwargs['endpoint']
del kwargs['endpoint']
else:
endpoint = None
LOGGER.info('{} URL = {}'.format(endpoint, url))
Contruct the headers
arg for requests.request()
if 'headers' not in kwargs:
kwargs['headers'] = {}
kwargs['headers']['Authorization'] = 'Bearer {}'.format(self.__access_token)
if self.__user_agent:
kwargs['headers']['User-Agent'] = self.__user_agent
if method == 'POST':
kwargs['headers']['Content-Type'] = 'application/json'
Make request
with metrics.http_request_timer(endpoint) as timer:
response = self.__session.request(method, url, **kwargs)
timer.tags[metrics.Tag.http_status_code] = response.status_code
Start backoff logic
if response.status_code >= 500:
raise Server5xxError()
if response.status_code == 429:
raise Server429Error()
if response.status_code != 200:
raise_for_error(response)
Ensure keys and rows are ordered as received from API. QUESITON: But why??
return response.json(object_pairs_hook=OrderedDict)
def get(self, path, api, **kwargs):
return self.request(method='GET', path=path, api=api, **kwargs)
def post(self, path, api, **kwargs):
return self.request(method='POST', path=path, api=api, **kwargs)
Here’s the weird one’s .json(object_pairs_hook=OrderedDict)
output
OrderedDict([('file', 'this is my file')])
Here’s a more complex example:
{ "deleted": false,
"__v": 0,
"_id": "5887e1d85c873e0011036889",
"text": "Cats make about 100 different sounds. Dogs make only about 10.",
"createdAt": "2018-01-15T21:20:00.003Z",
"updatedAt": "2020-09-03T16:39:39.578Z",
"used": true,
"status": {
"sentCount": 1,
"feedback": "",
"verified": true
},
"type": "cat",
"user": "5a9ac18c7478810ea6c06381",
"source": "user"}
Versus .json(object_pairs_hook=OrderedDict)
OrderedDict([('status', OrderedDict([('verified', True),
('sentCount', 1),
('feedback', '')])),
('type', 'cat'),
('deleted', False),
('_id', '5887e1d85c873e0011036889'),
('user', '5a9ac18c7478810ea6c06381'),
('text', 'Cats make about 100 different sounds. Dogs make only about 10.'),
('__v', 0),
('source', 'user'),
('updatedAt', '2020-09-03T16:39:39.578Z'),
('createdAt', '2018-01-15T21:20:00.003Z'),
('used', True)])