From 55ac5a86eae5ef9079e3312fd589f1bda20e43f2 Mon Sep 17 00:00:00 2001 From: Andy Lu Date: Sun, 21 Feb 2021 21:09:11 -0600 Subject: Add files from pycco --- docs/__init__.html | 225 +++++++ docs/client.html | 1201 +++++++++++++++++++++++++++++++++++++ docs/discover.html | 127 ++++ docs/pycco.css | 191 ++++++ docs/schema.html | 949 +++++++++++++++++++++++++++++ docs/streams.html | 185 ++++++ docs/sync.html | 1680 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 4558 insertions(+) create mode 100644 docs/__init__.html create mode 100644 docs/client.html create mode 100644 docs/discover.html create mode 100644 docs/pycco.css create mode 100644 docs/schema.html create mode 100644 docs/streams.html create mode 100644 docs/sync.html diff --git a/docs/__init__.html b/docs/__init__.html new file mode 100644 index 0000000..eb5d3b3 --- /dev/null +++ b/docs/__init__.html @@ -0,0 +1,225 @@ + + + + + __init__.py + + + +
+
+
+

__init__.py

+
+
+
+
+
+ # +
+

This project syncs data from the v4 Google Sheets API.

+

Discovery Mode

+

There are a few static streams ("file_metadata", "spreadsheet_metadata", "sheet_metadata", +"sheets_loaded") and any number of dynamic streams. There’s one dynamic stream per sheet in the +one Google Sheets Doc.

+

Sync Mode

+
+
+
import sys
+import json
+import argparse # unused import
+import singer
+from singer import metadata, utils
+from tap_google_sheets.client import GoogleClient
+from tap_google_sheets.discover import discover
+from tap_google_sheets.sync import sync
+
+LOGGER = singer.get_logger()
+
+
+
+
+
+
+ # +
+

Configuration

+
+
+
+
+
+
+
+
+
+ # +
+

This is a typical OAuth2 tap. So in a config file we expect the following keys.

+
    +
  • +

    OAuth Related:

    +
      +
    • client_id
    • +
    • client_secret
    • +
    • refresh_token
    • +
    +
  • +
  • +

    Tap related:

    +
      +
    • spreadsheet_id
    • +
    • start_date
    • +
    • user_agent
    • +
    +
  • +
+
+
+
REQUIRED_CONFIG_KEYS = [
+    'client_id',
+    'client_secret',
+    'refresh_token',
+    'spreadsheet_id',
+    'start_date',
+    'user_agent'
+]
+
+
+
+
+
+
+ # +
+

Discovery Mode

+
+
+
+
+
+
+
+
+
+ # +
+

Creates a Singer Catalog and writes it to STDOUT

+
+
+
def do_discover(client, spreadsheet_id):
+
+
+
+
+
+
+ # +
+

Inputs:

+
    +
  • client
  • +
  • An instance of the GoogleClient class
  • +
  • spreadsheet_id
  • +
  • The id of the Google Sheet
  • +
+

Returns:

+
    +
  • None
  • +
+

Side Effects:

+
    +
  • Writes to STDOUT
  • +
+
+
+
    LOGGER.info('Starting discover')
+    catalog = discover(client, spreadsheet_id)
+    json.dump(catalog.to_dict(), sys.stdout, indent=2)
+    LOGGER.info('Finished discover')
+
+
+
+
+
+
+ # +
+

Entrypoint

+
+
+
+
+
+
+
+
+
+ # +
+

Read a config, then run discovery mode or sync mode

+
+
+
@singer.utils.handle_top_exception(LOGGER)
+def main():
+
+
+
+
+
+
+ # +
+

Inputs:

+
    +
  • None
  • +
+

Returns:

+
    +
  • None
  • +
+

Side Effects:

+
    +
  • Writes to STDOUT
  • +
+
+
+
    parsed_args = singer.utils.parse_args(REQUIRED_CONFIG_KEYS)
+
+    with GoogleClient(parsed_args.config['access_token'],
+                      parsed_args.config['user_agent']) as client:
+
+        state = {}
+        if parsed_args.state:
+            state = parsed_args.state
+
+        config = parsed_args.config
+        spreadsheet_id = config.get('spreadsheet_id')
+
+        if parsed_args.discover:
+            do_discover(client, spreadsheet_id)
+        elif parsed_args.catalog:
+            sync(client=client,
+                 config=config,
+                 catalog=parsed_args.catalog,
+                 state=state)
+
+
+
+
+
+
+ # +
+

Unused

+
+
+
if __name__ == '__main__':
+    main()
+
+
+
+
+
+
+ diff --git a/docs/client.html b/docs/client.html new file mode 100644 index 0000000..4e69b89 --- /dev/null +++ b/docs/client.html @@ -0,0 +1,1201 @@ + + + + + client.py + + + +
+
+
+

client.py

+
+
+
+
+
+ # +
+ +
+
+
from datetime import datetime, timedelta
+from collections import OrderedDict
+import backoff
+import requests
+import singer
+from singer import metrics
+from singer import utils
+
+BASE_URL = 'https://www.googleapis.com'
+GOOGLE_TOKEN_URI = 'https://oauth2.googleapis.com/token'
+LOGGER = singer.get_logger()
+
+
+
+
+
+
+ # +
+ +
+
+
class Server5xxError(Exception):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class Server429Error(Exception):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleError(Exception):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleBadRequestError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleUnauthorizedError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GooglePaymentRequiredError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleNotFoundError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleMethodNotAllowedError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleConflictError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleGoneError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GooglePreconditionFailedError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleRequestEntityTooLargeError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleRequestedRangeNotSatisfiableError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleExpectationFailedError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleForbiddenError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleUnprocessableEntityError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GooglePreconditionRequiredError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+ +
+
+
class GoogleInternalServiceError(GoogleError):
+    pass
+
+
+
+
+
+
+ # +
+

Error Codes: https://developers.google.com/webmaster-tools/search-console-api-original/v3/errors

+
+
+
ERROR_CODE_EXCEPTION_MAPPING = {
+    400: GoogleBadRequestError,
+    401: GoogleUnauthorizedError,
+    402: GooglePaymentRequiredError,
+    403: GoogleForbiddenError,
+    404: GoogleNotFoundError,
+    405: GoogleMethodNotAllowedError,
+    409: GoogleConflictError,
+    410: GoogleGoneError,
+    412: GooglePreconditionFailedError,
+    413: GoogleRequestEntityTooLargeError,
+    416: GoogleRequestedRangeNotSatisfiableError,
+    417: GoogleExpectationFailedError,
+    422: GoogleUnprocessableEntityError,
+    428: GooglePreconditionRequiredError,
+    500: GoogleInternalServiceError}
+
+
+
+
+
+
+ # +
+ +
+
+
def get_exception_for_error_code(error_code):
+    return ERROR_CODE_EXCEPTION_MAPPING.get(error_code, GoogleError)
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

client.py:raise_for_error() calls the raise_for_status() function from the requests library. +and catches all requests.HTTPError and requests.ConnectionError. Note the name difference.

+
+
+
+
+
+
+
+
+
+ # +
+
Thoughts
+

I believe there are 5 ways to leave this function. It’s worth skimming this just to understand the +structure. I’ll note below, but I think there’s just two ways to leave this function.

+
+
+
+
+
+
+
+
+
+ # +
+
    +
  1. If the length of the response content is 0, then we just leave
      +
    • I believe this results in us swallowing the requests.HTTPError and successfully returns to +the calling function
    • +
    • I believe it’s possible to leave the function this way
    • +
    +
  2. +
  3. If you can call response.json(), then we attempt to create a specific error message via + client.py:get_exception_for_error_code(), which just looks up a code found in + response.json()
      +
    • I am not convinced this ever works for this tap because my understanding of +raise_for_status() is if you raise_for_status() is unsuccessful then response.json() +will also be unsuccessful. So, because we are in the exception handling for +raise_for_status() I think we never make it past response = response.json() on +client.py:118
    • +
    • I believe it’s possible to leave the function this way
    • +
    +
  4. +
  5. Assuming response.json() does fail, then that function will raise a + simplejson.scanner.JSONDecodeError with an error message like "Expecting value: line 1 + column 1 (char 0)"
  6. +
  7. Assuming response.json() worked, but it’s lacks an error key and lacks an errorCode key, + we re-raise whatever was caught from raise_for_status()
  8. +
  9. We also re-raise whatever was caught from raise_for_status() if a ValueError or TypeError + occurs in trying to handle the raise_for_status() error
  10. +
+
+
+
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

Try to catch API errors to rethrow as tap specific errors

+
+
+
def raise_for_error(response):
+
+
+
+
+
+
+ # +
+

Inputs:

+
    +
  • response: A requests.Response object
  • +
+

Returns:

+
    +
  • None
  • +
+

Side Effects:

+
    +
  • Raises a GoogleError
  • +
+
+
+
    try:
+        response.raise_for_status()
+    except (requests.HTTPError, requests.ConnectionError) as error:
+        try:
+            content_length = len(response.content)
+            if content_length == 0:
+                return
+            response = response.json()
+            if ('error' in response) or ('errorCode' in response):
+                message = '%s: %s' % (response.get('error', str(error)),
+                                      response.get('message', 'Unknown Error'))
+                error_code = response.get('error', {}).get('code')
+                ex = get_exception_for_error_code(error_code)
+                raise ex(message)
+            raise GoogleError(error)
+        except (ValueError, TypeError):
+            raise GoogleError(error)
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

Handling a successful response

+
+
+
+
+
+
+
+
+
+ # +
+

A successful response is defined as anything that returns a HTTP 200.

+
+
+
+
+
+
+
+
+
+ # +
+

On a successful response, we store the access_token returned on a private field, +GoogleClient.__access_token, and we update GoogleClient.__expires to be the time this +access_token expires. GoogleClient.__expires is a datetime object in UTC.

+
+
+
+
+
+
+
+
+
+ # +
+

Handling an unsuccessful response

+
+
+
+
+
+
+
+
+
+ # +
+

To handle unsuccessful requests, the tap has the following pattern

+
+
+
+
+
+
+
+
+
+ # +
+
if response.status_code >= 500:
+    raise Server5xxError()
+
+if response.status_code != 200:
+    raise_for_error(response)
+
+
+
+
+
+
+
+
+
+
+ # +
+

The client.py:Server5xxError is caught by backoff and we exponentially backoff the request.

+
+
+
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

This is a class implemented in the tap in client.py. We initialize it once in __init__.py as +a context manager in __init__.py:main()

+
+
+
class GoogleClient: # pylint: disable=too-many-instance-attributes
+
+
+
+
+
+
+ # +
+ +
+
+
+
+
+
+
+
+
+ # +
+

To create the GoogleClient object, we have to pass in the three OAuth2 variables. Optionally we +can include the user_agent.

+

Side Effects:

+
    +
  • All of this gets stored in private fields by the constructor.
  • +
  • The constructor also initializes a requests.Session.
  • +
+
+
+
    def __init__(self, client_id, client_secret, refresh_token, access_token, user_agent=None):
+
+
+
+
+
+
+ # +
+ +
+
+
        self.__client_id = client_id
+        self.__client_secret = client_secret
+        self.__refresh_token = refresh_token
+        self.__user_agent = user_agent
+        self.__access_token = access_token
+        self.__session = requests.Session()
+        self.base_url = None
+
+
+
+
+
+
+ # +
+

On enter, get a new access token

+
+
+
    def __enter__(self):
+
+
+
+
+
+
+ # +
+ +
+
+
        self.get_access_token()
+        return self
+
+
+
+
+
+
+ # +
+

On exit, close the Requests Session

+
+
+
    def __exit__(self, exception_type, exception_value, traceback):
+
+
+
+
+
+
+ # +
+ +
+
+
        self.__session.close()
+
+
+
+
+
+
+ # +
+

get_access_token() will POST to client.py:GOOGLE_TOKEN_URI which is just +https://oauth2.googleapis.com/token. The body of the POST looks like

+
{
+  "grant_type": "refresh_token",
+  "client_id": my_client_id,
+  "client_secret": my_client_secret,
+  "refresh_token": my_refresh_token
+}
+
+

Side Effects:

+
    +
  • Store the access token and time it expires in private fields on the Client object
  • +
+
+
+
    @backoff.on_exception(backoff.expo,
+                          Server5xxError,
+                          max_tries=5,
+                          factor=2)
+    def get_access_token(self):
+
+
+
+
+
+
+ # +
+ +
+
+
        if self.__access_token is not None:
+            return
+
+        headers = {}
+        if self.__user_agent:
+            headers['User-Agent'] = self.__user_agent
+
+        response = self.__session.post(
+            url=GOOGLE_TOKEN_URI,
+            headers=headers,
+            data={
+                'grant_type': 'refresh_token',
+                'client_id': self.__client_id,
+                'client_secret': self.__client_secret,
+                'refresh_token': self.__refresh_token,
+            })
+
+        if response.status_code >= 500:
+            raise Server5xxError()
+
+        if response.status_code != 200:
+            raise_for_error(response)
+
+        data = response.json()
+        self.__access_token = data['access_token']
+        self.__expires = datetime.utcnow() + timedelta(seconds=data['expires_in'])
+        LOGGER.info('Authorized, token expires = {}'.format(self.__expires))
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

This function starts with a call to GoogleClient.get_access_token() which likely returns +immediately most of the time.

+
+
+
+
+
+
+
+
+
+ # +
+

Then we decide what url we are sending the request to. Sometimes it’s +https://sheets.googleapis.com/v4 and sometimes it’s https://www.googleapis.com/drive/v3.

+
+
+
+
+
+
+
+
+
+ # +
+
    +
  • It seems like a mistake to decide this so deep into the code. Why doesn’t the caller decide + where the request goes?
  • +
+
+
+
+
+
+
+
+
+
+ # +
+

Then we set up the request headers. The authorization, user-agent, and content-type keys come +into play here

+
+
+
+
+
+
+
+
+
+ # +
+
    +
  • One benefit of a a requests.Session is that you can set the headers for the session. I’m not +sure why we don’t do that here
  • +
  • If we did that, we wouldn’t have to think about the access_token making it into the headers +here. They would just already be there
  • +
+
+
+
+
+
+
+
+
+
+ # +
+

Then we make the request, timing how long it takes with a singer.metrics.http_request_timer.

+
+
+
+
+
+
+
+
+
+ # +
+

The chunk of code after making the request handles an unsuccessful response. We will retry HTTP +500 and HTTP 429 errors, and client.py:raise_for_error for everything else.

+
+
+
+
+
+
+
+
+
+ # +
+

The most unique thing of this tap happens here: we return an OrderedDict of the response with this +line

+
+
+
+
+
+
+
+
+
+ # +
+
return response.json(object_pairs_hook=OrderedDict)
+
+
+
+
+
+
+
+
+
+
+ # +
+

where object_pairs_hook is a kwarg passed to the JSON parser used by requests.

+
+
+
+
+
+
+
+
+
+ # +
+

This turns every key-value pair in the JSON response into a OrderedDict.

+
+
+
+
+
+
+
+
+
+ # +
+

Why do we do this? I don’t know. See the footnote for code examples

+
+
+
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

Rate Limit: https://developers.google.com/sheets/api/limits + 100 request per 100 seconds per User

+
+
+
    @backoff.on_exception(backoff.expo,
+                          (Server5xxError, ConnectionError, Server429Error),
+                          max_tries=7,
+                          factor=3)
+    @utils.ratelimit(100, 100)
+    def request(self, method, path=None, url=None, api=None, **kwargs):
+
+
+
+
+
+
+ # +
+

Make a request to the API

+

Inputs:

+
    +
  • method: “GET” or “POST”
  • +
  • url: The start of the url to make the request to
  • +
  • path:
  • +
+

Returns:

+
    +
  • A requests.Reponse
  • +
+

Side Effects:

+
    +
  • Might store a new access token
  • +
+
+
+
        self.get_access_token()
+
+
+
+
+
+
+ # +
+

Construct the URL to make a request to

+
+
+
        self.base_url = 'https://sheets.googleapis.com/v4'
+        if api == 'files':
+            self.base_url = 'https://www.googleapis.com/drive/v3'
+
+        if not url and path:
+            url = '{}/{}'.format(self.base_url, path)
+
+        if 'endpoint' in kwargs:
+            endpoint = kwargs['endpoint']
+            del kwargs['endpoint']
+        else:
+            endpoint = None
+        LOGGER.info('{} URL = {}'.format(endpoint, url))
+
+
+
+
+
+
+ # +
+

Contruct the headers arg for requests.request()

+
+
+
        if 'headers' not in kwargs:
+            kwargs['headers'] = {}
+        kwargs['headers']['Authorization'] = 'Bearer {}'.format(self.__access_token)
+
+        if self.__user_agent:
+            kwargs['headers']['User-Agent'] = self.__user_agent
+
+        if method == 'POST':
+            kwargs['headers']['Content-Type'] = 'application/json'
+
+
+
+
+
+
+ # +
+

Make request

+
+
+
        with metrics.http_request_timer(endpoint) as timer:
+            response = self.__session.request(method, url, **kwargs)
+            timer.tags[metrics.Tag.http_status_code] = response.status_code
+
+
+
+
+
+
+ # +
+

Start backoff logic

+
+
+
        if response.status_code >= 500:
+            raise Server5xxError()
+
+        if response.status_code == 429:
+            raise Server429Error()
+
+        if response.status_code != 200:
+            raise_for_error(response)
+
+
+
+
+
+
+ # +
+

Ensure keys and rows are ordered as received from API. +QUESITON: But why??

+
+
+
        return response.json(object_pairs_hook=OrderedDict)
+
+
+
+
+
+
+ # +
+

Syntactic Sugar

+
+
+
    def get(self, path, api, **kwargs):
+        return self.request(method='GET', path=path, api=api, **kwargs)
+
+
+
+
+
+
+ # +
+ +
+
+
    def post(self, path, api, **kwargs):
+        return self.request(method='POST', path=path, api=api, **kwargs)
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

Footnotes

+
+
+
+
+
+
+
+
+
+ # +
+

Here’s a normal .json()‘s output

+
{"file": "this is my file"}
+
+
+
+
+
+
+
+
+
+
+ # +
+

Here’s the weird one’s .json(object_pairs_hook=OrderedDict) output

+
OrderedDict([('file', 'this is my file')])
+
+
+
+
+
+
+
+
+
+
+ # +
+

Here’s a more complex example:

+
+
+
+
+
+
+
+
+
+ # +
+
{ "deleted": false,
+  "__v": 0,
+  "_id": "5887e1d85c873e0011036889",
+  "text": "Cats make about 100 different sounds. Dogs make only about 10.",
+  "createdAt": "2018-01-15T21:20:00.003Z",
+  "updatedAt": "2020-09-03T16:39:39.578Z",
+  "used": true,
+  "status": {
+      "sentCount": 1,
+      "feedback": "",
+      "verified": true
+  },
+  "type": "cat",
+  "user": "5a9ac18c7478810ea6c06381",
+  "source": "user"}
+
+
+
+
+
+
+
+
+
+
+ # +
+

Versus .json(object_pairs_hook=OrderedDict)

+
OrderedDict([('status', OrderedDict([('verified', True),
+                                     ('sentCount', 1),
+                                     ('feedback', '')])),
+             ('type', 'cat'),
+             ('deleted', False),
+             ('_id', '5887e1d85c873e0011036889'),
+             ('user', '5a9ac18c7478810ea6c06381'),
+             ('text', 'Cats make about 100 different sounds. Dogs make only about 10.'),
+             ('__v', 0),
+             ('source', 'user'),
+             ('updatedAt', '2020-09-03T16:39:39.578Z'),
+             ('createdAt', '2018-01-15T21:20:00.003Z'),
+             ('used', True)])
+
+
+
+
+
+
+
+
+ diff --git a/docs/discover.html b/docs/discover.html new file mode 100644 index 0000000..aecc2b6 --- /dev/null +++ b/docs/discover.html @@ -0,0 +1,127 @@ + + + + + discover.py + + + +
+
+
+

discover.py

+
+
+
+
+
+ # +
+ +
+
+
from singer.catalog import Catalog, CatalogEntry, Schema
+from tap_google_sheets.schema import get_schemas, STREAMS
+
+
+
+
+
+
+ # +
+

Construct a Catalog Entry for each stream

+

Inputs:

+
    +
  • client: A GoogleClient object
  • +
  • spreadsheet_id: the ID of a Google Sheet Doc
  • +
+

Returns:

+
    +
  • A singer.Catalog object
  • +
+
+
+
def discover(client, spreadsheet_id):
+
+
+
+
+
+
+ # +
+

It’s typical for taps in this style to call schema.py:get_schemas() to get schemas and +field_metadata.

+
+
+
+
+
+
+
+
+
+ # +
+

Here schemas is a dictionary of stream name to JSON schema and field_metadata is a dictionary +of stream name to another dictionary of stuff. In this tap, it seems that discover.py:discover() +only cares about sometimes getting table-key-properties from field_metadata.

+
+
+
+
+
+
+
+
+
+ # +
+
    +
  • This could be a point of confusion because table-key-properties is a stream / table level +metadata, which you may or may not expect to be returned and stored in field_metadata.
  • +
+
+
+
    schemas, field_metadata = get_schemas(client, spreadsheet_id)
+    catalog = Catalog([])
+
+    for stream_name, schema_dict in schemas.items():
+        schema = Schema.from_dict(schema_dict)
+        mdata = field_metadata[stream_name]
+        key_properties = None
+        for mdt in mdata:
+            table_key_properties = mdt.get('metadata', {}).get('table-key-properties')
+            if table_key_properties:
+                key_properties = table_key_properties
+
+
+
+
+
+
+ # +
+

Once you have the stream_name, value of table-key-properties, the schema, and the +metadata for the some stream, we pass all of that to the singer.CatalogEntry constructor +and append that to the singer.Catalog object initialized at the start of +discover.py:discover().

+
+
+
        catalog.streams.append(CatalogEntry(
+            stream=stream_name,
+            tap_stream_id=stream_name,
+            key_properties=STREAMS.get(stream_name, {}).get('key_properties', key_properties),
+            schema=schema,
+            metadata=mdata
+        ))
+
+    return catalog
+
+
+
+
+
+
+ diff --git a/docs/pycco.css b/docs/pycco.css new file mode 100644 index 0000000..a45d4c0 --- /dev/null +++ b/docs/pycco.css @@ -0,0 +1,191 @@ +/*--------------------- Layout and Typography ----------------------------*/ +body { + font-family: 'Palatino Linotype', 'Book Antiqua', Palatino, FreeSerif, serif; + font-size: 16px; + line-height: 24px; + color: #252519; + margin: 0; padding: 0; + background: #f5f5ff; +} +a { + color: #261a3b; +} + a:visited { + color: #261a3b; + } +p { + margin: 0 0 15px 0; +} +h1, h2, h3, h4, h5, h6 { + margin: 40px 0 15px 0; +} +h2, h3, h4, h5, h6 { + margin-top: 0; + } +#container { + background: white; + } +#container, div.section { + position: relative; +} +#background { + position: absolute; + top: 0; left: 580px; right: 0; bottom: 0; + background: #f5f5ff; + border-left: 1px solid #e5e5ee; + z-index: 0; +} +#jump_to, #jump_page { + background: white; + -webkit-box-shadow: 0 0 25px #777; -moz-box-shadow: 0 0 25px #777; + -webkit-border-bottom-left-radius: 5px; -moz-border-radius-bottomleft: 5px; + font: 10px Arial; + text-transform: uppercase; + cursor: pointer; + text-align: right; +} +#jump_to, #jump_wrapper { + position: fixed; + right: 0; top: 0; + padding: 5px 10px; +} + #jump_wrapper { + padding: 0; + display: none; + } + #jump_to:hover #jump_wrapper { + display: block; + } + #jump_page { + padding: 5px 0 3px; + margin: 0 0 25px 25px; + } + #jump_page .source { + display: block; + padding: 5px 10px; + text-decoration: none; + border-top: 1px solid #eee; + } + #jump_page .source:hover { + background: #f5f5ff; + } + #jump_page .source:first-child { + } +div.docs { + float: left; + max-width: 500px; + min-width: 500px; + min-height: 5px; + padding: 10px 25px 1px 50px; + vertical-align: top; + text-align: left; +} + .docs pre { + margin: 15px 0 15px; + padding-left: 15px; + overflow-y: scroll; + } + .docs p tt, .docs p code { + background: #f8f8ff; + border: 1px solid #dedede; + font-size: 12px; + padding: 0 0.2em; + } + .octowrap { + position: relative; + } + .octothorpe { + font: 12px Arial; + text-decoration: none; + color: #454545; + position: absolute; + top: 3px; left: -20px; + padding: 1px 2px; + opacity: 0; + -webkit-transition: opacity 0.2s linear; + } + div.docs:hover .octothorpe { + opacity: 1; + } +div.code { + margin-left: 580px; + padding: 14px 15px 16px 50px; + vertical-align: top; +} + .code pre, .docs p code { + font-size: 12px; + } + pre, tt, code { + line-height: 18px; + font-family: Monaco, Consolas, "Lucida Console", monospace; + margin: 0; padding: 0; + } +div.clearall { + clear: both; +} + + +/*---------------------- Syntax Highlighting -----------------------------*/ +td.linenos { background-color: #f0f0f0; padding-right: 10px; } +span.lineno { background-color: #f0f0f0; padding: 0 5px 0 5px; } +body .hll { background-color: #ffffcc } +body .c { color: #408080; font-style: italic } /* Comment */ +body .err { border: 1px solid #FF0000 } /* Error */ +body .k { color: #954121 } /* Keyword */ +body .o { color: #666666 } /* Operator */ +body .cm { color: #408080; font-style: italic } /* Comment.Multiline */ +body .cp { color: #BC7A00 } /* Comment.Preproc */ +body .c1 { color: #408080; font-style: italic } /* Comment.Single */ +body .cs { color: #408080; font-style: italic } /* Comment.Special */ +body .gd { color: #A00000 } /* Generic.Deleted */ +body .ge { font-style: italic } /* Generic.Emph */ +body .gr { color: #FF0000 } /* Generic.Error */ +body .gh { color: #000080; font-weight: bold } /* Generic.Heading */ +body .gi { color: #00A000 } /* Generic.Inserted */ +body .go { color: #808080 } /* Generic.Output */ +body .gp { color: #000080; font-weight: bold } /* Generic.Prompt */ +body .gs { font-weight: bold } /* Generic.Strong */ +body .gu { color: #800080; font-weight: bold } /* Generic.Subheading */ +body .gt { color: #0040D0 } /* Generic.Traceback */ +body .kc { color: #954121 } /* Keyword.Constant */ +body .kd { color: #954121; font-weight: bold } /* Keyword.Declaration */ +body .kn { color: #954121; font-weight: bold } /* Keyword.Namespace */ +body .kp { color: #954121 } /* Keyword.Pseudo */ +body .kr { color: #954121; font-weight: bold } /* Keyword.Reserved */ +body .kt { color: #B00040 } /* Keyword.Type */ +body .m { color: #666666 } /* Literal.Number */ +body .s { color: #219161 } /* Literal.String */ +body .na { color: #7D9029 } /* Name.Attribute */ +body .nb { color: #954121 } /* Name.Builtin */ +body .nc { color: #0000FF; font-weight: bold } /* Name.Class */ +body .no { color: #880000 } /* Name.Constant */ +body .nd { color: #AA22FF } /* Name.Decorator */ +body .ni { color: #999999; font-weight: bold } /* Name.Entity */ +body .ne { color: #D2413A; font-weight: bold } /* Name.Exception */ +body .nf { color: #0000FF } /* Name.Function */ +body .nl { color: #A0A000 } /* Name.Label */ +body .nn { color: #0000FF; font-weight: bold } /* Name.Namespace */ +body .nt { color: #954121; font-weight: bold } /* Name.Tag */ +body .nv { color: #19469D } /* Name.Variable */ +body .ow { color: #AA22FF; font-weight: bold } /* Operator.Word */ +body .w { color: #bbbbbb } /* Text.Whitespace */ +body .mf { color: #666666 } /* Literal.Number.Float */ +body .mh { color: #666666 } /* Literal.Number.Hex */ +body .mi { color: #666666 } /* Literal.Number.Integer */ +body .mo { color: #666666 } /* Literal.Number.Oct */ +body .sb { color: #219161 } /* Literal.String.Backtick */ +body .sc { color: #219161 } /* Literal.String.Char */ +body .sd { color: #219161; font-style: italic } /* Literal.String.Doc */ +body .s2 { color: #219161 } /* Literal.String.Double */ +body .se { color: #BB6622; font-weight: bold } /* Literal.String.Escape */ +body .sh { color: #219161 } /* Literal.String.Heredoc */ +body .si { color: #BB6688; font-weight: bold } /* Literal.String.Interpol */ +body .sx { color: #954121 } /* Literal.String.Other */ +body .sr { color: #BB6688 } /* Literal.String.Regex */ +body .s1 { color: #219161 } /* Literal.String.Single */ +body .ss { color: #19469D } /* Literal.String.Symbol */ +body .bp { color: #954121 } /* Name.Builtin.Pseudo */ +body .vc { color: #19469D } /* Name.Variable.Class */ +body .vg { color: #19469D } /* Name.Variable.Global */ +body .vi { color: #19469D } /* Name.Variable.Instance */ +body .il { color: #666666 } /* Literal.Number.Integer.Long */ diff --git a/docs/schema.html b/docs/schema.html new file mode 100644 index 0000000..0564f26 --- /dev/null +++ b/docs/schema.html @@ -0,0 +1,949 @@ + + + + + schema.py + + + +
+
+
+

schema.py

+
+
+
+
+
+ # +
+ +
+
+
import os
+import json
+import re
+import urllib.parse
+from collections import OrderedDict
+import singer
+from singer import metadata
+from tap_google_sheets.streams import STREAMS
+
+LOGGER = singer.get_logger()
+
+
+
+
+
+
+ # +
+

Convert column index to column letter

+
+
+
def colnum_string(num):
+
+
+
+
+
+
+ # +
+ +
+
+
    string = ""
+    while num > 0:
+        num, remainder = divmod(num - 1, 26)
+        string = chr(65 + remainder) + string
+    return string
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

The goal of this function is to get the JSON schema of the sheet you pass in. Our return values here +are sheet_json_schema and columns, an OrderedDict and a list respectively.

+
+
+
+
+
+
+
+
+
+ # +
+

This function is massive and we will discuss it in the following parts:

+
+
+
+
+
+
+
+
+
+ # +
+
    +
  • Part 1
  • +
  • Part 2
      +
    • Part 2A
    • +
    • Part 2B
        +
      • Part 3
      • +
      • Part 4
      • +
      +
    • +
    +
  • +
+
+
+
+
+
+
+
+
+
+ # +
+

Part 1 is just setting up constants and variables. We can skim through this part.

+
+
+
+
+
+
+
+
+
+ # +
+

Part 2 is split into two parts because it’s a loop over the column and there’s two ways to handle a +column.

+
+
+
+
+
+
+
+
+
+ # +
+

We’ll consider 2A to be the “skip this column” case.

+
+
+
+
+
+
+
+
+
+ # +
+

We’ll consider 2B as the “not skipped” case. In which we determine a field’s type (Part 3) and then +use the type to decide the JSON Schema (Part 4).

+
+
+
+
+
+
+
+
+
+ # +
+
+

Create sheet_metadata_json with columns from sheet

+
+
+
def get_sheet_schema_columns(sheet):
+
+
+
+
+
+
+ # +
+

The input to this function is shaped like

+
{
+  "data" : [
+    {
+      "rowData": [
+        {"values": <thing 1>},
+        {"values": <thing 2>}
+      ]
+    }
+  ]
+}
+
+

Return Values

+
    +
  • +

    columns

    +
      +
    • A column that goes into columns is a dictionary with keys "columnIndex", +"columnLetter", "columnName", "columnType", and "columnSkipped".
    • +
    +
  • +
  • +

    sheet_json_schema

    +
      +
    • A col_properties that goes into sheet_json_schema['properties'][column_name] is the JSON +schema of column_name.
    • +
    +
  • +
+
+
+
    sheet_title = sheet.get('properties', {}).get('title')
+    sheet_json_schema = OrderedDict()
+    data = next(iter(sheet.get('data', [])), {})
+    row_data = data.get('rowData', [])
+    if row_data == []:
+        LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
+        return None, None
+
+
+
+
+
+
+ # +
+

So this function starts by unpacking it into two lists, headers and first_values, which is +“thing 1” and “thing 2” respectively.

+
+
+
    headers = row_data[0].get('values', [])
+    first_values = row_data[1].get('values', [])
+
+
+
+
+
+
+ # +
+

All of the objects in headers and first_values have the following shape:

+
+
+
+
+
+
+
+
+
+ # +
+
{
+    "userEnteredValue": {"stringValue": "time1"},
+    "effectiveValue": {"stringValue": "time1"},
+    "formattedValue": "time1",
+    "userEnteredFormat": {...},
+    "effectiveFormat": {}
+}
+
+
+
+
+
+
+
+
+
+
+ # +
+

The base Sheet schema

+
+
+
    sheet_json_schema = {
+        'type': 'object',
+        'additionalProperties': False,
+        'properties': {
+            '__sdc_spreadsheet_id': {
+                'type': ['null', 'string']
+            },
+            '__sdc_sheet_id': {
+                'type': ['null', 'integer']
+            },
+            '__sdc_row': {
+                'type': ['null', 'integer']
+            }
+        }
+    }
+
+    header_list = [] # used for checking uniqueness
+    columns = []
+    prior_header = None
+    i = 0
+    skipped = 0
+
+
+
+
+
+
+ # +
+

We loop over the columns in the headers list and accummulate an object in each return +variable.

+
+
+
    for header in headers:
+        column_index = i + 1
+        column_letter = colnum_string(column_index)
+        header_value = header.get('formattedValue')
+        if header_value: # NOT skipped
+
+
+
+
+
+
+ # +
+

Assuming the column we are looking at does not get skipped, we have to figure out the +schema.

+
+
+
            column_is_skipped = False
+
+
+
+
+
+
+ # +
+

First we reset the counter for consecutive skipped columns.

+
+
+
            skipped = 0
+
+
+
+
+
+
+ # +
+

Then we let the name of this column be the value of formattedValue from the header +object we are looking at. This seems to be the value rendered in Google Sheets in the +cell.

+
+
+
            column_name = '{}'.format(header_value)
+
+
+
+
+
+
+ # +
+

We assert that this column name is unique or else we raise a “Duplicate Header Error”.

+
+
+
            if column_name in header_list:
+                raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format(
+                    sheet_title, column_name, column_letter))
+            header_list.append(column_name)
+
+
+
+
+
+
+ # +
+

We attempt to grab the value in the second row of the sheet (the first row of data) +associated with this column. Remember this row we are looking at is stored in +first_values. Note again that headers and first_values have the same shape.

+
+
+
            first_value = None
+            try:
+                first_value = first_values[i]
+            except IndexError as err:
+                LOGGER.info('NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2. {}'.format(
+                    sheet_title, column_name, column_letter, err))
+                first_value = {}
+                first_values.append(first_value)
+                pass
+
+            column_effective_value = first_value.get('effectiveValue', {})
+
+            col_val = None
+            if column_effective_value == {}:
+                column_effective_value_type = 'stringValue'
+                LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format(
+                    sheet_title, column_name, column_letter))
+                LOGGER.info('   Setting column datatype to STRING')
+            else:
+
+
+
+
+
+
+ # +
+

The tap calls the value of "effectiveValue" the column_effective_value. This +dictionary can be empty or it can have a key1 that looks like "numberValue", +"stringValue", or "boolValue". If the dictionary is empty, we force key1 to +be "stringValue".

+
+
+
                for key, val in column_effective_value.items():
+                    if key in ('numberValue', 'stringValue', 'boolValue'):
+                        column_effective_value_type = key
+                        col_val = str(val)
+
+
+
+
+
+
+ # +
+

Sometimes key1 also looks like "errorType" or "formulaType", but in +these cases, we raise a “Data Type Error” error immediately.

+
+
+
                    elif key in ('errorType', 'formulaType'):
+                        col_val = str(val)
+                        raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
+                            sheet_title, column_name, column_letter, key, col_val))
+
+            column_number_format = first_values[i].get('effectiveFormat', {}).get(
+                'numberFormat', {})
+
+
+
+
+
+
+ # +
+

column_number_format_type = UNSPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE

+
    +
  • TIME, DATE_TIME, SCIENTIFIC
  • +
  • https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
  • +
+
+
+
            column_number_format_type = column_number_format.get('type')
+
+
+
+
+
+
+ # +
+

the giant if-elif-else block: All it does is set a variable col_properties and +column_gs_type based on the values of column_effective_value_type and +column_number_format_type.

+
+
+
            column_format = None
+            if column_effective_value == {}:
+                col_properties = {'type': ['null', 'string']}
+                column_gs_type = 'stringValue'
+                LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format(
+                        sheet_title, column_name, column_letter))
+                LOGGER.info('   Setting column datatype to STRING')
+
+
+
+
+
+
+ # +
+

column_effective_value_type = numberValue, stringValue, boolValue

+
    +
  • INVALID: errorType, formulaType
  • +
  • https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
  • +
+
+
+
            elif column_effective_value_type == 'stringValue':
+                col_properties = {'type': ['null', 'string']}
+                column_gs_type = 'stringValue'
+            elif column_effective_value_type == 'boolValue':
+                col_properties = {'type': ['null', 'boolean', 'string']}
+                column_gs_type = 'boolValue'
+            elif column_effective_value_type == 'numberValue':
+                if column_number_format_type == 'DATE_TIME':
+                    col_properties = {
+                        'type': ['null', 'string'],
+                        'format': 'date-time'
+                    }
+                    column_gs_type = 'numberType.DATE_TIME'
+                elif column_number_format_type == 'DATE':
+                    col_properties = {
+                        'type': ['null', 'string'],
+                        'format': 'date'
+                    }
+                    column_gs_type = 'numberType.DATE'
+                elif column_number_format_type == 'TIME':
+                    col_properties = {
+                        'type': ['null', 'string'],
+                        'format': 'time'
+                    }
+                    column_gs_type = 'numberType.TIME'
+                elif column_number_format_type == 'TEXT':
+                    col_properties = {'type': ['null', 'string']}
+                    column_gs_type = 'stringValue'
+                else:
+                    col_properties = {'type': 'number', 'multipleOf': 1e-15}
+                    column_gs_type = 'numberType'
+            else:
+                col_properties = {'type': ['null', 'string']}
+                column_gs_type = 'unsupportedValue'
+                LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
+                        sheet_title, column_name, column_letter, column_effective_value_type, col_val))
+                LOGGER.info('Converting to string.')
+        else: # skipped
+
+
+
+
+
+
+ # +
+

We note that we are skipping this column. It still gets added to the schema though as +a string field. The only other notable thing about skipped columns is the we create +the field name for it, and it looks like "__sdc_skip_col_XY", where the XY goes +from "00", "01", to "99".

+
+
+
            column_is_skipped = True
+            skipped = skipped + 1
+            column_index_str = str(column_index).zfill(2)
+            column_name = '__sdc_skip_col_{}'.format(column_index_str)
+            col_properties = {'type': ['null', 'string']}
+            column_gs_type = 'stringValue'
+            LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format(
+                sheet_title, column_name, column_letter))
+            LOGGER.info('  This column will be skipped during data loading.')
+
+        if skipped >= 2:
+            sheet_json_schema['properties'].pop(prior_header, None)
+            LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format(
+                sheet_title, column_name, column_letter))
+            break
+
+        else:
+            column = {}
+            column = {
+                'columnIndex': column_index,
+                'columnLetter': column_letter,
+                'columnName': column_name,
+                'columnType': column_gs_type,
+                'columnSkipped': column_is_skipped
+            }
+            columns.append(column)
+
+            if column_gs_type in {'numberType.DATE_TIME', 'numberType.DATE', 'numberType.TIME', 'numberType'}:
+                col_properties = {
+                    'anyOf': [
+                        col_properties,
+                        {'type': ['null', 'string']}
+                    ]
+                }
+
+            sheet_json_schema['properties'][column_name] = col_properties
+
+        prior_header = column_name
+        i = i + 1
+
+    return sheet_json_schema, columns
+
+
+
+
+
+
+ # +
+

The point of this function seems to be (1) make a request to get a sheet (2) return the schema +generated for this sheet by schema.py:get_sheet_schema_columns.

+

get_sheet_metadata() sets up a lot of variables to ultimately make a request to

+
https://sheets.googleapis.com/v4/spreadsheets/my-spreadsheet-id?includeGridData=true&ranges='my-sheet-title'!1:2
+
+
+
+
+
+
+
+
+
+
+ # +
+

Let’s dissect the query params here a bit.

+
+
+
+
+
+
+
+
+
+ # +
+

includeGridData is false by default and setting this to true lets us get “Grid data”. If you +compare the same request but with that value flipped, then you’ll notice the includeGridData=false +gives you a relatively small response with no data in it. It seems like just a bunch of metadata.

+
+
+
+
+
+
+
+
+
+ # +
+

ranges controls the rows returned.

+
+
+
def get_sheet_metadata(sheet, spreadsheet_id, client):
+
+
+
+
+
+
+ # +
+

Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query

+
    +
  • endpoint: spreadsheets/{spreadsheet_id}
  • +
  • params: includeGridData = true, ranges = ‘{sheet_title}’!1:2 +This endpoint includes detailed metadata about each cell - incl. data type, formatting, etc.
  • +
+
+
+
    sheet_id = sheet.get('properties', {}).get('sheetId')
+    sheet_title = sheet.get('properties', {}).get('title')
+    LOGGER.info('sheet_id = {}, sheet_title = {}'.format(sheet_id, sheet_title))
+
+    stream_name = 'sheet_metadata'
+    stream_metadata = STREAMS.get(stream_name)
+    api = stream_metadata.get('api', 'sheets')
+    params = stream_metadata.get('params', {})
+    sheet_title_encoded = urllib.parse.quote_plus(sheet_title)
+    sheet_title_escaped = re.escape(sheet_title)
+    querystring = '&'.join(
+        ['%s=%s' % (key, value) for (key, value) in params.items()]
+    ).replace('{sheet_title}', sheet_title_encoded)
+    path = '{}?{}'.format(
+        stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id),
+        querystring
+    )
+
+
+
+
+
+
+ # +
+

See the Footnotes for this response shape

+
+
+
    sheet_md_results = client.get(path=path, api=api, endpoint=sheet_title_escaped)
+    sheet_metadata = sheet_md_results.get('sheets')[0]
+
+
+    try:
+
+
+
+
+
+
+ # +
+

Create sheet_json_schema (for discovery/catalog) and columns (for sheet_metadata results)

+
+
+
        sheet_json_schema, columns = get_sheet_schema_columns(sheet_metadata)
+    except Exception as err:
+        LOGGER.warning('{}'.format(err))
+        LOGGER.warning('SKIPPING Malformed sheet: {}'.format(sheet_title))
+        sheet_json_schema, columns = None, None
+
+    return sheet_json_schema, columns
+
+
+
+
+
+
+ # +
+ +
+
+
def get_abs_path(path):
+    return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
+
+
+
+
+
+
+ # +
+

We initialize our return variables, schemas and field_metadata to empty dictionaries.

+

We loop over each stream in streams.py:STREAMS. We load the static JSON file into memory - all +four streams currently have some static schema. We store this on our return variable schemas +under the stream name.

+

We then call singer.metadata.get_standard_metadata() passing in whatever metadata we do have +(key properties, valid replication keys, the replication method). The return value here is +stored on our return variable field_metadata under the stream name.

+
+
+
def get_schemas(client, spreadsheet_id):
+
+
+
+
+
+
+ # +
+ +
+
+
    schemas = {}
+    field_metadata = {}
+
+    for stream_name, stream_metadata in STREAMS.items():
+        schema_path = get_abs_path('schemas/{}.json'.format(stream_name))
+        with open(schema_path) as file:
+            schema = json.load(file)
+        schemas[stream_name] = schema
+        mdata = metadata.new()
+
+        mdata = metadata.get_standard_metadata(
+            schema=schema,
+            key_properties=stream_metadata.get('key_properties', None),
+            valid_replication_keys=stream_metadata.get('replication_keys', None),
+            replication_method=stream_metadata.get('replication_method', None)
+        )
+        field_metadata[stream_name] = mdata
+
+
+
+
+
+
+ # +
+

If we are handling the "spreadsheet_metadata" stream, we do some extra work to build the +dynamic schemas of each Sheet we want to sync.. Otherwise, that’s it.

+
+
+
        if stream_name == 'spreadsheet_metadata':
+
+
+
+
+
+
+ # +
+

We ultimately end up making a GET to

+
+
+
+
+
+
+
+
+
+ # +
+
https://sheets.googleapis.com/v4/spreadsheets/my-spreadsheet-id?includeGridData=false
+
+
+
+
+
+
+
+
+
+
+ # +
+

Notice this is base_url + path + query_string. There’s code here to figure out and +properly format path and query_string. I’m not sure why we don’t let requests +handle this.

+
+
+
+
+
+
+
+
+
+ # +
+

We assume this request is successful and we store the OrderedDict return value as +spreadsheet_md_results.

+
+
+
            api = stream_metadata.get('api', 'sheets')
+            params = stream_metadata.get('params', {})
+            querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()])
+            path = '{}?{}'.format(
+                stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id),
+                querystring
+            )
+
+            spreadsheet_md_results = client.get(
+                path=path,
+                params=querystring,
+                api=api,
+                endpoint=stream_name
+            )
+
+
+
+
+
+
+ # +
+

The response here is one of those “envelope” kinds. The data we care about is under +the "sheets" key.

+
+
+
            sheets = spreadsheet_md_results.get('sheets')
+            if sheets:
+
+
+
+
+
+
+ # +
+

Looping over this array, we call schema.py:get_sheet_metadata. This gets the +JSON schema of each sheet found in this Google Doc. We use the sheet’s title as +the stream name here.

+
+
+
                for sheet in sheets:
+                    sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
+
+                    if sheet_json_schema and columns:
+                        sheet_title = sheet.get('properties', {}).get('title')
+                        schemas[sheet_title] = sheet_json_schema
+                        sheet_mdata = metadata.new()
+                        sheet_mdata = metadata.get_standard_metadata(
+                            schema=sheet_json_schema,
+                            key_properties=['__sdc_row'],
+                            valid_replication_keys=None,
+                            replication_method='FULL_TABLE'
+                        )
+                        field_metadata[sheet_title] = sheet_mdata
+
+    return schemas, field_metadata
+
+
+
+
+
+
+ # +
+

Footnotes

+

The shape of response is like, but note the tap stores this in the recursive OrderedDict structure

+
+
+
+
+
+
+
+
+
+ # +
+
{
+    "spreadsheetid": "my-id",
+    "properties": {...},
+    "sheets": [
+        {
+            "properties": {},
+            "data": [
+                {
+                    "rowData": [
+                        {
+                            "values": [
+                                {
+                                    "userEnteredValue": {"stringValue": "time1"},
+                                    "effectiveValue": {"stringValue": "time1"},
+                                    "formattedValue": "time1",
+                                    "userEnteredFormat": {...},
+                                    "effectiveFormat": {}
+                                },
+                                ...
+                            ],
+                        },
+                        ...
+                    ]
+                }
+            ]
+        },
+    ]
+}
+
+
+
+
+
+
+
+
+ diff --git a/docs/streams.html b/docs/streams.html new file mode 100644 index 0000000..8c9b6d9 --- /dev/null +++ b/docs/streams.html @@ -0,0 +1,185 @@ + + + + + streams.py + + + +
+
+
+

streams.py

+
+
+
+
+
+ # +
+

streams.py:STREAMS is an OrderedDict. Only because we want to loop over it in the same order +every time.

+

It’s still the same global variable found in taps of this style. It maps stream names to a +dictionary describing the stream.

+

Some notable things we learn in this file:

+
    +
  • +

    api is either "files" or "sheets"

    +
  • +
  • +

    We saw this used in client.py:GoogleClient.request() to switch the base url of the request

    +
  • +
  • +

    "file_metadata" is the only incremental stream

    +
  • +
  • +

    Full table streams include:

    +
  • +
  • "spreadsheet_metadata"
  • +
  • "sheet_metadata"
  • +
  • +

    "sheets_loaded"

    +
  • +
  • +

    "sheets_loaded" is the only stream with a "data_key"

    +
  • +
  • We typically see data_key be the name of the key to get data out of “envelope” responses
  • +
+
+
+
from collections import OrderedDict
+
+
+
+
+
+
+ # +
+

streams: API URL endpoints to be called +properties: + : Plural stream name for the endpoint + path: API endpoint relative path, when added to the base URL, creates the full path, + default = stream_name + key_properties: Primary key fields for identifying an endpoint record. + replication_method: INCREMENTAL or FULL_TABLE + replication_keys: bookmark_field(s), typically a date-time, used for filtering the results + and setting the state + params: Query, sort, and other endpoint specific parameters; default = {} + data_key: JSON element containing the results list for the endpoint; + default = root (no data_key)

+
+
+
+
+
+
+
+
+
+ # +
+

file_metadata: Queries Google Drive API to get file information and see if file has been modified + Provides audit info about who and when last changed the file.

+
+
+
FILE_METADATA = {
+    "api": "files",
+    "path": "files/{spreadsheet_id}",
+    "key_properties": ["id"],
+    "replication_method": "INCREMENTAL",
+    "replication_keys": ["modifiedTime"],
+    "params": {
+        "fields": "id,name,createdTime,modifiedTime,version,teamDriveId,driveId,lastModifyingUser"
+    }
+}
+
+
+
+
+
+
+ # +
+

spreadsheet_metadata: Queries spreadsheet to get basic information on spreadhsheet and sheets

+
+
+
SPREADSHEET_METADATA = {
+    "api": "sheets",
+    "path": "spreadsheets/{spreadsheet_id}",
+    "key_properties": ["spreadsheetId"],
+    "replication_method": "FULL_TABLE",
+    "params": {
+        "includeGridData": "false"
+    }
+}
+
+
+
+
+
+
+ # +
+

sheet_metadata: Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet. +This endpoint includes detailed metadata about each cell in the header and first data row + incl. data type, formatting, etc.

+
+
+
SHEET_METADATA = {
+    "api": "sheets",
+    "path": "spreadsheets/{spreadsheet_id}",
+    "key_properties": ["sheetId"],
+    "replication_method": "FULL_TABLE",
+    "params": {
+        "includeGridData": "true",
+        "ranges": "'{sheet_title}'!1:2"
+    }
+}
+
+
+
+
+
+
+ # +
+

sheets_loaded: Queries a batch of Rows for each Sheet in the Spreadsheet. +Each query uses the values endpoint, to get data-only, w/out the formatting/type metadata.

+
+
+
SHEETS_LOADED = {
+    "api": "sheets",
+    "path": "spreadsheets/{spreadsheet_id}/values/'{sheet_title}'!{range_rows}",
+    "data_key": "values",
+    "key_properties": ["spreadsheetId", "sheetId", "loadDate"],
+    "replication_method": "FULL_TABLE",
+    "params": {
+        "dateTimeRenderOption": "SERIAL_NUMBER",
+        "valueRenderOption": "UNFORMATTED_VALUE",
+        "majorDimension": "ROWS"
+    }
+}
+
+
+
+
+
+
+ # +
+

Ensure streams are ordered sequentially, logically.

+
+
+
STREAMS = OrderedDict()
+STREAMS['file_metadata'] = FILE_METADATA
+STREAMS['spreadsheet_metadata'] = SPREADSHEET_METADATA
+STREAMS['sheet_metadata'] = SHEET_METADATA
+STREAMS['sheets_loaded'] = SHEETS_LOADED
+
+
+
+
+
+
+ diff --git a/docs/sync.html b/docs/sync.html new file mode 100644 index 0000000..97ef9fa --- /dev/null +++ b/docs/sync.html @@ -0,0 +1,1680 @@ + + + + + sync.py + + + +
+
+
+

sync.py

+
+
+
+
+
+ # +
+

This module contains the logic to sync data from the API.

+
+

Syncable streams: The tap seems to care about syncing the streams in this order.

+
    +
  1. file_metadata
  2. +
  3. spreadsheet_metadata
  4. +
  5. N Sheets
  6. +
  7. sheet_metadata
  8. +
  9. sheets_loaded
  10. +
  11. sheets_loaded
  12. +
+
+

The flow through this module is:

+
    +
  1. Entrypoint: sync()
  2. +
  3. Sync file_metadata
      +
    1. get_data()
    2. +
    3. transform_file_metadata()
    4. +
    5. Maybe exit the sync
    6. +
    7. sync_stream()
    8. +
    +
  4. +
  5. Sync spreadsheet_metadata
      +
    1. get_data()
    2. +
    3. transform_spreadsheet_metadata()
    4. +
    5. sync_stream()
    6. +
    +
  6. +
  7. Sync all of the Sheets. Here’s the process for a single Sheet
      +
    1. get_sheet_metadata()
    2. +
    3. transform_sheet_metadata()
    4. +
    5. get_data()
    6. +
    7. transform_sheet_data()
    8. +
    9. process_records()
    10. +
    +
  8. +
  9. Sync sheet_metadata
      +
    1. sync_stream()
    2. +
    +
  10. +
  11. Sync sheets_loaded
      +
    1. sync_stream()
    2. +
    +
  12. +
  13. Sync sheets_loaded
      +
    1. sync_stream()
    2. +
    +
  14. +
+
+
+
import time
+import math
+import json
+import re
+import urllib.parse
+from datetime import datetime, timedelta
+import pytz
+import singer
+from singer import metrics, metadata, Transformer, utils
+from singer.utils import strptime_to_utc, strftime
+from singer.messages import RecordMessage
+from tap_google_sheets.streams import STREAMS
+from tap_google_sheets.schema import get_sheet_metadata
+
+LOGGER = singer.get_logger()
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

Helper Functions

+
+
+
+
+
+
+
+
+
+ # +
+
+

Log that we write a schema via singer.write_schema

+
+
+
def write_schema(catalog, stream_name):
+
+
+
+
+
+
+ # +
+ +
+
+
    stream = catalog.get_stream(stream_name)
+    schema = stream.schema.to_dict()
+    try:
+        singer.write_schema(stream_name, schema, stream.key_properties)
+        LOGGER.info('Writing schema for: {}'.format(stream_name))
+    except OSError as err:
+
+
+
+
+
+
+ # +
+

QUESTION: When do we encounter an OSError?

+
+
+
        LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
+        raise err
+
+
+
+
+
+
+ # +
+

Write a RecordMessage, with the given version if it was passed in

+
+
+
def write_record(stream_name, record, time_extracted, version=None):
+
+
+
+
+
+
+ # +
+ +
+
+
    try:
+        if version:
+            singer.messages.write_message(
+                RecordMessage(
+                    stream=stream_name,
+                    record=record,
+                    version=version,
+                    time_extracted=time_extracted))
+        else:
+            singer.messages.write_record(
+                stream_name=stream_name,
+                record=record,
+                time_extracted=time_extracted)
+    except OSError as err:
+
+
+
+
+
+
+ # +
+

QUESTION: When do we encounter an OSError?

+
+
+
        LOGGER.info('OS Error writing record for: {}'.format(stream_name))
+        LOGGER.info('record: {}'.format(record))
+        raise err
+
+
+
+
+
+
+ # +
+

Safe get a bookmark from state.

+
+
+
def get_bookmark(state, stream, default):
+
+
+
+
+
+
+ # +
+

Hides an error though if state turns out to be None

+
+
+
    if (state is None) or ('bookmarks' not in state):
+        return default
+
+
+
+
+
+
+ # +
+

This is also short enough for one line, is this supposed to be more readable?

+
+
+
    return (
+        state
+        .get('bookmarks', {})
+        .get(stream, default)
+    )
+
+
+
+
+
+
+ # +
+

Updates and write state

+
+
+
def write_bookmark(state, stream, value):
+
+
+
+
+
+
+ # +
+ +
+
+
    if 'bookmarks' not in state:
+        state['bookmarks'] = {}
+    state['bookmarks'][stream] = value
+    LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
+    singer.write_state(state)
+
+
+
+
+
+
+ # +
+

Upserts or deletes the ‘currently_syncing’ stream

+
+
+
def update_currently_syncing(state, stream_name):
+
+
+
+
+
+
+ # +
+

Why do we care if stream_name is passed in to delete currently_syncing?

+
+
+
    if (stream_name is None) and ('currently_syncing' in state):
+        del state['currently_syncing']
+    else:
+        singer.set_currently_syncing(state, stream_name)
+    singer.write_state(state)
+
+
+
+
+
+
+ # +
+

Get a list of selected, top-level fields for stream_name

+
+
+
def get_selected_fields(catalog, stream_name):
+
+
+
+
+
+
+ # +
+ +
+
+
    stream = catalog.get_stream(stream_name)
+    mdata = metadata.to_map(stream.metadata)
+    mdata_list = singer.metadata.to_list(mdata)
+    selected_fields = []
+    for entry in mdata_list:
+        field = None
+        try:
+            field = entry['breadcrumb'][1]
+            if entry.get('metadata', {}).get('selected', False):
+                selected_fields.append(field)
+        except IndexError:
+
+
+
+
+
+
+ # +
+

Swallow the error for the Stream level metadata

+
+
+
            pass
+    return selected_fields
+
+
+
+
+
+
+ # +
+

Construct the request we want to make, make the request, and return the Response

+
+
+
def get_data(stream_name, endpoint_config, client, spreadsheet_id, range_rows=None):
+
+
+
+
+
+
+ # +
+ +
+
+
+
+
+
+
+
+
+ # +
+

Build the query

+
+
+
    stream_name_escaped = re.escape(stream_name)
+
+
+
+
+
+
+ # +
+

Encode stream_name to fix issues with special characters in stream_name +QUESTION: If there’s special characters here how do databases handle it?

+
+
+
    stream_name_encoded = urllib.parse.quote_plus(stream_name)
+
+    if not range_rows:
+        range_rows = ''
+
+
+
+
+
+
+ # +
+

QUESTION: Why is this not a string.format() with keywords?

+
+
+
    path = endpoint_config.get('path', stream_name).replace(
+        '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace(
+            '{range_rows}', range_rows)
+    params = endpoint_config.get('params', {})
+    api = endpoint_config.get('api', 'sheets')
+    querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
+        '{sheet_title}', stream_name_encoded)
+    LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring))
+    data = {}
+    time_extracted = utils.now()
+
+
+
+
+
+
+ # +
+

Make the query

+
+
+
    data = client.get(
+        path=path,
+        api=api,
+        params=querystring,
+        endpoint=stream_name_escaped)
+
+
+
+
+
+
+ # +
+

Return the Response.json()

+
+
+
    return data, time_extracted
+
+
+
+
+
+
+ # +
+
+

Transform Functions

+

There’s this line of code that happens in these that is a bit confusing:

+
+
+
+
+
+
+
+
+
+ # +
+
json.loads(json.dumps(some_object))
+
+
+
+
+
+
+
+
+
+
+ # +
+

I don’t see the use here. We turn Python into a JSON string and back again. +The only thing I could see in the repl is that integer keys get stringified.

+
+
+
+
+
+
+
+
+
+ # +
+

In general, the transform functions just look like “maybe pop some +stuff”, “maybe add some stuff”, and return the input in a list

+
+
+
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

remove nodes from lastModifyingUser, format as array

+
+
+
def transform_file_metadata(file_metadata):
+
+
+
+
+
+
+ # +
+ +
+
+
    file_metadata_tf = json.loads(json.dumps(file_metadata))
+
+    if file_metadata_tf.get('lastModifyingUser'):
+        file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
+        file_metadata_tf['lastModifyingUser'].pop('me', None)
+        file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
+
+    file_metadata_arr = []
+    file_metadata_arr.append(file_metadata_tf)
+    return file_metadata_arr
+
+
+
+
+
+
+ # +
+

remove defaultFormat and sheets nodes, format as array

+
+
+
def transform_spreadsheet_metadata(spreadsheet_metadata):
+
+
+
+
+
+
+ # +
+ +
+
+
    spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
+
+    if spreadsheet_metadata_tf.get('properties'):
+        spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
+    spreadsheet_metadata_tf.pop('sheets', None)
+
+    spreadsheet_metadata_arr = []
+    spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
+    return spreadsheet_metadata_arr
+
+
+
+
+
+
+ # +
+

add spreadsheetId, sheetUrl, and columns metadata

+
+
+
def transform_sheet_metadata(spreadsheet_id, sheet, columns):
+
+
+
+
+
+
+ # +
+ +
+
+
    sheet_metadata = sheet.get('properties')
+    sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
+    sheet_id = sheet_metadata_tf.get('sheetId')
+    sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(spreadsheet_id, sheet_id)
+    sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
+    sheet_metadata_tf['sheetUrl'] = sheet_url
+    sheet_metadata_tf['columns'] = columns
+    return sheet_metadata_tf
+
+
+
+
+
+
+ # +
+

Convert Excel Date Serial Number (excel_date_sn) to datetime string timezone_str: defaults to

+
+
+
def excel_to_dttm_str(excel_date_sn, timezone_str=None):
+
+
+
+
+
+
+ # +
+

UTC (which we assume is the timezone for ALL datetimes)

+
+
+
    if not timezone_str:
+        timezone_str = 'UTC'
+    tzn = pytz.timezone(timezone_str)
+    epoch_dttm = datetime(1970, 1, 1)
+
+    sec_per_day = 86400
+
+
+
+
+
+
+ # +
+

1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date

+
+
+
    excel_epoch = 25569
+
+
+
+
+
+
+ # +
+

Seconds since Epoch, times the seconds per day => days since Epoch?

+
+
+
    epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
+
+    excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
+    utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
+    utc_dttm_str = singer.utils.strftime(utc_dttm)
+    return utc_dttm_str
+
+
+
+
+
+
+ # +
+
+

WARNING This next function is confusing

+
+
+
+
+
+
+
+
+
+ # +
+

In general, the point of the function is to transform the field based on the data type that the +API tells us. It loops over every row and then every column in the row.

+
+
+
+
+
+
+
+
+
+ # +
+

For the TIME fields, there’s no reason it should work. And for some cases, the value returned is +just wrong.

+
+
+
+
+
+
+
+
+
+ # +
+

You can look at the code for timedelta and you would see that this constructor wants to +normalize the input of 6 units into 3 (you can create the object with years, months, days, +hours, minutes, and seconds. But it will convert values into just days, hours, and +seconds).

+
+
+
+
+
+
+
+
+
+ # +
+

Disclaimer I don’t have the exact units, but the spirit of +the idea is here.

+
+
+
+
+
+
+
+
+
+ # +
+

When we pass in seconds here as the value we get from the API times the number of seconds in a +day, how timedelta does its normalization gives us an incorrect value. It takes the input to +seconds and passes that to divmod() which returns a 2-ple as the result. The first element is +our input integer divided by the number of seconds in a day. The second element is our input mod +the number of seconds in a day. Then these results are added to the rest of the normalization and +we get the correct time value back out. It’s easy to imagine that since we don’t pass in a days +argument, our divmod‘s days output is just added to zero. The __str__() for timedelta must +be something like "{my_days} days, {time_since_midnight(my_seconds)}", which is essentially what +we get after this transform function.

+
+
+
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

add spreadsheet_id, sheet_id, and row, convert dates/times Convert from array of values to

+
+
+
def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
+
+
+
+
+
+
+ # +
+

JSON with column names as keys

+
+
+
    sheet_data_tf = []
+    row_num = from_row
+
+
+
+
+
+
+ # +
+

Create sorted list of columns based on columnIndex

+
+
+
    cols = sorted(columns, key=lambda i: i['columnIndex'])
+
+    for row in sheet_data_rows:
+
+
+
+
+
+
+ # +
+

If empty row, SKIP

+
+
+
        if row == []:
+            LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
+        else:
+            sheet_data_row_tf = {}
+
+
+
+
+
+
+ # +
+

Add spreadsheet_id, sheet_id, and row

+
+
+
            sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
+            sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
+            sheet_data_row_tf['__sdc_row'] = row_num
+            col_num = 1
+            for value in row:
+
+
+
+
+
+
+ # +
+

Select column metadata based on column index

+
+
+
                col = cols[col_num - 1]
+                col_skipped = col.get('columnSkipped')
+                if not col_skipped:
+
+
+
+
+
+
+ # +
+

Get column metadata

+
+
+
                    col_name = col.get('columnName')
+                    col_type = col.get('columnType')
+                    col_letter = col.get('columnLetter')
+
+
+
+
+
+
+ # +
+

NULL values

+
+
+
                    if value is None or value == '':
+                        col_val = None
+
+
+
+
+
+
+ # +
+

Convert dates/times from Lotus Notes Serial Numbers +DATE-TIME

+
+
+
                    elif col_type == 'numberType.DATE_TIME':
+                        if isinstance(value, (int, float)):
+                            col_val = excel_to_dttm_str(value)
+                        else:
+                            col_val = str(value)
+                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                sheet_title, col_name, col_letter, row_num, col_type, value))
+
+
+
+
+
+
+ # +
+

DATE

+
+
+
                    elif col_type == 'numberType.DATE':
+                        if isinstance(value, (int, float)):
+                            col_val = excel_to_dttm_str(value)[:10]
+                        else:
+                            col_val = str(value)
+                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                sheet_title, col_name, col_letter, row_num, col_type, value))
+
+
+
+
+
+
+ # +
+

TIME ONLY (NO DATE)

+
+
+
                    elif col_type == 'numberType.TIME':
+
+                        if isinstance(value, (int, float)):
+                            try:
+                                total_secs = value * 86400
+
+
+
+
+
+
+ # +
+

Create string formatted like HH:MM:SS

+
+
+
                                col_val = str(timedelta(seconds=total_secs))
+                            except ValueError:
+                                col_val = str(value)
+                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row_num, col_type, value))
+                        else:
+                            col_val = str(value)
+
+
+
+
+
+
+ # +
+

NUMBER (INTEGER AND FLOAT)

+
+
+
                    elif col_type == 'numberType':
+                        if isinstance(value, int):
+                            col_val = int(value)
+                        elif isinstance(value, float):
+
+
+
+
+
+
+ # +
+

Determine float decimal digits

+
+
+
                            decimal_digits = str(value)[::-1].find('.')
+                            if decimal_digits > 15:
+                                try:
+
+
+
+
+
+
+ # +
+

ROUND to multipleOf: 1e-15

+
+
+
                                    col_val = float(round(value, 15))
+                                except ValueError:
+                                    col_val = str(value)
+                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                        sheet_title, col_name, col_letter, row_num, col_type, value))
+                            else: # decimal_digits <= 15, no rounding
+                                try:
+                                    col_val = float(value)
+                                except ValueError:
+                                    col_val = str(value)
+                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                        sheet_title, col_name, col_letter, row_num, col_type, value))
+                        else:
+                            col_val = str(value)
+                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                   sheet_title, col_name, col_letter, row_num, col_type, value))
+
+
+
+
+
+
+ # +
+

STRING

+
+
+
                    elif col_type == 'stringValue':
+                        col_val = str(value)
+
+
+
+
+
+
+ # +
+

BOOLEAN

+
+
+
                    elif col_type == 'boolValue':
+                        if isinstance(value, bool):
+                            col_val = value
+                        elif isinstance(value, str):
+                            if value.lower() in ('true', 't', 'yes', 'y'):
+                                col_val = True
+                            elif value.lower() in ('false', 'f', 'no', 'n'):
+                                col_val = False
+                            else:
+                                col_val = str(value)
+                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row, col_type, value))
+                        elif isinstance(value, int):
+                            if value in (1, -1):
+                                col_val = True
+                            elif value == 0:
+                                col_val = False
+                            else:
+                                col_val = str(value)
+                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row, col_type, value))
+
+
+
+
+
+
+ # +
+

OTHER: Convert everything else to a string

+
+
+
                    else:
+                        col_val = str(value)
+                        LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                            sheet_title, col_name, col_letter, row, col_type, value))
+                    sheet_data_row_tf[col_name] = col_val
+                col_num = col_num + 1
+
+
+
+
+
+
+ # +
+

APPEND non-empty row

+
+
+
            sheet_data_tf.append(sheet_data_row_tf)
+        row_num = row_num + 1
+    return sheet_data_tf, row_num
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

Main Functions

+
+
+
+
+
+
+
+
+
+ # +
+
+
+
+
+
+
+
+
+
+
+ # +
+

Transform/validate batch of records w/ schema and sent to target

+
+
+
def process_records(catalog, stream_name, records, time_extracted, version=None):
+
+
+
+
+
+
+ # +
+ +
+
+
    stream = catalog.get_stream(stream_name)
+    schema = stream.schema.to_dict()
+    stream_metadata = metadata.to_map(stream.metadata)
+    with metrics.record_counter(stream_name) as counter:
+        for record in records:
+            with Transformer() as transformer:
+                try:
+                    transformed_record = transformer.transform(record, schema, stream_metadata)
+                except Exception as err:
+                    LOGGER.error('{}'.format(err))
+                    raise RuntimeError(err)
+                write_record(
+                    stream_name=stream_name,
+                    record=transformed_record,
+                    time_extracted=time_extracted,
+                    version=version)
+                counter.increment()
+        return counter.value
+
+
+
+
+
+
+ # +
+

This is just a pass-through to process_records()

+
+
+
def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
+
+
+
+
+
+
+ # +
+ +
+
+
    if stream_name in selected_streams:
+        LOGGER.info('STARTED Syncing {}'.format(stream_name))
+        update_currently_syncing(state, stream_name)
+        selected_fields = get_selected_fields(catalog, stream_name)
+        LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
+        write_schema(catalog, stream_name)
+        if not time_extracted:
+            time_extracted = utils.now()
+        record_count = process_records(
+            catalog=catalog,
+            stream_name=stream_name,
+            records=records,
+            time_extracted=time_extracted)
+        LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
+        update_currently_syncing(state, None)
+
+
+
+
+
+
+ # +
+

See top of file for notes

+
+
+
def sync(client, config, catalog, state):
+    start_date = config.get('start_date')
+    spreadsheet_id = config.get('spreadsheet_id')
+
+    last_stream = singer.get_currently_syncing(state)
+    LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
+
+    selected_streams = []
+    for stream in catalog.get_selected_streams(state):
+        selected_streams.append(stream.stream)
+    LOGGER.info('selected_streams: {}'.format(selected_streams))
+
+    if not selected_streams:
+        return
+
+
+
+
+
+
+ # +
+

FILE_METADATA

+
+
+
    file_metadata = {}
+    stream_name = 'file_metadata'
+    file_metadata_config = STREAMS.get(stream_name)
+
+
+
+
+
+
+ # +
+

GET file_metadata

+
+
+
    LOGGER.info('GET file_meatadata')
+    file_metadata, time_extracted = get_data(stream_name=stream_name,
+                                             endpoint_config=file_metadata_config,
+                                             client=client,
+                                             spreadsheet_id=spreadsheet_id)
+
+
+
+
+
+
+ # +
+

Transform file_metadata

+
+
+
    LOGGER.info('Transform file_meatadata')
+    file_metadata_tf = transform_file_metadata(file_metadata)
+
+
+
+
+
+
+ # +
+

Check if file has changed, if not exit

+
+
+
    last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
+    this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
+    LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
+    if this_datetime <= last_datetime:
+        LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
+        write_bookmark(state, 'file_metadata', strftime(this_datetime))
+        return
+
+
+
+
+
+
+ # +
+

Write file_metadata records if selected

+
+
+
    sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
+
+
+
+
+
+
+ # +
+

SPREADSHEET_METADATA

+
+
+
    spreadsheet_metadata = {}
+    stream_name = 'spreadsheet_metadata'
+    spreadsheet_metadata_config = STREAMS.get(stream_name)
+
+
+
+
+
+
+ # +
+

GET spreadsheet_metadata

+
+
+
    LOGGER.info('GET spreadsheet_meatadata')
+    spreadsheet_metadata, ss_time_extracted = get_data(
+        stream_name=stream_name,
+        endpoint_config=spreadsheet_metadata_config,
+        client=client,
+        spreadsheet_id=spreadsheet_id)
+
+
+
+
+
+
+ # +
+

Transform spreadsheet_metadata

+
+
+
    LOGGER.info('Transform spreadsheet_meatadata')
+    spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
+
+
+
+
+
+
+ # +
+

Write spreadsheet_metadata records if selected

+
+
+
    sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
+        ss_time_extracted)
+
+
+
+
+
+
+ # +
+

SHEET_METADATA and SHEET_DATA

+
+
+
    sheets = spreadsheet_metadata.get('sheets')
+    sheet_metadata = []
+    sheets_loaded = []
+    sheets_loaded_config = STREAMS['sheets_loaded']
+    if sheets:
+
+
+
+
+
+
+ # +
+

Loop thru sheets (worksheet tabs) in spreadsheet

+
+
+
        for sheet in sheets:
+            sheet_title = sheet.get('properties', {}).get('title')
+            sheet_id = sheet.get('properties', {}).get('sheetId')
+
+
+
+
+
+
+ # +
+

Sheet_Metadata

+

GET sheet_metadata and columns

+
+
+
            sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
+
+            if not sheet_schema or not columns:
+                LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
+            else:
+
+
+
+
+
+
+ # +
+

Transform sheet_metadata

+
+
+
                sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
+                sheet_metadata.append(sheet_metadata_tf)
+
+
+
+
+
+
+ # +
+

SHEET_DATA

+
+
+
                if sheet_title in selected_streams:
+                    LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
+                    update_currently_syncing(state, sheet_title)
+                    selected_fields = get_selected_fields(catalog, sheet_title)
+                    LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
+                    write_schema(catalog, sheet_title)
+
+
+
+
+
+
+ # +
+

Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) +everytime after each sheet sync is complete. +This forces hard deletes on the data downstream if fewer records are sent. +https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137

+
+
+
                    last_integer = int(get_bookmark(state, sheet_title, 0))
+                    activate_version = int(time.time() * 1000)
+                    activate_version_message = singer.ActivateVersionMessage(
+                            stream=sheet_title,
+                            version=activate_version)
+                    if last_integer == 0:
+
+
+
+
+
+
+ # +
+

initial load, send activate_version before AND after data sync

+
+
+
                        singer.write_message(activate_version_message)
+                        LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+
+
+
+
+
+
+ # +
+

Determine max range of columns and rows for “paging” through the data

+
+
+
                    sheet_last_col_index = 1
+                    sheet_last_col_letter = 'A'
+                    for col in columns:
+                        col_index = col.get('columnIndex')
+                        col_letter = col.get('columnLetter')
+                        if col_index > sheet_last_col_index:
+                            sheet_last_col_index = col_index
+                            sheet_last_col_letter = col_letter
+                    sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
+
+
+
+
+
+
+ # +
+

Initialize paging for 1st batch

+
+
+
                    is_last_row = False
+                    batch_rows = 200
+                    from_row = 2
+                    if sheet_max_row < batch_rows:
+                        to_row = sheet_max_row
+                    else:
+                        to_row = batch_rows
+
+
+
+
+
+
+ # +
+

Loop thru batches (each having 200 rows of data)

+
+
+
                    while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
+                        range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
+
+
+
+
+
+
+ # +
+

GET sheet_data for a worksheet tab

+
+
+
                        sheet_data, time_extracted = get_data(
+                            stream_name=sheet_title,
+                            endpoint_config=sheets_loaded_config,
+                            client=client,
+                            spreadsheet_id=spreadsheet_id,
+                            range_rows=range_rows)
+
+
+
+
+
+
+ # +
+

Data is returned as a list of arrays, an array of values for each row

+
+
+
                        sheet_data_rows = sheet_data.get('values', [])
+
+
+
+
+
+
+ # +
+

Transform batch of rows to JSON with keys for each column

+
+
+
                        sheet_data_tf, row_num = transform_sheet_data(
+                            spreadsheet_id=spreadsheet_id,
+                            sheet_id=sheet_id,
+                            sheet_title=sheet_title,
+                            from_row=from_row,
+                            columns=columns,
+                            sheet_data_rows=sheet_data_rows)
+                        if row_num < to_row:
+                            is_last_row = True
+
+
+
+
+
+
+ # +
+

Process records, send batch of records to target

+
+
+
                        record_count = process_records(
+                            catalog=catalog,
+                            stream_name=sheet_title,
+                            records=sheet_data_tf,
+                            time_extracted=ss_time_extracted,
+                            version=activate_version)
+                        LOGGER.info('Sheet: {}, records processed: {}'.format(
+                            sheet_title, record_count))
+
+
+
+
+
+
+ # +
+

Update paging from/to_row for next batch

+
+
+
                        from_row = to_row + 1
+                        if to_row + batch_rows > sheet_max_row:
+                            to_row = sheet_max_row
+                        else:
+                            to_row = to_row + batch_rows
+
+
+
+
+
+
+ # +
+

End of Stream: Send Activate Version and update State

+
+
+
                    singer.write_message(activate_version_message)
+                    write_bookmark(state, sheet_title, activate_version)
+                    LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+                    LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
+                        sheet_title, row_num - 2)) # subtract 1 for header row
+                    update_currently_syncing(state, None)
+
+
+
+
+
+
+ # +
+

SHEETS_LOADED +Add sheet to sheets_loaded

+
+
+
                    sheet_loaded = {}
+                    sheet_loaded['spreadsheetId'] = spreadsheet_id
+                    sheet_loaded['sheetId'] = sheet_id
+                    sheet_loaded['title'] = sheet_title
+                    sheet_loaded['loadDate'] = strftime(utils.now())
+                    sheet_loaded['lastRowNumber'] = row_num
+                    sheets_loaded.append(sheet_loaded)
+
+    stream_name = 'sheet_metadata'
+
+
+
+
+
+
+ # +
+

Write sheet_metadata records if selected

+
+
+
    sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
+
+    stream_name = 'sheets_loaded'
+
+
+
+
+
+
+ # +
+

Write sheet_metadata records if selected

+
+
+
    sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
+
+
+
+
+
+
+ # +
+

Update file_metadata bookmark

+
+
+
    write_bookmark(state, 'file_metadata', strftime(this_datetime))
+
+    return
+
+
+
+
+
+
+ -- cgit v1.2.3