]> git.immae.eu Git - github/fretlink/tap-google-sheets.git/blobdiff - tap_google_sheets/sync.py
Bump to v1.1.0, update changelog (#26)
[github/fretlink/tap-google-sheets.git] / tap_google_sheets / sync.py
index d7d71846999bd2af4037a58d5b14ebf0a5387d52..26c2d19cebd1a4c174eeffa3a0be78797006d8a4 100644 (file)
@@ -1,11 +1,14 @@
 import time
 import math
 import json
+import re
+import urllib.parse
 from datetime import datetime, timedelta
 import pytz
 import singer
 from singer import metrics, metadata, Transformer, utils
 from singer.utils import strptime_to_utc, strftime
+from singer.messages import RecordMessage
 from tap_google_sheets.streams import STREAMS
 from tap_google_sheets.schema import get_sheet_metadata
 
@@ -17,14 +20,26 @@ def write_schema(catalog, stream_name):
     schema = stream.schema.to_dict()
     try:
         singer.write_schema(stream_name, schema, stream.key_properties)
+        LOGGER.info('Writing schema for: {}'.format(stream_name))
     except OSError as err:
         LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
         raise err
 
 
-def write_record(stream_name, record, time_extracted):
+def write_record(stream_name, record, time_extracted, version=None):
     try:
-        singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
+        if version:
+            singer.messages.write_message(
+                RecordMessage(
+                    stream=stream_name,
+                    record=record,
+                    version=version,
+                    time_extracted=time_extracted))
+        else:
+            singer.messages.write_record(
+                stream_name=stream_name,
+                record=record,
+                time_extracted=time_extracted)
     except OSError as err:
         LOGGER.info('OS Error writing record for: {}'.format(stream_name))
         LOGGER.info('record: {}'.format(record))
@@ -53,7 +68,8 @@ def write_bookmark(state, stream, value):
 def process_records(catalog,
                     stream_name,
                     records,
-                    time_extracted):
+                    time_extracted,
+                    version=None):
     stream = catalog.get_stream(stream_name)
     schema = stream.schema.to_dict()
     stream_metadata = metadata.to_map(stream.metadata)
@@ -61,11 +77,19 @@ def process_records(catalog,
         for record in records:
             # Transform record for Singer.io
             with Transformer() as transformer:
-                transformed_record = transformer.transform(
-                    record,
-                    schema,
-                    stream_metadata)
-                write_record(stream_name, transformed_record, time_extracted=time_extracted)
+                try:
+                    transformed_record = transformer.transform(
+                        record,
+                        schema,
+                        stream_metadata)
+                except Exception as err:
+                    LOGGER.error('{}'.format(err))
+                    raise RuntimeError(err)
+                write_record(
+                    stream_name=stream_name,
+                    record=transformed_record,
+                    time_extracted=time_extracted,
+                    version=version)
                 counter.increment()
         return counter.value
 
@@ -125,20 +149,27 @@ def get_data(stream_name,
              range_rows=None):
     if not range_rows:
         range_rows = ''
+    # Replace {placeholder} variables in path
+    # Encode stream_name: fixes issue w/ special characters in sheet name
+    stream_name_escaped = re.escape(stream_name)
+    stream_name_encoded = urllib.parse.quote_plus(stream_name)
     path = endpoint_config.get('path', stream_name).replace(
-        '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
+        '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name_encoded).replace(
             '{range_rows}', range_rows)
     params = endpoint_config.get('params', {})
     api = endpoint_config.get('api', 'sheets')
+    # Add in querystring parameters and replace {placeholder} variables
+    # querystring function ensures parameters are added but not encoded causing API errors
     querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
-        '{sheet_title}', stream_name)
+        '{sheet_title}', stream_name_encoded)
+    LOGGER.info('URL: {}/{}?{}'.format(client.base_url, path, querystring))
     data = {}
     time_extracted = utils.now()
     data = client.get(
         path=path,
         api=api,
         params=querystring,
-        endpoint=stream_name)
+        endpoint=stream_name_escaped)
     return data, time_extracted
 
 
@@ -192,7 +223,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None):
         timezone_str = 'UTC'
     tzn = pytz.timezone(timezone_str)
     sec_per_day = 86400
-    excel_epoch = 25569 # 1970-01-01T00:00:00Z
+    excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
     epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
     epoch_dttm = datetime(1970, 1, 1)
     excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
@@ -203,87 +234,130 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None):
 
 # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
 #  Convert from array of values to JSON with column names as keys
-def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
+def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
     sheet_data_tf = []
-    is_last_row = False
     row_num = from_row
     # Create sorted list of columns based on columnIndex
     cols = sorted(columns, key=lambda i: i['columnIndex'])
 
     # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
     for row in sheet_data_rows:
-        # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
+        # If empty row, SKIP
         if row == []:
-            is_last_row = True
-            return sheet_data_tf, row_num - 1, is_last_row
-        sheet_data_row_tf = {}
-        # Add spreadsheet_id, sheet_id, and row
-        sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
-        sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
-        sheet_data_row_tf['__sdc_row'] = row_num
-        col_num = 1
-        for value in row:
-            # Select column metadata based on column index
-            col = cols[col_num - 1]
-            col_skipped = col.get('columnSkipped')
-            if not col_skipped:
-                col_name = col.get('columnName')
-                col_type = col.get('columnType')
-                # Convert dates/times from Lotus Notes Serial Numbers
-                if col_type == 'numberType.DATE_TIME':
-                    if isinstance(value, (int, float)):
-                        col_val = excel_to_dttm_str(value)
-                    else:
-                        col_val = str(value)
-                elif col_type == 'numberType.DATE':
-                    if isinstance(value, (int, float)):
-                        col_val = excel_to_dttm_str(value)[:10]
-                    else:
-                        col_val = str(value)
-                elif col_type == 'numberType.TIME':
-                    if isinstance(value, (int, float)):
-                        try:
-                            total_secs = value * 86400 # seconds in day
-                            col_val = str(timedelta(seconds=total_secs))
-                        except ValueError:
+            LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
+        else:
+            sheet_data_row_tf = {}
+            # Add spreadsheet_id, sheet_id, and row
+            sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
+            sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
+            sheet_data_row_tf['__sdc_row'] = row_num
+            col_num = 1
+            for value in row:
+                # Select column metadata based on column index
+                col = cols[col_num - 1]
+                col_skipped = col.get('columnSkipped')
+                if not col_skipped:
+                    # Get column metadata
+                    col_name = col.get('columnName')
+                    col_type = col.get('columnType')
+                    col_letter = col.get('columnLetter')
+
+                    # NULL values
+                    if value is None or value == '':
+                        col_val = None
+
+                    # Convert dates/times from Lotus Notes Serial Numbers
+                    # DATE-TIME
+                    elif col_type == 'numberType.DATE_TIME':
+                        if isinstance(value, (int, float)):
+                            col_val = excel_to_dttm_str(value)
+                        else:
                             col_val = str(value)
-                    else:
-                        col_val = str(value)
-                elif col_type == 'numberType':
-                    if isinstance(value, int):
-                        col_val = int(value)
-                    else:
-                        try:
-                            col_val = float(value)
-                        except ValueError:
+                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                sheet_title, col_name, col_letter, row_num, col_type, value))
+                    # DATE
+                    elif col_type == 'numberType.DATE':
+                        if isinstance(value, (int, float)):
+                            col_val = excel_to_dttm_str(value)[:10]
+                        else:
                             col_val = str(value)
-                elif col_type == 'stringValue':
-                    col_val = str(value)
-                elif col_type == 'boolValue':
-                    if isinstance(value, bool):
-                        col_val = value
-                    elif isinstance(value, str):
-                        if value.lower() in ('true', 't', 'yes', 'y'):
-                            col_val = True
-                        elif value.lower() in ('false', 'f', 'no', 'n'):
-                            col_val = False
+                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                sheet_title, col_name, col_letter, row_num, col_type, value))
+                    # TIME ONLY (NO DATE)
+                    elif col_type == 'numberType.TIME':
+                        if isinstance(value, (int, float)):
+                            try:
+                                total_secs = value * 86400 # seconds in day
+                                # Create string formatted like HH:MM:SS
+                                col_val = str(timedelta(seconds=total_secs))
+                            except ValueError:
+                                col_val = str(value)
+                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row_num, col_type, value))
                         else:
                             col_val = str(value)
-                    elif isinstance(value, int):
-                        if value in (1, -1):
-                            col_val = True
-                        elif value == 0:
-                            col_val = False
+                    # NUMBER (INTEGER AND FLOAT)
+                    elif col_type == 'numberType':
+                        if isinstance(value, int):
+                            col_val = int(value)
+                        elif isinstance(value, float):
+                            # Determine float decimal digits
+                            decimal_digits = str(value)[::-1].find('.')
+                            if decimal_digits > 15:
+                                try:
+                                    # ROUND to multipleOf: 1e-15
+                                    col_val = float(round(value, 15))
+                                except ValueError:
+                                    col_val = str(value)
+                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                        sheet_title, col_name, col_letter, row_num, col_type, value))
+                            else: # decimal_digits <= 15, no rounding
+                                try:
+                                    col_val = float(value)
+                                except ValueError:
+                                    col_val = str(value)
+                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                        sheet_title, col_name, col_letter, row_num, col_type, value))
                         else:
                             col_val = str(value)
-
-                else:
-                    col_val = value
-                sheet_data_row_tf[col_name] = col_val
-            col_num = col_num + 1
-        sheet_data_tf.append(sheet_data_row_tf)
+                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                   sheet_title, col_name, col_letter, row_num, col_type, value))
+                    # STRING
+                    elif col_type == 'stringValue':
+                        col_val = str(value)
+                    # BOOLEAN
+                    elif col_type == 'boolValue':
+                        if isinstance(value, bool):
+                            col_val = value
+                        elif isinstance(value, str):
+                            if value.lower() in ('true', 't', 'yes', 'y'):
+                                col_val = True
+                            elif value.lower() in ('false', 'f', 'no', 'n'):
+                                col_val = False
+                            else:
+                                col_val = str(value)
+                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row, col_type, value))
+                        elif isinstance(value, int):
+                            if value in (1, -1):
+                                col_val = True
+                            elif value == 0:
+                                col_val = False
+                            else:
+                                col_val = str(value)
+                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                                    sheet_title, col_name, col_letter, row, col_type, value))
+                    # OTHER: Convert everything else to a string
+                    else:
+                        col_val = str(value)
+                        LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
+                            sheet_title, col_name, col_letter, row, col_type, value))
+                    sheet_data_row_tf[col_name] = col_val
+                col_num = col_num + 1
+            # APPEND non-empty row
+            sheet_data_tf.append(sheet_data_row_tf)
         row_num = row_num + 1
-    return sheet_data_tf, row_num, is_last_row
+    return sheet_data_tf, row_num
 
 
 def sync(client, config, catalog, state):
@@ -324,10 +398,12 @@ def sync(client, config, catalog, state):
     LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
     if this_datetime <= last_datetime:
         LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
+        # Update file_metadata bookmark
+        write_bookmark(state, 'file_metadata', strftime(this_datetime))
         return
     # Sync file_metadata if selected
     sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
-    write_bookmark(state, stream_name, strftime(this_datetime))
+    # file_metadata bookmark is updated at the end of sync
 
     # SPREADSHEET_METADATA
     spreadsheet_metadata = {}
@@ -363,102 +439,119 @@ def sync(client, config, catalog, state):
 
             # GET sheet_metadata and columns
             sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
-            LOGGER.info('sheet_schema: {}'.format(sheet_schema))
-
-            # Transform sheet_metadata
-            sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
-            # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
-            sheet_metadata.append(sheet_metadata_tf)
-
-            # SHEET_DATA
-            # Should this worksheet tab be synced?
-            if sheet_title in selected_streams:
-                LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
-                update_currently_syncing(state, sheet_title)
-                selected_fields = get_selected_fields(catalog, sheet_title)
-                LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
-                write_schema(catalog, sheet_title)
-
-                # Determine max range of columns and rows for "paging" through the data
-                sheet_last_col_index = 1
-                sheet_last_col_letter = 'A'
-                for col in columns:
-                    col_index = col.get('columnIndex')
-                    col_letter = col.get('columnLetter')
-                    if col_index > sheet_last_col_index:
-                        sheet_last_col_index = col_index
-                        sheet_last_col_letter = col_letter
-                sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
-
-                # Initialize paging for 1st batch
-                is_last_row = False
-                batch_rows = 200
-                from_row = 2
-                if sheet_max_row < batch_rows:
-                    to_row = sheet_max_row
-                else:
-                    to_row = batch_rows
-
-                # Loop thru batches (each having 200 rows of data)
-                while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
-                    range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
-
-                    # GET sheet_data for a worksheet tab
-                    sheet_data, time_extracted = get_data(
-                        stream_name=sheet_title,
-                        endpoint_config=sheets_loaded_config,
-                        client=client,
-                        spreadsheet_id=spreadsheet_id,
-                        range_rows=range_rows)
-                    # Data is returned as a list of arrays, an array of values for each row
-                    sheet_data_rows = sheet_data.get('values')
-
-                    # Transform batch of rows to JSON with keys for each column
-                    sheet_data_tf, row_num, is_last_row = transform_sheet_data(
-                        spreadsheet_id=spreadsheet_id,
-                        sheet_id=sheet_id,
-                        from_row=from_row,
-                        columns=columns,
-                        sheet_data_rows=sheet_data_rows)
-                    if row_num < to_row:
-                        is_last_row = True
-
-                    # Process records, send batch of records to target
-                    record_count = process_records(
-                        catalog=catalog,
-                        stream_name=sheet_title,
-                        records=sheet_data_tf,
-                        time_extracted=ss_time_extracted)
-                    LOGGER.info('Sheet: {}, ecords processed: {}'.format(
-                        sheet_title, record_count))
-
-                    # Update paging from/to_row for next batch
-                    from_row = to_row + 1
-                    if to_row + batch_rows > sheet_max_row:
+            # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
+
+            # SKIP empty sheets (where sheet_schema and columns are None)
+            if not sheet_schema or not columns:
+                LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
+            else:
+                # Transform sheet_metadata
+                sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
+                # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
+                sheet_metadata.append(sheet_metadata_tf)
+
+                # SHEET_DATA
+                # Should this worksheet tab be synced?
+                if sheet_title in selected_streams:
+                    LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
+                    update_currently_syncing(state, sheet_title)
+                    selected_fields = get_selected_fields(catalog, sheet_title)
+                    LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
+                    write_schema(catalog, sheet_title)
+
+                    # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
+                    # everytime after each sheet sync is complete.
+                    # This forces hard deletes on the data downstream if fewer records are sent.
+                    # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
+                    last_integer = int(get_bookmark(state, sheet_title, 0))
+                    activate_version = int(time.time() * 1000)
+                    activate_version_message = singer.ActivateVersionMessage(
+                            stream=sheet_title,
+                            version=activate_version)
+                    if last_integer == 0:
+                        # initial load, send activate_version before AND after data sync
+                        singer.write_message(activate_version_message)
+                        LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+
+                    # Determine max range of columns and rows for "paging" through the data
+                    sheet_last_col_index = 1
+                    sheet_last_col_letter = 'A'
+                    for col in columns:
+                        col_index = col.get('columnIndex')
+                        col_letter = col.get('columnLetter')
+                        if col_index > sheet_last_col_index:
+                            sheet_last_col_index = col_index
+                            sheet_last_col_letter = col_letter
+                    sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
+
+                    # Initialize paging for 1st batch
+                    is_last_row = False
+                    batch_rows = 200
+                    from_row = 2
+                    if sheet_max_row < batch_rows:
                         to_row = sheet_max_row
                     else:
-                        to_row = to_row + batch_rows
-
-                # SHEETS_LOADED
-                # Add sheet to sheets_loaded
-                sheet_loaded = {}
-                sheet_loaded['spreadsheetId'] = spreadsheet_id
-                sheet_loaded['sheetId'] = sheet_id
-                sheet_loaded['title'] = sheet_title
-                sheet_loaded['loadDate'] = strftime(utils.now())
-                sheet_loaded['lastRowNumber'] = row_num
-                sheets_loaded.append(sheet_loaded)
-
-                # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
-                # This forces hard deletes on the data downstream if fewer records are sent.
-                # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
-                activate_version_message = singer.ActivateVersionMessage(
-                    stream=sheet_title,
-                    version=int(time.time() * 1000))
-                singer.write_message(activate_version_message)
-
-                LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
-                    sheet_title, row_num - 1))
+                        to_row = batch_rows
+
+                    # Loop thru batches (each having 200 rows of data)
+                    while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
+                        range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
+
+                        # GET sheet_data for a worksheet tab
+                        sheet_data, time_extracted = get_data(
+                            stream_name=sheet_title,
+                            endpoint_config=sheets_loaded_config,
+                            client=client,
+                            spreadsheet_id=spreadsheet_id,
+                            range_rows=range_rows)
+                        # Data is returned as a list of arrays, an array of values for each row
+                        sheet_data_rows = sheet_data.get('values', [])
+
+                        # Transform batch of rows to JSON with keys for each column
+                        sheet_data_tf, row_num = transform_sheet_data(
+                            spreadsheet_id=spreadsheet_id,
+                            sheet_id=sheet_id,
+                            sheet_title=sheet_title,
+                            from_row=from_row,
+                            columns=columns,
+                            sheet_data_rows=sheet_data_rows)
+                        if row_num < to_row:
+                            is_last_row = True
+
+                        # Process records, send batch of records to target
+                        record_count = process_records(
+                            catalog=catalog,
+                            stream_name=sheet_title,
+                            records=sheet_data_tf,
+                            time_extracted=ss_time_extracted,
+                            version=activate_version)
+                        LOGGER.info('Sheet: {}, records processed: {}'.format(
+                            sheet_title, record_count))
+                        
+                        # Update paging from/to_row for next batch
+                        from_row = to_row + 1
+                        if to_row + batch_rows > sheet_max_row:
+                            to_row = sheet_max_row
+                        else:
+                            to_row = to_row + batch_rows
+
+                    # End of Stream: Send Activate Version and update State
+                    singer.write_message(activate_version_message)
+                    write_bookmark(state, sheet_title, activate_version)
+                    LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
+                    LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
+                        sheet_title, row_num - 2)) # subtract 1 for header row
+                    update_currently_syncing(state, None)
+
+                    # SHEETS_LOADED
+                    # Add sheet to sheets_loaded
+                    sheet_loaded = {}
+                    sheet_loaded['spreadsheetId'] = spreadsheet_id
+                    sheet_loaded['sheetId'] = sheet_id
+                    sheet_loaded['title'] = sheet_title
+                    sheet_loaded['loadDate'] = strftime(utils.now())
+                    sheet_loaded['lastRowNumber'] = row_num
+                    sheets_loaded.append(sheet_loaded)
 
     stream_name = 'sheet_metadata'
     # Sync sheet_metadata if selected
@@ -468,4 +561,7 @@ def sync(client, config, catalog, state):
     # Sync sheet_metadata if selected
     sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)
 
+    # Update file_metadata bookmark
+    write_bookmark(state, 'file_metadata', strftime(this_datetime))
+
     return