aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets/sync.py
blob: d7d71846999bd2af4037a58d5b14ebf0a5387d52 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
import time
import math
import json
from datetime import datetime, timedelta
import pytz
import singer
from singer import metrics, metadata, Transformer, utils
from singer.utils import strptime_to_utc, strftime
from tap_google_sheets.streams import STREAMS
from tap_google_sheets.schema import get_sheet_metadata

LOGGER = singer.get_logger()


def write_schema(catalog, stream_name):
    stream = catalog.get_stream(stream_name)
    schema = stream.schema.to_dict()
    try:
        singer.write_schema(stream_name, schema, stream.key_properties)
    except OSError as err:
        LOGGER.info('OS Error writing schema for: {}'.format(stream_name))
        raise err


def write_record(stream_name, record, time_extracted):
    try:
        singer.messages.write_record(stream_name, record, time_extracted=time_extracted)
    except OSError as err:
        LOGGER.info('OS Error writing record for: {}'.format(stream_name))
        LOGGER.info('record: {}'.format(record))
        raise err


def get_bookmark(state, stream, default):
    if (state is None) or ('bookmarks' not in state):
        return default
    return (
        state
        .get('bookmarks', {})
        .get(stream, default)
    )


def write_bookmark(state, stream, value):
    if 'bookmarks' not in state:
        state['bookmarks'] = {}
    state['bookmarks'][stream] = value
    LOGGER.info('Write state for stream: {}, value: {}'.format(stream, value))
    singer.write_state(state)


# Transform/validate batch of records w/ schema and sent to target
def process_records(catalog,
                    stream_name,
                    records,
                    time_extracted):
    stream = catalog.get_stream(stream_name)
    schema = stream.schema.to_dict()
    stream_metadata = metadata.to_map(stream.metadata)
    with metrics.record_counter(stream_name) as counter:
        for record in records:
            # Transform record for Singer.io
            with Transformer() as transformer:
                transformed_record = transformer.transform(
                    record,
                    schema,
                    stream_metadata)
                write_record(stream_name, transformed_record, time_extracted=time_extracted)
                counter.increment()
        return counter.value


def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None):
    # Should sheets_loaded be synced?
    if stream_name in selected_streams:
        LOGGER.info('STARTED Syncing {}'.format(stream_name))
        update_currently_syncing(state, stream_name)
        selected_fields = get_selected_fields(catalog, stream_name)
        LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields))
        write_schema(catalog, stream_name)
        if not time_extracted:
            time_extracted = utils.now()
        record_count = process_records(
            catalog=catalog,
            stream_name=stream_name,
            records=records,
            time_extracted=time_extracted)
        LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count))
        update_currently_syncing(state, None)


# Currently syncing sets the stream currently being delivered in the state.
# If the integration is interrupted, this state property is used to identify
#  the starting point to continue from.
# Reference: https://github.com/singer-io/singer-python/blob/master/singer/bookmarks.py#L41-L46
def update_currently_syncing(state, stream_name):
    if (stream_name is None) and ('currently_syncing' in state):
        del state['currently_syncing']
    else:
        singer.set_currently_syncing(state, stream_name)
    singer.write_state(state)


# List selected fields from stream catalog
def get_selected_fields(catalog, stream_name):
    stream = catalog.get_stream(stream_name)
    mdata = metadata.to_map(stream.metadata)
    mdata_list = singer.metadata.to_list(mdata)
    selected_fields = []
    for entry in mdata_list:
        field = None
        try:
            field = entry['breadcrumb'][1]
            if entry.get('metadata', {}).get('selected', False):
                selected_fields.append(field)
        except IndexError:
            pass
    return selected_fields


def get_data(stream_name,
             endpoint_config,
             client,
             spreadsheet_id,
             range_rows=None):
    if not range_rows:
        range_rows = ''
    path = endpoint_config.get('path', stream_name).replace(
        '{spreadsheet_id}', spreadsheet_id).replace('{sheet_title}', stream_name).replace(
            '{range_rows}', range_rows)
    params = endpoint_config.get('params', {})
    api = endpoint_config.get('api', 'sheets')
    querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace(
        '{sheet_title}', stream_name)
    data = {}
    time_extracted = utils.now()
    data = client.get(
        path=path,
        api=api,
        params=querystring,
        endpoint=stream_name)
    return data, time_extracted


# Tranform file_metadata: remove nodes from lastModifyingUser, format as array
def transform_file_metadata(file_metadata):
    # Convert to dict
    file_metadata_tf = json.loads(json.dumps(file_metadata))
    # Remove keys
    if file_metadata_tf.get('lastModifyingUser'):
        file_metadata_tf['lastModifyingUser'].pop('photoLink', None)
        file_metadata_tf['lastModifyingUser'].pop('me', None)
        file_metadata_tf['lastModifyingUser'].pop('permissionId', None)
    # Add record to an array of 1
    file_metadata_arr = []
    file_metadata_arr.append(file_metadata_tf)
    return file_metadata_arr


# Tranform spreadsheet_metadata: remove defaultFormat and sheets nodes, format as array
def transform_spreadsheet_metadata(spreadsheet_metadata):
    # Convert to dict
    spreadsheet_metadata_tf = json.loads(json.dumps(spreadsheet_metadata))
    # Remove keys: defaultFormat and sheets (sheets will come in sheet_metadata)
    if spreadsheet_metadata_tf.get('properties'):
        spreadsheet_metadata_tf['properties'].pop('defaultFormat', None)
    spreadsheet_metadata_tf.pop('sheets', None)
    # Add record to an array of 1
    spreadsheet_metadata_arr = []
    spreadsheet_metadata_arr.append(spreadsheet_metadata_tf)
    return spreadsheet_metadata_arr


# Tranform spreadsheet_metadata: add spreadsheetId, sheetUrl, and columns metadata
def transform_sheet_metadata(spreadsheet_id, sheet, columns):
    # Convert to properties to dict
    sheet_metadata = sheet.get('properties')
    sheet_metadata_tf = json.loads(json.dumps(sheet_metadata))
    sheet_id = sheet_metadata_tf.get('sheetId')
    sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format(
        spreadsheet_id, sheet_id)
    sheet_metadata_tf['spreadsheetId'] = spreadsheet_id
    sheet_metadata_tf['sheetUrl'] = sheet_url
    sheet_metadata_tf['columns'] = columns
    return sheet_metadata_tf


# Convert Excel Date Serial Number (excel_date_sn) to datetime string
# timezone_str: defaults to UTC (which we assume is the timezone for ALL datetimes)
def excel_to_dttm_str(excel_date_sn, timezone_str=None):
    if not timezone_str:
        timezone_str = 'UTC'
    tzn = pytz.timezone(timezone_str)
    sec_per_day = 86400
    excel_epoch = 25569 # 1970-01-01T00:00:00Z
    epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day)
    epoch_dttm = datetime(1970, 1, 1)
    excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec)
    utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc)
    utc_dttm_str = strftime(utc_dttm)
    return utc_dttm_str


# Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times
#  Convert from array of values to JSON with column names as keys
def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows):
    sheet_data_tf = []
    is_last_row = False
    row_num = from_row
    # Create sorted list of columns based on columnIndex
    cols = sorted(columns, key=lambda i: i['columnIndex'])

    # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows))
    for row in sheet_data_rows:
        # If empty row, return sheet_data_tf w/ is_last_row and row_num - 1
        if row == []:
            is_last_row = True
            return sheet_data_tf, row_num - 1, is_last_row
        sheet_data_row_tf = {}
        # Add spreadsheet_id, sheet_id, and row
        sheet_data_row_tf['__sdc_spreadsheet_id'] = spreadsheet_id
        sheet_data_row_tf['__sdc_sheet_id'] = sheet_id
        sheet_data_row_tf['__sdc_row'] = row_num
        col_num = 1
        for value in row:
            # Select column metadata based on column index
            col = cols[col_num - 1]
            col_skipped = col.get('columnSkipped')
            if not col_skipped:
                col_name = col.get('columnName')
                col_type = col.get('columnType')
                # Convert dates/times from Lotus Notes Serial Numbers
                if col_type == 'numberType.DATE_TIME':
                    if isinstance(value, (int, float)):
                        col_val = excel_to_dttm_str(value)
                    else:
                        col_val = str(value)
                elif col_type == 'numberType.DATE':
                    if isinstance(value, (int, float)):
                        col_val = excel_to_dttm_str(value)[:10]
                    else:
                        col_val = str(value)
                elif col_type == 'numberType.TIME':
                    if isinstance(value, (int, float)):
                        try:
                            total_secs = value * 86400 # seconds in day
                            col_val = str(timedelta(seconds=total_secs))
                        except ValueError:
                            col_val = str(value)
                    else:
                        col_val = str(value)
                elif col_type == 'numberType':
                    if isinstance(value, int):
                        col_val = int(value)
                    else:
                        try:
                            col_val = float(value)
                        except ValueError:
                            col_val = str(value)
                elif col_type == 'stringValue':
                    col_val = str(value)
                elif col_type == 'boolValue':
                    if isinstance(value, bool):
                        col_val = value
                    elif isinstance(value, str):
                        if value.lower() in ('true', 't', 'yes', 'y'):
                            col_val = True
                        elif value.lower() in ('false', 'f', 'no', 'n'):
                            col_val = False
                        else:
                            col_val = str(value)
                    elif isinstance(value, int):
                        if value in (1, -1):
                            col_val = True
                        elif value == 0:
                            col_val = False
                        else:
                            col_val = str(value)

                else:
                    col_val = value
                sheet_data_row_tf[col_name] = col_val
            col_num = col_num + 1
        sheet_data_tf.append(sheet_data_row_tf)
        row_num = row_num + 1
    return sheet_data_tf, row_num, is_last_row


def sync(client, config, catalog, state):
    start_date = config.get('start_date')
    spreadsheet_id = config.get('spreadsheet_id')

    # Get selected_streams from catalog, based on state last_stream
    #   last_stream = Previous currently synced stream, if the load was interrupted
    last_stream = singer.get_currently_syncing(state)
    LOGGER.info('last/currently syncing stream: {}'.format(last_stream))
    selected_streams = []
    for stream in catalog.get_selected_streams(state):
        selected_streams.append(stream.stream)
    LOGGER.info('selected_streams: {}'.format(selected_streams))

    if not selected_streams:
        return

    # FILE_METADATA
    file_metadata = {}
    stream_name = 'file_metadata'
    file_metadata_config = STREAMS.get(stream_name)

    # GET file_metadata
    LOGGER.info('GET file_meatadata')
    file_metadata, time_extracted = get_data(stream_name=stream_name,
                                             endpoint_config=file_metadata_config,
                                             client=client,
                                             spreadsheet_id=spreadsheet_id)
    # Transform file_metadata
    LOGGER.info('Transform file_meatadata')
    file_metadata_tf = transform_file_metadata(file_metadata)
    # LOGGER.info('file_metadata_tf = {}'.format(file_metadata_tf))

    # Check if file has changed, if not break (return to __init__)
    last_datetime = strptime_to_utc(get_bookmark(state, stream_name, start_date))
    this_datetime = strptime_to_utc(file_metadata.get('modifiedTime'))
    LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime))
    if this_datetime <= last_datetime:
        LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.')
        return
    # Sync file_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted)
    write_bookmark(state, stream_name, strftime(this_datetime))

    # SPREADSHEET_METADATA
    spreadsheet_metadata = {}
    stream_name = 'spreadsheet_metadata'
    spreadsheet_metadata_config = STREAMS.get(stream_name)

    # GET spreadsheet_metadata
    LOGGER.info('GET spreadsheet_meatadata')
    spreadsheet_metadata, ss_time_extracted = get_data(
        stream_name=stream_name,
        endpoint_config=spreadsheet_metadata_config,
        client=client,
        spreadsheet_id=spreadsheet_id)

    # Transform spreadsheet_metadata
    LOGGER.info('Transform spreadsheet_meatadata')
    spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata)

    # Sync spreadsheet_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \
        ss_time_extracted)

    # SHEET_METADATA and SHEET_DATA
    sheets = spreadsheet_metadata.get('sheets')
    sheet_metadata = []
    sheets_loaded = []
    sheets_loaded_config = STREAMS['sheets_loaded']
    if sheets:
        # Loop thru sheets (worksheet tabs) in spreadsheet
        for sheet in sheets:
            sheet_title = sheet.get('properties', {}).get('title')
            sheet_id = sheet.get('properties', {}).get('sheetId')

            # GET sheet_metadata and columns
            sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
            LOGGER.info('sheet_schema: {}'.format(sheet_schema))

            # Transform sheet_metadata
            sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
            # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
            sheet_metadata.append(sheet_metadata_tf)

            # SHEET_DATA
            # Should this worksheet tab be synced?
            if sheet_title in selected_streams:
                LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
                update_currently_syncing(state, sheet_title)
                selected_fields = get_selected_fields(catalog, sheet_title)
                LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
                write_schema(catalog, sheet_title)

                # Determine max range of columns and rows for "paging" through the data
                sheet_last_col_index = 1
                sheet_last_col_letter = 'A'
                for col in columns:
                    col_index = col.get('columnIndex')
                    col_letter = col.get('columnLetter')
                    if col_index > sheet_last_col_index:
                        sheet_last_col_index = col_index
                        sheet_last_col_letter = col_letter
                sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')

                # Initialize paging for 1st batch
                is_last_row = False
                batch_rows = 200
                from_row = 2
                if sheet_max_row < batch_rows:
                    to_row = sheet_max_row
                else:
                    to_row = batch_rows

                # Loop thru batches (each having 200 rows of data)
                while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
                    range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)

                    # GET sheet_data for a worksheet tab
                    sheet_data, time_extracted = get_data(
                        stream_name=sheet_title,
                        endpoint_config=sheets_loaded_config,
                        client=client,
                        spreadsheet_id=spreadsheet_id,
                        range_rows=range_rows)
                    # Data is returned as a list of arrays, an array of values for each row
                    sheet_data_rows = sheet_data.get('values')

                    # Transform batch of rows to JSON with keys for each column
                    sheet_data_tf, row_num, is_last_row = transform_sheet_data(
                        spreadsheet_id=spreadsheet_id,
                        sheet_id=sheet_id,
                        from_row=from_row,
                        columns=columns,
                        sheet_data_rows=sheet_data_rows)
                    if row_num < to_row:
                        is_last_row = True

                    # Process records, send batch of records to target
                    record_count = process_records(
                        catalog=catalog,
                        stream_name=sheet_title,
                        records=sheet_data_tf,
                        time_extracted=ss_time_extracted)
                    LOGGER.info('Sheet: {}, ecords processed: {}'.format(
                        sheet_title, record_count))

                    # Update paging from/to_row for next batch
                    from_row = to_row + 1
                    if to_row + batch_rows > sheet_max_row:
                        to_row = sheet_max_row
                    else:
                        to_row = to_row + batch_rows

                # SHEETS_LOADED
                # Add sheet to sheets_loaded
                sheet_loaded = {}
                sheet_loaded['spreadsheetId'] = spreadsheet_id
                sheet_loaded['sheetId'] = sheet_id
                sheet_loaded['title'] = sheet_title
                sheet_loaded['loadDate'] = strftime(utils.now())
                sheet_loaded['lastRowNumber'] = row_num
                sheets_loaded.append(sheet_loaded)

                # Emit a Singer ACTIVATE_VERSION message after each sheet is complete.
                # This forces hard deletes on the data downstream if fewer records are sent.
                # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
                activate_version_message = singer.ActivateVersionMessage(
                    stream=sheet_title,
                    version=int(time.time() * 1000))
                singer.write_message(activate_version_message)

                LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
                    sheet_title, row_num - 1))

    stream_name = 'sheet_metadata'
    # Sync sheet_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata)

    stream_name = 'sheets_loaded'
    # Sync sheet_metadata if selected
    sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded)

    return