From 89643ba6fa98db82efd3246805ef801a8bfb5c81 Mon Sep 17 00:00:00 2001 From: Jeff Huth Date: Wed, 13 Nov 2019 17:03:56 -0800 Subject: Initial commit Discovery mode works. Still working on normal sync. --- tap_google_sheets/__init__.py | 57 +++++ tap_google_sheets/client.py | 247 ++++++++++++++++++ tap_google_sheets/discover.py | 26 ++ tap_google_sheets/schema.py | 228 +++++++++++++++++ tap_google_sheets/schemas/file_metadata.json | 44 ++++ tap_google_sheets/schemas/sheet_metadata.json | 89 +++++++ tap_google_sheets/schemas/sheets_loaded.json | 22 ++ .../schemas/spreadsheet_metadata.json | 30 +++ tap_google_sheets/streams.py | 66 +++++ tap_google_sheets/sync.py | 281 +++++++++++++++++++++ 10 files changed, 1090 insertions(+) create mode 100644 tap_google_sheets/__init__.py create mode 100644 tap_google_sheets/client.py create mode 100644 tap_google_sheets/discover.py create mode 100644 tap_google_sheets/schema.py create mode 100644 tap_google_sheets/schemas/file_metadata.json create mode 100644 tap_google_sheets/schemas/sheet_metadata.json create mode 100644 tap_google_sheets/schemas/sheets_loaded.json create mode 100644 tap_google_sheets/schemas/spreadsheet_metadata.json create mode 100644 tap_google_sheets/streams.py create mode 100644 tap_google_sheets/sync.py (limited to 'tap_google_sheets') diff --git a/tap_google_sheets/__init__.py b/tap_google_sheets/__init__.py new file mode 100644 index 0000000..f97d4b8 --- /dev/null +++ b/tap_google_sheets/__init__.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 + +import sys +import json +import argparse +import singer +from singer import metadata, utils +from tap_google_sheets.client import GoogleClient +from tap_google_sheets.discover import discover +from tap_google_sheets.sync import sync + +LOGGER = singer.get_logger() + +REQUIRED_CONFIG_KEYS = [ + 'client_id', + 'client_secret', + 'refresh_token', + 'spreadsheet_id', + 'start_date', + 'user_agent' +] + +def do_discover(client, spreadsheet_id): + + LOGGER.info('Starting discover') + catalog = discover(client, spreadsheet_id) + json.dump(catalog.to_dict(), sys.stdout, indent=2) + LOGGER.info('Finished discover') + + +@singer.utils.handle_top_exception(LOGGER) +def main(): + + parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS) + + with GoogleClient(parsed_args.config['client_id'], + parsed_args.config['client_secret'], + parsed_args.config['refresh_token'], + parsed_args.config['user_agent']) as client: + + state = {} + if parsed_args.state: + state = parsed_args.state + + config = parsed_args.config + spreadsheet_id = config.get('spreadsheet_id') + + if parsed_args.discover: + do_discover(client, spreadsheet_id) + elif parsed_args.catalog: + sync(client=client, + config=config, + catalog=parsed_args.catalog, + state=state) + +if __name__ == '__main__': + main() diff --git a/tap_google_sheets/client.py b/tap_google_sheets/client.py new file mode 100644 index 0000000..12f0811 --- /dev/null +++ b/tap_google_sheets/client.py @@ -0,0 +1,247 @@ +from datetime import datetime, timedelta +import backoff +import requests +from collections import OrderedDict + +import singer +from singer import metrics +from singer import utils + +BASE_URL = 'https://www.googleapis.com' +GOOGLE_TOKEN_URI = 'https://oauth2.googleapis.com/token' +LOGGER = singer.get_logger() + + +class Server5xxError(Exception): + pass + + +class Server429Error(Exception): + pass + + +class GoogleError(Exception): + pass + + +class GoogleBadRequestError(GoogleError): + pass + + +class GoogleUnauthorizedError(GoogleError): + pass + + +class GooglePaymentRequiredError(GoogleError): + pass + + +class GoogleNotFoundError(GoogleError): + pass + + +class GoogleMethodNotAllowedError(GoogleError): + pass + + +class GoogleConflictError(GoogleError): + pass + + +class GoogleGoneError(GoogleError): + pass + + +class GooglePreconditionFailedError(GoogleError): + pass + + +class GoogleRequestEntityTooLargeError(GoogleError): + pass + + +class GoogleRequestedRangeNotSatisfiableError(GoogleError): + pass + + +class GoogleExpectationFailedError(GoogleError): + pass + + +class GoogleForbiddenError(GoogleError): + pass + + +class GoogleUnprocessableEntityError(GoogleError): + pass + + +class GooglePreconditionRequiredError(GoogleError): + pass + + +class GoogleInternalServiceError(GoogleError): + pass + + +# Error Codes: https://developers.google.com/webmaster-tools/search-console-api-original/v3/errors +ERROR_CODE_EXCEPTION_MAPPING = { + 400: GoogleBadRequestError, + 401: GoogleUnauthorizedError, + 402: GooglePaymentRequiredError, + 403: GoogleForbiddenError, + 404: GoogleNotFoundError, + 405: GoogleMethodNotAllowedError, + 409: GoogleConflictError, + 410: GoogleGoneError, + 412: GooglePreconditionFailedError, + 413: GoogleRequestEntityTooLargeError, + 416: GoogleRequestedRangeNotSatisfiableError, + 417: GoogleExpectationFailedError, + 422: GoogleUnprocessableEntityError, + 428: GooglePreconditionRequiredError, + 500: GoogleInternalServiceError} + + +def get_exception_for_error_code(error_code): + return ERROR_CODE_EXCEPTION_MAPPING.get(error_code, GoogleError) + +def raise_for_error(response): + try: + response.raise_for_status() + except (requests.HTTPError, requests.ConnectionError) as error: + try: + content_length = len(response.content) + if content_length == 0: + # There is nothing we can do here since Google has neither sent + # us a 2xx response nor a response content. + return + response = response.json() + if ('error' in response) or ('errorCode' in response): + message = '%s: %s' % (response.get('error', str(error)), + response.get('message', 'Unknown Error')) + error_code = response.get('error', {}).get('code') + ex = get_exception_for_error_code(error_code) + raise ex(message) + else: + raise GoogleError(error) + except (ValueError, TypeError): + raise GoogleError(error) + +class GoogleClient: # pylint: disable=too-many-instance-attributes + def __init__(self, + client_id, + client_secret, + refresh_token, + user_agent=None): + self.__client_id = client_id + self.__client_secret = client_secret + self.__refresh_token = refresh_token + self.__user_agent = user_agent + self.__access_token = None + self.__expires = None + self.__session = requests.Session() + self.base_url = None + + + def __enter__(self): + self.get_access_token() + return self + + def __exit__(self, exception_type, exception_value, traceback): + self.__session.close() + + @backoff.on_exception(backoff.expo, + Server5xxError, + max_tries=5, + factor=2) + def get_access_token(self): + # The refresh_token never expires and may be used many times to generate each access_token + # Since the refresh_token does not expire, it is not included in get access_token response + if self.__access_token is not None and self.__expires > datetime.utcnow(): + return + + headers = {} + if self.__user_agent: + headers['User-Agent'] = self.__user_agent + + response = self.__session.post( + url=GOOGLE_TOKEN_URI, + headers=headers, + data={ + 'grant_type': 'refresh_token', + 'client_id': self.__client_id, + 'client_secret': self.__client_secret, + 'refresh_token': self.__refresh_token, + }) + + if response.status_code >= 500: + raise Server5xxError() + + if response.status_code != 200: + raise_for_error(response) + + data = response.json() + self.__access_token = data['access_token'] + self.__expires = datetime.utcnow() + timedelta(seconds=data['expires_in']) + LOGGER.info('Authorized, token expires = {}'.format(self.__expires)) + + + @backoff.on_exception(backoff.expo, + (Server5xxError, ConnectionError, Server429Error), + max_tries=7, + factor=3) + # Rate Limit: + # https://developers.google.com/webmaster-tools/search-console-api-original/v3/limits + @utils.ratelimit(1200, 60) + def request(self, method, path=None, url=None, api=None, **kwargs): + + self.get_access_token() + + self.base_url = 'https://sheets.googleapis.com/v4' + if api == 'files': + self.base_url = 'https://www.googleapis.com/drive/v3' + + if not url and path: + url = '{}/{}'.format(self.base_url, path) + + # endpoint = stream_name (from sync.py API call) + if 'endpoint' in kwargs: + endpoint = kwargs['endpoint'] + del kwargs['endpoint'] + else: + endpoint = None + + if 'headers' not in kwargs: + kwargs['headers'] = {} + kwargs['headers']['Authorization'] = 'Bearer {}'.format(self.__access_token) + + if self.__user_agent: + kwargs['headers']['User-Agent'] = self.__user_agent + + if method == 'POST': + kwargs['headers']['Content-Type'] = 'application/json' + + with metrics.http_request_timer(endpoint) as timer: + response = self.__session.request(method, url, **kwargs) + timer.tags[metrics.Tag.http_status_code] = response.status_code + + if response.status_code >= 500: + raise Server5xxError() + + #Use retry functionality in backoff to wait and retry if + #response code equals 429 because rate limit has been exceeded + if response.status_code == 429: + raise Server429Error() + + if response.status_code != 200: + raise_for_error(response) + + # Ensure keys and rows are ordered as received from API + return response.json(object_pairs_hook=OrderedDict) + + def get(self, path, api, **kwargs): + return self.request(method='GET', path=path, api=api, **kwargs) + + def post(self, path, api, **kwargs): + return self.request(method='POST', path=path, api=api, **kwargs) diff --git a/tap_google_sheets/discover.py b/tap_google_sheets/discover.py new file mode 100644 index 0000000..6477a5f --- /dev/null +++ b/tap_google_sheets/discover.py @@ -0,0 +1,26 @@ +from singer.catalog import Catalog, CatalogEntry, Schema +from tap_google_sheets.schema import get_schemas, STREAMS + + +def discover(client, spreadsheet_id): + schemas, field_metadata = get_schemas(client, spreadsheet_id) + catalog = Catalog([]) + + for stream_name, schema_dict in schemas.items(): + schema = Schema.from_dict(schema_dict) + mdata = field_metadata[stream_name] + key_properties = None + for md in mdata: + table_key_properties = md.get('metadata', {}).get('table-key-properties') + if table_key_properties: + key_properties = table_key_properties + + catalog.streams.append(CatalogEntry( + stream=stream_name, + tap_stream_id=stream_name, + key_properties=STREAMS.get(stream_name, {}).get('key_properties', key_properties), + schema=schema, + metadata=mdata + )) + + return catalog diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py new file mode 100644 index 0000000..237ab06 --- /dev/null +++ b/tap_google_sheets/schema.py @@ -0,0 +1,228 @@ +import os +import json +from collections import OrderedDict +import singer +from singer import metadata +from tap_google_sheets.streams import STREAMS + +LOGGER = singer.get_logger() + +# Reference: +# https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata + +# Convert column index to column letter +def colnum_string(n): + string = "" + while n > 0: + n, remainder = divmod(n - 1, 26) + string = chr(65 + remainder) + string + return string + + +# Create sheet_metadata_json with columns from sheet +def get_sheet_schema_columns(sheet, spreadsheet_id, client): + sheet_json_schema = OrderedDict() + data = next(iter(sheet.get('data', [])), {}) + row_data = data.get('rowData',[]) + # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse + + headers = row_data[0].get('values', []) + first_values = row_data[1].get('values', []) + # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) + + sheet_json_schema['type'] = 'object' + sheet_json_schema['additionalProperties'] = False + sheet_json_schema = { + 'type': 'object', + 'additionalProperties': False, + 'properties': { + '__sdc_spreadsheet_id': { + 'type': ['null', 'string'] + }, + '__sdc_sheet_id': { + 'type': ['null', 'integer'] + }, + '__sdc_row': { + 'type': ['null', 'integer'] + } + } + } + + header_list = [] # used for checking uniqueness + columns = [] + prior_header = None + i = 0 + skipped = 0 + # Read column headers until end or 2 consecutive skipped headers + for header in headers: + # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True))) + column_index = i + 1 + column_letter = colnum_string(column_index) + header_value = header.get('formattedValue') + if header_value: # NOT skipped + column_is_skipped = False + skipped = 0 + column_name = '{}'.format(header_value) + if column_name in header_list: + raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) + else: + header_list.append(column_name) + + first_value = first_values[i] + # LOGGER.info('first_value[{}] = {}'.format(i, json.dumps(first_value, indent=2, sort_keys=True))) + + column_effective_value = first_value.get('effectiveValue', {}) + for key in column_effective_value.keys(): + if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): + column_effective_value_type = key + + column_number_format = first_values[i].get('effectiveFormat', {}).get('numberFormat', {}) + column_number_format_type = column_number_format.get('type') + + # Determine datatype for sheet_json_schema + # + # column_effective_value_type = numberValue, stringValue, boolValue; INVALID: errorType, formulaType + # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue + # + # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC + # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType + # + column_format = None # Default + # column_multiple_of = None # Default + if column_effective_value_type in ('formulaValue', 'errorValue'): + raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name)) + elif column_effective_value_type == 'stringValue': + column_type = ['null', 'string'] + column_gs_type = 'stringValue' + elif column_effective_value_type == 'boolValue': + column_type = ['null', 'boolean', 'string'] + column_gs_type = 'boolValue' + elif column_effective_value_type == 'numberValue': + if column_number_format_type == 'DATE_TIME': + column_type = ['null', 'string'] + column_format = 'date-time' + column_gs_type = 'numberType.DATE_TIME' + elif column_number_format_type == 'DATE': + column_type = ['null', 'string'] + column_format = 'date' + column_gs_type = 'numberType.DATE' + elif column_number_format_type == 'TIME': + column_type = ['null', 'string'] + column_format = 'time' + column_gs_type = 'numberType.TIME' + elif column_number_format_type == 'TEXT': + column_type = ['null', 'string'] + column_gs_type = 'stringValue' + else: + column_type = ['null', 'number', 'string'] + column_gs_type = 'numberType' + + else: # skipped + column_is_skipped = True + skipped = skipped + 1 + column_index_str = str(column_index).zfill(2) + column_name = '__sdc_skip_col_{}'.format(column_index_str) + column_type = ['null', 'string'] + column_format = None + column_gs_type = 'stringValue' + + if skipped >= 2: + # skipped = 2 consecutive skipped headers + # Remove prior_header column_name + sheet_json_schema['properties'].pop(prior_header, None) + column_count = i - 1 + break + + else: + column = {} + column = { + 'columnIndex': column_index, + 'columnLetter': column_letter, + 'columnName': column_name, + 'columnType': column_gs_type, + 'columnSkipped': column_is_skipped + } + columns.append(column) + + sheet_json_schema['properties'][column_name] = column + sheet_json_schema['properties'][column_name]['type'] = column_type + if column_format: + sheet_json_schema['properties'][column_name]['format'] = column_format + + prior_header = column_name + i = i + 1 + + return sheet_json_schema, columns + + +def get_sheet_metadata(sheet, spreadsheet_id, client): + sheet_id = sheet.get('properties', {}).get('sheetId') + sheet_title = sheet.get('properties', {}).get('title') + LOGGER.info('sheet_id = {}, sheet_title = {}'.format(sheet_id, sheet_title)) + + stream_name = 'sheet_metadata' + stream_metadata = STREAMS.get(stream_name) + api = stream_metadata.get('api', 'sheets') + params = stream_metadata.get('params', {}) + querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace('{sheet_title}', sheet_title) + path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring) + + sheet_md_results = client.get(path=path, api=api, endpoint=stream_name) + sheet_cols = sheet_md_results.get('sheets')[0] + sheet_schema, columns = get_sheet_schema_columns(sheet_cols, spreadsheet_id, client) + + return sheet_schema, columns + + +def get_abs_path(path): + return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) + +def get_schemas(client, spreadsheet_id): + schemas = {} + field_metadata = {} + + for stream_name, stream_metadata in STREAMS.items(): + schema_path = get_abs_path('schemas/{}.json'.format(stream_name)) + with open(schema_path) as file: + schema = json.load(file) + schemas[stream_name] = schema + mdata = metadata.new() + + # Documentation: + # https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#singer-python-helper-functions + # Reference: + # https://github.com/singer-io/singer-python/blob/master/singer/metadata.py#L25-L44 + mdata = metadata.get_standard_metadata( + schema=schema, + key_properties=stream_metadata.get('key_properties', None), + valid_replication_keys=stream_metadata.get('replication_keys', None), + replication_method=stream_metadata.get('replication_method', None) + ) + field_metadata[stream_name] = mdata + + if stream_name == 'spreadsheet_metadata': + api = stream_metadata.get('api', 'sheets') + params = stream_metadata.get('params', {}) + querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]) + path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring) + + spreadsheet_md_results = client.get(path=path, params=querystring, api=api, endpoint=stream_name) + + sheets = spreadsheet_md_results.get('sheets') + if sheets: + for sheet in sheets: + sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) + # LOGGER.info('sheet_schema = {}'.format(json.dumps(sheet_schema, indent=2, sort_keys=True))) + + sheet_title = sheet.get('properties', {}).get('title') + schemas[sheet_title] = sheet_schema + sheet_mdata = metadata.new() + sheet_mdata = metadata.get_standard_metadata( + schema=sheet_schema, + key_properties=['__sdc_row'], + valid_replication_keys=None, + replication_method='FULL_TABLE' + ) + field_metadata[sheet_title] = sheet_mdata + + return schemas, field_metadata diff --git a/tap_google_sheets/schemas/file_metadata.json b/tap_google_sheets/schemas/file_metadata.json new file mode 100644 index 0000000..25c19c4 --- /dev/null +++ b/tap_google_sheets/schemas/file_metadata.json @@ -0,0 +1,44 @@ +{ + "type": "object", + "additionalProperties": false, + "properties": { + "id": { + "type": ["null", "string"] + }, + "name": { + "type": ["null", "string"] + }, + "version": { + "type": ["null", "integer"] + }, + "createdTime": { + "type": ["null", "string"], + "format": "date-time" + }, + "modifiedTime": { + "type": ["null", "string"], + "format": "date-time" + }, + "teamDriveId": { + "type": ["null", "string"] + }, + "driveId": { + "type": ["null", "string"] + }, + "lastModifyingUser": { + "type": ["null", "object"], + "additionalProperties": false, + "properties": { + "kind": { + "type": ["null", "integer"] + }, + "displayName": { + "type": ["null", "string"] + }, + "emailAdress": { + "type": ["null", "string"] + } + } + } + } +} diff --git a/tap_google_sheets/schemas/sheet_metadata.json b/tap_google_sheets/schemas/sheet_metadata.json new file mode 100644 index 0000000..c3f2ac2 --- /dev/null +++ b/tap_google_sheets/schemas/sheet_metadata.json @@ -0,0 +1,89 @@ +{ + "type": "object", + "additionalProperties": false, + "properties": { + "spreadsheetId": { + "type": ["null", "string"] + }, + "sheetId": { + "type": ["null", "integer"] + }, + "title": { + "type": ["null", "string"] + }, + "index": { + "type": ["null", "integer"] + }, + "sheetType": { + "type": ["null", "string"] + }, + "sheetUrl": { + "type": ["null", "string"] + }, + "gridProperties": { + "type": ["null", "object"], + "additionalProperties": false, + "properties": { + "rowCount": { + "type": ["null", "integer"] + }, + "columnCount": { + "type": ["null", "integer"] + }, + "frozenRowCount": { + "type": ["null", "integer"] + }, + "frozenColumnCount": { + "type": ["null", "integer"] + } + } + }, + "columns": { + "anyOf": [ + { + "type": "array", + "items": { + "type": ["null", "object"], + "additionalProperties": false, + "properties": { + "columnIndex": { + "type": ["null", "integer"] + }, + "columnLetter": { + "type": ["null", "string"] + }, + "columnName": { + "type": ["null", "string"] + }, + "columnType": { + "type": ["null", "string"] + }, + "columnSkipped": { + "type": ["null", "boolean"] + }, + "type": { + "anyOf": [ + { + "type": "array", + "items": { + "type": "string" + } + }, + { + "type": "null" + } + ] + }, + "format": { + "type": ["null", "string"] + } + } + } + }, + { + "type": "null" + } + ] + } + } +} diff --git a/tap_google_sheets/schemas/sheets_loaded.json b/tap_google_sheets/schemas/sheets_loaded.json new file mode 100644 index 0000000..12f967a --- /dev/null +++ b/tap_google_sheets/schemas/sheets_loaded.json @@ -0,0 +1,22 @@ +{ + "type": "object", + "additionalProperties": false, + "properties": { + "spreadsheetId": { + "type": ["null", "string"] + }, + "sheetId": { + "type": ["null", "integer"] + }, + "sheetTitle": { + "type": ["null", "string"] + }, + "loadDate": { + "type": ["null", "string"], + "format": "date-time" + }, + "lastRowNumber": { + "type": ["null", "integer"] + } + } +} diff --git a/tap_google_sheets/schemas/spreadsheet_metadata.json b/tap_google_sheets/schemas/spreadsheet_metadata.json new file mode 100644 index 0000000..852cb76 --- /dev/null +++ b/tap_google_sheets/schemas/spreadsheet_metadata.json @@ -0,0 +1,30 @@ +{ + "type": "object", + "additionalProperties": false, + "properties": { + "spreadsheetId": { + "type": ["null", "string"] + }, + "properties": { + "type": ["null", "object"], + "additionalProperties": false, + "properties": { + "title": { + "type": ["null", "string"] + }, + "locale": { + "type": ["null", "string"] + }, + "autoRecalc": { + "type": ["null", "string"] + }, + "timeZone": { + "type": ["null", "string"] + } + } + }, + "spreadsheetUrl": { + "type": ["null", "string"] + } + } +} diff --git a/tap_google_sheets/streams.py b/tap_google_sheets/streams.py new file mode 100644 index 0000000..299326a --- /dev/null +++ b/tap_google_sheets/streams.py @@ -0,0 +1,66 @@ +from collections import OrderedDict + +# streams: API URL endpoints to be called +# properties: +# : Plural stream name for the endpoint +# path: API endpoint relative path, when added to the base URL, creates the full path, +# default = stream_name +# key_properties: Primary key fields for identifying an endpoint record. +# replication_method: INCREMENTAL or FULL_TABLE +# replication_keys: bookmark_field(s), typically a date-time, used for filtering the results +# and setting the state +# params: Query, sort, and other endpoint specific parameters; default = {} +# data_key: JSON element containing the results list for the endpoint; default = root (no data_key) +# bookmark_query_field: From date-time field used for filtering the query +# bookmark_type: Data type for bookmark, integer or datetime + +FILE_METADATA = { + "api": "files", + "path": "files/{spreadsheet_id}", + "key_properties": ["id"], + "replication_method": "FULL_TABLE", + "params": { + "fields": "id,name,createdTime,modifiedTime,version,teamDriveId,driveId,lastModifyingUser" + } +} + +SPREADSHEET_METADATA = { + "api": "sheets", + "path": "spreadsheets/{spreadsheet_id}", + "key_properties": ["spreadsheetId"], + "replication_method": "FULL_TABLE", + "params": { + "includeGridData": "false" + } +} + +SHEET_METADATA = { + "api": "sheets", + "path": "spreadsheets/{spreadsheet_id}", + "key_properties": ["sheetId"], + "replication_method": "FULL_TABLE", + "params": { + "includeGridData": "true", + "ranges": "'{sheet_title}'!1:2" + } +} + +SHEETS_LOADED = { + "api": "sheets", + "path": "spreadsheets/{spreadsheet_id}/values/'{sheet_title}'!{range_rows}", + "data_key": "values", + "key_properties": ["spreadsheetId", "sheetId", "loadDate"], + "replication_method": "FULL_TABLE", + "params": { + "dateTimeRenderOption": "SERIAL_NUMBER", + "valueRenderOption": "UNFORMATTED_VALUE", + "majorDimension": "ROWS" + } +} + +# Ensure streams are ordered logically +STREAMS = OrderedDict() +STREAMS['file_metadata'] = FILE_METADATA +STREAMS['spreadsheet_metadata'] = SPREADSHEET_METADATA +STREAMS['sheet_metadata'] = SHEET_METADATA +STREAMS['sheets_loaded'] = SHEETS_LOADED diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py new file mode 100644 index 0000000..a8b02d0 --- /dev/null +++ b/tap_google_sheets/sync.py @@ -0,0 +1,281 @@ +import time +import math +import singer +import json +from collections import OrderedDict +from singer import metrics, metadata, Transformer, utils +from singer.utils import strptime_to_utc, strftime +from tap_google_sheets.transform import transform_json +from tap_google_sheets.streams import STREAMS +from tap_google_sheets.schema import get_sheet_metadata + +LOGGER = singer.get_logger() + + +def write_schema(catalog, stream_name): + stream = catalog.get_stream(stream_name) + schema = stream.schema.to_dict() + try: + singer.write_schema(stream_name, schema, stream.key_properties) + except OSError as err: + LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) + raise err + + +def write_record(stream_name, record, time_extracted): + try: + singer.messages.write_record(stream_name, record, time_extracted=time_extracted) + except OSError as err: + LOGGER.info('OS Error writing record for: {}'.format(stream_name)) + LOGGER.info('record: {}'.format(record)) + raise err + + +def get_bookmark(state, stream, default): + if (state is None) or ('bookmarks' not in state): + return default + return ( + state + .get('bookmarks', {}) + .get(stream, default) + ) + + +def write_bookmark(state, stream, value): + if 'bookmarks' not in state: + state['bookmarks'] = {} + state['bookmarks'][stream] = value + LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value)) + singer.write_state(state) + + +# def transform_datetime(this_dttm): +def transform_datetime(this_dttm): + with Transformer() as transformer: + new_dttm = transformer._transform_datetime(this_dttm) + return new_dttm + + +def process_records(catalog, #pylint: disable=too-many-branches + stream_name, + records, + time_extracted, + bookmark_field=None, + bookmark_type=None, + max_bookmark_value=None, + last_datetime=None, + last_integer=None, + parent=None, + parent_id=None): + stream = catalog.get_stream(stream_name) + schema = stream.schema.to_dict() + stream_metadata = metadata.to_map(stream.metadata) + + with metrics.record_counter(stream_name) as counter: + for record in records: + # If child object, add parent_id to record + if parent_id and parent: + record[parent + '_id'] = parent_id + + # Transform record for Singer.io + with Transformer() as transformer: + transformed_record = transformer.transform( + record, + schema, + stream_metadata) + # Reset max_bookmark_value to new value if higher + if transformed_record.get(bookmark_field): + if max_bookmark_value is None or \ + transformed_record[bookmark_field] > transform_datetime(max_bookmark_value): + max_bookmark_value = transformed_record[bookmark_field] + + if bookmark_field and (bookmark_field in transformed_record): + if bookmark_type == 'integer': + # Keep only records whose bookmark is after the last_integer + if transformed_record[bookmark_field] >= last_integer: + write_record(stream_name, transformed_record, \ + time_extracted=time_extracted) + counter.increment() + elif bookmark_type == 'datetime': + last_dttm = transform_datetime(last_datetime) + bookmark_dttm = transform_datetime(transformed_record[bookmark_field]) + # Keep only records whose bookmark is after the last_datetime + if bookmark_dttm >= last_dttm: + write_record(stream_name, transformed_record, \ + time_extracted=time_extracted) + counter.increment() + else: + write_record(stream_name, transformed_record, time_extracted=time_extracted) + counter.increment() + + return max_bookmark_value, counter.value + + +# Currently syncing sets the stream currently being delivered in the state. +# If the integration is interrupted, this state property is used to identify +# the starting point to continue from. +# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46 +def update_currently_syncing(state, stream_name): + if (stream_name is None) and ('currently_syncing' in state): + del state['currently_syncing'] + else: + singer.set_currently_syncing(state, stream_name) + singer.write_state(state) + + +# List selected fields from stream catalog +def get_selected_fields(catalog, stream_name): + stream = catalog.get_stream(stream_name) + mdata = metadata.to_map(stream.metadata) + mdata_list = singer.metadata.to_list(mdata) + selected_fields = [] + for entry in mdata_list: + field = None + try: + field = entry['breadcrumb'][1] + if entry.get('metadata', {}).get('selected', False): + selected_fields.append(field) + except IndexError: + pass + return selected_fields + + +def get_data(stream_name, + endpoint_config, + client, + spreadsheet_id, + range_rows=None): + if not range_rows: + range_rows = '' + path = endpoint_config.get('path', stream_name).replace( + '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace( + '{range_rows}', range_rows) + params = endpoint_config.get('params', {}) + api = endpoint_config.get('api', 'sheets') + querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( + '{sheet_title}', stream_name) + data = {} + data = client.get( + path=path, + api=api, + params=querystring, + endpoint=stream_name) + return data + + +def transform_file_metadata(file_metadata): + # Convert to dict + file_metadata_tf = json.loads(json.dumps(file_metadata)) + # Remove keys + if file_metadata_tf.get('lastModifyingUser'): + file_metadata_tf['lastModifyingUser'].pop('photoLink', None) + file_metadata_tf['lastModifyingUser'].pop('me', None) + file_metadata_tf['lastModifyingUser'].pop('permissionId', None) + # Add record to an array of 1 + file_metadata_arr = [] + file_metadata_arr.append(file_metadata_tf) + return file_metadata_arr + + +def transform_spreadsheet_metadata(spreadsheet_metadata): + # Convert to dict + spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) + # Remove keys + if spreadsheet_metadata_tf.get('properties'): + spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) + spreadsheet_metadata_tf.pop('sheets', None) + # Add record to an array of 1 + spreadsheet_metadata_arr = [] + spreadsheet_metadata_arr.append(spreadsheet_metadata_tf) + return spreadsheet_metadata_arr + + +def transform_sheet_metadata(spreadsheet_id, sheet, columns): + # Convert to properties to dict + sheet_metadata = sheet.get('properties') + sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) + sheet_id = sheet_metadata_tf.get('sheetId') + sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( + spreadsheet_id, sheet_id) + sheet_metadata_tf['spreadsheetId'] = spreadsheet_id + sheet_metadata_tf['sheetUrl'] = sheet_url + sheet_metadata_tf['columns'] = columns + return sheet_metadata_tf + + +def sync(client, config, catalog, state): + start_date = config.get('start_date') + spreadsheet_id = config.get('spreadsheet_id') + + # Get selected_streams from catalog, based on state last_stream + # last_stream = Previous currently synced stream, if the load was interrupted + last_stream = singer.get_currently_syncing(state) + LOGGER.info('last/currently syncing stream: {}'.format(last_stream)) + selected_streams = [] + for stream in catalog.get_selected_streams(state): + selected_streams.append(stream.stream) + LOGGER.info('selected_streams: {}'.format(selected_streams)) + + if not selected_streams: + return + + # Get file_metadata + file_metadata = {} + file_metadata_config = STREAMS.get('file_metadata') + file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id) + file_metadata_tf = transform_file_metadata(file_metadata) + # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) + last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date)) + this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) + LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) + if this_datetime <= last_datetime: + LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') + return 0 + + # Get spreadsheet_metadata + spreadsheet_metadata = {} + spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata') + spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id) + spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) + # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf)) + + # Get sheet_metadata + sheets = spreadsheet_metadata.get('sheets') + sheet_metadata = [] + sheets_loaded = [] + sheets_loaded_config = STREAMS['sheets_loaded'] + if sheets: + for sheet in sheets: + sheet_title = sheet.get('properties', {}).get('title') + sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) + sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) + # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) + sheet_metadata.append(sheet_metadata_tf) + + # Determine range of rows and columns for "paging" through batch rows of data + sheet_last_col_index = 1 + sheet_last_col_letter = 'A' + for col in columns: + col_index = col.get('columnIndex') + col_letter = col.get('columnLetter') + if col_index > sheet_last_col_index: + sheet_last_col_index = col_index + sheet_last_col_letter = col_letter + sheet_max_row = sheet.get('gridProperties', {}).get('rowCount') + is_empty_row = False + batch_rows = 200 + from_row = 2 + if sheet_max_row < batch_rows: + to_row = sheet_max_row + else: + to_row = batch_rows + + while not is_empty_row and to_row <= sheet_max_row: + range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row) + + sheet_data = get_data( + stream_name=sheet_title, + endpoint_config=sheets_loaded_config, + client=client, + spreadsheet_id=spreadsheet_id, + range_rows=range_rows) -- cgit v1.2.3