aboutsummaryrefslogtreecommitdiffhomepage
path: root/tap_google_sheets
diff options
context:
space:
mode:
authorJeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com>2020-02-24 09:53:26 -0800
committerGitHub <noreply@github.com>2020-02-24 12:53:26 -0500
commit376f1145837541d4fff2ad0e499236761f9873c3 (patch)
treecc086f18b24bda8a86c16c3ec742b89947f382ae /tap_google_sheets
parentf1d1d43c6b74a8705e91e908c582e39c68464c0c (diff)
downloadtap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.tar.gz
tap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.tar.zst
tap-google-sheets-376f1145837541d4fff2ad0e499236761f9873c3.zip
v.0.0.4 Logic to skip empty sheets (#4)v0.0.4
* v.0.0.2 schema and sync changes Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py * v.0.0.3 Sync activate version and error handling Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages. * v.0.0.4 Skip empty worksheets Add logic to skip empty worksheets in Discovery and Sync mode. * schema.py fix number datatype issue Nomber datatypes are being created as strings in targets. The JSON schema order needs to be adjusted so that order is null, number, string.
Diffstat (limited to 'tap_google_sheets')
-rw-r--r--tap_google_sheets/schema.py365
-rw-r--r--tap_google_sheets/sync.py214
2 files changed, 294 insertions, 285 deletions
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py
index e319c03..c229d72 100644
--- a/tap_google_sheets/schema.py
+++ b/tap_google_sheets/schema.py
@@ -25,184 +25,188 @@ def get_sheet_schema_columns(sheet):
25 sheet_json_schema = OrderedDict() 25 sheet_json_schema = OrderedDict()
26 data = next(iter(sheet.get('data', [])), {}) 26 data = next(iter(sheet.get('data', [])), {})
27 row_data = data.get('rowData', []) 27 row_data = data.get('rowData', [])
28 # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse 28 if row_data == []:
29 29 # Empty sheet, SKIP
30 headers = row_data[0].get('values', []) 30 LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
31 first_values = row_data[1].get('values', []) 31 return None, None
32 # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) 32 else:
33 33 # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse
34 sheet_json_schema = { 34 headers = row_data[0].get('values', [])
35 'type': 'object', 35 first_values = row_data[1].get('values', [])
36 'additionalProperties': False, 36 # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True)))
37 'properties': { 37
38 '__sdc_spreadsheet_id': { 38 sheet_json_schema = {
39 'type': ['null', 'string'] 39 'type': 'object',
40 }, 40 'additionalProperties': False,
41 '__sdc_sheet_id': { 41 'properties': {
42 'type': ['null', 'integer'] 42 '__sdc_spreadsheet_id': {
43 }, 43 'type': ['null', 'string']
44 '__sdc_row': { 44 },
45 'type': ['null', 'integer'] 45 '__sdc_sheet_id': {
46 'type': ['null', 'integer']
47 },
48 '__sdc_row': {
49 'type': ['null', 'integer']
50 }
46 } 51 }
47 } 52 }
48 } 53
49 54 header_list = [] # used for checking uniqueness
50 header_list = [] # used for checking uniqueness 55 columns = []
51 columns = [] 56 prior_header = None
52 prior_header = None 57 i = 0
53 i = 0 58 skipped = 0
54 skipped = 0 59 # Read column headers until end or 2 consecutive skipped headers
55 # Read column headers until end or 2 consecutive skipped headers 60 for header in headers:
56 for header in headers: 61 # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True)))
57 # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True))) 62 column_index = i + 1
58 column_index = i + 1 63 column_letter = colnum_string(column_index)
59 column_letter = colnum_string(column_index) 64 header_value = header.get('formattedValue')
60 header_value = header.get('formattedValue') 65 if header_value: # NOT skipped
61 if header_value: # NOT skipped 66 column_is_skipped = False
62 column_is_skipped = False 67 skipped = 0
63 skipped = 0 68 column_name = '{}'.format(header_value)
64 column_name = '{}'.format(header_value) 69 if column_name in header_list:
65 if column_name in header_list: 70 raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format(
66 raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format(
67 sheet_title, column_name, column_letter))
68 header_list.append(column_name)
69
70 first_value = None
71 try:
72 first_value = first_values[i]
73 except IndexError as err:
74 raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format(
75 sheet_title, column_name, column_letter, err))
76
77 column_effective_value = first_value.get('effectiveValue', {})
78
79 col_val = None
80 if column_effective_value == {}:
81 column_effective_value_type = 'stringValue'
82 LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format(
83 sheet_title, column_name, column_letter))
84 LOGGER.info(' Setting column datatype to STRING')
85 else:
86 for key, val in column_effective_value.items():
87 if key in ('numberValue', 'stringValue', 'boolValue'):
88 column_effective_value_type = key
89 col_val = str(val)
90 elif key in ('errorType', 'formulaType'):
91 col_val = str(val)
92 raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
93 sheet_title, column_name, column_letter, key, col_val))
94
95 column_number_format = first_values[i].get('effectiveFormat', {}).get(
96 'numberFormat', {})
97 column_number_format_type = column_number_format.get('type')
98
99 # Determine datatype for sheet_json_schema
100 #
101 # column_effective_value_type = numberValue, stringValue, boolValue;
102 # INVALID: errorType, formulaType
103 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
104 #
105 # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE,
106 # TIME, DATE_TIME, SCIENTIFIC
107 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
108 #
109 column_format = None # Default
110 if column_effective_value == {}:
111 col_properties = {'type': ['null', 'string']}
112 column_gs_type = 'stringValue'
113 LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format(
114 sheet_title, column_name, column_letter)) 71 sheet_title, column_name, column_letter))
115 LOGGER.info(' Setting column datatype to STRING') 72 header_list.append(column_name)
116 elif column_effective_value_type == 'stringValue': 73
117 col_properties = {'type': ['null', 'string']} 74 first_value = None
118 column_gs_type = 'stringValue' 75 try:
119 elif column_effective_value_type == 'boolValue': 76 first_value = first_values[i]
120 col_properties = {'type': ['null', 'boolean', 'string']} 77 except IndexError as err:
121 column_gs_type = 'boolValue' 78 raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format(
122 elif column_effective_value_type == 'numberValue': 79 sheet_title, column_name, column_letter, err))
123 if column_number_format_type == 'DATE_TIME': 80
124 col_properties = { 81 column_effective_value = first_value.get('effectiveValue', {})
125 'type': ['null', 'string'], 82
126 'format': 'date-time' 83 col_val = None
127 } 84 if column_effective_value == {}:
128 column_gs_type = 'numberType.DATE_TIME' 85 column_effective_value_type = 'stringValue'
129 elif column_number_format_type == 'DATE': 86 LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format(
130 col_properties = { 87 sheet_title, column_name, column_letter))
131 'type': ['null', 'string'], 88 LOGGER.info(' Setting column datatype to STRING')
132 'format': 'date' 89 else:
133 } 90 for key, val in column_effective_value.items():
134 column_gs_type = 'numberType.DATE' 91 if key in ('numberValue', 'stringValue', 'boolValue'):
135 elif column_number_format_type == 'TIME': 92 column_effective_value_type = key
136 col_properties = { 93 col_val = str(val)
137 'type': ['null', 'string'], 94 elif key in ('errorType', 'formulaType'):
138 'format': 'time' 95 col_val = str(val)
139 } 96 raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
140 column_gs_type = 'numberType.TIME' 97 sheet_title, column_name, column_letter, key, col_val))
141 elif column_number_format_type == 'TEXT': 98
99 column_number_format = first_values[i].get('effectiveFormat', {}).get(
100 'numberFormat', {})
101 column_number_format_type = column_number_format.get('type')
102
103 # Determine datatype for sheet_json_schema
104 #
105 # column_effective_value_type = numberValue, stringValue, boolValue;
106 # INVALID: errorType, formulaType
107 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
108 #
109 # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE,
110 # TIME, DATE_TIME, SCIENTIFIC
111 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
112 #
113 column_format = None # Default
114 if column_effective_value == {}:
115 col_properties = {'type': ['null', 'string']}
116 column_gs_type = 'stringValue'
117 LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format(
118 sheet_title, column_name, column_letter))
119 LOGGER.info(' Setting column datatype to STRING')
120 elif column_effective_value_type == 'stringValue':
142 col_properties = {'type': ['null', 'string']} 121 col_properties = {'type': ['null', 'string']}
143 column_gs_type = 'stringValue' 122 column_gs_type = 'stringValue'
123 elif column_effective_value_type == 'boolValue':
124 col_properties = {'type': ['null', 'boolean', 'string']}
125 column_gs_type = 'boolValue'
126 elif column_effective_value_type == 'numberValue':
127 if column_number_format_type == 'DATE_TIME':
128 col_properties = {
129 'type': ['null', 'string'],
130 'format': 'date-time'
131 }
132 column_gs_type = 'numberType.DATE_TIME'
133 elif column_number_format_type == 'DATE':
134 col_properties = {
135 'type': ['null', 'string'],
136 'format': 'date'
137 }
138 column_gs_type = 'numberType.DATE'
139 elif column_number_format_type == 'TIME':
140 col_properties = {
141 'type': ['null', 'string'],
142 'format': 'time'
143 }
144 column_gs_type = 'numberType.TIME'
145 elif column_number_format_type == 'TEXT':
146 col_properties = {'type': ['null', 'string']}
147 column_gs_type = 'stringValue'
148 else:
149 # Interesting - order in the anyOf makes a difference.
150 # Number w/ multipleOf must be listed last, otherwise errors occur.
151 col_properties = {
152 'anyOf': [
153 {
154 'type': 'null'
155 },
156 {
157 'type': 'number',
158 'multipleOf': 1e-15
159 },
160 {
161 'type': 'string'
162 }
163 ]
164 }
165 column_gs_type = 'numberType'
166 # Catch-all to deal with other types and set to string
167 # column_effective_value_type: formulaValue, errorValue, or other
144 else: 168 else:
145 # Interesting - order in the anyOf makes a difference. 169 col_properties = {'type': ['null', 'string']}
146 # Number w/ multipleOf must be listed last, otherwise errors occur. 170 column_gs_type = 'unsupportedValue'
147 col_properties = { 171 LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
148 'anyOf': [ 172 sheet_title, column_name, column_letter, column_effective_value_type, col_val))
149 { 173 LOGGER.info('Converting to string.')
150 'type': 'string' 174 else: # skipped
151 }, 175 column_is_skipped = True
152 { 176 skipped = skipped + 1
153 'type': 'null' 177 column_index_str = str(column_index).zfill(2)
154 }, 178 column_name = '__sdc_skip_col_{}'.format(column_index_str)
155 {
156 'type': 'number',
157 'multipleOf': 1e-15
158 }
159 ]
160 }
161 column_gs_type = 'numberType'
162 # Catch-all to deal with other types and set to string
163 # column_effective_value_type: formulaValue, errorValue, or other
164 else:
165 col_properties = {'type': ['null', 'string']} 179 col_properties = {'type': ['null', 'string']}
166 column_gs_type = 'unsupportedValue' 180 column_gs_type = 'stringValue'
167 LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( 181 LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format(
168 sheet_title, column_name, column_letter, column_effective_value_type, col_val)) 182 sheet_title, column_name, column_letter))
169 LOGGER.info('Converting to string.') 183 LOGGER.info(' This column will be skipped during data loading.')
170 else: # skipped
171 column_is_skipped = True
172 skipped = skipped + 1
173 column_index_str = str(column_index).zfill(2)
174 column_name = '__sdc_skip_col_{}'.format(column_index_str)
175 col_properties = {'type': ['null', 'string']}
176 column_gs_type = 'stringValue'
177 LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format(
178 sheet_title, column_name, column_letter))
179 LOGGER.info(' This column will be skipped during data loading.')
180
181 if skipped >= 2:
182 # skipped = 2 consecutive skipped headers
183 # Remove prior_header column_name
184 sheet_json_schema['properties'].pop(prior_header, None)
185 LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format(
186 sheet_title, column_name, column_letter))
187 break
188
189 else:
190 column = {}
191 column = {
192 'columnIndex': column_index,
193 'columnLetter': column_letter,
194 'columnName': column_name,
195 'columnType': column_gs_type,
196 'columnSkipped': column_is_skipped
197 }
198 columns.append(column)
199 184
200 sheet_json_schema['properties'][column_name] = col_properties 185 if skipped >= 2:
186 # skipped = 2 consecutive skipped headers
187 # Remove prior_header column_name
188 sheet_json_schema['properties'].pop(prior_header, None)
189 LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format(
190 sheet_title, column_name, column_letter))
191 break
201 192
202 prior_header = column_name 193 else:
203 i = i + 1 194 column = {}
195 column = {
196 'columnIndex': column_index,
197 'columnLetter': column_letter,
198 'columnName': column_name,
199 'columnType': column_gs_type,
200 'columnSkipped': column_is_skipped
201 }
202 columns.append(column)
204 203
205 return sheet_json_schema, columns 204 sheet_json_schema['properties'][column_name] = col_properties
205
206 prior_header = column_name
207 i = i + 1
208
209 return sheet_json_schema, columns
206 210
207 211
208# Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query 212# Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query
@@ -276,17 +280,18 @@ def get_schemas(client, spreadsheet_id):
276 for sheet in sheets: 280 for sheet in sheets:
277 # GET sheet_json_schema for each worksheet (from function above) 281 # GET sheet_json_schema for each worksheet (from function above)
278 sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 282 sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
279 # LOGGER.info('columns = {}'.format(columns)) 283
280 284 # SKIP empty sheets (where sheet_json_schema and columns are None)
281 sheet_title = sheet.get('properties', {}).get('title') 285 if sheet_json_schema and columns:
282 schemas[sheet_title] = sheet_json_schema 286 sheet_title = sheet.get('properties', {}).get('title')
283 sheet_mdata = metadata.new() 287 schemas[sheet_title] = sheet_json_schema
284 sheet_mdata = metadata.get_standard_metadata( 288 sheet_mdata = metadata.new()
285 schema=sheet_json_schema, 289 sheet_mdata = metadata.get_standard_metadata(
286 key_properties=['__sdc_row'], 290 schema=sheet_json_schema,
287 valid_replication_keys=None, 291 key_properties=['__sdc_row'],
288 replication_method='FULL_TABLE' 292 valid_replication_keys=None,
289 ) 293 replication_method='FULL_TABLE'
290 field_metadata[sheet_title] = sheet_mdata 294 )
295 field_metadata[sheet_title] = sheet_mdata
291 296
292 return schemas, field_metadata 297 return schemas, field_metadata
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
index 311281c..b77eab3 100644
--- a/tap_google_sheets/sync.py
+++ b/tap_google_sheets/sync.py
@@ -429,113 +429,117 @@ def sync(client, config, catalog, state):
429 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 429 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
430 # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) 430 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
431 431
432 # Transform sheet_metadata 432 # SKIP empty sheets (where sheet_schema and columns are None)
433 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) 433 if not sheet_schema or not columns:
434 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) 434 LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
435 sheet_metadata.append(sheet_metadata_tf) 435 else:
436 436 # Transform sheet_metadata
437 # SHEET_DATA 437 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
438 # Should this worksheet tab be synced? 438 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
439 if sheet_title in selected_streams: 439 sheet_metadata.append(sheet_metadata_tf)
440 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) 440
441 update_currently_syncing(state, sheet_title) 441 # SHEET_DATA
442 selected_fields = get_selected_fields(catalog, sheet_title) 442 # Should this worksheet tab be synced?
443 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) 443 if sheet_title in selected_streams:
444 write_schema(catalog, sheet_title) 444 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
445 445 update_currently_syncing(state, sheet_title)
446 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) 446 selected_fields = get_selected_fields(catalog, sheet_title)
447 # everytime after each sheet sync is complete. 447 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
448 # This forces hard deletes on the data downstream if fewer records are sent. 448 write_schema(catalog, sheet_title)
449 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 449
450 last_integer = int(get_bookmark(state, sheet_title, 0)) 450 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
451 activate_version = int(time.time() * 1000) 451 # everytime after each sheet sync is complete.
452 activate_version_message = singer.ActivateVersionMessage( 452 # This forces hard deletes on the data downstream if fewer records are sent.
453 stream=sheet_title, 453 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
454 version=activate_version) 454 last_integer = int(get_bookmark(state, sheet_title, 0))
455 if last_integer == 0: 455 activate_version = int(time.time() * 1000)
456 # initial load, send activate_version before AND after data sync 456 activate_version_message = singer.ActivateVersionMessage(
457 singer.write_message(activate_version_message) 457 stream=sheet_title,
458 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) 458 version=activate_version)
459 459 if last_integer == 0:
460 # Determine max range of columns and rows for "paging" through the data 460 # initial load, send activate_version before AND after data sync
461 sheet_last_col_index = 1 461 singer.write_message(activate_version_message)
462 sheet_last_col_letter = 'A' 462 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
463 for col in columns: 463
464 col_index = col.get('columnIndex') 464 # Determine max range of columns and rows for "paging" through the data
465 col_letter = col.get('columnLetter') 465 sheet_last_col_index = 1
466 if col_index > sheet_last_col_index: 466 sheet_last_col_letter = 'A'
467 sheet_last_col_index = col_index 467 for col in columns:
468 sheet_last_col_letter = col_letter 468 col_index = col.get('columnIndex')
469 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') 469 col_letter = col.get('columnLetter')
470 470 if col_index > sheet_last_col_index:
471 # Initialize paging for 1st batch 471 sheet_last_col_index = col_index
472 is_last_row = False 472 sheet_last_col_letter = col_letter
473 batch_rows = 200 473 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
474 from_row = 2 474
475 if sheet_max_row < batch_rows: 475 # Initialize paging for 1st batch
476 to_row = sheet_max_row 476 is_last_row = False
477 else: 477 batch_rows = 200
478 to_row = batch_rows 478 from_row = 2
479 479 if sheet_max_row < batch_rows:
480 # Loop thru batches (each having 200 rows of data)
481 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
482 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
483
484 # GET sheet_data for a worksheet tab
485 sheet_data, time_extracted = get_data(
486 stream_name=sheet_title,
487 endpoint_config=sheets_loaded_config,
488 client=client,
489 spreadsheet_id=spreadsheet_id,
490 range_rows=range_rows)
491 # Data is returned as a list of arrays, an array of values for each row
492 sheet_data_rows = sheet_data.get('values')
493
494 # Transform batch of rows to JSON with keys for each column
495 sheet_data_tf, row_num = transform_sheet_data(
496 spreadsheet_id=spreadsheet_id,
497 sheet_id=sheet_id,
498 sheet_title=sheet_title,
499 from_row=from_row,
500 columns=columns,
501 sheet_data_rows=sheet_data_rows)
502 if row_num < to_row:
503 is_last_row = True
504
505 # Process records, send batch of records to target
506 record_count = process_records(
507 catalog=catalog,
508 stream_name=sheet_title,
509 records=sheet_data_tf,
510 time_extracted=ss_time_extracted,
511 version=activate_version)
512 LOGGER.info('Sheet: {}, records processed: {}'.format(
513 sheet_title, record_count))
514
515 # Update paging from/to_row for next batch
516 from_row = to_row + 1
517 if to_row + batch_rows > sheet_max_row:
518 to_row = sheet_max_row 480 to_row = sheet_max_row
519 else: 481 else:
520 to_row = to_row + batch_rows 482 to_row = batch_rows
521 483
522 # End of Stream: Send Activate Version and update State 484 # Loop thru batches (each having 200 rows of data)
523 singer.write_message(activate_version_message) 485 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
524 write_bookmark(state, sheet_title, activate_version) 486 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
525 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) 487
526 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( 488 # GET sheet_data for a worksheet tab
527 sheet_title, row_num - 2)) # subtract 1 for header row 489 sheet_data, time_extracted = get_data(
528 update_currently_syncing(state, None) 490 stream_name=sheet_title,
529 491 endpoint_config=sheets_loaded_config,
530 # SHEETS_LOADED 492 client=client,
531 # Add sheet to sheets_loaded 493 spreadsheet_id=spreadsheet_id,
532 sheet_loaded = {} 494 range_rows=range_rows)
533 sheet_loaded['spreadsheetId'] = spreadsheet_id 495 # Data is returned as a list of arrays, an array of values for each row
534 sheet_loaded['sheetId'] = sheet_id 496 sheet_data_rows = sheet_data.get('values')
535 sheet_loaded['title'] = sheet_title 497
536 sheet_loaded['loadDate'] = strftime(utils.now()) 498 # Transform batch of rows to JSON with keys for each column
537 sheet_loaded['lastRowNumber'] = row_num 499 sheet_data_tf, row_num = transform_sheet_data(
538 sheets_loaded.append(sheet_loaded) 500 spreadsheet_id=spreadsheet_id,
501 sheet_id=sheet_id,
502 sheet_title=sheet_title,
503 from_row=from_row,
504 columns=columns,
505 sheet_data_rows=sheet_data_rows)
506 if row_num < to_row:
507 is_last_row = True
508
509 # Process records, send batch of records to target
510 record_count = process_records(
511 catalog=catalog,
512 stream_name=sheet_title,
513 records=sheet_data_tf,
514 time_extracted=ss_time_extracted,
515 version=activate_version)
516 LOGGER.info('Sheet: {}, records processed: {}'.format(
517 sheet_title, record_count))
518
519 # Update paging from/to_row for next batch
520 from_row = to_row + 1
521 if to_row + batch_rows > sheet_max_row:
522 to_row = sheet_max_row
523 else:
524 to_row = to_row + batch_rows
525
526 # End of Stream: Send Activate Version and update State
527 singer.write_message(activate_version_message)
528 write_bookmark(state, sheet_title, activate_version)
529 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
530 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
531 sheet_title, row_num - 2)) # subtract 1 for header row
532 update_currently_syncing(state, None)
533
534 # SHEETS_LOADED
535 # Add sheet to sheets_loaded
536 sheet_loaded = {}
537 sheet_loaded['spreadsheetId'] = spreadsheet_id
538 sheet_loaded['sheetId'] = sheet_id
539 sheet_loaded['title'] = sheet_title
540 sheet_loaded['loadDate'] = strftime(utils.now())
541 sheet_loaded['lastRowNumber'] = row_num
542 sheets_loaded.append(sheet_loaded)
539 543
540 stream_name = 'sheet_metadata' 544 stream_name = 'sheet_metadata'
541 # Sync sheet_metadata if selected 545 # Sync sheet_metadata if selected