client.py

#
from datetime import datetime, timedelta
from collections import OrderedDict
import backoff
import requests
import singer
from singer import metrics
from singer import utils

BASE_URL = 'https://www.googleapis.com'
GOOGLE_TOKEN_URI = 'https://oauth2.googleapis.com/token'
LOGGER = singer.get_logger()
#
class Server5xxError(Exception):
    pass
#
class Server429Error(Exception):
    pass
#
class GoogleError(Exception):
    pass
#
class GoogleBadRequestError(GoogleError):
    pass
#
class GoogleUnauthorizedError(GoogleError):
    pass
#
class GooglePaymentRequiredError(GoogleError):
    pass
#
class GoogleNotFoundError(GoogleError):
    pass
#
class GoogleMethodNotAllowedError(GoogleError):
    pass
#
class GoogleConflictError(GoogleError):
    pass
#
class GoogleGoneError(GoogleError):
    pass
#
class GooglePreconditionFailedError(GoogleError):
    pass
#
class GoogleRequestEntityTooLargeError(GoogleError):
    pass
#
class GoogleRequestedRangeNotSatisfiableError(GoogleError):
    pass
#
class GoogleExpectationFailedError(GoogleError):
    pass
#
class GoogleForbiddenError(GoogleError):
    pass
#
class GoogleUnprocessableEntityError(GoogleError):
    pass
#
class GooglePreconditionRequiredError(GoogleError):
    pass
#
class GoogleInternalServiceError(GoogleError):
    pass
#

Error Codes: https://developers.google.com/webmaster-tools/search-console-api-original/v3/errors

ERROR_CODE_EXCEPTION_MAPPING = {
    400: GoogleBadRequestError,
    401: GoogleUnauthorizedError,
    402: GooglePaymentRequiredError,
    403: GoogleForbiddenError,
    404: GoogleNotFoundError,
    405: GoogleMethodNotAllowedError,
    409: GoogleConflictError,
    410: GoogleGoneError,
    412: GooglePreconditionFailedError,
    413: GoogleRequestEntityTooLargeError,
    416: GoogleRequestedRangeNotSatisfiableError,
    417: GoogleExpectationFailedError,
    422: GoogleUnprocessableEntityError,
    428: GooglePreconditionRequiredError,
    500: GoogleInternalServiceError}
#
def get_exception_for_error_code(error_code):
    return ERROR_CODE_EXCEPTION_MAPPING.get(error_code, GoogleError)
#

#

client.py:raise_for_error() calls the raise_for_status() function from the requests library. and catches all requests.HTTPError and requests.ConnectionError. Note the name difference.

#
Thoughts

I believe there are 5 ways to leave this function. It’s worth skimming this just to understand the structure. I’ll note below, but I think there’s just two ways to leave this function.

#
  1. If the length of the response content is 0, then we just leave
    • I believe this results in us swallowing the requests.HTTPError and successfully returns to the calling function
    • I believe it’s possible to leave the function this way
  2. If you can call response.json(), then we attempt to create a specific error message via client.py:get_exception_for_error_code(), which just looks up a code found in response.json()
    • I am not convinced this ever works for this tap because my understanding of raise_for_status() is if you raise_for_status() is unsuccessful then response.json() will also be unsuccessful. So, because we are in the exception handling for raise_for_status() I think we never make it past response = response.json() on client.py:118
    • I believe it’s possible to leave the function this way
  3. Assuming response.json() does fail, then that function will raise a simplejson.scanner.JSONDecodeError with an error message like "Expecting value: line 1 column 1 (char 0)"
  4. Assuming response.json() worked, but it’s lacks an error key and lacks an errorCode key, we re-raise whatever was caught from raise_for_status()
  5. We also re-raise whatever was caught from raise_for_status() if a ValueError or TypeError occurs in trying to handle the raise_for_status() error
#

#

Try to catch API errors to rethrow as tap specific errors

def raise_for_error(response):
#

Inputs:

  • response: A requests.Response object

Returns:

  • None

Side Effects:

  • Raises a GoogleError
    try:
        response.raise_for_status()
    except (requests.HTTPError, requests.ConnectionError) as error:
        try:
            content_length = len(response.content)
            if content_length == 0:
                return
            response = response.json()
            if ('error' in response) or ('errorCode' in response):
                message = '%s: %s' % (response.get('error', str(error)),
                                      response.get('message', 'Unknown Error'))
                error_code = response.get('error', {}).get('code')
                ex = get_exception_for_error_code(error_code)
                raise ex(message)
            raise GoogleError(error)
        except (ValueError, TypeError):
            raise GoogleError(error)
#

#

Handling a successful response

#

A successful response is defined as anything that returns a HTTP 200.

#

On a successful response, we store the access_token returned on a private field, GoogleClient.__access_token, and we update GoogleClient.__expires to be the time this access_token expires. GoogleClient.__expires is a datetime object in UTC.

#

Handling an unsuccessful response

#

To handle unsuccessful requests, the tap has the following pattern

#
if response.status_code >= 500:
    raise Server5xxError()

if response.status_code != 200:
    raise_for_error(response)
#

The client.py:Server5xxError is caught by backoff and we exponentially backoff the request.

#

#

This is a class implemented in the tap in client.py. We initialize it once in __init__.py as a context manager in __init__.py:main()

class GoogleClient: # pylint: disable=too-many-instance-attributes
#
#

To create the GoogleClient object, we have to pass in the three OAuth2 variables. Optionally we can include the user_agent.

Side Effects:

  • All of this gets stored in private fields by the constructor.
  • The constructor also initializes a requests.Session.
    def __init__(self, client_id, client_secret, refresh_token, access_token, user_agent=None):
#
        self.__client_id = client_id
        self.__client_secret = client_secret
        self.__refresh_token = refresh_token
        self.__user_agent = user_agent
        self.__access_token = access_token
        self.__session = requests.Session()
        self.base_url = None
#

On enter, get a new access token

    def __enter__(self):
#
        self.get_access_token()
        return self
#

On exit, close the Requests Session

    def __exit__(self, exception_type, exception_value, traceback):
#
        self.__session.close()
#

get_access_token() will POST to client.py:GOOGLE_TOKEN_URI which is just https://oauth2.googleapis.com/token. The body of the POST looks like

{
  "grant_type": "refresh_token",
  "client_id": my_client_id,
  "client_secret": my_client_secret,
  "refresh_token": my_refresh_token
}

Side Effects:

  • Store the access token and time it expires in private fields on the Client object
    @backoff.on_exception(backoff.expo,
                          Server5xxError,
                          max_tries=5,
                          factor=2)
    def get_access_token(self):
#
        if self.__access_token is not None:
            return

        headers = {}
        if self.__user_agent:
            headers['User-Agent'] = self.__user_agent

        response = self.__session.post(
            url=GOOGLE_TOKEN_URI,
            headers=headers,
            data={
                'grant_type': 'refresh_token',
                'client_id': self.__client_id,
                'client_secret': self.__client_secret,
                'refresh_token': self.__refresh_token,
            })

        if response.status_code >= 500:
            raise Server5xxError()

        if response.status_code != 200:
            raise_for_error(response)

        data = response.json()
        self.__access_token = data['access_token']
        self.__expires = datetime.utcnow() + timedelta(seconds=data['expires_in'])
        LOGGER.info('Authorized, token expires = {}'.format(self.__expires))
#

#

This function starts with a call to GoogleClient.get_access_token() which likely returns immediately most of the time.

#

Then we decide what url we are sending the request to. Sometimes it’s https://sheets.googleapis.com/v4 and sometimes it’s https://www.googleapis.com/drive/v3.

#
  • It seems like a mistake to decide this so deep into the code. Why doesn’t the caller decide where the request goes?
#

Then we set up the request headers. The authorization, user-agent, and content-type keys come into play here

#
  • One benefit of a a requests.Session is that you can set the headers for the session. I’m not sure why we don’t do that here
  • If we did that, we wouldn’t have to think about the access_token making it into the headers here. They would just already be there
#

Then we make the request, timing how long it takes with a singer.metrics.http_request_timer.

#

The chunk of code after making the request handles an unsuccessful response. We will retry HTTP 500 and HTTP 429 errors, and client.py:raise_for_error for everything else.

#

The most unique thing of this tap happens here: we return an OrderedDict of the response with this line

#
return response.json(object_pairs_hook=OrderedDict)
#

where object_pairs_hook is a kwarg passed to the JSON parser used by requests.

#

This turns every key-value pair in the JSON response into a OrderedDict.

#

Why do we do this? I don’t know. See the footnote for code examples

#

#

Rate Limit: https://developers.google.com/sheets/api/limits 100 request per 100 seconds per User

    @backoff.on_exception(backoff.expo,
                          (Server5xxError, ConnectionError, Server429Error),
                          max_tries=7,
                          factor=3)
    @utils.ratelimit(100, 100)
    def request(self, method, path=None, url=None, api=None, **kwargs):
#

Make a request to the API

Inputs:

  • method: “GET” or “POST”
  • url: The start of the url to make the request to
  • path:

Returns:

  • A requests.Reponse

Side Effects:

  • Might store a new access token
        self.get_access_token()
#

Construct the URL to make a request to

        self.base_url = 'https://sheets.googleapis.com/v4'
        if api == 'files':
            self.base_url = 'https://www.googleapis.com/drive/v3'

        if not url and path:
            url = '{}/{}'.format(self.base_url, path)

        if 'endpoint' in kwargs:
            endpoint = kwargs['endpoint']
            del kwargs['endpoint']
        else:
            endpoint = None
        LOGGER.info('{} URL = {}'.format(endpoint, url))
#

Contruct the headers arg for requests.request()

        if 'headers' not in kwargs:
            kwargs['headers'] = {}
        kwargs['headers']['Authorization'] = 'Bearer {}'.format(self.__access_token)

        if self.__user_agent:
            kwargs['headers']['User-Agent'] = self.__user_agent

        if method == 'POST':
            kwargs['headers']['Content-Type'] = 'application/json'
#

Make request

        with metrics.http_request_timer(endpoint) as timer:
            response = self.__session.request(method, url, **kwargs)
            timer.tags[metrics.Tag.http_status_code] = response.status_code
#

Start backoff logic

        if response.status_code >= 500:
            raise Server5xxError()

        if response.status_code == 429:
            raise Server429Error()

        if response.status_code != 200:
            raise_for_error(response)
#

Ensure keys and rows are ordered as received from API. QUESITON: But why??

        return response.json(object_pairs_hook=OrderedDict)
#

Syntactic Sugar

    def get(self, path, api, **kwargs):
        return self.request(method='GET', path=path, api=api, **kwargs)
#
    def post(self, path, api, **kwargs):
        return self.request(method='POST', path=path, api=api, **kwargs)
#

#

Footnotes

#

Here’s a normal .json()‘s output

{"file": "this is my file"}
#

Here’s the weird one’s .json(object_pairs_hook=OrderedDict) output

OrderedDict([('file', 'this is my file')])
#

Here’s a more complex example:

#
{ "deleted": false,
  "__v": 0,
  "_id": "5887e1d85c873e0011036889",
  "text": "Cats make about 100 different sounds. Dogs make only about 10.",
  "createdAt": "2018-01-15T21:20:00.003Z",
  "updatedAt": "2020-09-03T16:39:39.578Z",
  "used": true,
  "status": {
      "sentCount": 1,
      "feedback": "",
      "verified": true
  },
  "type": "cat",
  "user": "5a9ac18c7478810ea6c06381",
  "source": "user"}
#

Versus .json(object_pairs_hook=OrderedDict)

OrderedDict([('status', OrderedDict([('verified', True),
                                     ('sentCount', 1),
                                     ('feedback', '')])),
             ('type', 'cat'),
             ('deleted', False),
             ('_id', '5887e1d85c873e0011036889'),
             ('user', '5a9ac18c7478810ea6c06381'),
             ('text', 'Cats make about 100 different sounds. Dogs make only about 10.'),
             ('__v', 0),
             ('source', 'user'),
             ('updatedAt', '2020-09-03T16:39:39.578Z'),
             ('createdAt', '2018-01-15T21:20:00.003Z'),
             ('used', True)])