]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/commitdiff
v.0.0.2 schema and sync changes (#1) v0.0.2
authorJeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com>
Wed, 4 Dec 2019 14:10:46 +0000 (06:10 -0800)
committerKyle Allan <KAllan357@gmail.com>
Wed, 4 Dec 2019 14:10:46 +0000 (09:10 -0500)
Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py

CHANGELOG.md
setup.py
tap_google_sheets/schema.py
tap_google_sheets/streams.py
tap_google_sheets/sync.py

index d58f396a668aab8f8193ceed3dfe762cc8bbec4c..e5d65608d670c0071c62b134de9767b4665eb6aa 100644 (file)
@@ -1,4 +1,7 @@
 # Changelog
 
+## 0.0.2
+  * Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py
+
 ## 0.0.1
   * Initial commit
index e3c4f3e0a1bbf62542e53cbaeb14e49266fb9bca..6fe24934bc6672596302e0e6b873898244a9d360 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -3,7 +3,7 @@
 from setuptools import setup, find_packages
 
 setup(name='tap-google-sheets',
-      version='0.0.1',
+      version='0.0.2',
       description='Singer.io tap for extracting data from the Google Sheets v4 API',
       author='jeff.huth@bytecode.io',
       classifiers=['Programming Language :: Python :: 3 :: Only'],
index d4fead52271566336799f5f851e29bdca9a340c0..243467b6c71c7e78efa224eed66c8967664dd2c8 100644 (file)
@@ -30,8 +30,6 @@ def get_sheet_schema_columns(sheet):
     first_values = row_data[1].get('values', [])
     # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True)))
 
-    sheet_json_schema['type'] = 'object'
-    sheet_json_schema['additionalProperties'] = False
     sheet_json_schema = {
         'type': 'object',
         'additionalProperties': False,
@@ -89,42 +87,66 @@ def get_sheet_schema_columns(sheet):
             #  https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
             #
             column_format = None # Default
-            # column_multiple_of = None # Default
             if column_effective_value_type == 'stringValue':
-                column_type = ['null', 'string']
+                col_properties = {'type': ['null', 'string']}
                 column_gs_type = 'stringValue'
             elif column_effective_value_type == 'boolValue':
-                column_type = ['null', 'boolean', 'string']
+                col_properties = {'type': ['null', 'boolean', 'string']}
                 column_gs_type = 'boolValue'
             elif column_effective_value_type == 'numberValue':
                 if column_number_format_type == 'DATE_TIME':
-                    column_type = ['null', 'string']
-                    column_format = 'date-time'
+                    col_properties = {
+                        'type': ['null', 'string'],
+                        'format': 'date-time'
+                    }
                     column_gs_type = 'numberType.DATE_TIME'
                 elif column_number_format_type == 'DATE':
-                    column_type = ['null', 'string']
-                    column_format = 'date'
+                    col_properties = {
+                        'type': ['null', 'string'],
+                        'format': 'date'
+                    }
                     column_gs_type = 'numberType.DATE'
                 elif column_number_format_type == 'TIME':
-                    column_type = ['null', 'string']
-                    column_format = 'time'
+                    col_properties = {
+                        'type': ['null', 'string'],
+                        'format': 'time'
+                    }
                     column_gs_type = 'numberType.TIME'
                 elif column_number_format_type == 'TEXT':
-                    column_type = ['null', 'string']
+                    col_properties = {'type': ['null', 'string']}
                     column_gs_type = 'stringValue'
                 else:
-                    column_type = ['null', 'number', 'string']
+                    # Interesting - order in the anyOf makes a difference.
+                    # Number w/ multipleOf must be listed last, otherwise errors occur.
+                    col_properties =  {
+                        'anyOf': [
+                            {
+                                'type': 'string'
+                            },
+                            {
+                                'type': 'null'
+                            },
+                            {
+                                'type': 'number',
+                                'multipleOf': 1e-15
+                            }
+                        ]
+                    }
                     column_gs_type = 'numberType'
-            elif column_effective_value_type in ('formulaValue', 'errorValue'):
-                raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name, \
+            # Catch-all to deal with other types and set to string
+            # column_effective_value_type: formulaValue, errorValue, or other
+            else:
+                col_properties = {'type': ['null', 'string']}
+                column_gs_type = 'unsupportedValue'
+                LOGGER.info('Unsupported data type: {}, value: {}'.format(column_name, \
                     column_effective_value_type))
+                LOGGER.info('Converting to string.')
         else: # skipped
             column_is_skipped = True
             skipped = skipped + 1
             column_index_str = str(column_index).zfill(2)
             column_name = '__sdc_skip_col_{}'.format(column_index_str)
-            column_type = ['null', 'string']
-            column_format = None
+            col_properties = {'type': ['null', 'string']}
             column_gs_type = 'stringValue'
 
         if skipped >= 2:
@@ -144,10 +166,7 @@ def get_sheet_schema_columns(sheet):
             }
             columns.append(column)
 
-            sheet_json_schema['properties'][column_name] = column
-            sheet_json_schema['properties'][column_name]['type'] = column_type
-            if column_format:
-                sheet_json_schema['properties'][column_name]['format'] = column_format
+            sheet_json_schema['properties'][column_name] = col_properties
 
         prior_header = column_name
         i = i + 1
@@ -155,6 +174,10 @@ def get_sheet_schema_columns(sheet):
     return sheet_json_schema, columns
 
 
+# Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query
+#   endpoint: spreadsheets/{spreadsheet_id}
+#   params: includeGridData = true, ranges = '{sheet_title}'!1:2
+# This endpoint includes detailed metadata about each cell - incl. data type, formatting, etc.
 def get_sheet_metadata(sheet, spreadsheet_id, client):
     sheet_id = sheet.get('properties', {}).get('sheetId')
     sheet_title = sheet.get('properties', {}).get('title')
@@ -170,10 +193,13 @@ def get_sheet_metadata(sheet, spreadsheet_id, client):
         spreadsheet_id), querystring)
 
     sheet_md_results = client.get(path=path, api=api, endpoint=stream_name)
-    sheet_cols = sheet_md_results.get('sheets')[0]
-    sheet_schema, columns = get_sheet_schema_columns(sheet_cols)
+    # sheet_metadata: 1st `sheets` node in results
+    sheet_metadata = sheet_md_results.get('sheets')[0]
 
-    return sheet_schema, columns
+    # Create sheet_json_schema (for discovery/catalog) and columns (for sheet_metadata results)
+    sheet_json_schema, columns = get_sheet_schema_columns(sheet_metadata)
+
+    return sheet_json_schema, columns
 
 
 def get_abs_path(path):
@@ -209,20 +235,23 @@ def get_schemas(client, spreadsheet_id):
             path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \
                 spreadsheet_id), querystring)
 
+            # GET spreadsheet_metadata, which incl. sheets (basic metadata for each worksheet)
             spreadsheet_md_results = client.get(path=path, params=querystring, api=api, \
                 endpoint=stream_name)
 
             sheets = spreadsheet_md_results.get('sheets')
             if sheets:
+                # Loop thru each worksheet in spreadsheet
                 for sheet in sheets:
-                    sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
+                    # GET sheet_json_schema for each worksheet (from function above)
+                    sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
                     LOGGER.info('columns = {}'.format(columns))
 
                     sheet_title = sheet.get('properties', {}).get('title')
-                    schemas[sheet_title] = sheet_schema
+                    schemas[sheet_title] = sheet_json_schema
                     sheet_mdata = metadata.new()
                     sheet_mdata = metadata.get_standard_metadata(
-                        schema=sheet_schema,
+                        schema=sheet_json_schema,
                         key_properties=['__sdc_row'],
                         valid_replication_keys=None,
                         replication_method='FULL_TABLE'
index b8e3eff859457f7fe1061cc5c363050fe8551848..ad5529f4ac3defef3ae215cfb7c1cf701e98137c 100644 (file)
@@ -13,6 +13,8 @@ from collections import OrderedDict
 #   data_key: JSON element containing the results list for the endpoint;
 #       default = root (no data_key)
 
+# file_metadata: Queries Google Drive API to get file information and see if file has been modified
+#    Provides audit info about who and when last changed the file.
 FILE_METADATA = {
     "api": "files",
     "path": "files/{spreadsheet_id}",
@@ -24,6 +26,7 @@ FILE_METADATA = {
     }
 }
 
+# spreadsheet_metadata: Queries spreadsheet to get basic information on spreadhsheet and sheets
 SPREADSHEET_METADATA = {
     "api": "sheets",
     "path": "spreadsheets/{spreadsheet_id}",
@@ -34,6 +37,9 @@ SPREADSHEET_METADATA = {
     }
 }
 
+# sheet_metadata: Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet.
+# This endpoint includes detailed metadata about each cell in the header and first data row
+#   incl. data type, formatting, etc.
 SHEET_METADATA = {
     "api": "sheets",
     "path": "spreadsheets/{spreadsheet_id}",
@@ -45,6 +51,8 @@ SHEET_METADATA = {
     }
 }
 
+# sheets_loaded: Queries a batch of Rows for each Sheet in the Spreadsheet.
+# Each query uses the `values` endpoint, to get data-only, w/out the formatting/type metadata.
 SHEETS_LOADED = {
     "api": "sheets",
     "path": "spreadsheets/{spreadsheet_id}/values/'{sheet_title}'!{range_rows}",
@@ -58,7 +66,7 @@ SHEETS_LOADED = {
     }
 }
 
-# Ensure streams are ordered logically
+# Ensure streams are ordered sequentially, logically.
 STREAMS = OrderedDict()
 STREAMS['file_metadata'] = FILE_METADATA
 STREAMS['spreadsheet_metadata'] = SPREADSHEET_METADATA
index d7d71846999bd2af4037a58d5b14ebf0a5387d52..76b2e593c071f59e1698ee72e684021ae5626b5a 100644 (file)
@@ -125,11 +125,14 @@ def get_data(stream_name,
              range_rows=None):
     if not range_rows:
         range_rows = ''
+    # Replace {placeholder} variables in path
     path = endpoint_config.get('path', stream_name).replace(
         '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
             '{range_rows}', range_rows)
     params = endpoint_config.get('params', {})
     api = endpoint_config.get('api', 'sheets')
+    # Add in querystring parameters and replace {placeholder} variables
+    # querystring function ensures parameters are added but not encoded causing API errors
     querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
         '{sheet_title}', stream_name)
     data = {}
@@ -192,7 +195,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None):
         timezone_str = 'UTC'
     tzn = pytz.timezone(timezone_str)
     sec_per_day = 86400
-    excel_epoch = 25569 # 1970-01-01T00:00:00Z
+    excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
     epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
     epoch_dttm = datetime(1970, 1, 1)
     excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
@@ -205,85 +208,103 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None):
 #  Convert from array of values to JSON with column names as keys
 def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
     sheet_data_tf = []
-    is_last_row = False
     row_num = from_row
     # Create sorted list of columns based on columnIndex
     cols = sorted(columns, key=lambda i: i['columnIndex'])
 
     # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
     for row in sheet_data_rows:
-        # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
+        # If empty row, SKIP
         if row == []:
-            is_last_row = True
-            return sheet_data_tf, row_num - 1, is_last_row
-        sheet_data_row_tf = {}
-        # Add spreadsheet_id, sheet_id, and row
-        sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
-        sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
-        sheet_data_row_tf['__sdc_row'] = row_num
-        col_num = 1
-        for value in row:
-            # Select column metadata based on column index
-            col = cols[col_num - 1]
-            col_skipped = col.get('columnSkipped')
-            if not col_skipped:
-                col_name = col.get('columnName')
-                col_type = col.get('columnType')
-                # Convert dates/times from Lotus Notes Serial Numbers
-                if col_type == 'numberType.DATE_TIME':
-                    if isinstance(value, (int, float)):
-                        col_val = excel_to_dttm_str(value)
-                    else:
-                        col_val = str(value)
-                elif col_type == 'numberType.DATE':
-                    if isinstance(value, (int, float)):
-                        col_val = excel_to_dttm_str(value)[:10]
-                    else:
-                        col_val = str(value)
-                elif col_type == 'numberType.TIME':
-                    if isinstance(value, (int, float)):
-                        try:
-                            total_secs = value * 86400 # seconds in day
-                            col_val = str(timedelta(seconds=total_secs))
-                        except ValueError:
+            LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
+        else:
+            sheet_data_row_tf = {}
+            # Add spreadsheet_id, sheet_id, and row
+            sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
+            sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
+            sheet_data_row_tf['__sdc_row'] = row_num
+            col_num = 1
+            for value in row:
+                # Select column metadata based on column index
+                col = cols[col_num - 1]
+                col_skipped = col.get('columnSkipped')
+                if not col_skipped:
+                    col_name = col.get('columnName')
+                    col_type = col.get('columnType')
+                    # Convert dates/times from Lotus Notes Serial Numbers
+                    # DATE-TIME
+                    if col_type == 'numberType.DATE_TIME':
+                        if isinstance(value, (int, float)):
+                            col_val = excel_to_dttm_str(value)
+                        else:
                             col_val = str(value)
-                    else:
-                        col_val = str(value)
-                elif col_type == 'numberType':
-                    if isinstance(value, int):
-                        col_val = int(value)
-                    else:
-                        try:
-                            col_val = float(value)
-                        except ValueError:
+                    # DATE
+                    elif col_type == 'numberType.DATE':
+                        if isinstance(value, (int, float)):
+                            col_val = excel_to_dttm_str(value)[:10]
+                        else:
                             col_val = str(value)
-                elif col_type == 'stringValue':
-                    col_val = str(value)
-                elif col_type == 'boolValue':
-                    if isinstance(value, bool):
-                        col_val = value
-                    elif isinstance(value, str):
-                        if value.lower() in ('true', 't', 'yes', 'y'):
-                            col_val = True
-                        elif value.lower() in ('false', 'f', 'no', 'n'):
-                            col_val = False
+                    # TIME ONLY (NO DATE)
+                    elif col_type == 'numberType.TIME':
+                        if isinstance(value, (int, float)):
+                            try:
+                                total_secs = value * 86400 # seconds in day
+                                # Create string formatted like HH:MM:SS
+                                col_val = str(timedelta(seconds=total_secs))
+                            except ValueError:
+                                col_val = str(value)
                         else:
                             col_val = str(value)
-                    elif isinstance(value, int):
-                        if value in (1, -1):
-                            col_val = True
-                        elif value == 0:
-                            col_val = False
+                    # NUMBER (INTEGER AND FLOAT)
+                    elif col_type == 'numberType':
+                        if isinstance(value, int):
+                            col_val = int(value)
+                        elif isinstance(value, float):
+                            # Determine float decimal digits
+                            decimal_digits = str(value)[::-1].find('.')
+                            if decimal_digits > 15:
+                                try:
+                                    # ROUND to multipleOf: 1e-15
+                                    col_val = float(round(value, 15))
+                                except ValueError:
+                                    col_val = str(value)
+                            else: # decimal_digits <= 15, no rounding
+                                try:
+                                    col_val = float(value)
+                                except ValueError:
+                                    col_val = str(value)
                         else:
                             col_val = str(value)
-
-                else:
-                    col_val = value
-                sheet_data_row_tf[col_name] = col_val
-            col_num = col_num + 1
-        sheet_data_tf.append(sheet_data_row_tf)
+                    # STRING
+                    elif col_type == 'stringValue':
+                        col_val = str(value)
+                    # BOOLEAN
+                    elif col_type == 'boolValue':
+                        if isinstance(value, bool):
+                            col_val = value
+                        elif isinstance(value, str):
+                            if value.lower() in ('true', 't', 'yes', 'y'):
+                                col_val = True
+                            elif value.lower() in ('false', 'f', 'no', 'n'):
+                                col_val = False
+                            else:
+                                col_val = str(value)
+                        elif isinstance(value, int):
+                            if value in (1, -1):
+                                col_val = True
+                            elif value == 0:
+                                col_val = False
+                            else:
+                                col_val = str(value)
+                    # OTHER: Convert everything else to a string
+                    else:
+                        col_val = str(value)
+                    sheet_data_row_tf[col_name] = col_val
+                col_num = col_num + 1
+            # APPEND non-empty row
+            sheet_data_tf.append(sheet_data_row_tf)
         row_num = row_num + 1
-    return sheet_data_tf, row_num, is_last_row
+    return sheet_data_tf, row_num
 
 
 def sync(client, config, catalog, state):
@@ -327,7 +348,7 @@ def sync(client, config, catalog, state):
         return
     # Sync file_metadata if selected
     sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
-    write_bookmark(state, stream_name, strftime(this_datetime))
+    # file_metadata bookmark is updated at the end of sync
 
     # SPREADSHEET_METADATA
     spreadsheet_metadata = {}
@@ -363,7 +384,7 @@ def sync(client, config, catalog, state):
 
             # GET sheet_metadata and columns
             sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
-            LOGGER.info('sheet_schema: {}'.format(sheet_schema))
+            LOGGER.info('sheet_schema: {}'.format(sheet_schema))
 
             # Transform sheet_metadata
             sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
@@ -414,7 +435,7 @@ def sync(client, config, catalog, state):
                     sheet_data_rows = sheet_data.get('values')
 
                     # Transform batch of rows to JSON with keys for each column
-                    sheet_data_tf, row_num, is_last_row = transform_sheet_data(
+                    sheet_data_tf, row_num = transform_sheet_data(
                         spreadsheet_id=spreadsheet_id,
                         sheet_id=sheet_id,
                         from_row=from_row,
@@ -429,7 +450,7 @@ def sync(client, config, catalog, state):
                         stream_name=sheet_title,
                         records=sheet_data_tf,
                         time_extracted=ss_time_extracted)
-                    LOGGER.info('Sheet: {}, ecords processed: {}'.format(
+                    LOGGER.info('Sheet: {}, records processed: {}'.format(
                         sheet_title, record_count))
 
                     # Update paging from/to_row for next batch
@@ -458,7 +479,7 @@ def sync(client, config, catalog, state):
                 singer.write_message(activate_version_message)
 
                 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
-                    sheet_title, row_num - 1))
+                    sheet_title, row_num - 2)) # subtract 1 for header row
 
     stream_name = 'sheet_metadata'
     # Sync sheet_metadata if selected
@@ -468,4 +489,7 @@ def sync(client, config, catalog, state):
     # Sync sheet_metadata if selected
     sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
 
+    # Update file_metadata bookmark
+    write_bookmark(state, 'file_metadata', strftime(this_datetime))
+
     return