from datetime import datetime, timedelta from collections import OrderedDict import backoff import requests import singer from singer import metrics from singer import utils BASE_URL = 'https://www.googleapis.com' GOOGLE_TOKEN_URI = 'https://oauth2.googleapis.com/token' LOGGER = singer.get_logger() class Server5xxError(Exception): pass class Server429Error(Exception): pass class GoogleError(Exception): pass class GoogleBadRequestError(GoogleError): pass class GoogleUnauthorizedError(GoogleError): pass class GooglePaymentRequiredError(GoogleError): pass class GoogleNotFoundError(GoogleError): pass class GoogleMethodNotAllowedError(GoogleError): pass class GoogleConflictError(GoogleError): pass class GoogleGoneError(GoogleError): pass class GooglePreconditionFailedError(GoogleError): pass class GoogleRequestEntityTooLargeError(GoogleError): pass class GoogleRequestedRangeNotSatisfiableError(GoogleError): pass class GoogleExpectationFailedError(GoogleError): pass class GoogleForbiddenError(GoogleError): pass class GoogleUnprocessableEntityError(GoogleError): pass class GooglePreconditionRequiredError(GoogleError): pass class GoogleInternalServiceError(GoogleError): pass # Error Codes: https://developers.google.com/webmaster-tools/search-console-api-original/v3/errors ERROR_CODE_EXCEPTION_MAPPING = { 400: GoogleBadRequestError, 401: GoogleUnauthorizedError, 402: GooglePaymentRequiredError, 403: GoogleForbiddenError, 404: GoogleNotFoundError, 405: GoogleMethodNotAllowedError, 409: GoogleConflictError, 410: GoogleGoneError, 412: GooglePreconditionFailedError, 413: GoogleRequestEntityTooLargeError, 416: GoogleRequestedRangeNotSatisfiableError, 417: GoogleExpectationFailedError, 422: GoogleUnprocessableEntityError, 428: GooglePreconditionRequiredError, 500: GoogleInternalServiceError} def get_exception_for_error_code(error_code): return ERROR_CODE_EXCEPTION_MAPPING.get(error_code, GoogleError) def raise_for_error(response): try: response.raise_for_status() except (requests.HTTPError, requests.ConnectionError) as error: try: content_length = len(response.content) if content_length == 0: # There is nothing we can do here since Google has neither sent # us a 2xx response nor a response content. return response = response.json() if ('error' in response) or ('errorCode' in response): message = '%s: %s' % (response.get('error', str(error)), response.get('message', 'Unknown Error')) error_code = response.get('error', {}).get('code') ex = get_exception_for_error_code(error_code) raise ex(message) raise GoogleError(error) except (ValueError, TypeError): raise GoogleError(error) class GoogleClient: # pylint: disable=too-many-instance-attributes def __init__(self, client_id, client_secret, refresh_token, user_agent=None): self.__client_id = client_id self.__client_secret = client_secret self.__refresh_token = refresh_token self.__user_agent = user_agent self.__access_token = None self.__expires = None self.__session = requests.Session() self.base_url = None def __enter__(self): self.get_access_token() return self def __exit__(self, exception_type, exception_value, traceback): self.__session.close() @backoff.on_exception(backoff.expo, Server5xxError, max_tries=5, factor=2) def get_access_token(self): # The refresh_token never expires and may be used many times to generate each access_token # Since the refresh_token does not expire, it is not included in get access_token response if self.__access_token is not None and self.__expires > datetime.utcnow(): return headers = {} if self.__user_agent: headers['User-Agent'] = self.__user_agent response = self.__session.post( url=GOOGLE_TOKEN_URI, headers=headers, data={ 'grant_type': 'refresh_token', 'client_id': self.__client_id, 'client_secret': self.__client_secret, 'refresh_token': self.__refresh_token, }) if response.status_code >= 500: raise Server5xxError() if response.status_code != 200: raise_for_error(response) data = response.json() self.__access_token = data['access_token'] self.__expires = datetime.utcnow() + timedelta(seconds=data['expires_in']) LOGGER.info('Authorized, token expires = {}'.format(self.__expires)) # Rate Limit: https://developers.google.com/sheets/api/limits # 100 request per 100 seconds per User @backoff.on_exception(backoff.expo, (Server5xxError, ConnectionError, Server429Error), max_tries=7, factor=3) @utils.ratelimit(100, 100) def request(self, method, path=None, url=None, api=None, **kwargs): self.get_access_token() self.base_url = 'https://sheets.googleapis.com/v4' if api == 'files': self.base_url = 'https://www.googleapis.com/drive/v3' if not url and path: url = '{}/{}'.format(self.base_url, path) # endpoint = stream_name (from sync.py API call) if 'endpoint' in kwargs: endpoint = kwargs['endpoint'] del kwargs['endpoint'] else: endpoint = None LOGGER.info('{} URL = {}'.format(endpoint, url)) if 'headers' not in kwargs: kwargs['headers'] = {} kwargs['headers']['Authorization'] = 'Bearer {}'.format(self.__access_token) if self.__user_agent: kwargs['headers']['User-Agent'] = self.__user_agent if method == 'POST': kwargs['headers']['Content-Type'] = 'application/json' with metrics.http_request_timer(endpoint) as timer: response = self.__session.request(method, url, **kwargs) timer.tags[metrics.Tag.http_status_code] = response.status_code if response.status_code >= 500: raise Server5xxError() #Use retry functionality in backoff to wait and retry if #response code equals 429 because rate limit has been exceeded if response.status_code == 429: raise Server429Error() if response.status_code != 200: raise_for_error(response) # Ensure keys and rows are ordered as received from API return response.json(object_pairs_hook=OrderedDict) def get(self, path, api, **kwargs): return self.request(method='GET', path=path, api=api, **kwargs) def post(self, path, api, **kwargs): return self.request(method='POST', path=path, api=api, **kwargs)