]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/commitdiff
pylint and testing v0.0.1
authorJeff Huth <jeff.huth@bytecode.io>
Fri, 15 Nov 2019 09:58:55 +0000 (01:58 -0800)
committerJeff Huth <jeff.huth@bytecode.io>
Fri, 15 Nov 2019 09:58:55 +0000 (01:58 -0800)
pylint and testing

README.md
tap_google_sheets/client.py
tap_google_sheets/discover.py
tap_google_sheets/schema.py
tap_google_sheets/streams.py
tap_google_sheets/sync.py

index 5752b64f5e466120c135c7213e0a4511d7878ea2..8c9cc9dee6e502cecb2439823ffcdd0b97979e96 100644 (file)
--- a/README.md
+++ b/README.md
@@ -154,7 +154,7 @@ The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id=
     ```
     Pylint test resulted in the following score:
     ```bash
-    TBD
+    Your code has been rated at 9.78/10
     ```
 
     To [check the tap](https://github.com/singer-io/singer-tools#singer-check-tap) and verify working:
@@ -164,7 +164,31 @@ The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id=
     ```
     Check tap resulted in the following:
     ```bash
-    TBD
+    The output is valid.
+    It contained 3881 messages for 13 streams.
+
+        13 schema messages
+      3841 record messages
+        27 state messages
+
+    Details by stream:
+    +----------------------+---------+---------+
+    | stream               | records | schemas |
+    +----------------------+---------+---------+
+    | file_metadata        | 1       | 1       |
+    | spreadsheet_metadata | 1       | 1       |
+    | Test-1               | 9       | 1       |
+    | Test 2               | 2       | 1       |
+    | SKU COGS             | 218     | 1       |
+    | Item Master          | 216     | 1       |
+    | Retail Price         | 273     | 1       |
+    | Retail Price NEW     | 284     | 1       |
+    | Forecast Scenarios   | 2681    | 1       |
+    | Promo Type           | 91      | 1       |
+    | Shipping Method      | 47      | 1       |
+    | sheet_metadata       | 9       | 1       |
+    | sheets_loaded        | 9       | 1       |
+    +----------------------+---------+---------+
     ```
 ---
 
index 0a0ce5a6a09c6b128ea802dc3d790e70a0e4db61..4f38352ed91dcf08c8da73c502cb55b40b9fa919 100644 (file)
@@ -1,8 +1,7 @@
 from datetime import datetime, timedelta
+from collections import OrderedDict
 import backoff
 import requests
-from collections import OrderedDict
-
 import singer
 from singer import metrics
 from singer import utils
@@ -123,8 +122,7 @@ def raise_for_error(response):
                 error_code = response.get('error', {}).get('code')
                 ex = get_exception_for_error_code(error_code)
                 raise ex(message)
-            else:
-                raise GoogleError(error)
+            raise GoogleError(error)
         except (ValueError, TypeError):
             raise GoogleError(error)
 
@@ -196,9 +194,7 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes
                           factor=3)
     @utils.ratelimit(100, 100)
     def request(self, method, path=None, url=None, api=None, **kwargs):
-
         self.get_access_token()
-        
         self.base_url = 'https://sheets.googleapis.com/v4'
         if api == 'files':
             self.base_url = 'https://www.googleapis.com/drive/v3'
index 6477a5f92e99756a5e2731b6f5dc805abc0b8478..6cf0d09b3f0c1f65bd077cd825c38e5d0a2963f8 100644 (file)
@@ -10,11 +10,11 @@ def discover(client, spreadsheet_id):
         schema = Schema.from_dict(schema_dict)
         mdata = field_metadata[stream_name]
         key_properties = None
-        for md in mdata:
-            table_key_properties = md.get('metadata', {}).get('table-key-properties')
+        for mdt in mdata:
+            table_key_properties = mdt.get('metadata', {}).get('table-key-properties')
             if table_key_properties:
                 key_properties = table_key_properties
-    
+
         catalog.streams.append(CatalogEntry(
             stream=stream_name,
             tap_stream_id=stream_name,
index 237ab06f9cea44deaa3225a75d286e8852615652..d4fead52271566336799f5f851e29bdca9a340c0 100644 (file)
@@ -11,19 +11,19 @@ LOGGER = singer.get_logger()
 # https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata
 
 # Convert column index to column letter
-def colnum_string(n):
+def colnum_string(num):
     string = ""
-    while n > 0:
-        n, remainder = divmod(n - 1, 26)
+    while num > 0:
+        num, remainder = divmod(num - 1, 26)
         string = chr(65 + remainder) + string
     return string
 
 
 # Create sheet_metadata_json with columns from sheet
-def get_sheet_schema_columns(sheet, spreadsheet_id, client):
+def get_sheet_schema_columns(sheet):
     sheet_json_schema = OrderedDict()
     data = next(iter(sheet.get('data', [])), {})
-    row_data = data.get('rowData',[])
+    row_data = data.get('rowData', [])
     # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse
 
     headers = row_data[0].get('values', [])
@@ -65,33 +65,32 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client):
             column_name = '{}'.format(header_value)
             if column_name in header_list:
                 raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name))
-            else:
-                header_list.append(column_name)
+            header_list.append(column_name)
 
             first_value = first_values[i]
-            # LOGGER.info('first_value[{}] = {}'.format(i, json.dumps(first_value, indent=2, sort_keys=True)))
 
             column_effective_value = first_value.get('effectiveValue', {})
             for key in column_effective_value.keys():
                 if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'):
                     column_effective_value_type = key
 
-            column_number_format = first_values[i].get('effectiveFormat', {}).get('numberFormat', {})
+            column_number_format = first_values[i].get('effectiveFormat', {}).get(
+                'numberFormat', {})
             column_number_format_type = column_number_format.get('type')
 
             # Determine datatype for sheet_json_schema
             #
-            # column_effective_value_type = numberValue, stringValue, boolValue; INVALID: errorType, formulaType
-            #   Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
+            # column_effective_value_type = numberValue, stringValue, boolValue;
+            #  INVALID: errorType, formulaType
+            #  https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
             #
-            # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC
-            #   Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
+            # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE,
+            #   TIME, DATE_TIME, SCIENTIFIC
+            #  https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
             #
             column_format = None # Default
             # column_multiple_of = None # Default
-            if column_effective_value_type in ('formulaValue', 'errorValue'):
-                raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name))
-            elif column_effective_value_type == 'stringValue':
+            if column_effective_value_type == 'stringValue':
                 column_type = ['null', 'string']
                 column_gs_type = 'stringValue'
             elif column_effective_value_type == 'boolValue':
@@ -116,7 +115,9 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client):
                 else:
                     column_type = ['null', 'number', 'string']
                     column_gs_type = 'numberType'
-
+            elif column_effective_value_type in ('formulaValue', 'errorValue'):
+                raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name, \
+                    column_effective_value_type))
         else: # skipped
             column_is_skipped = True
             skipped = skipped + 1
@@ -130,7 +131,6 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client):
             # skipped = 2 consecutive skipped headers
             # Remove prior_header column_name
             sheet_json_schema['properties'].pop(prior_header, None)
-            column_count = i - 1
             break
 
         else:
@@ -164,12 +164,14 @@ def get_sheet_metadata(sheet, spreadsheet_id, client):
     stream_metadata = STREAMS.get(stream_name)
     api = stream_metadata.get('api', 'sheets')
     params = stream_metadata.get('params', {})
-    querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace('{sheet_title}', sheet_title)
-    path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring)
+    querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in \
+        params.items()]).replace('{sheet_title}', sheet_title)
+    path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \
+        spreadsheet_id), querystring)
 
     sheet_md_results = client.get(path=path, api=api, endpoint=stream_name)
     sheet_cols = sheet_md_results.get('sheets')[0]
-    sheet_schema, columns = get_sheet_schema_columns(sheet_cols, spreadsheet_id, client)
+    sheet_schema, columns = get_sheet_schema_columns(sheet_cols)
 
     return sheet_schema, columns
 
@@ -199,20 +201,22 @@ def get_schemas(client, spreadsheet_id):
             replication_method=stream_metadata.get('replication_method', None)
         )
         field_metadata[stream_name] = mdata
-        
+
         if stream_name == 'spreadsheet_metadata':
             api = stream_metadata.get('api', 'sheets')
             params = stream_metadata.get('params', {})
             querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()])
-            path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring)
+            path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \
+                spreadsheet_id), querystring)
 
-            spreadsheet_md_results = client.get(path=path, params=querystring, api=api, endpoint=stream_name)
+            spreadsheet_md_results = client.get(path=path, params=querystring, api=api, \
+                endpoint=stream_name)
 
             sheets = spreadsheet_md_results.get('sheets')
             if sheets:
                 for sheet in sheets:
                     sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
-                    # LOGGER.info('sheet_schema = {}'.format(json.dumps(sheet_schema, indent=2, sort_keys=True)))
+                    LOGGER.info('columns = {}'.format(columns))
 
                     sheet_title = sheet.get('properties', {}).get('title')
                     schemas[sheet_title] = sheet_schema
@@ -224,5 +228,5 @@ def get_schemas(client, spreadsheet_id):
                         replication_method='FULL_TABLE'
                     )
                     field_metadata[sheet_title] = sheet_mdata
-            
+
     return schemas, field_metadata
index 231a41d0cde60e8915136e7f79bbf44b1454201d..b8e3eff859457f7fe1061cc5c363050fe8551848 100644 (file)
@@ -8,11 +8,10 @@ from collections import OrderedDict
 #   key_properties: Primary key fields for identifying an endpoint record.
 #   replication_method: INCREMENTAL or FULL_TABLE
 #   replication_keys: bookmark_field(s), typically a date-time, used for filtering the results
-#        and setting the state
+#       and setting the state
 #   params: Query, sort, and other endpoint specific parameters; default = {}
-#   data_key: JSON element containing the results list for the endpoint; default = root (no data_key)
-#   bookmark_query_field: From date-time field used for filtering the query
-#   bookmark_type: Data type for bookmark, integer or datetime
+#   data_key: JSON element containing the results list for the endpoint;
+#       default = root (no data_key)
 
 FILE_METADATA = {
     "api": "files",
index 79e05f9929ce33bd038cac086dea2772f2306d4a..d7d71846999bd2af4037a58d5b14ebf0a5387d52 100644 (file)
@@ -1,10 +1,9 @@
 import time
 import math
-import singer
 import json
-import pytz
 from datetime import datetime, timedelta
-from collections import OrderedDict
+import pytz
+import singer
 from singer import metrics, metadata, Transformer, utils
 from singer.utils import strptime_to_utc, strftime
 from tap_google_sheets.streams import STREAMS
@@ -71,17 +70,21 @@ def process_records(catalog,
         return counter.value
 
 
-def sync_stream(stream_name, selected_streams, catalog, state, records):
+def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
     # Should sheets_loaded be synced?
     if stream_name in selected_streams:
         LOGGER.info('STARTED Syncing {}'.format(stream_name))
         update_currently_syncing(state, stream_name)
+        selected_fields = get_selected_fields(catalog, stream_name)
+        LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
         write_schema(catalog, stream_name)
+        if not time_extracted:
+            time_extracted = utils.now()
         record_count = process_records(
             catalog=catalog,
             stream_name=stream_name,
             records=records,
-            time_extracted=utils.now())
+            time_extracted=time_extracted)
         LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
         update_currently_syncing(state, None)
 
@@ -105,9 +108,9 @@ def get_selected_fields(catalog, stream_name):
     mdata_list = singer.metadata.to_list(mdata)
     selected_fields = []
     for entry in mdata_list:
-        field =  None
+        field = None
         try:
-            field =  entry['breadcrumb'][1]
+            field = entry['breadcrumb'][1]
             if entry.get('metadata', {}).get('selected', False):
                 selected_fields.append(field)
         except IndexError:
@@ -172,7 +175,7 @@ def transform_spreadsheet_metadata(spreadsheet_metadata):
 def transform_sheet_metadata(spreadsheet_id, sheet, columns):
     # Convert to properties to dict
     sheet_metadata = sheet.get('properties')
-    sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) 
+    sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
     sheet_id = sheet_metadata_tf.get('sheetId')
     sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
         spreadsheet_id, sheet_id)
@@ -187,13 +190,13 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns):
 def excel_to_dttm_str(excel_date_sn, timezone_str=None):
     if not timezone_str:
         timezone_str = 'UTC'
-    tz = pytz.timezone(timezone_str)
+    tzn = pytz.timezone(timezone_str)
     sec_per_day = 86400
     excel_epoch = 25569 # 1970-01-01T00:00:00Z
     epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
     epoch_dttm = datetime(1970, 1, 1)
     excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
-    utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc)
+    utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
     utc_dttm_str = strftime(utc_dttm)
     return utc_dttm_str
 
@@ -205,7 +208,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
     is_last_row = False
     row_num = from_row
     # Create sorted list of columns based on columnIndex
-    cols = sorted(columns, key = lambda i: i['columnIndex'])
+    cols = sorted(columns, key=lambda i: i['columnIndex'])
 
     # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
     for row in sheet_data_rows:
@@ -228,17 +231,17 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
                 col_type = col.get('columnType')
                 # Convert dates/times from Lotus Notes Serial Numbers
                 if col_type == 'numberType.DATE_TIME':
-                    if isinstance(value, int) or isinstance(value, float):
+                    if isinstance(value, (int, float)):
                         col_val = excel_to_dttm_str(value)
                     else:
                         col_val = str(value)
                 elif col_type == 'numberType.DATE':
-                    if isinstance(value, int) or isinstance(value, float):
+                    if isinstance(value, (int, float)):
                         col_val = excel_to_dttm_str(value)[:10]
                     else:
                         col_val = str(value)
                 elif col_type == 'numberType.TIME':
-                    if isinstance(value, int) or isinstance(value, float):
+                    if isinstance(value, (int, float)):
                         try:
                             total_secs = value * 86400 # seconds in day
                             col_val = str(timedelta(seconds=total_secs))
@@ -267,7 +270,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data
                         else:
                             col_val = str(value)
                     elif isinstance(value, int):
-                        if value == 1 or value == -1:
+                        if value in (1, -1):
                             col_val = True
                         elif value == 0:
                             col_val = False
@@ -321,11 +324,10 @@ def sync(client, config, catalog, state):
     LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
     if this_datetime <= last_datetime:
         LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
-        return 0
-    else:
-        # Sync file_metadata if selected
-        sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf)
-        write_bookmark(state, stream_name, strftime(this_datetime))
+        return
+    # Sync file_metadata if selected
+    sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
+    write_bookmark(state, stream_name, strftime(this_datetime))
 
     # SPREADSHEET_METADATA
     spreadsheet_metadata = {}
@@ -334,7 +336,7 @@ def sync(client, config, catalog, state):
 
     # GET spreadsheet_metadata
     LOGGER.info('GET spreadsheet_meatadata')
-    spreadsheet_metadata, ss_time_extracted  = get_data(
+    spreadsheet_metadata, ss_time_extracted = get_data(
         stream_name=stream_name,
         endpoint_config=spreadsheet_metadata_config,
         client=client,
@@ -345,7 +347,8 @@ def sync(client, config, catalog, state):
     spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
 
     # Sync spreadsheet_metadata if selected
-    sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf)
+    sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
+        ss_time_extracted)
 
     # SHEET_METADATA and SHEET_DATA
     sheets = spreadsheet_metadata.get('sheets')
@@ -360,6 +363,7 @@ def sync(client, config, catalog, state):
 
             # GET sheet_metadata and columns
             sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
+            LOGGER.info('sheet_schema: {}'.format(sheet_schema))
 
             # Transform sheet_metadata
             sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
@@ -371,6 +375,8 @@ def sync(client, config, catalog, state):
             if sheet_title in selected_streams:
                 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
                 update_currently_syncing(state, sheet_title)
+                selected_fields = get_selected_fields(catalog, sheet_title)
+                LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
                 write_schema(catalog, sheet_title)
 
                 # Determine max range of columns and rows for "paging" through the data
@@ -396,7 +402,7 @@ def sync(client, config, catalog, state):
                 # Loop thru batches (each having 200 rows of data)
                 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
                     range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
-                    
+
                     # GET sheet_data for a worksheet tab
                     sheet_data, time_extracted = get_data(
                         stream_name=sheet_title,
@@ -423,7 +429,9 @@ def sync(client, config, catalog, state):
                         stream_name=sheet_title,
                         records=sheet_data_tf,
                         time_extracted=ss_time_extracted)
-                    
+                    LOGGER.info('Sheet: {}, ecords processed: {}'.format(
+                        sheet_title, record_count))
+
                     # Update paging from/to_row for next batch
                     from_row = to_row + 1
                     if to_row + batch_rows > sheet_max_row:
@@ -442,8 +450,8 @@ def sync(client, config, catalog, state):
                 sheets_loaded.append(sheet_loaded)
 
                 # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
-                # This forces hard deletes on the data downstream if fewer records are sent for a sheet.
-                # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167
+                # This forces hard deletes on the data downstream if fewer records are sent.
+                # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
                 activate_version_message = singer.ActivateVersionMessage(
                     stream=sheet_title,
                     version=int(time.time() * 1000))
@@ -451,12 +459,13 @@ def sync(client, config, catalog, state):
 
                 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
                     sheet_title, row_num - 1))
-                update_currently_syncing(state, None)
 
     stream_name = 'sheet_metadata'
     # Sync sheet_metadata if selected
     sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
-    
+
     stream_name = 'sheets_loaded'
     # Sync sheet_metadata if selected
     sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
+
+    return