From 4bf194076d39d516c3cd0f5c3559954ebe8a12f2 Mon Sep 17 00:00:00 2001 From: Paul B Date: Thu, 19 Nov 2020 12:35:22 +0100 Subject: feat: use the official Google API python library These changes will make use of the official `google-api-python-client` library instead of relying on manual HTTP requests. Therer are two main advantages of these changes: - the Tap doesn't need to worry about the Google API interaction details as its hidden away by the Google official lib. - We can use the authentication helpers from the lib to ease the credentials management for the user. In that way the current PR implements two auth mean: installed OAuth client authentication or Service Accounts authentication. The only downside of this change is that it breaks the current `config.json` parameters for existing users. --- README.md | 34 +++--- config.json.example | 7 +- setup.py | 6 +- tap_google_sheets/__init__.py | 12 +-- tap_google_sheets/client.py | 239 +++++++++++++++++++++--------------------- tap_google_sheets/schema.py | 24 ++--- tap_google_sheets/streams.py | 28 +++-- tap_google_sheets/sync.py | 37 ++----- 8 files changed, 176 insertions(+), 211 deletions(-) mode change 100644 => 100755 setup.py mode change 100644 => 100755 tap_google_sheets/__init__.py diff --git a/README.md b/README.md index 9470411..04e7667 100644 --- a/README.md +++ b/README.md @@ -67,16 +67,21 @@ This tap: - Process/send records to target ## Authentication -The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id=1FojlvtLwS0-BzGS37R0jEXtwSHqSiO1Uw-7RKQQO-C4) Google Doc provides instructions show how to configure the Google Cloud API credentials to enable Google Drive and Google Sheets APIs, configure Google Cloud to authorize/verify your domain ownership, generate an API key (client_id, client_secret), authenticate and generate a refresh_token, and prepare your tap config.json with the necessary parameters. -- Enable Googe Drive APIs and Authorization Scope: https://www.googleapis.com/auth/drive.metadata.readonly -- Enable Google Sheets API and Authorization Scope: https://www.googleapis.com/auth/spreadsheets.readonly -- Tap config.json parameters: - - client_id: identifies your application - - client_secret: authenticates your application - - refresh_token: generates an access token to authorize your session - - spreadsheet_id: unique identifier for each spreadsheet in Google Drive - - start_date: absolute minimum start date to check file modified - - user_agent: tap-name and email address; identifies your application in the Remote API server logs + +You will need a Google developer project to use this tool. After [creating a project](https://console.developers.google.com/projectcreate) (or selecting an existing one) in your Google developers console the authentication can be configured in two different ways: + +- Via an OAuth client which will ask the user to login to its Google user account. + + Please check the [“Creating application credentials”](https://github.com/googleapis/google-api-python-client/blob/d0110cf4f7aaa93d6f56fc028cd6a1e3d8dd300a/docs/oauth-installed.md#creating-application-credentials) paragraph of the Google Python library to download your Google credentials file. + +- Via a Service account (ideal for server-to-server communication) + + Please check the [“Creating a service account”](https://github.com/googleapis/google-api-python-client/blob/d0110cf4f7aaa93d6f56fc028cd6a1e3d8dd300a/docs/oauth-server.md#creating-a-service-account) paragraph of the Google Python library to download your Google Service Account key file. + +- Tap `config.json` parameters: + - `credentials_file`: the path to a valid Google credentials file (Either an OAuth client secrets file or a Service Account key file) + - `spreadsheet_id`: unique identifier for each spreadsheet in Google Drive + - `start_date`: absolute minimum start date to check file modified ## Quick Start @@ -103,16 +108,13 @@ The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id= - [singer-tools](https://github.com/singer-io/singer-tools) - [target-stitch](https://github.com/singer-io/target-stitch) -3. Create your tap's `config.json` file. Include the client_id, client_secret, refresh_token, site_urls (website URL properties in a comma delimited list; do not include the domain-level property in the list), start_date (UTC format), and user_agent (tap name with the api user email address). +3. Create your tap's `config.json` file. Include the `credentials_file` path to your google secrets file as described in the [Authentication](#authentication) paragraph. ```json { - "client_id": "YOUR_CLIENT_ID", - "client_secret": "YOUR_CLIENT_SECRET", - "refresh_token": "YOUR_REFRESH_TOKEN", + "credentials_file": "PATH_TO_YOUR_GOOGLE_CREDENTIALS_FILE", "spreadsheet_id": "YOUR_GOOGLE_SPREADSHEET_ID", - "start_date": "2019-01-01T00:00:00Z", - "user_agent": "tap-google-sheets " + "start_date": "2019-01-01T00:00:00Z" } ``` diff --git a/config.json.example b/config.json.example index 159aafb..fba0b17 100644 --- a/config.json.example +++ b/config.json.example @@ -1,8 +1,5 @@ { - "client_id": "YOUR_CLIENT_ID", - "client_secret": "YOUR_CLIENT_SECRET", - "refresh_token": "YOUR_REFRESH_TOKEN", + "credentials_file": "client-secrets.json", "spreadsheet_id": "YOUR_GOOGLE_SPREADSHEET_ID", - "start_date": "2019-01-01T00:00:00Z", - "user_agent": "tap-google-search-console " + "start_date": "2019-01-01T00:00:00Z" } diff --git a/setup.py b/setup.py old mode 100644 new mode 100755 index f3fd2c3..ddea949 --- a/setup.py +++ b/setup.py @@ -10,8 +10,10 @@ setup(name='tap-google-sheets', py_modules=['tap_google_sheets'], install_requires=[ 'backoff==1.8.0', - 'requests==2.22.0', - 'singer-python==5.9.0' + 'singer-python==5.9.0', + 'google-api-python-client==1.12.5', + 'google-auth==1.23.0', + 'google-auth-oauthlib==0.4.2', ], extras_require={ 'dev': [ diff --git a/tap_google_sheets/__init__.py b/tap_google_sheets/__init__.py old mode 100644 new mode 100755 index f97d4b8..15db05f --- a/tap_google_sheets/__init__.py +++ b/tap_google_sheets/__init__.py @@ -12,12 +12,9 @@ from tap_google_sheets.sync import sync LOGGER = singer.get_logger() REQUIRED_CONFIG_KEYS = [ - 'client_id', - 'client_secret', - 'refresh_token', + 'credentials_file', 'spreadsheet_id', - 'start_date', - 'user_agent' + 'start_date' ] def do_discover(client, spreadsheet_id): @@ -33,10 +30,7 @@ def main(): parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS) - with GoogleClient(parsed_args.config['client_id'], - parsed_args.config['client_secret'], - parsed_args.config['refresh_token'], - parsed_args.config['user_agent']) as client: + with GoogleClient(parsed_args.config['credentials_file']) as client: state = {} if parsed_args.state: diff --git a/tap_google_sheets/client.py b/tap_google_sheets/client.py index 4f38352..0cbad98 100644 --- a/tap_google_sheets/client.py +++ b/tap_google_sheets/client.py @@ -1,16 +1,21 @@ from datetime import datetime, timedelta from collections import OrderedDict import backoff -import requests import singer +import logging +import pickle +import json +import os from singer import metrics from singer import utils +from google.oauth2 import service_account +from google_auth_oauthlib.flow import InstalledAppFlow +from google.auth.transport.requests import Request +from googleapiclient.errors import HttpError +import googleapiclient.discovery -BASE_URL = 'https://www.googleapis.com' -GOOGLE_TOKEN_URI = 'https://oauth2.googleapis.com/token' LOGGER = singer.get_logger() - class Server5xxError(Exception): pass @@ -101,90 +106,85 @@ ERROR_CODE_EXCEPTION_MAPPING = { 428: GooglePreconditionRequiredError, 500: GoogleInternalServiceError} - -def get_exception_for_error_code(error_code): - return ERROR_CODE_EXCEPTION_MAPPING.get(error_code, GoogleError) - -def raise_for_error(response): - try: - response.raise_for_status() - except (requests.HTTPError, requests.ConnectionError) as error: - try: - content_length = len(response.content) - if content_length == 0: - # There is nothing we can do here since Google has neither sent - # us a 2xx response nor a response content. - return - response = response.json() - if ('error' in response) or ('errorCode' in response): - message = '%s: %s' % (response.get('error', str(error)), - response.get('message', 'Unknown Error')) - error_code = response.get('error', {}).get('code') - ex = get_exception_for_error_code(error_code) - raise ex(message) - raise GoogleError(error) - except (ValueError, TypeError): - raise GoogleError(error) - class GoogleClient: # pylint: disable=too-many-instance-attributes - def __init__(self, - client_id, - client_secret, - refresh_token, - user_agent=None): - self.__client_id = client_id - self.__client_secret = client_secret - self.__refresh_token = refresh_token - self.__user_agent = user_agent - self.__access_token = None - self.__expires = None - self.__session = requests.Session() - self.base_url = None - + SCOPES = [ + "https://www.googleapis.com/auth/drive.metadata.readonly", + "https://www.googleapis.com/auth/spreadsheets.readonly" + ] + + def __init__(self, credentials_file): + self.__credentials = self.fetchCredentials(credentials_file) + self.__sheets_service = googleapiclient.discovery.build( + 'sheets', + 'v4', + credentials=self.__credentials, + cache_discovery=False + ) + self.__drive_service = googleapiclient.discovery.build( + 'drive', + 'v3', + credentials=self.__credentials, + cache_discovery=False + ) + + def fetchCredentials(self, credentials_file): + LOGGER.debug('authenticate with google') + data = None + + # Check a credentials file exist + if not os.path.exists(credentials_file): + raise Exception("The configured Google credentials file {} doesn't exist".format(credentials_file)) + + # Load credentials json file + with open(credentials_file) as json_file: + data = json.load(json_file) + + if data.get('type', '') == 'service_account': + return self.fetchServiceAccountCredentials(credentials_file) + elif data.get('installed'): + return self.fetchInstalledOAuthCredentials(credentials_file) + else: + raise Exception("""This Google credentials file is not yet recognize. + + Please use either: + - a Service Account (https://github.com/googleapis/google-api-python-client/blob/d0110cf4f7aaa93d6f56fc028cd6a1e3d8dd300a/docs/oauth-server.md) + - an installed OAuth client (https://github.com/googleapis/google-api-python-client/blob/d0110cf4f7aaa93d6f56fc028cd6a1e3d8dd300a/docs/oauth-installed.md)""" + ) + + def fetchServiceAccountCredentials(self, credentials_file): + # The service account credentials file can be used for server-to-server applications + return service_account.Credentials.from_service_account_file( + credentials_file, scopes=GoogleClient.SCOPES) + + def fetchInstalledOAuthCredentials(self, credentials_file): + creds = None + + # The file token.pickle stores the user's access and refresh tokens, and is + # created automatically when the authorization flow completes for the first + # time. + if os.path.exists('token.pickle'): + with open('token.pickle', 'rb') as token: + creds = pickle.load(token) + + # If there are no (valid) credentials available, let the user log in. + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + creds.refresh(Request()) + else: + flow = InstalledAppFlow.from_client_secrets_file( + credentials_file, GoogleClient.SCOPES) + creds = flow.run_local_server(port=0) + # Save the credentials for the next run + with open('token.pickle', 'wb') as token: + pickle.dump(creds, token) + + return creds def __enter__(self): - self.get_access_token() return self def __exit__(self, exception_type, exception_value, traceback): - self.__session.close() - - - @backoff.on_exception(backoff.expo, - Server5xxError, - max_tries=5, - factor=2) - def get_access_token(self): - # The refresh_token never expires and may be used many times to generate each access_token - # Since the refresh_token does not expire, it is not included in get access_token response - if self.__access_token is not None and self.__expires > datetime.utcnow(): - return - - headers = {} - if self.__user_agent: - headers['User-Agent'] = self.__user_agent - - response = self.__session.post( - url=GOOGLE_TOKEN_URI, - headers=headers, - data={ - 'grant_type': 'refresh_token', - 'client_id': self.__client_id, - 'client_secret': self.__client_secret, - 'refresh_token': self.__refresh_token, - }) - - if response.status_code >= 500: - raise Server5xxError() - - if response.status_code != 200: - raise_for_error(response) - - data = response.json() - self.__access_token = data['access_token'] - self.__expires = datetime.utcnow() + timedelta(seconds=data['expires_in']) - LOGGER.info('Authorized, token expires = {}'.format(self.__expires)) - + LOGGER.debug('exiting google client') # Rate Limit: https://developers.google.com/sheets/api/limits # 100 request per 100 seconds per User @@ -193,53 +193,48 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes max_tries=7, factor=3) @utils.ratelimit(100, 100) - def request(self, method, path=None, url=None, api=None, **kwargs): - self.get_access_token() - self.base_url = 'https://sheets.googleapis.com/v4' - if api == 'files': - self.base_url = 'https://www.googleapis.com/drive/v3' - - if not url and path: - url = '{}/{}'.format(self.base_url, path) - - # endpoint = stream_name (from sync.py API call) - if 'endpoint' in kwargs: - endpoint = kwargs['endpoint'] - del kwargs['endpoint'] + def request(self, endpoint=None, params={}, **kwargs): + formatted_params = {} + for (key, value) in params.items(): + # API parameters interpolation + # will raise a KeyError in case a necessary argument is missing + formatted_params[key] = value.format(**kwargs) + + # Call the correct Google API depending on the stream name + if endpoint == 'spreadsheet_metadata' or endpoint == 'sheet_metadata': + # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/get + request = self.__sheets_service.spreadsheets().get(**formatted_params) + elif endpoint == 'sheets_loaded': + # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get + request = self.__sheets_service.spreadsheets().values().get(**formatted_params) + elif endpoint == 'file_metadata': + # https://developers.google.com/drive/api/v3/reference/files/get + request = self.__drive_service.files().get(**formatted_params) else: - endpoint = None - LOGGER.info('{} URL = {}'.format(endpoint, url)) - - if 'headers' not in kwargs: - kwargs['headers'] = {} - kwargs['headers']['Authorization'] = 'Bearer {}'.format(self.__access_token) + raise Exception('{} not implemented yet!'.format(endpoint)) - if self.__user_agent: - kwargs['headers']['User-Agent'] = self.__user_agent + with metrics.http_request_timer(endpoint) as timer: + error = None + status_code = 400 - if method == 'POST': - kwargs['headers']['Content-Type'] = 'application/json' + try: + response = request.execute() + status_code = 200 + except HttpError as e: + status_code = e.resp.status or status_code + error = e - with metrics.http_request_timer(endpoint) as timer: - response = self.__session.request(method, url, **kwargs) - timer.tags[metrics.Tag.http_status_code] = response.status_code + timer.tags[metrics.Tag.http_status_code] = status_code - if response.status_code >= 500: + if status_code >= 500: raise Server5xxError() - #Use retry functionality in backoff to wait and retry if - #response code equals 429 because rate limit has been exceeded - if response.status_code == 429: + # Use retry functionality in backoff to wait and retry if + # response code equals 429 because rate limit has been exceeded + if status_code == 429: raise Server429Error() - if response.status_code != 200: - raise_for_error(response) - - # Ensure keys and rows are ordered as received from API - return response.json(object_pairs_hook=OrderedDict) - - def get(self, path, api, **kwargs): - return self.request(method='GET', path=path, api=api, **kwargs) + if status_code != 200: + raise error - def post(self, path, api, **kwargs): - return self.request(method='POST', path=path, api=api, **kwargs) + return response diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index fcaccf9..56d2fb9 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py @@ -224,16 +224,13 @@ def get_sheet_metadata(sheet, spreadsheet_id, client): stream_name = 'sheet_metadata' stream_metadata = STREAMS.get(stream_name) - api = stream_metadata.get('api', 'sheets') params = stream_metadata.get('params', {}) - sheet_title_encoded = urllib.parse.quote_plus(sheet_title) - sheet_title_escaped = re.escape(sheet_title) - querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in \ - params.items()]).replace('{sheet_title}', sheet_title_encoded) - path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \ - spreadsheet_id), querystring) - - sheet_md_results = client.get(path=path, api=api, endpoint=sheet_title_escaped) + + # GET sheet_metadata + sheet_md_results = client.request(endpoint=stream_name, + spreadsheet_id=spreadsheet_id, + sheet_title=sheet_title, + params=params) # sheet_metadata: 1st `sheets` node in results sheet_metadata = sheet_md_results.get('sheets')[0] @@ -275,15 +272,12 @@ def get_schemas(client, spreadsheet_id): field_metadata[stream_name] = mdata if stream_name == 'spreadsheet_metadata': - api = stream_metadata.get('api', 'sheets') params = stream_metadata.get('params', {}) - querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]) - path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \ - spreadsheet_id), querystring) # GET spreadsheet_metadata, which incl. sheets (basic metadata for each worksheet) - spreadsheet_md_results = client.get(path=path, params=querystring, api=api, \ - endpoint=stream_name) + spreadsheet_md_results = client.request(endpoint=stream_name, + spreadsheet_id=spreadsheet_id, + params=params) sheets = spreadsheet_md_results.get('sheets') if sheets: diff --git a/tap_google_sheets/streams.py b/tap_google_sheets/streams.py index ad5529f..f7bf8ac 100644 --- a/tap_google_sheets/streams.py +++ b/tap_google_sheets/streams.py @@ -2,9 +2,7 @@ from collections import OrderedDict # streams: API URL endpoints to be called # properties: -# : Plural stream name for the endpoint -# path: API endpoint relative path, when added to the base URL, creates the full path, -# default = stream_name +# : Plural stream name which will condition the endpoint called # key_properties: Primary key fields for identifying an endpoint record. # replication_method: INCREMENTAL or FULL_TABLE # replication_keys: bookmark_field(s), typically a date-time, used for filtering the results @@ -15,51 +13,51 @@ from collections import OrderedDict # file_metadata: Queries Google Drive API to get file information and see if file has been modified # Provides audit info about who and when last changed the file. +# cf https://developers.google.com/drive/api/v3/reference/files/get FILE_METADATA = { - "api": "files", - "path": "files/{spreadsheet_id}", "key_properties": ["id"], "replication_method": "INCREMENTAL", "replication_keys": ["modifiedTime"], "params": { + "fileId": "{spreadsheet_id}", "fields": "id,name,createdTime,modifiedTime,version,teamDriveId,driveId,lastModifyingUser" } } # spreadsheet_metadata: Queries spreadsheet to get basic information on spreadhsheet and sheets +# cf https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/get SPREADSHEET_METADATA = { - "api": "sheets", - "path": "spreadsheets/{spreadsheet_id}", "key_properties": ["spreadsheetId"], "replication_method": "FULL_TABLE", "params": { - "includeGridData": "false" + "spreadsheetId": "{spreadsheet_id}" } } # sheet_metadata: Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet. -# This endpoint includes detailed metadata about each cell in the header and first data row -# incl. data type, formatting, etc. +# This endpoint includes detailed metadata about each cell in the header and first data row +# incl. data type, formatting, etc. +# cf https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/get SHEET_METADATA = { - "api": "sheets", - "path": "spreadsheets/{spreadsheet_id}", "key_properties": ["sheetId"], "replication_method": "FULL_TABLE", "params": { + "spreadsheetId": "{spreadsheet_id}", "includeGridData": "true", "ranges": "'{sheet_title}'!1:2" } } # sheets_loaded: Queries a batch of Rows for each Sheet in the Spreadsheet. -# Each query uses the `values` endpoint, to get data-only, w/out the formatting/type metadata. +# Each query uses the `values` endpoint, to get data-only, w/out the formatting/type metadata. +# cf https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get SHEETS_LOADED = { - "api": "sheets", - "path": "spreadsheets/{spreadsheet_id}/values/'{sheet_title}'!{range_rows}", "data_key": "values", "key_properties": ["spreadsheetId", "sheetId", "loadDate"], "replication_method": "FULL_TABLE", "params": { + "spreadsheetId": "{spreadsheet_id}", + "range": "'{sheet_title}'!{range_rows}", "dateTimeRenderOption": "SERIAL_NUMBER", "valueRenderOption": "UNFORMATTED_VALUE", "majorDimension": "ROWS" diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 26c2d19..c67055a 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py @@ -141,35 +141,17 @@ def get_selected_fields(catalog, stream_name): pass return selected_fields - def get_data(stream_name, endpoint_config, client, - spreadsheet_id, - range_rows=None): - if not range_rows: - range_rows = '' - # Replace {placeholder} variables in path - # Encode stream_name: fixes issue w/ special characters in sheet name - stream_name_escaped = re.escape(stream_name) - stream_name_encoded = urllib.parse.quote_plus(stream_name) - path = endpoint_config.get('path', stream_name).replace( - '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace( - '{range_rows}', range_rows) + **kwargs): params = endpoint_config.get('params', {}) - api = endpoint_config.get('api', 'sheets') - # Add in querystring parameters and replace {placeholder} variables - # querystring function ensures parameters are added but not encoded causing API errors - querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( - '{sheet_title}', stream_name_encoded) - LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring)) - data = {} + LOGGER.info('GET {}'.format(stream_name)) time_extracted = utils.now() - data = client.get( - path=path, - api=api, - params=querystring, - endpoint=stream_name_escaped) + data = client.request( + endpoint=stream_name, + params=params, + **kwargs) return data, time_extracted @@ -382,7 +364,7 @@ def sync(client, config, catalog, state): file_metadata_config = STREAMS.get(stream_name) # GET file_metadata - LOGGER.info('GET file_meatadata') + LOGGER.info('GET file_metadata') file_metadata, time_extracted = get_data(stream_name=stream_name, endpoint_config=file_metadata_config, client=client, @@ -497,11 +479,12 @@ def sync(client, config, catalog, state): while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) - # GET sheet_data for a worksheet tab + # GET sheets_loaded for a worksheet tab sheet_data, time_extracted = get_data( - stream_name=sheet_title, + stream_name='sheets_loaded', endpoint_config=sheets_loaded_config, client=client, + sheet_title=sheet_title, spreadsheet_id=spreadsheet_id, range_rows=range_rows) # Data is returned as a list of arrays, an array of values for each row -- cgit v1.2.3