aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--CHANGELOG.md3
-rw-r--r--setup.py2
-rw-r--r--tap_google_sheets/schema.py365
-rw-r--r--tap_google_sheets/sync.py214
4 files changed, 298 insertions, 286 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3c47260..81057ee 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,8 @@
1# Changelog 1# Changelog
2 2
3## 0.0.4
4 * Add logic to skip empty worksheets in Discovery and Sync mode.
5
3## 0.0.3 6## 0.0.3
4 * Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages. 7 * Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages.
5 8
diff --git a/setup.py b/setup.py
index 80c2c10..dde2bad 100644
--- a/setup.py
+++ b/setup.py
@@ -3,7 +3,7 @@
3from setuptools import setup, find_packages 3from setuptools import setup, find_packages
4 4
5setup(name='tap-google-sheets', 5setup(name='tap-google-sheets',
6 version='0.0.3', 6 version='0.0.4',
7 description='Singer.io tap for extracting data from the Google Sheets v4 API', 7 description='Singer.io tap for extracting data from the Google Sheets v4 API',
8 author='jeff.huth@bytecode.io', 8 author='jeff.huth@bytecode.io',
9 classifiers=['Programming Language :: Python :: 3 :: Only'], 9 classifiers=['Programming Language :: Python :: 3 :: Only'],
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py
index e319c03..c229d72 100644
--- a/tap_google_sheets/schema.py
+++ b/tap_google_sheets/schema.py
@@ -25,184 +25,188 @@ def get_sheet_schema_columns(sheet):
25 sheet_json_schema = OrderedDict() 25 sheet_json_schema = OrderedDict()
26 data = next(iter(sheet.get('data', [])), {}) 26 data = next(iter(sheet.get('data', [])), {})
27 row_data = data.get('rowData', []) 27 row_data = data.get('rowData', [])
28 # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse 28 if row_data == []:
29 29 # Empty sheet, SKIP
30 headers = row_data[0].get('values', []) 30 LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
31 first_values = row_data[1].get('values', []) 31 return None, None
32 # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True))) 32 else:
33 33 # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse
34 sheet_json_schema = { 34 headers = row_data[0].get('values', [])
35 'type': 'object', 35 first_values = row_data[1].get('values', [])
36 'additionalProperties': False, 36 # LOGGER.info('first_values = {}'.format(json.dumps(first_values, indent=2, sort_keys=True)))
37 'properties': { 37
38 '__sdc_spreadsheet_id': { 38 sheet_json_schema = {
39 'type': ['null', 'string'] 39 'type': 'object',
40 }, 40 'additionalProperties': False,
41 '__sdc_sheet_id': { 41 'properties': {
42 'type': ['null', 'integer'] 42 '__sdc_spreadsheet_id': {
43 }, 43 'type': ['null', 'string']
44 '__sdc_row': { 44 },
45 'type': ['null', 'integer'] 45 '__sdc_sheet_id': {
46 'type': ['null', 'integer']
47 },
48 '__sdc_row': {
49 'type': ['null', 'integer']
50 }
46 } 51 }
47 } 52 }
48 } 53
49 54 header_list = [] # used for checking uniqueness
50 header_list = [] # used for checking uniqueness 55 columns = []
51 columns = [] 56 prior_header = None
52 prior_header = None 57 i = 0
53 i = 0 58 skipped = 0
54 skipped = 0 59 # Read column headers until end or 2 consecutive skipped headers
55 # Read column headers until end or 2 consecutive skipped headers 60 for header in headers:
56 for header in headers: 61 # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True)))
57 # LOGGER.info('header = {}'.format(json.dumps(header, indent=2, sort_keys=True))) 62 column_index = i + 1
58 column_index = i + 1 63 column_letter = colnum_string(column_index)
59 column_letter = colnum_string(column_index) 64 header_value = header.get('formattedValue')
60 header_value = header.get('formattedValue') 65 if header_value: # NOT skipped
61 if header_value: # NOT skipped 66 column_is_skipped = False
62 column_is_skipped = False 67 skipped = 0
63 skipped = 0 68 column_name = '{}'.format(header_value)
64 column_name = '{}'.format(header_value) 69 if column_name in header_list:
65 if column_name in header_list: 70 raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format(
66 raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format(
67 sheet_title, column_name, column_letter))
68 header_list.append(column_name)
69
70 first_value = None
71 try:
72 first_value = first_values[i]
73 except IndexError as err:
74 raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format(
75 sheet_title, column_name, column_letter, err))
76
77 column_effective_value = first_value.get('effectiveValue', {})
78
79 col_val = None
80 if column_effective_value == {}:
81 column_effective_value_type = 'stringValue'
82 LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format(
83 sheet_title, column_name, column_letter))
84 LOGGER.info(' Setting column datatype to STRING')
85 else:
86 for key, val in column_effective_value.items():
87 if key in ('numberValue', 'stringValue', 'boolValue'):
88 column_effective_value_type = key
89 col_val = str(val)
90 elif key in ('errorType', 'formulaType'):
91 col_val = str(val)
92 raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
93 sheet_title, column_name, column_letter, key, col_val))
94
95 column_number_format = first_values[i].get('effectiveFormat', {}).get(
96 'numberFormat', {})
97 column_number_format_type = column_number_format.get('type')
98
99 # Determine datatype for sheet_json_schema
100 #
101 # column_effective_value_type = numberValue, stringValue, boolValue;
102 # INVALID: errorType, formulaType
103 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
104 #
105 # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE,
106 # TIME, DATE_TIME, SCIENTIFIC
107 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
108 #
109 column_format = None # Default
110 if column_effective_value == {}:
111 col_properties = {'type': ['null', 'string']}
112 column_gs_type = 'stringValue'
113 LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format(
114 sheet_title, column_name, column_letter)) 71 sheet_title, column_name, column_letter))
115 LOGGER.info(' Setting column datatype to STRING') 72 header_list.append(column_name)
116 elif column_effective_value_type == 'stringValue': 73
117 col_properties = {'type': ['null', 'string']} 74 first_value = None
118 column_gs_type = 'stringValue' 75 try:
119 elif column_effective_value_type == 'boolValue': 76 first_value = first_values[i]
120 col_properties = {'type': ['null', 'boolean', 'string']} 77 except IndexError as err:
121 column_gs_type = 'boolValue' 78 raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format(
122 elif column_effective_value_type == 'numberValue': 79 sheet_title, column_name, column_letter, err))
123 if column_number_format_type == 'DATE_TIME': 80
124 col_properties = { 81 column_effective_value = first_value.get('effectiveValue', {})
125 'type': ['null', 'string'], 82
126 'format': 'date-time' 83 col_val = None
127 } 84 if column_effective_value == {}:
128 column_gs_type = 'numberType.DATE_TIME' 85 column_effective_value_type = 'stringValue'
129 elif column_number_format_type == 'DATE': 86 LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format(
130 col_properties = { 87 sheet_title, column_name, column_letter))
131 'type': ['null', 'string'], 88 LOGGER.info(' Setting column datatype to STRING')
132 'format': 'date' 89 else:
133 } 90 for key, val in column_effective_value.items():
134 column_gs_type = 'numberType.DATE' 91 if key in ('numberValue', 'stringValue', 'boolValue'):
135 elif column_number_format_type == 'TIME': 92 column_effective_value_type = key
136 col_properties = { 93 col_val = str(val)
137 'type': ['null', 'string'], 94 elif key in ('errorType', 'formulaType'):
138 'format': 'time' 95 col_val = str(val)
139 } 96 raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
140 column_gs_type = 'numberType.TIME' 97 sheet_title, column_name, column_letter, key, col_val))
141 elif column_number_format_type == 'TEXT': 98
99 column_number_format = first_values[i].get('effectiveFormat', {}).get(
100 'numberFormat', {})
101 column_number_format_type = column_number_format.get('type')
102
103 # Determine datatype for sheet_json_schema
104 #
105 # column_effective_value_type = numberValue, stringValue, boolValue;
106 # INVALID: errorType, formulaType
107 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue
108 #
109 # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE,
110 # TIME, DATE_TIME, SCIENTIFIC
111 # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType
112 #
113 column_format = None # Default
114 if column_effective_value == {}:
115 col_properties = {'type': ['null', 'string']}
116 column_gs_type = 'stringValue'
117 LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format(
118 sheet_title, column_name, column_letter))
119 LOGGER.info(' Setting column datatype to STRING')
120 elif column_effective_value_type == 'stringValue':
142 col_properties = {'type': ['null', 'string']} 121 col_properties = {'type': ['null', 'string']}
143 column_gs_type = 'stringValue' 122 column_gs_type = 'stringValue'
123 elif column_effective_value_type == 'boolValue':
124 col_properties = {'type': ['null', 'boolean', 'string']}
125 column_gs_type = 'boolValue'
126 elif column_effective_value_type == 'numberValue':
127 if column_number_format_type == 'DATE_TIME':
128 col_properties = {
129 'type': ['null', 'string'],
130 'format': 'date-time'
131 }
132 column_gs_type = 'numberType.DATE_TIME'
133 elif column_number_format_type == 'DATE':
134 col_properties = {
135 'type': ['null', 'string'],
136 'format': 'date'
137 }
138 column_gs_type = 'numberType.DATE'
139 elif column_number_format_type == 'TIME':
140 col_properties = {
141 'type': ['null', 'string'],
142 'format': 'time'
143 }
144 column_gs_type = 'numberType.TIME'
145 elif column_number_format_type == 'TEXT':
146 col_properties = {'type': ['null', 'string']}
147 column_gs_type = 'stringValue'
148 else:
149 # Interesting - order in the anyOf makes a difference.
150 # Number w/ multipleOf must be listed last, otherwise errors occur.
151 col_properties = {
152 'anyOf': [
153 {
154 'type': 'null'
155 },
156 {
157 'type': 'number',
158 'multipleOf': 1e-15
159 },
160 {
161 'type': 'string'
162 }
163 ]
164 }
165 column_gs_type = 'numberType'
166 # Catch-all to deal with other types and set to string
167 # column_effective_value_type: formulaValue, errorValue, or other
144 else: 168 else:
145 # Interesting - order in the anyOf makes a difference. 169 col_properties = {'type': ['null', 'string']}
146 # Number w/ multipleOf must be listed last, otherwise errors occur. 170 column_gs_type = 'unsupportedValue'
147 col_properties = { 171 LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format(
148 'anyOf': [ 172 sheet_title, column_name, column_letter, column_effective_value_type, col_val))
149 { 173 LOGGER.info('Converting to string.')
150 'type': 'string' 174 else: # skipped
151 }, 175 column_is_skipped = True
152 { 176 skipped = skipped + 1
153 'type': 'null' 177 column_index_str = str(column_index).zfill(2)
154 }, 178 column_name = '__sdc_skip_col_{}'.format(column_index_str)
155 {
156 'type': 'number',
157 'multipleOf': 1e-15
158 }
159 ]
160 }
161 column_gs_type = 'numberType'
162 # Catch-all to deal with other types and set to string
163 # column_effective_value_type: formulaValue, errorValue, or other
164 else:
165 col_properties = {'type': ['null', 'string']} 179 col_properties = {'type': ['null', 'string']}
166 column_gs_type = 'unsupportedValue' 180 column_gs_type = 'stringValue'
167 LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( 181 LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format(
168 sheet_title, column_name, column_letter, column_effective_value_type, col_val)) 182 sheet_title, column_name, column_letter))
169 LOGGER.info('Converting to string.') 183 LOGGER.info(' This column will be skipped during data loading.')
170 else: # skipped
171 column_is_skipped = True
172 skipped = skipped + 1
173 column_index_str = str(column_index).zfill(2)
174 column_name = '__sdc_skip_col_{}'.format(column_index_str)
175 col_properties = {'type': ['null', 'string']}
176 column_gs_type = 'stringValue'
177 LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format(
178 sheet_title, column_name, column_letter))
179 LOGGER.info(' This column will be skipped during data loading.')
180
181 if skipped >= 2:
182 # skipped = 2 consecutive skipped headers
183 # Remove prior_header column_name
184 sheet_json_schema['properties'].pop(prior_header, None)
185 LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format(
186 sheet_title, column_name, column_letter))
187 break
188
189 else:
190 column = {}
191 column = {
192 'columnIndex': column_index,
193 'columnLetter': column_letter,
194 'columnName': column_name,
195 'columnType': column_gs_type,
196 'columnSkipped': column_is_skipped
197 }
198 columns.append(column)
199 184
200 sheet_json_schema['properties'][column_name] = col_properties 185 if skipped >= 2:
186 # skipped = 2 consecutive skipped headers
187 # Remove prior_header column_name
188 sheet_json_schema['properties'].pop(prior_header, None)
189 LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format(
190 sheet_title, column_name, column_letter))
191 break
201 192
202 prior_header = column_name 193 else:
203 i = i + 1 194 column = {}
195 column = {
196 'columnIndex': column_index,
197 'columnLetter': column_letter,
198 'columnName': column_name,
199 'columnType': column_gs_type,
200 'columnSkipped': column_is_skipped
201 }
202 columns.append(column)
204 203
205 return sheet_json_schema, columns 204 sheet_json_schema['properties'][column_name] = col_properties
205
206 prior_header = column_name
207 i = i + 1
208
209 return sheet_json_schema, columns
206 210
207 211
208# Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query 212# Get Header Row and 1st data row (Rows 1 & 2) from a Sheet on Spreadsheet w/ sheet_metadata query
@@ -276,17 +280,18 @@ def get_schemas(client, spreadsheet_id):
276 for sheet in sheets: 280 for sheet in sheets:
277 # GET sheet_json_schema for each worksheet (from function above) 281 # GET sheet_json_schema for each worksheet (from function above)
278 sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 282 sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
279 # LOGGER.info('columns = {}'.format(columns)) 283
280 284 # SKIP empty sheets (where sheet_json_schema and columns are None)
281 sheet_title = sheet.get('properties', {}).get('title') 285 if sheet_json_schema and columns:
282 schemas[sheet_title] = sheet_json_schema 286 sheet_title = sheet.get('properties', {}).get('title')
283 sheet_mdata = metadata.new() 287 schemas[sheet_title] = sheet_json_schema
284 sheet_mdata = metadata.get_standard_metadata( 288 sheet_mdata = metadata.new()
285 schema=sheet_json_schema, 289 sheet_mdata = metadata.get_standard_metadata(
286 key_properties=['__sdc_row'], 290 schema=sheet_json_schema,
287 valid_replication_keys=None, 291 key_properties=['__sdc_row'],
288 replication_method='FULL_TABLE' 292 valid_replication_keys=None,
289 ) 293 replication_method='FULL_TABLE'
290 field_metadata[sheet_title] = sheet_mdata 294 )
295 field_metadata[sheet_title] = sheet_mdata
291 296
292 return schemas, field_metadata 297 return schemas, field_metadata
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py
index 311281c..b77eab3 100644
--- a/tap_google_sheets/sync.py
+++ b/tap_google_sheets/sync.py
@@ -429,113 +429,117 @@ def sync(client, config, catalog, state):
429 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) 429 sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client)
430 # LOGGER.info('sheet_schema: {}'.format(sheet_schema)) 430 # LOGGER.info('sheet_schema: {}'.format(sheet_schema))
431 431
432 # Transform sheet_metadata 432 # SKIP empty sheets (where sheet_schema and columns are None)
433 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) 433 if not sheet_schema or not columns:
434 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf)) 434 LOGGER.info('SKIPPING Empty Sheet: {}'.format(sheet_title))
435 sheet_metadata.append(sheet_metadata_tf) 435 else:
436 436 # Transform sheet_metadata
437 # SHEET_DATA 437 sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns)
438 # Should this worksheet tab be synced? 438 # LOGGER.info('sheet_metadata_tf = {}'.format(sheet_metadata_tf))
439 if sheet_title in selected_streams: 439 sheet_metadata.append(sheet_metadata_tf)
440 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) 440
441 update_currently_syncing(state, sheet_title) 441 # SHEET_DATA
442 selected_fields = get_selected_fields(catalog, sheet_title) 442 # Should this worksheet tab be synced?
443 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) 443 if sheet_title in selected_streams:
444 write_schema(catalog, sheet_title) 444 LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title))
445 445 update_currently_syncing(state, sheet_title)
446 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) 446 selected_fields = get_selected_fields(catalog, sheet_title)
447 # everytime after each sheet sync is complete. 447 LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields))
448 # This forces hard deletes on the data downstream if fewer records are sent. 448 write_schema(catalog, sheet_title)
449 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 449
450 last_integer = int(get_bookmark(state, sheet_title, 0)) 450 # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs)
451 activate_version = int(time.time() * 1000) 451 # everytime after each sheet sync is complete.
452 activate_version_message = singer.ActivateVersionMessage( 452 # This forces hard deletes on the data downstream if fewer records are sent.
453 stream=sheet_title, 453 # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137
454 version=activate_version) 454 last_integer = int(get_bookmark(state, sheet_title, 0))
455 if last_integer == 0: 455 activate_version = int(time.time() * 1000)
456 # initial load, send activate_version before AND after data sync 456 activate_version_message = singer.ActivateVersionMessage(
457 singer.write_message(activate_version_message) 457 stream=sheet_title,
458 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) 458 version=activate_version)
459 459 if last_integer == 0:
460 # Determine max range of columns and rows for "paging" through the data 460 # initial load, send activate_version before AND after data sync
461 sheet_last_col_index = 1 461 singer.write_message(activate_version_message)
462 sheet_last_col_letter = 'A' 462 LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
463 for col in columns: 463
464 col_index = col.get('columnIndex') 464 # Determine max range of columns and rows for "paging" through the data
465 col_letter = col.get('columnLetter') 465 sheet_last_col_index = 1
466 if col_index > sheet_last_col_index: 466 sheet_last_col_letter = 'A'
467 sheet_last_col_index = col_index 467 for col in columns:
468 sheet_last_col_letter = col_letter 468 col_index = col.get('columnIndex')
469 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount') 469 col_letter = col.get('columnLetter')
470 470 if col_index > sheet_last_col_index:
471 # Initialize paging for 1st batch 471 sheet_last_col_index = col_index
472 is_last_row = False 472 sheet_last_col_letter = col_letter
473 batch_rows = 200 473 sheet_max_row = sheet.get('properties').get('gridProperties', {}).get('rowCount')
474 from_row = 2 474
475 if sheet_max_row < batch_rows: 475 # Initialize paging for 1st batch
476 to_row = sheet_max_row 476 is_last_row = False
477 else: 477 batch_rows = 200
478 to_row = batch_rows 478 from_row = 2
479 479 if sheet_max_row < batch_rows:
480 # Loop thru batches (each having 200 rows of data)
481 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
482 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
483
484 # GET sheet_data for a worksheet tab
485 sheet_data, time_extracted = get_data(
486 stream_name=sheet_title,
487 endpoint_config=sheets_loaded_config,
488 client=client,
489 spreadsheet_id=spreadsheet_id,
490 range_rows=range_rows)
491 # Data is returned as a list of arrays, an array of values for each row
492 sheet_data_rows = sheet_data.get('values')
493
494 # Transform batch of rows to JSON with keys for each column
495 sheet_data_tf, row_num = transform_sheet_data(
496 spreadsheet_id=spreadsheet_id,
497 sheet_id=sheet_id,
498 sheet_title=sheet_title,
499 from_row=from_row,
500 columns=columns,
501 sheet_data_rows=sheet_data_rows)
502 if row_num < to_row:
503 is_last_row = True
504
505 # Process records, send batch of records to target
506 record_count = process_records(
507 catalog=catalog,
508 stream_name=sheet_title,
509 records=sheet_data_tf,
510 time_extracted=ss_time_extracted,
511 version=activate_version)
512 LOGGER.info('Sheet: {}, records processed: {}'.format(
513 sheet_title, record_count))
514
515 # Update paging from/to_row for next batch
516 from_row = to_row + 1
517 if to_row + batch_rows > sheet_max_row:
518 to_row = sheet_max_row 480 to_row = sheet_max_row
519 else: 481 else:
520 to_row = to_row + batch_rows 482 to_row = batch_rows
521 483
522 # End of Stream: Send Activate Version and update State 484 # Loop thru batches (each having 200 rows of data)
523 singer.write_message(activate_version_message) 485 while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row:
524 write_bookmark(state, sheet_title, activate_version) 486 range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row)
525 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) 487
526 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( 488 # GET sheet_data for a worksheet tab
527 sheet_title, row_num - 2)) # subtract 1 for header row 489 sheet_data, time_extracted = get_data(
528 update_currently_syncing(state, None) 490 stream_name=sheet_title,
529 491 endpoint_config=sheets_loaded_config,
530 # SHEETS_LOADED 492 client=client,
531 # Add sheet to sheets_loaded 493 spreadsheet_id=spreadsheet_id,
532 sheet_loaded = {} 494 range_rows=range_rows)
533 sheet_loaded['spreadsheetId'] = spreadsheet_id 495 # Data is returned as a list of arrays, an array of values for each row
534 sheet_loaded['sheetId'] = sheet_id 496 sheet_data_rows = sheet_data.get('values')
535 sheet_loaded['title'] = sheet_title 497
536 sheet_loaded['loadDate'] = strftime(utils.now()) 498 # Transform batch of rows to JSON with keys for each column
537 sheet_loaded['lastRowNumber'] = row_num 499 sheet_data_tf, row_num = transform_sheet_data(
538 sheets_loaded.append(sheet_loaded) 500 spreadsheet_id=spreadsheet_id,
501 sheet_id=sheet_id,
502 sheet_title=sheet_title,
503 from_row=from_row,
504 columns=columns,
505 sheet_data_rows=sheet_data_rows)
506 if row_num < to_row:
507 is_last_row = True
508
509 # Process records, send batch of records to target
510 record_count = process_records(
511 catalog=catalog,
512 stream_name=sheet_title,
513 records=sheet_data_tf,
514 time_extracted=ss_time_extracted,
515 version=activate_version)
516 LOGGER.info('Sheet: {}, records processed: {}'.format(
517 sheet_title, record_count))
518
519 # Update paging from/to_row for next batch
520 from_row = to_row + 1
521 if to_row + batch_rows > sheet_max_row:
522 to_row = sheet_max_row
523 else:
524 to_row = to_row + batch_rows
525
526 # End of Stream: Send Activate Version and update State
527 singer.write_message(activate_version_message)
528 write_bookmark(state, sheet_title, activate_version)
529 LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version))
530 LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format(
531 sheet_title, row_num - 2)) # subtract 1 for header row
532 update_currently_syncing(state, None)
533
534 # SHEETS_LOADED
535 # Add sheet to sheets_loaded
536 sheet_loaded = {}
537 sheet_loaded['spreadsheetId'] = spreadsheet_id
538 sheet_loaded['sheetId'] = sheet_id
539 sheet_loaded['title'] = sheet_title
540 sheet_loaded['loadDate'] = strftime(utils.now())
541 sheet_loaded['lastRowNumber'] = row_num
542 sheets_loaded.append(sheet_loaded)
539 543
540 stream_name = 'sheet_metadata' 544 stream_name = 'sheet_metadata'
541 # Sync sheet_metadata if selected 545 # Sync sheet_metadata if selected