From 89643ba6fa98db82efd3246805ef801a8bfb5c81 Mon Sep 17 00:00:00 2001 From: Jeff Huth Date: Wed, 13 Nov 2019 17:03:56 -0800 Subject: Initial commit Discovery mode works. Still working on normal sync. --- tap_google_sheets/sync.py | 281 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100644 tap_google_sheets/sync.py (limited to 'tap_google_sheets/sync.py') diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py new file mode 100644 index 0000000..a8b02d0 --- /dev/null +++ b/tap_google_sheets/sync.py @@ -0,0 +1,281 @@ +import time +import math +import singer +import json +from collections import OrderedDict +from singer import metrics, metadata, Transformer, utils +from singer.utils import strptime_to_utc, strftime +from tap_google_sheets.transform import transform_json +from tap_google_sheets.streams import STREAMS +from tap_google_sheets.schema import get_sheet_metadata + +LOGGER = singer.get_logger() + + +def write_schema(catalog, stream_name): + stream = catalog.get_stream(stream_name) + schema = stream.schema.to_dict() + try: + singer.write_schema(stream_name, schema, stream.key_properties) + except OSError as err: + LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) + raise err + + +def write_record(stream_name, record, time_extracted): + try: + singer.messages.write_record(stream_name, record, time_extracted=time_extracted) + except OSError as err: + LOGGER.info('OS Error writing record for: {}'.format(stream_name)) + LOGGER.info('record: {}'.format(record)) + raise err + + +def get_bookmark(state, stream, default): + if (state is None) or ('bookmarks' not in state): + return default + return ( + state + .get('bookmarks', {}) + .get(stream, default) + ) + + +def write_bookmark(state, stream, value): + if 'bookmarks' not in state: + state['bookmarks'] = {} + state['bookmarks'][stream] = value + LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value)) + singer.write_state(state) + + +# def transform_datetime(this_dttm): +def transform_datetime(this_dttm): + with Transformer() as transformer: + new_dttm = transformer._transform_datetime(this_dttm) + return new_dttm + + +def process_records(catalog, #pylint: disable=too-many-branches + stream_name, + records, + time_extracted, + bookmark_field=None, + bookmark_type=None, + max_bookmark_value=None, + last_datetime=None, + last_integer=None, + parent=None, + parent_id=None): + stream = catalog.get_stream(stream_name) + schema = stream.schema.to_dict() + stream_metadata = metadata.to_map(stream.metadata) + + with metrics.record_counter(stream_name) as counter: + for record in records: + # If child object, add parent_id to record + if parent_id and parent: + record[parent + '_id'] = parent_id + + # Transform record for Singer.io + with Transformer() as transformer: + transformed_record = transformer.transform( + record, + schema, + stream_metadata) + # Reset max_bookmark_value to new value if higher + if transformed_record.get(bookmark_field): + if max_bookmark_value is None or \ + transformed_record[bookmark_field] > transform_datetime(max_bookmark_value): + max_bookmark_value = transformed_record[bookmark_field] + + if bookmark_field and (bookmark_field in transformed_record): + if bookmark_type == 'integer': + # Keep only records whose bookmark is after the last_integer + if transformed_record[bookmark_field] >= last_integer: + write_record(stream_name, transformed_record, \ + time_extracted=time_extracted) + counter.increment() + elif bookmark_type == 'datetime': + last_dttm = transform_datetime(last_datetime) + bookmark_dttm = transform_datetime(transformed_record[bookmark_field]) + # Keep only records whose bookmark is after the last_datetime + if bookmark_dttm >= last_dttm: + write_record(stream_name, transformed_record, \ + time_extracted=time_extracted) + counter.increment() + else: + write_record(stream_name, transformed_record, time_extracted=time_extracted) + counter.increment() + + return max_bookmark_value, counter.value + + +# Currently syncing sets the stream currently being delivered in the state. +# If the integration is interrupted, this state property is used to identify +# the starting point to continue from. +# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46 +def update_currently_syncing(state, stream_name): + if (stream_name is None) and ('currently_syncing' in state): + del state['currently_syncing'] + else: + singer.set_currently_syncing(state, stream_name) + singer.write_state(state) + + +# List selected fields from stream catalog +def get_selected_fields(catalog, stream_name): + stream = catalog.get_stream(stream_name) + mdata = metadata.to_map(stream.metadata) + mdata_list = singer.metadata.to_list(mdata) + selected_fields = [] + for entry in mdata_list: + field = None + try: + field = entry['breadcrumb'][1] + if entry.get('metadata', {}).get('selected', False): + selected_fields.append(field) + except IndexError: + pass + return selected_fields + + +def get_data(stream_name, + endpoint_config, + client, + spreadsheet_id, + range_rows=None): + if not range_rows: + range_rows = '' + path = endpoint_config.get('path', stream_name).replace( + '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace( + '{range_rows}', range_rows) + params = endpoint_config.get('params', {}) + api = endpoint_config.get('api', 'sheets') + querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace( + '{sheet_title}', stream_name) + data = {} + data = client.get( + path=path, + api=api, + params=querystring, + endpoint=stream_name) + return data + + +def transform_file_metadata(file_metadata): + # Convert to dict + file_metadata_tf = json.loads(json.dumps(file_metadata)) + # Remove keys + if file_metadata_tf.get('lastModifyingUser'): + file_metadata_tf['lastModifyingUser'].pop('photoLink', None) + file_metadata_tf['lastModifyingUser'].pop('me', None) + file_metadata_tf['lastModifyingUser'].pop('permissionId', None) + # Add record to an array of 1 + file_metadata_arr = [] + file_metadata_arr.append(file_metadata_tf) + return file_metadata_arr + + +def transform_spreadsheet_metadata(spreadsheet_metadata): + # Convert to dict + spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata)) + # Remove keys + if spreadsheet_metadata_tf.get('properties'): + spreadsheet_metadata_tf['properties'].pop('defaultFormat', None) + spreadsheet_metadata_tf.pop('sheets', None) + # Add record to an array of 1 + spreadsheet_metadata_arr = [] + spreadsheet_metadata_arr.append(spreadsheet_metadata_tf) + return spreadsheet_metadata_arr + + +def transform_sheet_metadata(spreadsheet_id, sheet, columns): + # Convert to properties to dict + sheet_metadata = sheet.get('properties') + sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) + sheet_id = sheet_metadata_tf.get('sheetId') + sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( + spreadsheet_id, sheet_id) + sheet_metadata_tf['spreadsheetId'] = spreadsheet_id + sheet_metadata_tf['sheetUrl'] = sheet_url + sheet_metadata_tf['columns'] = columns + return sheet_metadata_tf + + +def sync(client, config, catalog, state): + start_date = config.get('start_date') + spreadsheet_id = config.get('spreadsheet_id') + + # Get selected_streams from catalog, based on state last_stream + # last_stream = Previous currently synced stream, if the load was interrupted + last_stream = singer.get_currently_syncing(state) + LOGGER.info('last/currently syncing stream: {}'.format(last_stream)) + selected_streams = [] + for stream in catalog.get_selected_streams(state): + selected_streams.append(stream.stream) + LOGGER.info('selected_streams: {}'.format(selected_streams)) + + if not selected_streams: + return + + # Get file_metadata + file_metadata = {} + file_metadata_config = STREAMS.get('file_metadata') + file_metadata = get_data('file_metadata', file_metadata_config, client, spreadsheet_id) + file_metadata_tf = transform_file_metadata(file_metadata) + # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf)) + last_datetime = strptime_to_utc(get_bookmark(state, 'file_metadata', start_date)) + this_datetime = strptime_to_utc(file_metadata.get('modifiedTime')) + LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) + if this_datetime <= last_datetime: + LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') + return 0 + + # Get spreadsheet_metadata + spreadsheet_metadata = {} + spreadsheet_metadata_config = STREAMS.get('spreadsheet_metadata') + spreadsheet_metadata = get_data('spreadsheet_metadata', spreadsheet_metadata_config, client, spreadsheet_id) + spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) + # LOGGER.info('spreadsheet_metadata_tf = {}'.format(spreadsheet_metadata_tf)) + + # Get sheet_metadata + sheets = spreadsheet_metadata.get('sheets') + sheet_metadata = [] + sheets_loaded = [] + sheets_loaded_config = STREAMS['sheets_loaded'] + if sheets: + for sheet in sheets: + sheet_title = sheet.get('properties', {}).get('title') + sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) + sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) + # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) + sheet_metadata.append(sheet_metadata_tf) + + # Determine range of rows and columns for "paging" through batch rows of data + sheet_last_col_index = 1 + sheet_last_col_letter = 'A' + for col in columns: + col_index = col.get('columnIndex') + col_letter = col.get('columnLetter') + if col_index > sheet_last_col_index: + sheet_last_col_index = col_index + sheet_last_col_letter = col_letter + sheet_max_row = sheet.get('gridProperties', {}).get('rowCount') + is_empty_row = False + batch_rows = 200 + from_row = 2 + if sheet_max_row < batch_rows: + to_row = sheet_max_row + else: + to_row = batch_rows + + while not is_empty_row and to_row <= sheet_max_row: + range_rows = 'A2:{}{}'.format(sheet_last_col_letter, to_row) + + sheet_data = get_data( + stream_name=sheet_title, + endpoint_config=sheets_loaded_config, + client=client, + spreadsheet_id=spreadsheet_id, + range_rows=range_rows) -- cgit v1.2.3