aboutsummaryrefslogblamecommitdiffhomepage
path: root/tap_google_sheets/sync.py
blob: 689be2785010e122842c33482e9efa6c044b558d (plain) (tree)
1
2
3
4
5
6
7
8
9

           
           
                                        

             

                                                        
                                         










                                                                       
                                                                 




                                                                          
                                                                    
        











                                                   























                                                                              

                                                                  

                                

                                   


                                                      

                                                        





                                                           




                                                  



                                   
                                                                                             



                                                             

                                                                                           
                                          

                                        



                                       
                                          

                                                                                               




















                                                                                               
                    
            
                                          













                                                                
                                             




                                                                                          

                                                                                          


                                                                                                
                                




                             
                               

 
                                                                              













                                                                       
                                                                                       


                                                                          
                                                                                








                                                                        
                                                                                  


                                                             
                                                              








                                                                               




                                                                                   
                                     
                       
                                                                                              


                                                                       
                                                            





                                                                                  
                                                                                                    
                      

                                                        
                                                          


                                                                
                            
                     












                                                                      
                                         

                                                    





                                                        

                                                                         
                                                            


                                                              
                                                

                                                                                                                                        




                                                                   
                                                

                                                                                                                                        








                                                                            

                                                                                                                                            

                                                












                                                                       

                                                                                                                                                




                                                                     

                                                                                                                                                

                                                

                                                                                                                                        













                                                                            

                                                                                                                                            






                                                    

                                                                                                                                            


                                                                

                                                                                                                                    



                                                         
                             
                                 

 















                                                                                   
                   
                      










                                                                                  

                                                                   


                                                                                 



                                                                                              

                                                                       


                                                                                                
                                                          

                          
                             




                                                          
                                                       






                                                    
                                                                                  
 
                                           

                                                                                         

                                   




                                                   
                                                          

                                                                  


                                                                 
                                                                                     
                                                                  
 















































                                                                                                                           

                                              




























































                                                                                                                        
 


                                                                              
 


                                                                             
 


                                                                   
          
import time
import math
import json
from datetime import datetime, timedelta
import pytz
import singer
from singer import metrics, metadata, Transformer, utils
from singer.utils import strptime_to_utc, strftime
from singer.messages import RecordMessage
from tap_google_sheets.streams import STREAMS
from tap_google_sheets.schema import get_sheet_metadata

LOGGER = singer.get_logger()


def write_schema(catalog, stream_name):
    stream = catalog.get_stream(stream_name)
    schema = stream.schema.to_dict()
    try:
        singer.write_schema(stream_name, schema, stream.key_properties)
        LOGGER.info('Writing schema for: {}'.format(stream_name))
    except OSError as err:
        LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
        raise err


def write_record(stream_name, record, time_extracted, version=None):
    try:
        if version:
            singer.messages.write_message(
                RecordMessage(
                    stream=stream_name,
                    record=record,
                    version=version,
                    time_extracted=time_extracted))
        else:
            singer.messages.write_record(
                stream_name=stream_name,
                record=record,
                time_extracted=time_extracted)
    except OSError as err:
        LOGGER.info('OS Error writing record for: {}'.format(stream_name))
        LOGGER.info('record: {}'.format(record))
        raise err


def get_bookmark(state, stream, default):
    if (state is None) or ('bookmarks' not in state):
        return default
    return (
        state
        .get('bookmarks', {})
        .get(stream, default)
    )


def write_bookmark(state, stream, value):
    if 'bookmarks' not in state:
        state['bookmarks'] = {}
    state['bookmarks'][stream] = value
    LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
    singer.write_state(state)


# Transform/validate batch of records w/ schema and sent to target
def process_records(catalog,
                    stream_name,
                    records,
                    time_extracted,
                    version=None):
    stream = catalog.get_stream(stream_name)
    schema = stream.schema.to_dict()
    stream_metadata = metadata.to_map(stream.metadata)
    with metrics.record_counter(stream_name) as counter:
        for record in records:
            # Transform record for Singer.io
            with Transformer() as transformer:
                transformed_record = transformer.transform(
                    record,
                    schema,
                    stream_metadata)
                write_record(
                    stream_name=stream_name,
                    record=transformed_record,
                    time_extracted=time_extracted,
                    version=version)
                counter.increment()
        return counter.value


def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
    # Should sheets_loaded be synced?
    if stream_name in selected_streams:
        LOGGER.info('STARTED Syncing {}'.format(stream_name))
        update_currently_syncing(state, stream_name)
        selected_fields = get_selected_fields(catalog, stream_name)
        LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
        write_schema(catalog, stream_name)
        if not time_extracted:
            time_extracted = utils.now()
        record_count = process_records(
            catalog=catalog,
            stream_name=stream_name,
            records=records,
            time_extracted=time_extracted)
        LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
        update_currently_syncing(state, None)


# Currently syncing sets the stream currently being delivered in the state.
# If the integration is interrupted, this state property is used to identify
#  the starting point to continue from.
# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
def update_currently_syncing(state, stream_name):
    if (stream_name is None) and ('currently_syncing' in state):
        del state['currently_syncing']
    else:
        singer.set_currently_syncing(state, stream_name)
    singer.write_state(state)


# List selected fields from stream catalog
def get_selected_fields(catalog, stream_name):
    stream = catalog.get_stream(stream_name)
    mdata = metadata.to_map(stream.metadata)
    mdata_list = singer.metadata.to_list(mdata)
    selected_fields = []
    for entry in mdata_list:
        field = None
        try:
            field = entry['breadcrumb'][1]
            if entry.get('metadata', {}).get('selected', False):
                selected_fields.append(field)
        except IndexError:
            pass
    return selected_fields


def get_data(stream_name,
             endpoint_config,
             client,
             spreadsheet_id,
             range_rows=None):
    if not range_rows:
        range_rows = ''
    # Replace {placeholder} variables in path
    path = endpoint_config.get('path', stream_name).replace(
        '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
            '{range_rows}', range_rows)
    params = endpoint_config.get('params', {})
    api = endpoint_config.get('api', 'sheets')
    # Add in querystring parameters and replace {placeholder} variables
    # querystring function ensures parameters are added but not encoded causing API errors
    querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
        '{sheet_title}', stream_name)
    data = {}
    time_extracted = utils.now()
    data = client.get(
        path=path,
        api=api,
        params=querystring,
        endpoint=stream_name)
    return data, time_extracted


# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
def transform_file_metadata(file_metadata):
    # Convert to dict
    file_metadata_tf = json.loads(json.dumps(file_metadata))
    # Remove keys
    if file_metadata_tf.get('lastModifyingUser'):
        file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
        file_metadata_tf['lastModifyingUser'].pop('me', None)
        file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
    # Add record to an array of 1
    file_metadata_arr = []
    file_metadata_arr.append(file_metadata_tf)
    return file_metadata_arr


# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
def transform_spreadsheet_metadata(spreadsheet_metadata):
    # Convert to dict
    spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
    # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
    if spreadsheet_metadata_tf.get('properties'):
        spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
    spreadsheet_metadata_tf.pop('sheets', None)
    # Add record to an array of 1
    spreadsheet_metadata_arr = []
    spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
    return spreadsheet_metadata_arr


# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
def transform_sheet_metadata(spreadsheet_id, sheet, columns):
    # Convert to properties to dict
    sheet_metadata = sheet.get('properties')
    sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
    sheet_id = sheet_metadata_tf.get('sheetId')
    sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
        spreadsheet_id, sheet_id)
    sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
    sheet_metadata_tf['sheetUrl'] = sheet_url
    sheet_metadata_tf['columns'] = columns
    return sheet_metadata_tf


# Convert Excel Date Serial Number (excel_date_sn) to datetime string
# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
def excel_to_dttm_str(excel_date_sn, timezone_str=None):
    if not timezone_str:
        timezone_str = 'UTC'
    tzn = pytz.timezone(timezone_str)
    sec_per_day = 86400
    excel_epoch = 25569 # 1970-01-01T00:00:00Z, Lotus Notes Serial Number for Epoch Start Date
    epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
    epoch_dttm = datetime(1970, 1, 1)
    excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
    utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
    utc_dttm_str = strftime(utc_dttm)
    return utc_dttm_str


# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
#  Convert from array of values to JSON with column names as keys
def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows):
    sheet_data_tf = []
    row_num = from_row
    # Create sorted list of columns based on columnIndex
    cols = sorted(columns, key=lambda i: i['columnIndex'])

    # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
    for row in sheet_data_rows:
        # If empty row, SKIP
        if row == []:
            LOGGER.info('EMPTY ROW: {}, SKIPPING'.format(row_num))
        else:
            sheet_data_row_tf = {}
            # Add spreadsheet_id, sheet_id, and row
            sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
            sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
            sheet_data_row_tf['__sdc_row'] = row_num
            col_num = 1
            for value in row:
                # Select column metadata based on column index
                col = cols[col_num - 1]
                col_skipped = col.get('columnSkipped')
                if not col_skipped:
                    # Get column metadata
                    col_name = col.get('columnName')
                    col_type = col.get('columnType')
                    col_letter = col.get('columnLetter')

                    # NULL values
                    if value is None or value == '':
                        col_val = None

                    # Convert dates/times from Lotus Notes Serial Numbers
                    # DATE-TIME
                    elif col_type == 'numberType.DATE_TIME':
                        if isinstance(value, (int, float)):
                            col_val = excel_to_dttm_str(value)
                        else:
                            col_val = str(value)
                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                sheet_title, col_name, col_letter, row_num, col_type, value))
                    # DATE
                    elif col_type == 'numberType.DATE':
                        if isinstance(value, (int, float)):
                            col_val = excel_to_dttm_str(value)[:10]
                        else:
                            col_val = str(value)
                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                sheet_title, col_name, col_letter, row_num, col_type, value))
                    # TIME ONLY (NO DATE)
                    elif col_type == 'numberType.TIME':
                        if isinstance(value, (int, float)):
                            try:
                                total_secs = value * 86400 # seconds in day
                                # Create string formatted like HH:MM:SS
                                col_val = str(timedelta(seconds=total_secs))
                            except ValueError:
                                col_val = str(value)
                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                    sheet_title, col_name, col_letter, row_num, col_type, value))
                        else:
                            col_val = str(value)
                    # NUMBER (INTEGER AND FLOAT)
                    elif col_type == 'numberType':
                        if isinstance(value, int):
                            col_val = int(value)
                        elif isinstance(value, float):
                            # Determine float decimal digits
                            decimal_digits = str(value)[::-1].find('.')
                            if decimal_digits > 15:
                                try:
                                    # ROUND to multipleOf: 1e-15
                                    col_val = float(round(value, 15))
                                except ValueError:
                                    col_val = str(value)
                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                        sheet_title, col_name, col_letter, row_num, col_type, value))
                            else: # decimal_digits <= 15, no rounding
                                try:
                                    col_val = float(value)
                                except ValueError:
                                    col_val = str(value)
                                    LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                        sheet_title, col_name, col_letter, row_num, col_type, value))
                        else:
                            col_val = str(value)
                            LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                   sheet_title, col_name, col_letter, row_num, col_type, value))
                    # STRING
                    elif col_type == 'stringValue':
                        col_val = str(value)
                    # BOOLEAN
                    elif col_type == 'boolValue':
                        if isinstance(value, bool):
                            col_val = value
                        elif isinstance(value, str):
                            if value.lower() in ('true', 't', 'yes', 'y'):
                                col_val = True
                            elif value.lower() in ('false', 'f', 'no', 'n'):
                                col_val = False
                            else:
                                col_val = str(value)
                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                    sheet_title, col_name, col_letter, row, col_type, value))
                        elif isinstance(value, int):
                            if value in (1, -1):
                                col_val = True
                            elif value == 0:
                                col_val = False
                            else:
                                col_val = str(value)
                                LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                                    sheet_title, col_name, col_letter, row, col_type, value))
                    # OTHER: Convert everything else to a string
                    else:
                        col_val = str(value)
                        LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format(
                            sheet_title, col_name, col_letter, row, col_type, value))
                    sheet_data_row_tf[col_name] = col_val
                col_num = col_num + 1
            # APPEND non-empty row
            sheet_data_tf.append(sheet_data_row_tf)
        row_num = row_num + 1
    return sheet_data_tf, row_num


def sync(client, config, catalog, state):
    start_date = config.get('start_date')
    spreadsheet_id = config.get('spreadsheet_id')

    # Get selected_streams from catalog, based on state last_stream
    #   last_stream = Previous currently synced stream, if the load was interrupted
    last_stream = singer.get_currently_syncing(state)
    LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
    selected_streams = []
    for stream in catalog.get_selected_streams(state):
        selected_streams.append(stream.stream)
    LOGGER.info('selected_streams: {}'.format(selected_streams))

    if not selected_streams:
        return

    # FILE_METADATA
    file_metadata = {}
    stream_name = 'file_metadata'
    file_metadata_config = STREAMS.get(stream_name)

    # GET file_metadata
    LOGGER.info('GET file_meatadata')
    file_metadata, time_extracted = get_data(stream_name=stream_name,
                                             endpoint_config=file_metadata_config,
                                             client=client,
                                             spreadsheet_id=spreadsheet_id)
    # Transform file_metadata
    LOGGER.info('Transform file_meatadata')
    file_metadata_tf = transform_file_metadata(file_metadata)
    # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))

    # Check if file has changed, if not break (return to __init__)
    last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
    this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
    LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
    if this_datetime <= last_datetime:
        LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
        # Update file_metadata bookmark
        write_bookmark(state, 'file_metadata', strftime(this_datetime))
        return
    # Sync file_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
    # file_metadata bookmark is updated at the end of sync

    # SPREADSHEET_METADATA
    spreadsheet_metadata = {}
    stream_name = 'spreadsheet_metadata'
    spreadsheet_metadata_config = STREAMS.get(stream_name)

    # GET spreadsheet_metadata
    LOGGER.info('GET spreadsheet_meatadata')
    spreadsheet_metadata, ss_time_extracted = get_data(
        stream_name=stream_name,
        endpoint_config=spreadsheet_metadata_config,
        client=client,
        spreadsheet_id=spreadsheet_id)

    # Transform spreadsheet_metadata
    LOGGER.info('Transform spreadsheet_meatadata')
    spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)

    # Sync spreadsheet_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
        ss_time_extracted)

    # SHEET_METADATA and SHEET_DATA
    sheets = spreadsheet_metadata.get('sheets')
    sheet_metadata = []
    sheets_loaded = []
    sheets_loaded_config = STREAMS['sheets_loaded']
    if sheets:
        # Loop thru sheets (worksheet tabs) in spreadsheet
        for sheet in sheets:
            sheet_title = sheet.get('properties', {}).get('title')
            sheet_id = sheet.get('properties', {}).get('sheetId')

            # GET sheet_metadata and columns
            sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
            # LOGGER.info('sheet_schema: {}'.format(sheet_schema))

            # SKIP empty sheets (where sheet_schema and columns are None)
            if not sheet_schema or not columns:
                LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
            else:
                # Transform sheet_metadata
                sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
                # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
                sheet_metadata.append(sheet_metadata_tf)

                # SHEET_DATA
                # Should this worksheet tab be synced?
                if sheet_title in selected_streams:
                    LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
                    update_currently_syncing(state, sheet_title)
                    selected_fields = get_selected_fields(catalog, sheet_title)
                    LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
                    write_schema(catalog, sheet_title)

                    # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
                    # everytime after each sheet sync is complete.
                    # This forces hard deletes on the data downstream if fewer records are sent.
                    # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
                    last_integer = int(get_bookmark(state, sheet_title, 0))
                    activate_version = int(time.time() * 1000)
                    activate_version_message = singer.ActivateVersionMessage(
                            stream=sheet_title,
                            version=activate_version)
                    if last_integer == 0:
                        # initial load, send activate_version before AND after data sync
                        singer.write_message(activate_version_message)
                        LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))

                    # Determine max range of columns and rows for "paging" through the data
                    sheet_last_col_index = 1
                    sheet_last_col_letter = 'A'
                    for col in columns:
                        col_index = col.get('columnIndex')
                        col_letter = col.get('columnLetter')
                        if col_index > sheet_last_col_index:
                            sheet_last_col_index = col_index
                            sheet_last_col_letter = col_letter
                    sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')

                    # Initialize paging for 1st batch
                    is_last_row = False
                    batch_rows = 200
                    from_row = 2
                    if sheet_max_row < batch_rows:
                        to_row = sheet_max_row
                    else:
                        to_row = batch_rows

                    # Loop thru batches (each having 200 rows of data)
                    while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
                        range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)

                        # GET sheet_data for a worksheet tab
                        sheet_data, time_extracted = get_data(
                            stream_name=sheet_title,
                            endpoint_config=sheets_loaded_config,
                            client=client,
                            spreadsheet_id=spreadsheet_id,
                            range_rows=range_rows)
                        # Data is returned as a list of arrays, an array of values for each row
                        sheet_data_rows = sheet_data.get('values')

                        # Transform batch of rows to JSON with keys for each column
                        sheet_data_tf, row_num = transform_sheet_data(
                            spreadsheet_id=spreadsheet_id,
                            sheet_id=sheet_id,
                            sheet_title=sheet_title,
                            from_row=from_row,
                            columns=columns,
                            sheet_data_rows=sheet_data_rows)
                        if row_num < to_row:
                            is_last_row = True

                        # Process records, send batch of records to target
                        record_count = process_records(
                            catalog=catalog,
                            stream_name=sheet_title,
                            records=sheet_data_tf,
                            time_extracted=ss_time_extracted,
                            version=activate_version)
                        LOGGER.info('Sheet: {}, records processed: {}'.format(
                            sheet_title, record_count))
                        
                        # Update paging from/to_row for next batch
                        from_row = to_row + 1
                        if to_row + batch_rows > sheet_max_row:
                            to_row = sheet_max_row
                        else:
                            to_row = to_row + batch_rows

                    # End of Stream: Send Activate Version and update State
                    singer.write_message(activate_version_message)
                    write_bookmark(state, sheet_title, activate_version)
                    LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
                    LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
                        sheet_title, row_num - 2)) # subtract 1 for header row
                    update_currently_syncing(state, None)

                    # SHEETS_LOADED
                    # Add sheet to sheets_loaded
                    sheet_loaded = {}
                    sheet_loaded['spreadsheetId'] = spreadsheet_id
                    sheet_loaded['sheetId'] = sheet_id
                    sheet_loaded['title'] = sheet_title
                    sheet_loaded['loadDate'] = strftime(utils.now())
                    sheet_loaded['lastRowNumber'] = row_num
                    sheets_loaded.append(sheet_loaded)

    stream_name = 'sheet_metadata'
    # Sync sheet_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)

    stream_name = 'sheets_loaded'
    # Sync sheet_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)

    # Update file_metadata bookmark
    write_bookmark(state, 'file_metadata', strftime(this_datetime))

    return