diff options
Diffstat (limited to 'tap_google_sheets')
-rw-r--r-- | tap_google_sheets/client.py | 8 | ||||
-rw-r--r-- | tap_google_sheets/discover.py | 6 | ||||
-rw-r--r-- | tap_google_sheets/schema.py | 56 | ||||
-rw-r--r-- | tap_google_sheets/streams.py | 7 | ||||
-rw-r--r-- | tap_google_sheets/sync.py | 65 |
5 files changed, 75 insertions, 67 deletions
diff --git a/tap_google_sheets/client.py b/tap_google_sheets/client.py index 0a0ce5a..4f38352 100644 --- a/tap_google_sheets/client.py +++ b/tap_google_sheets/client.py | |||
@@ -1,8 +1,7 @@ | |||
1 | from datetime import datetime, timedelta | 1 | from datetime import datetime, timedelta |
2 | from collections import OrderedDict | ||
2 | import backoff | 3 | import backoff |
3 | import requests | 4 | import requests |
4 | from collections import OrderedDict | ||
5 | |||
6 | import singer | 5 | import singer |
7 | from singer import metrics | 6 | from singer import metrics |
8 | from singer import utils | 7 | from singer import utils |
@@ -123,8 +122,7 @@ def raise_for_error(response): | |||
123 | error_code = response.get('error', {}).get('code') | 122 | error_code = response.get('error', {}).get('code') |
124 | ex = get_exception_for_error_code(error_code) | 123 | ex = get_exception_for_error_code(error_code) |
125 | raise ex(message) | 124 | raise ex(message) |
126 | else: | 125 | raise GoogleError(error) |
127 | raise GoogleError(error) | ||
128 | except (ValueError, TypeError): | 126 | except (ValueError, TypeError): |
129 | raise GoogleError(error) | 127 | raise GoogleError(error) |
130 | 128 | ||
@@ -196,9 +194,7 @@ class GoogleClient: # pylint: disable=too-many-instance-attributes | |||
196 | factor=3) | 194 | factor=3) |
197 | @utils.ratelimit(100, 100) | 195 | @utils.ratelimit(100, 100) |
198 | def request(self, method, path=None, url=None, api=None, **kwargs): | 196 | def request(self, method, path=None, url=None, api=None, **kwargs): |
199 | |||
200 | self.get_access_token() | 197 | self.get_access_token() |
201 | |||
202 | self.base_url = 'https://sheets.googleapis.com/v4' | 198 | self.base_url = 'https://sheets.googleapis.com/v4' |
203 | if api == 'files': | 199 | if api == 'files': |
204 | self.base_url = 'https://www.googleapis.com/drive/v3' | 200 | self.base_url = 'https://www.googleapis.com/drive/v3' |
diff --git a/tap_google_sheets/discover.py b/tap_google_sheets/discover.py index 6477a5f..6cf0d09 100644 --- a/tap_google_sheets/discover.py +++ b/tap_google_sheets/discover.py | |||
@@ -10,11 +10,11 @@ def discover(client, spreadsheet_id): | |||
10 | schema = Schema.from_dict(schema_dict) | 10 | schema = Schema.from_dict(schema_dict) |
11 | mdata = field_metadata[stream_name] | 11 | mdata = field_metadata[stream_name] |
12 | key_properties = None | 12 | key_properties = None |
13 | for md in mdata: | 13 | for mdt in mdata: |
14 | table_key_properties = md.get('metadata', {}).get('table-key-properties') | 14 | table_key_properties = mdt.get('metadata', {}).get('table-key-properties') |
15 | if table_key_properties: | 15 | if table_key_properties: |
16 | key_properties = table_key_properties | 16 | key_properties = table_key_properties |
17 | 17 | ||
18 | catalog.streams.append(CatalogEntry( | 18 | catalog.streams.append(CatalogEntry( |
19 | stream=stream_name, | 19 | stream=stream_name, |
20 | tap_stream_id=stream_name, | 20 | tap_stream_id=stream_name, |
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index 237ab06..d4fead5 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py | |||
@@ -11,19 +11,19 @@ LOGGER = singer.get_logger() | |||
11 | # https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata | 11 | # https://github.com/singer-io/getting-started/blob/master/docs/DISCOVERY_MODE.md#Metadata |
12 | 12 | ||
13 | # Convert column index to column letter | 13 | # Convert column index to column letter |
14 | def colnum_string(n): | 14 | def colnum_string(num): |
15 | string = "" | 15 | string = "" |
16 | while n > 0: | 16 | while num > 0: |
17 | n, remainder = divmod(n - 1, 26) | 17 | num, remainder = divmod(num - 1, 26) |
18 | string = chr(65 + remainder) + string | 18 | string = chr(65 + remainder) + string |
19 | return string | 19 | return string |
20 | 20 | ||
21 | 21 | ||
22 | # Create sheet_metadata_json with columns from sheet | 22 | # Create sheet_metadata_json with columns from sheet |
23 | def get_sheet_schema_columns(sheet, spreadsheet_id, client): | 23 | def get_sheet_schema_columns(sheet): |
24 | sheet_json_schema = OrderedDict() | 24 | sheet_json_schema = OrderedDict() |
25 | data = next(iter(sheet.get('data', [])), {}) | 25 | data = next(iter(sheet.get('data', [])), {}) |
26 | row_data = data.get('rowData',[]) | 26 | row_data = data.get('rowData', []) |
27 | # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse | 27 | # spreadsheet is an OrderedDict, with orderd sheets and rows in the repsonse |
28 | 28 | ||
29 | headers = row_data[0].get('values', []) | 29 | headers = row_data[0].get('values', []) |
@@ -65,33 +65,32 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client): | |||
65 | column_name = '{}'.format(header_value) | 65 | column_name = '{}'.format(header_value) |
66 | if column_name in header_list: | 66 | if column_name in header_list: |
67 | raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) | 67 | raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) |
68 | else: | 68 | header_list.append(column_name) |
69 | header_list.append(column_name) | ||
70 | 69 | ||
71 | first_value = first_values[i] | 70 | first_value = first_values[i] |
72 | # LOGGER.info('first_value[{}] = {}'.format(i, json.dumps(first_value, indent=2, sort_keys=True))) | ||
73 | 71 | ||
74 | column_effective_value = first_value.get('effectiveValue', {}) | 72 | column_effective_value = first_value.get('effectiveValue', {}) |
75 | for key in column_effective_value.keys(): | 73 | for key in column_effective_value.keys(): |
76 | if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): | 74 | if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): |
77 | column_effective_value_type = key | 75 | column_effective_value_type = key |
78 | 76 | ||
79 | column_number_format = first_values[i].get('effectiveFormat', {}).get('numberFormat', {}) | 77 | column_number_format = first_values[i].get('effectiveFormat', {}).get( |
78 | 'numberFormat', {}) | ||
80 | column_number_format_type = column_number_format.get('type') | 79 | column_number_format_type = column_number_format.get('type') |
81 | 80 | ||
82 | # Determine datatype for sheet_json_schema | 81 | # Determine datatype for sheet_json_schema |
83 | # | 82 | # |
84 | # column_effective_value_type = numberValue, stringValue, boolValue; INVALID: errorType, formulaType | 83 | # column_effective_value_type = numberValue, stringValue, boolValue; |
85 | # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue | 84 | # INVALID: errorType, formulaType |
85 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue | ||
86 | # | 86 | # |
87 | # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC | 87 | # column_number_format_type = UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, |
88 | # Reference: https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType | 88 | # TIME, DATE_TIME, SCIENTIFIC |
89 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType | ||
89 | # | 90 | # |
90 | column_format = None # Default | 91 | column_format = None # Default |
91 | # column_multiple_of = None # Default | 92 | # column_multiple_of = None # Default |
92 | if column_effective_value_type in ('formulaValue', 'errorValue'): | 93 | if column_effective_value_type == 'stringValue': |
93 | raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name)) | ||
94 | elif column_effective_value_type == 'stringValue': | ||
95 | column_type = ['null', 'string'] | 94 | column_type = ['null', 'string'] |
96 | column_gs_type = 'stringValue' | 95 | column_gs_type = 'stringValue' |
97 | elif column_effective_value_type == 'boolValue': | 96 | elif column_effective_value_type == 'boolValue': |
@@ -116,7 +115,9 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client): | |||
116 | else: | 115 | else: |
117 | column_type = ['null', 'number', 'string'] | 116 | column_type = ['null', 'number', 'string'] |
118 | column_gs_type = 'numberType' | 117 | column_gs_type = 'numberType' |
119 | 118 | elif column_effective_value_type in ('formulaValue', 'errorValue'): | |
119 | raise Exception('INVALID DATA TYPE ERROR: {}, value: {}'.format(column_name, \ | ||
120 | column_effective_value_type)) | ||
120 | else: # skipped | 121 | else: # skipped |
121 | column_is_skipped = True | 122 | column_is_skipped = True |
122 | skipped = skipped + 1 | 123 | skipped = skipped + 1 |
@@ -130,7 +131,6 @@ def get_sheet_schema_columns(sheet, spreadsheet_id, client): | |||
130 | # skipped = 2 consecutive skipped headers | 131 | # skipped = 2 consecutive skipped headers |
131 | # Remove prior_header column_name | 132 | # Remove prior_header column_name |
132 | sheet_json_schema['properties'].pop(prior_header, None) | 133 | sheet_json_schema['properties'].pop(prior_header, None) |
133 | column_count = i - 1 | ||
134 | break | 134 | break |
135 | 135 | ||
136 | else: | 136 | else: |
@@ -164,12 +164,14 @@ def get_sheet_metadata(sheet, spreadsheet_id, client): | |||
164 | stream_metadata = STREAMS.get(stream_name) | 164 | stream_metadata = STREAMS.get(stream_name) |
165 | api = stream_metadata.get('api', 'sheets') | 165 | api = stream_metadata.get('api', 'sheets') |
166 | params = stream_metadata.get('params', {}) | 166 | params = stream_metadata.get('params', {}) |
167 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]).replace('{sheet_title}', sheet_title) | 167 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in \ |
168 | path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring) | 168 | params.items()]).replace('{sheet_title}', sheet_title) |
169 | path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \ | ||
170 | spreadsheet_id), querystring) | ||
169 | 171 | ||
170 | sheet_md_results = client.get(path=path, api=api, endpoint=stream_name) | 172 | sheet_md_results = client.get(path=path, api=api, endpoint=stream_name) |
171 | sheet_cols = sheet_md_results.get('sheets')[0] | 173 | sheet_cols = sheet_md_results.get('sheets')[0] |
172 | sheet_schema, columns = get_sheet_schema_columns(sheet_cols, spreadsheet_id, client) | 174 | sheet_schema, columns = get_sheet_schema_columns(sheet_cols) |
173 | 175 | ||
174 | return sheet_schema, columns | 176 | return sheet_schema, columns |
175 | 177 | ||
@@ -199,20 +201,22 @@ def get_schemas(client, spreadsheet_id): | |||
199 | replication_method=stream_metadata.get('replication_method', None) | 201 | replication_method=stream_metadata.get('replication_method', None) |
200 | ) | 202 | ) |
201 | field_metadata[stream_name] = mdata | 203 | field_metadata[stream_name] = mdata |
202 | 204 | ||
203 | if stream_name == 'spreadsheet_metadata': | 205 | if stream_name == 'spreadsheet_metadata': |
204 | api = stream_metadata.get('api', 'sheets') | 206 | api = stream_metadata.get('api', 'sheets') |
205 | params = stream_metadata.get('params', {}) | 207 | params = stream_metadata.get('params', {}) |
206 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]) | 208 | querystring = '&'.join(['%s=%s' % (key, value) for (key, value) in params.items()]) |
207 | path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', spreadsheet_id), querystring) | 209 | path = '{}?{}'.format(stream_metadata.get('path').replace('{spreadsheet_id}', \ |
210 | spreadsheet_id), querystring) | ||
208 | 211 | ||
209 | spreadsheet_md_results = client.get(path=path, params=querystring, api=api, endpoint=stream_name) | 212 | spreadsheet_md_results = client.get(path=path, params=querystring, api=api, \ |
213 | endpoint=stream_name) | ||
210 | 214 | ||
211 | sheets = spreadsheet_md_results.get('sheets') | 215 | sheets = spreadsheet_md_results.get('sheets') |
212 | if sheets: | 216 | if sheets: |
213 | for sheet in sheets: | 217 | for sheet in sheets: |
214 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 218 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
215 | # LOGGER.info('sheet_schema = {}'.format(json.dumps(sheet_schema, indent=2, sort_keys=True))) | 219 | LOGGER.info('columns = {}'.format(columns)) |
216 | 220 | ||
217 | sheet_title = sheet.get('properties', {}).get('title') | 221 | sheet_title = sheet.get('properties', {}).get('title') |
218 | schemas[sheet_title] = sheet_schema | 222 | schemas[sheet_title] = sheet_schema |
@@ -224,5 +228,5 @@ def get_schemas(client, spreadsheet_id): | |||
224 | replication_method='FULL_TABLE' | 228 | replication_method='FULL_TABLE' |
225 | ) | 229 | ) |
226 | field_metadata[sheet_title] = sheet_mdata | 230 | field_metadata[sheet_title] = sheet_mdata |
227 | 231 | ||
228 | return schemas, field_metadata | 232 | return schemas, field_metadata |
diff --git a/tap_google_sheets/streams.py b/tap_google_sheets/streams.py index 231a41d..b8e3eff 100644 --- a/tap_google_sheets/streams.py +++ b/tap_google_sheets/streams.py | |||
@@ -8,11 +8,10 @@ from collections import OrderedDict | |||
8 | # key_properties: Primary key fields for identifying an endpoint record. | 8 | # key_properties: Primary key fields for identifying an endpoint record. |
9 | # replication_method: INCREMENTAL or FULL_TABLE | 9 | # replication_method: INCREMENTAL or FULL_TABLE |
10 | # replication_keys: bookmark_field(s), typically a date-time, used for filtering the results | 10 | # replication_keys: bookmark_field(s), typically a date-time, used for filtering the results |
11 | # and setting the state | 11 | # and setting the state |
12 | # params: Query, sort, and other endpoint specific parameters; default = {} | 12 | # params: Query, sort, and other endpoint specific parameters; default = {} |
13 | # data_key: JSON element containing the results list for the endpoint; default = root (no data_key) | 13 | # data_key: JSON element containing the results list for the endpoint; |
14 | # bookmark_query_field: From date-time field used for filtering the query | 14 | # default = root (no data_key) |
15 | # bookmark_type: Data type for bookmark, integer or datetime | ||
16 | 15 | ||
17 | FILE_METADATA = { | 16 | FILE_METADATA = { |
18 | "api": "files", | 17 | "api": "files", |
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 79e05f9..d7d7184 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -1,10 +1,9 @@ | |||
1 | import time | 1 | import time |
2 | import math | 2 | import math |
3 | import singer | ||
4 | import json | 3 | import json |
5 | import pytz | ||
6 | from datetime import datetime, timedelta | 4 | from datetime import datetime, timedelta |
7 | from collections import OrderedDict | 5 | import pytz |
6 | import singer | ||
8 | from singer import metrics, metadata, Transformer, utils | 7 | from singer import metrics, metadata, Transformer, utils |
9 | from singer.utils import strptime_to_utc, strftime | 8 | from singer.utils import strptime_to_utc, strftime |
10 | from tap_google_sheets.streams import STREAMS | 9 | from tap_google_sheets.streams import STREAMS |
@@ -71,17 +70,21 @@ def process_records(catalog, | |||
71 | return counter.value | 70 | return counter.value |
72 | 71 | ||
73 | 72 | ||
74 | def sync_stream(stream_name, selected_streams, catalog, state, records): | 73 | def sync_stream(stream_name, selected_streams, catalog, state, records, time_extracted=None): |
75 | # Should sheets_loaded be synced? | 74 | # Should sheets_loaded be synced? |
76 | if stream_name in selected_streams: | 75 | if stream_name in selected_streams: |
77 | LOGGER.info('STARTED Syncing {}'.format(stream_name)) | 76 | LOGGER.info('STARTED Syncing {}'.format(stream_name)) |
78 | update_currently_syncing(state, stream_name) | 77 | update_currently_syncing(state, stream_name) |
78 | selected_fields = get_selected_fields(catalog, stream_name) | ||
79 | LOGGER.info('Stream: {}, selected_fields: {}'.format(stream_name, selected_fields)) | ||
79 | write_schema(catalog, stream_name) | 80 | write_schema(catalog, stream_name) |
81 | if not time_extracted: | ||
82 | time_extracted = utils.now() | ||
80 | record_count = process_records( | 83 | record_count = process_records( |
81 | catalog=catalog, | 84 | catalog=catalog, |
82 | stream_name=stream_name, | 85 | stream_name=stream_name, |
83 | records=records, | 86 | records=records, |
84 | time_extracted=utils.now()) | 87 | time_extracted=time_extracted) |
85 | LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) | 88 | LOGGER.info('FINISHED Syncing {}, Total Records: {}'.format(stream_name, record_count)) |
86 | update_currently_syncing(state, None) | 89 | update_currently_syncing(state, None) |
87 | 90 | ||
@@ -105,9 +108,9 @@ def get_selected_fields(catalog, stream_name): | |||
105 | mdata_list = singer.metadata.to_list(mdata) | 108 | mdata_list = singer.metadata.to_list(mdata) |
106 | selected_fields = [] | 109 | selected_fields = [] |
107 | for entry in mdata_list: | 110 | for entry in mdata_list: |
108 | field = None | 111 | field = None |
109 | try: | 112 | try: |
110 | field = entry['breadcrumb'][1] | 113 | field = entry['breadcrumb'][1] |
111 | if entry.get('metadata', {}).get('selected', False): | 114 | if entry.get('metadata', {}).get('selected', False): |
112 | selected_fields.append(field) | 115 | selected_fields.append(field) |
113 | except IndexError: | 116 | except IndexError: |
@@ -172,7 +175,7 @@ def transform_spreadsheet_metadata(spreadsheet_metadata): | |||
172 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): | 175 | def transform_sheet_metadata(spreadsheet_id, sheet, columns): |
173 | # Convert to properties to dict | 176 | # Convert to properties to dict |
174 | sheet_metadata = sheet.get('properties') | 177 | sheet_metadata = sheet.get('properties') |
175 | sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) | 178 | sheet_metadata_tf = json.loads(json.dumps(sheet_metadata)) |
176 | sheet_id = sheet_metadata_tf.get('sheetId') | 179 | sheet_id = sheet_metadata_tf.get('sheetId') |
177 | sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( | 180 | sheet_url = 'https://docs.google.com/spreadsheets/d/{}/edit#gid={}'.format( |
178 | spreadsheet_id, sheet_id) | 181 | spreadsheet_id, sheet_id) |
@@ -187,13 +190,13 @@ def transform_sheet_metadata(spreadsheet_id, sheet, columns): | |||
187 | def excel_to_dttm_str(excel_date_sn, timezone_str=None): | 190 | def excel_to_dttm_str(excel_date_sn, timezone_str=None): |
188 | if not timezone_str: | 191 | if not timezone_str: |
189 | timezone_str = 'UTC' | 192 | timezone_str = 'UTC' |
190 | tz = pytz.timezone(timezone_str) | 193 | tzn = pytz.timezone(timezone_str) |
191 | sec_per_day = 86400 | 194 | sec_per_day = 86400 |
192 | excel_epoch = 25569 # 1970-01-01T00:00:00Z | 195 | excel_epoch = 25569 # 1970-01-01T00:00:00Z |
193 | epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) | 196 | epoch_sec = math.floor((excel_date_sn - excel_epoch) * sec_per_day) |
194 | epoch_dttm = datetime(1970, 1, 1) | 197 | epoch_dttm = datetime(1970, 1, 1) |
195 | excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) | 198 | excel_dttm = epoch_dttm + timedelta(seconds=epoch_sec) |
196 | utc_dttm = tz.localize(excel_dttm).astimezone(pytz.utc) | 199 | utc_dttm = tzn.localize(excel_dttm).astimezone(pytz.utc) |
197 | utc_dttm_str = strftime(utc_dttm) | 200 | utc_dttm_str = strftime(utc_dttm) |
198 | return utc_dttm_str | 201 | return utc_dttm_str |
199 | 202 | ||
@@ -205,7 +208,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
205 | is_last_row = False | 208 | is_last_row = False |
206 | row_num = from_row | 209 | row_num = from_row |
207 | # Create sorted list of columns based on columnIndex | 210 | # Create sorted list of columns based on columnIndex |
208 | cols = sorted(columns, key = lambda i: i['columnIndex']) | 211 | cols = sorted(columns, key=lambda i: i['columnIndex']) |
209 | 212 | ||
210 | # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) | 213 | # LOGGER.info('sheet_data_rows: {}'.format(sheet_data_rows)) |
211 | for row in sheet_data_rows: | 214 | for row in sheet_data_rows: |
@@ -228,17 +231,17 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
228 | col_type = col.get('columnType') | 231 | col_type = col.get('columnType') |
229 | # Convert dates/times from Lotus Notes Serial Numbers | 232 | # Convert dates/times from Lotus Notes Serial Numbers |
230 | if col_type == 'numberType.DATE_TIME': | 233 | if col_type == 'numberType.DATE_TIME': |
231 | if isinstance(value, int) or isinstance(value, float): | 234 | if isinstance(value, (int, float)): |
232 | col_val = excel_to_dttm_str(value) | 235 | col_val = excel_to_dttm_str(value) |
233 | else: | 236 | else: |
234 | col_val = str(value) | 237 | col_val = str(value) |
235 | elif col_type == 'numberType.DATE': | 238 | elif col_type == 'numberType.DATE': |
236 | if isinstance(value, int) or isinstance(value, float): | 239 | if isinstance(value, (int, float)): |
237 | col_val = excel_to_dttm_str(value)[:10] | 240 | col_val = excel_to_dttm_str(value)[:10] |
238 | else: | 241 | else: |
239 | col_val = str(value) | 242 | col_val = str(value) |
240 | elif col_type == 'numberType.TIME': | 243 | elif col_type == 'numberType.TIME': |
241 | if isinstance(value, int) or isinstance(value, float): | 244 | if isinstance(value, (int, float)): |
242 | try: | 245 | try: |
243 | total_secs = value * 86400 # seconds in day | 246 | total_secs = value * 86400 # seconds in day |
244 | col_val = str(timedelta(seconds=total_secs)) | 247 | col_val = str(timedelta(seconds=total_secs)) |
@@ -267,7 +270,7 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
267 | else: | 270 | else: |
268 | col_val = str(value) | 271 | col_val = str(value) |
269 | elif isinstance(value, int): | 272 | elif isinstance(value, int): |
270 | if value == 1 or value == -1: | 273 | if value in (1, -1): |
271 | col_val = True | 274 | col_val = True |
272 | elif value == 0: | 275 | elif value == 0: |
273 | col_val = False | 276 | col_val = False |
@@ -321,11 +324,10 @@ def sync(client, config, catalog, state): | |||
321 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) | 324 | LOGGER.info('last_datetime = {}, this_datetime = {}'.format(last_datetime, this_datetime)) |
322 | if this_datetime <= last_datetime: | 325 | if this_datetime <= last_datetime: |
323 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') | 326 | LOGGER.info('this_datetime <= last_datetime, FILE NOT CHANGED. EXITING.') |
324 | return 0 | 327 | return |
325 | else: | 328 | # Sync file_metadata if selected |
326 | # Sync file_metadata if selected | 329 | sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf, time_extracted) |
327 | sync_stream(stream_name, selected_streams, catalog, state, file_metadata_tf) | 330 | write_bookmark(state, stream_name, strftime(this_datetime)) |
328 | write_bookmark(state, stream_name, strftime(this_datetime)) | ||
329 | 331 | ||
330 | # SPREADSHEET_METADATA | 332 | # SPREADSHEET_METADATA |
331 | spreadsheet_metadata = {} | 333 | spreadsheet_metadata = {} |
@@ -334,7 +336,7 @@ def sync(client, config, catalog, state): | |||
334 | 336 | ||
335 | # GET spreadsheet_metadata | 337 | # GET spreadsheet_metadata |
336 | LOGGER.info('GET spreadsheet_meatadata') | 338 | LOGGER.info('GET spreadsheet_meatadata') |
337 | spreadsheet_metadata, ss_time_extracted = get_data( | 339 | spreadsheet_metadata, ss_time_extracted = get_data( |
338 | stream_name=stream_name, | 340 | stream_name=stream_name, |
339 | endpoint_config=spreadsheet_metadata_config, | 341 | endpoint_config=spreadsheet_metadata_config, |
340 | client=client, | 342 | client=client, |
@@ -345,7 +347,8 @@ def sync(client, config, catalog, state): | |||
345 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) | 347 | spreadsheet_metadata_tf = transform_spreadsheet_metadata(spreadsheet_metadata) |
346 | 348 | ||
347 | # Sync spreadsheet_metadata if selected | 349 | # Sync spreadsheet_metadata if selected |
348 | sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf) | 350 | sync_stream(stream_name, selected_streams, catalog, state, spreadsheet_metadata_tf, \ |
351 | ss_time_extracted) | ||
349 | 352 | ||
350 | # SHEET_METADATA and SHEET_DATA | 353 | # SHEET_METADATA and SHEET_DATA |
351 | sheets = spreadsheet_metadata.get('sheets') | 354 | sheets = spreadsheet_metadata.get('sheets') |
@@ -360,6 +363,7 @@ def sync(client, config, catalog, state): | |||
360 | 363 | ||
361 | # GET sheet_metadata and columns | 364 | # GET sheet_metadata and columns |
362 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 365 | sheet_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
366 | LOGGER.info('sheet_schema: {}'.format(sheet_schema)) | ||
363 | 367 | ||
364 | # Transform sheet_metadata | 368 | # Transform sheet_metadata |
365 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) | 369 | sheet_metadata_tf = transform_sheet_metadata(spreadsheet_id, sheet, columns) |
@@ -371,6 +375,8 @@ def sync(client, config, catalog, state): | |||
371 | if sheet_title in selected_streams: | 375 | if sheet_title in selected_streams: |
372 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) | 376 | LOGGER.info('STARTED Syncing Sheet {}'.format(sheet_title)) |
373 | update_currently_syncing(state, sheet_title) | 377 | update_currently_syncing(state, sheet_title) |
378 | selected_fields = get_selected_fields(catalog, sheet_title) | ||
379 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | ||
374 | write_schema(catalog, sheet_title) | 380 | write_schema(catalog, sheet_title) |
375 | 381 | ||
376 | # Determine max range of columns and rows for "paging" through the data | 382 | # Determine max range of columns and rows for "paging" through the data |
@@ -396,7 +402,7 @@ def sync(client, config, catalog, state): | |||
396 | # Loop thru batches (each having 200 rows of data) | 402 | # Loop thru batches (each having 200 rows of data) |
397 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: | 403 | while not is_last_row and from_row < sheet_max_row and to_row <= sheet_max_row: |
398 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) | 404 | range_rows = 'A{}:{}{}'.format(from_row, sheet_last_col_letter, to_row) |
399 | 405 | ||
400 | # GET sheet_data for a worksheet tab | 406 | # GET sheet_data for a worksheet tab |
401 | sheet_data, time_extracted = get_data( | 407 | sheet_data, time_extracted = get_data( |
402 | stream_name=sheet_title, | 408 | stream_name=sheet_title, |
@@ -423,7 +429,9 @@ def sync(client, config, catalog, state): | |||
423 | stream_name=sheet_title, | 429 | stream_name=sheet_title, |
424 | records=sheet_data_tf, | 430 | records=sheet_data_tf, |
425 | time_extracted=ss_time_extracted) | 431 | time_extracted=ss_time_extracted) |
426 | 432 | LOGGER.info('Sheet: {}, ecords processed: {}'.format( | |
433 | sheet_title, record_count)) | ||
434 | |||
427 | # Update paging from/to_row for next batch | 435 | # Update paging from/to_row for next batch |
428 | from_row = to_row + 1 | 436 | from_row = to_row + 1 |
429 | if to_row + batch_rows > sheet_max_row: | 437 | if to_row + batch_rows > sheet_max_row: |
@@ -442,8 +450,8 @@ def sync(client, config, catalog, state): | |||
442 | sheets_loaded.append(sheet_loaded) | 450 | sheets_loaded.append(sheet_loaded) |
443 | 451 | ||
444 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. | 452 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. |
445 | # This forces hard deletes on the data downstream if fewer records are sent for a sheet. | 453 | # This forces hard deletes on the data downstream if fewer records are sent. |
446 | # Reference: https://github.com/singer-io/singer-python/blob/9b99c6e0efc18836e6a07f1092aed8ba253f403f/singer/messages.py#L137-L167 | 454 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 |
447 | activate_version_message = singer.ActivateVersionMessage( | 455 | activate_version_message = singer.ActivateVersionMessage( |
448 | stream=sheet_title, | 456 | stream=sheet_title, |
449 | version=int(time.time() * 1000)) | 457 | version=int(time.time() * 1000)) |
@@ -451,12 +459,13 @@ def sync(client, config, catalog, state): | |||
451 | 459 | ||
452 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | 460 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( |
453 | sheet_title, row_num - 1)) | 461 | sheet_title, row_num - 1)) |
454 | update_currently_syncing(state, None) | ||
455 | 462 | ||
456 | stream_name = 'sheet_metadata' | 463 | stream_name = 'sheet_metadata' |
457 | # Sync sheet_metadata if selected | 464 | # Sync sheet_metadata if selected |
458 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | 465 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) |
459 | 466 | ||
460 | stream_name = 'sheets_loaded' | 467 | stream_name = 'sheets_loaded' |
461 | # Sync sheet_metadata if selected | 468 | # Sync sheet_metadata if selected |
462 | sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) | 469 | sync_stream(stream_name, selected_streams, catalog, state, sheets_loaded) |
470 | |||
471 | return | ||