]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/commitdiff
client.py rate limit, sync.py changes
authorJeff Huth <jeff.huth@bytecode.io>
Fri, 15 Nov 2019 08:49:39 +0000 (00:49 -0800)
committerJeff Huth <jeff.huth@bytecode.io>
Fri, 15 Nov 2019 08:49:39 +0000 (00:49 -0800)
client.py rate limit, fix json schemas, sync.py many changes

tap_google_sheets/client.py
tap_google_sheets/schemas/file_metadata.json
tap_google_sheets/schemas/sheets_loaded.json
tap_google_sheets/sync.py

index 12f081115dd2a7f907f63f0c63044cca4ed5c3a6..0a0ce5a6a09c6b128ea802dc3d790e70a0e4db61 100644 (file)
@@ -151,6 +151,7 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes
     def __exit__(self, exception_type, exception_value, traceback):
         self.__session.close()
 
+
     @backoff.on_exception(backoff.expo,
                           Server5xxError,
                           max_tries=5,
@@ -187,13 +188,13 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes
         LOGGER.info('Authorized, token expires = {}'.format(self.__expires))
 
 
+    # Rate Limit: https://developers.google.com/sheets/api/limits
+    #   100 request per 100 seconds per User
     @backoff.on_exception(backoff.expo,
                           (Server5xxError, ConnectionError, Server429Error),
                           max_tries=7,
                           factor=3)
-    # Rate Limit:
-    #  https://developers.google.com/webmaster-tools/search-console-api-original/v3/limits
-    @utils.ratelimit(1200, 60)
+    @utils.ratelimit(100, 100)
     def request(self, method, path=None, url=None, api=None, **kwargs):
 
         self.get_access_token()
@@ -211,6 +212,7 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes
             del kwargs['endpoint']
         else:
             endpoint = None
+        LOGGER.info('{} URL = {}'.format(endpoint, url))
 
         if 'headers' not in kwargs:
             kwargs['headers'] = {}
index 25c19c4e7f85cf8a3d3bf1f28b825d28a7f49154..03fefc601cb8cc7ff75b02d1d6ab75e09a5e3dfb 100644 (file)
@@ -30,7 +30,7 @@
       "additionalProperties": false,
       "properties": {
         "kind": {
-          "type": ["null", "integer"]
+          "type": ["null", "string"]
         },
         "displayName": {
           "type": ["null", "string"]
index 12f967a190471859ff4cadd6f0c4ee26b1ae7322..f7a323d457ed0e8059cad8bf71d3da1edcfda182 100644 (file)
@@ -8,7 +8,7 @@
     "sheetId": {
       "type": ["null", "integer"]
     },
-    "sheetTitle": {
+    "title": {
       "type": ["null", "string"]
     },
     "loadDate": {
index 5b57e77ec5cf973e73472aa678bc2187d3629766..79e05f9929ce33bd038cac086dea2772f2306d4a 100644 (file)
@@ -2,6 +2,8 @@ import time
 import math
 import singer
 import json
+import pytz
+from datetime import datetime, timedelta
 from collections import OrderedDict
 from singer import metrics, metadata, Transformer, utils
 from singer.utils import strptime_to_utc, strftime
@@ -48,66 +50,40 @@ def write_bookmark(state, stream, value):
     singer.write_state(state)
 
 
-# def transform_datetime(this_dttm):
-def transform_datetime(this_dttm):
-    with Transformer() as transformer:
-        new_dttm = transformer._transform_datetime(this_dttm)
-    return new_dttm
-
-
-def process_records(catalog, #pylint: disable=too-many-branches
+# Transform/validate batch of records w/ schema and sent to target
+def process_records(catalog,
                     stream_name,
                     records,
-                    time_extracted,
-                    bookmark_field=None,
-                    bookmark_type=None,
-                    max_bookmark_value=None,
-                    last_datetime=None,
-                    last_integer=None,
-                    parent=None,
-                    parent_id=None):
+                    time_extracted):
     stream = catalog.get_stream(stream_name)
     schema = stream.schema.to_dict()
     stream_metadata = metadata.to_map(stream.metadata)
-
     with metrics.record_counter(stream_name) as counter:
         for record in records:
-            # If child object, add parent_id to record
-            if parent_id and parent:
-                record[parent + '_id'] = parent_id
-
             # Transform record for Singer.io
             with Transformer() as transformer:
                 transformed_record = transformer.transform(
                     record,
                     schema,
                     stream_metadata)
-                # Reset max_bookmark_value to new value if higher
-                if transformed_record.get(bookmark_field):
-                    if max_bookmark_value is None or \
-                        transformed_record[bookmark_field] > transform_datetime(max_bookmark_value):
-                        max_bookmark_value = transformed_record[bookmark_field]
-
-                if bookmark_field and (bookmark_field in transformed_record):
-                    if bookmark_type == 'integer':
-                        # Keep only records whose bookmark is after the last_integer
-                        if transformed_record[bookmark_field] >= last_integer:
-                            write_record(stream_name, transformed_record, \
-                                time_extracted=time_extracted)
-                            counter.increment()
-                    elif bookmark_type == 'datetime':
-                        last_dttm = transform_datetime(last_datetime)
-                        bookmark_dttm = transform_datetime(transformed_record[bookmark_field])
-                        # Keep only records whose bookmark is after the last_datetime
-                        if bookmark_dttm >= last_dttm:
-                            write_record(stream_name, transformed_record, \
-                                time_extracted=time_extracted)
-                            counter.increment()
-                else:
-                    write_record(stream_name, transformed_record, time_extracted=time_extracted)
-                    counter.increment()
-
-        return max_bookmark_value, counter.value
+                write_record(stream_name, transformed_record, time_extracted=time_extracted)
+                counter.increment()
+        return counter.value
+
+
+def sync_stream(stream_name, selected_streams, catalog, state, records):
+    # Should sheets_loaded be synced?
+    if stream_name in selected_streams:
+        LOGGER.info('STARTED Syncing {}'.format(stream_name))
+        update_currently_syncing(state, stream_name)
+        write_schema(catalog, stream_name)
+        record_count = process_records(
+            catalog=catalog,
+            stream_name=stream_name,
+            records=records,
+            time_extracted=utils.now())
+        LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
+        update_currently_syncing(state, None)
 
 
 # Currently syncing sets the stream currently being delivered in the state.
@@ -154,14 +130,16 @@ def get_data(stream_name,
     querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
         '{sheet_title}', stream_name)
     data = {}
+    time_extracted = utils.now()
     data = client.get(
         path=path,
         api=api,
         params=querystring,
         endpoint=stream_name)
-    return data
+    return data, time_extracted
 
 
+# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
 def transform_file_metadata(file_metadata):
     # Convert to dict
     file_metadata_tf = json.loads(json.dumps(file_metadata))
@@ -176,10 +154,11 @@ def transform_file_metadata(file_metadata):
     return file_metadata_arr
 
 
+# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
 def transform_spreadsheet_metadata(spreadsheet_metadata):
     # Convert to dict
     spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
-    # Remove keys
+    # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
     if spreadsheet_metadata_tf.get('properties'):
         spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
     spreadsheet_metadata_tf.pop('sheets', None)
@@ -189,6 +168,7 @@ def transform_spreadsheet_metadata(spreadsheet_metadata):
     return spreadsheet_metadata_arr
 
 
+# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
 def transform_sheet_metadata(spreadsheet_id, sheet, columns):
     # Convert to properties to dict
     sheet_metadata = sheet.get('properties')
@@ -202,6 +182,107 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns):
     return sheet_metadata_tf
 
 
+# Convert Excel Date Serial Number (excel_date_sn) to datetime string
+# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
+def excel_to_dttm_str(excel_date_sn, timezone_str=None):
+    if not timezone_str:
+        timezone_str = 'UTC'
+    tz = pytz.timezone(timezone_str)
+    sec_per_day = 86400
+    excel_epoch = 25569 # 1970-01-01T00:00:00Z
+    epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
+    epoch_dttm = datetime(1970, 1, 1)
+    excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
+    utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc)
+    utc_dttm_str = strftime(utc_dttm)
+    return utc_dttm_str
+
+
+# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
+#  Convert from array of values to JSON with column names as keys
+def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
+    sheet_data_tf = []
+    is_last_row = False
+    row_num = from_row
+    # Create sorted list of columns based on columnIndex
+    cols = sorted(columns, key = lambda i: i['columnIndex'])
+
+    # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
+    for row in sheet_data_rows:
+        # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
+        if row == []:
+            is_last_row = True
+            return sheet_data_tf, row_num - 1, is_last_row
+        sheet_data_row_tf = {}
+        # Add spreadsheet_id, sheet_id, and row
+        sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
+        sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
+        sheet_data_row_tf['__sdc_row'] = row_num
+        col_num = 1
+        for value in row:
+            # Select column metadata based on column index
+            col = cols[col_num - 1]
+            col_skipped = col.get('columnSkipped')
+            if not col_skipped:
+                col_name = col.get('columnName')
+                col_type = col.get('columnType')
+                # Convert dates/times from Lotus Notes Serial Numbers
+                if col_type == 'numberType.DATE_TIME':
+                    if isinstance(value, int) or isinstance(value, float):
+                        col_val = excel_to_dttm_str(value)
+                    else:
+                        col_val = str(value)
+                elif col_type == 'numberType.DATE':
+                    if isinstance(value, int) or isinstance(value, float):
+                        col_val = excel_to_dttm_str(value)[:10]
+                    else:
+                        col_val = str(value)
+                elif col_type == 'numberType.TIME':
+                    if isinstance(value, int) or isinstance(value, float):
+                        try:
+                            total_secs = value * 86400 # seconds in day
+                            col_val = str(timedelta(seconds=total_secs))
+                        except ValueError:
+                            col_val = str(value)
+                    else:
+                        col_val = str(value)
+                elif col_type == 'numberType':
+                    if isinstance(value, int):
+                        col_val = int(value)
+                    else:
+                        try:
+                            col_val = float(value)
+                        except ValueError:
+                            col_val = str(value)
+                elif col_type == 'stringValue':
+                    col_val = str(value)
+                elif col_type == 'boolValue':
+                    if isinstance(value, bool):
+                        col_val = value
+                    elif isinstance(value, str):
+                        if value.lower() in ('true', 't', 'yes', 'y'):
+                            col_val = True
+                        elif value.lower() in ('false', 'f', 'no', 'n'):
+                            col_val = False
+                        else:
+                            col_val = str(value)
+                    elif isinstance(value, int):
+                        if value == 1 or value == -1:
+                            col_val = True
+                        elif value == 0:
+                            col_val = False
+                        else:
+                            col_val = str(value)
+
+                else:
+                    col_val = value
+                sheet_data_row_tf[col_name] = col_val
+            col_num = col_num + 1
+        sheet_data_tf.append(sheet_data_row_tf)
+        row_num = row_num + 1
+    return sheet_data_tf, row_num, is_last_row
+
+
 def sync(client, config, catalog, state):
     start_date = config.get('start_date')
     spreadsheet_id = config.get('spreadsheet_id')
@@ -218,63 +299,164 @@ def sync(client, config, catalog, state):
     if not selected_streams:
         return
 
-    # Get file_metadata
+    # FILE_METADATA
     file_metadata = {}
-    file_metadata_config = STREAMS.get('file_metadata')
-    file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id)
+    stream_name = 'file_metadata'
+    file_metadata_config = STREAMS.get(stream_name)
+
+    # GET file_metadata
+    LOGGER.info('GET file_meatadata')
+    file_metadata, time_extracted = get_data(stream_name=stream_name,
+                                             endpoint_config=file_metadata_config,
+                                             client=client,
+                                             spreadsheet_id=spreadsheet_id)
+    # Transform file_metadata
+    LOGGER.info('Transform file_meatadata')
     file_metadata_tf = transform_file_metadata(file_metadata)
     # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))
-    last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date))
+
+    # Check if file has changed, if not break (return to __init__)
+    last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
     this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
     LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
     if this_datetime <= last_datetime:
         LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
         return 0
-    
-    # Get spreadsheet_metadata
+    else:
+        # Sync file_metadata if selected
+        sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf)
+        write_bookmark(state, stream_name, strftime(this_datetime))
+
+    # SPREADSHEET_METADATA
     spreadsheet_metadata = {}
-    spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata')
-    spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id)
+    stream_name = 'spreadsheet_metadata'
+    spreadsheet_metadata_config = STREAMS.get(stream_name)
+
+    # GET spreadsheet_metadata
+    LOGGER.info('GET spreadsheet_meatadata')
+    spreadsheet_metadata, ss_time_extracted  = get_data(
+        stream_name=stream_name,
+        endpoint_config=spreadsheet_metadata_config,
+        client=client,
+        spreadsheet_id=spreadsheet_id)
+
+    # Transform spreadsheet_metadata
+    LOGGER.info('Transform spreadsheet_meatadata')
     spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)
-    # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf))
 
-    # Get sheet_metadata
+    # Sync spreadsheet_metadata if selected
+    sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf)
+
+    # SHEET_METADATA and SHEET_DATA
     sheets = spreadsheet_metadata.get('sheets')
     sheet_metadata = []
     sheets_loaded = []
     sheets_loaded_config = STREAMS['sheets_loaded']
     if sheets:
+        # Loop thru sheets (worksheet tabs) in spreadsheet
         for sheet in sheets:
             sheet_title = sheet.get('properties', {}).get('title')
+            sheet_id = sheet.get('properties', {}).get('sheetId')
+
+            # GET sheet_metadata and columns
             sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
+
+            # Transform sheet_metadata
             sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
             # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
             sheet_metadata.append(sheet_metadata_tf)
 
-            # Determine range of rows and columns for "paging" through batch rows of data
-            sheet_last_col_index = 1
-            sheet_last_col_letter = 'A'
-            for col in columns:
-                col_index = col.get('columnIndex')
-                col_letter = col.get('columnLetter')
-                if col_index > sheet_last_col_index:
-                    sheet_last_col_index = col_index
-                    sheet_last_col_letter = col_letter
-            sheet_max_row = sheet.get('gridProperties', {}).get('rowCount')
-            is_empty_row = False
-            batch_rows = 200
-            from_row = 2
-            if sheet_max_row < batch_rows:
-                to_row = sheet_max_row
-            else:
-                to_row = batch_rows
-
-            while not is_empty_row and to_row <= sheet_max_row:
-                range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row)
-                
-                sheet_data = get_data(
-                    stream_name=sheet_title,
-                    endpoint_config=sheets_loaded_config,
-                    client=client,
-                    spreadsheet_id=spreadsheet_id,
-                    range_rows=range_rows)
+            # SHEET_DATA
+            # Should this worksheet tab be synced?
+            if sheet_title in selected_streams:
+                LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
+                update_currently_syncing(state, sheet_title)
+                write_schema(catalog, sheet_title)
+
+                # Determine max range of columns and rows for "paging" through the data
+                sheet_last_col_index = 1
+                sheet_last_col_letter = 'A'
+                for col in columns:
+                    col_index = col.get('columnIndex')
+                    col_letter = col.get('columnLetter')
+                    if col_index > sheet_last_col_index:
+                        sheet_last_col_index = col_index
+                        sheet_last_col_letter = col_letter
+                sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
+
+                # Initialize paging for 1st batch
+                is_last_row = False
+                batch_rows = 200
+                from_row = 2
+                if sheet_max_row < batch_rows:
+                    to_row = sheet_max_row
+                else:
+                    to_row = batch_rows
+
+                # Loop thru batches (each having 200 rows of data)
+                while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
+                    range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
+                    
+                    # GET sheet_data for a worksheet tab
+                    sheet_data, time_extracted = get_data(
+                        stream_name=sheet_title,
+                        endpoint_config=sheets_loaded_config,
+                        client=client,
+                        spreadsheet_id=spreadsheet_id,
+                        range_rows=range_rows)
+                    # Data is returned as a list of arrays, an array of values for each row
+                    sheet_data_rows = sheet_data.get('values')
+
+                    # Transform batch of rows to JSON with keys for each column
+                    sheet_data_tf, row_num, is_last_row = transform_sheet_data(
+                        spreadsheet_id=spreadsheet_id,
+                        sheet_id=sheet_id,
+                        from_row=from_row,
+                        columns=columns,
+                        sheet_data_rows=sheet_data_rows)
+                    if row_num < to_row:
+                        is_last_row = True
+
+                    # Process records, send batch of records to target
+                    record_count = process_records(
+                        catalog=catalog,
+                        stream_name=sheet_title,
+                        records=sheet_data_tf,
+                        time_extracted=ss_time_extracted)
+                    
+                    # Update paging from/to_row for next batch
+                    from_row = to_row + 1
+                    if to_row + batch_rows > sheet_max_row:
+                        to_row = sheet_max_row
+                    else:
+                        to_row = to_row + batch_rows
+
+                # SHEETS_LOADED
+                # Add sheet to sheets_loaded
+                sheet_loaded = {}
+                sheet_loaded['spreadsheetId'] = spreadsheet_id
+                sheet_loaded['sheetId'] = sheet_id
+                sheet_loaded['title'] = sheet_title
+                sheet_loaded['loadDate'] = strftime(utils.now())
+                sheet_loaded['lastRowNumber'] = row_num
+                sheets_loaded.append(sheet_loaded)
+
+                # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
+                # This forces hard deletes on the data downstream if fewer records are sent for a sheet.
+                # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167
+                activate_version_message = singer.ActivateVersionMessage(
+                    stream=sheet_title,
+                    version=int(time.time() * 1000))
+                singer.write_message(activate_version_message)
+
+                LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
+                    sheet_title, row_num - 1))
+                update_currently_syncing(state, None)
+
+    stream_name = 'sheet_metadata'
+    # Sync sheet_metadata if selected
+    sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)
+    
+    stream_name = 'sheets_loaded'
+    # Sync sheet_metadata if selected
+    sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)