diff options
author | Jeff Huth <39202799+jeffhuth-bytecode@users.noreply.github.com> | 2020-01-09 07:30:53 -0800 |
---|---|---|
committer | Kyle Allan <KAllan357@gmail.com> | 2020-01-09 10:30:53 -0500 |
commit | 43a24cbab1dbc35b893c35b86e34adc0f2fb84e7 (patch) | |
tree | bcbaae860aad0a94bcc4d27f4804504691401438 | |
parent | 5890b89c1aa7c554235b3cef156b5a5a2c594bec (diff) | |
download | tap-google-sheets-0.0.3.tar.gz tap-google-sheets-0.0.3.tar.zst tap-google-sheets-0.0.3.zip |
v.0.0.3 Sync error handling, activate version, documentation (#2)v0.0.3
* v.0.0.2 schema and sync changes
Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py
* v.0.0.3 Sync activate version and error handling
Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages.
-rw-r--r-- | CHANGELOG.md | 3 | ||||
-rw-r--r-- | README.md | 37 | ||||
-rw-r--r-- | setup.py | 4 | ||||
-rw-r--r-- | tap_google_sheets/schema.py | 51 | ||||
-rw-r--r-- | tap_google_sheets/sync.py | 94 |
5 files changed, 144 insertions, 45 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md index e5d6560..3c47260 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md | |||
@@ -1,5 +1,8 @@ | |||
1 | # Changelog | 1 | # Changelog |
2 | 2 | ||
3 | ## 0.0.3 | ||
4 | * Update README.md documentation. Improved logging and handling of errors and warnings. Better null handling in Discovery and Sync. Fix issues with activate version messages. | ||
5 | |||
3 | ## 0.0.2 | 6 | ## 0.0.2 |
4 | * Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py | 7 | * Change number json schema to anyOf with multipleOf; skip empty rows; move write_bookmark to end of sync.py |
5 | 8 | ||
@@ -11,30 +11,37 @@ This tap: | |||
11 | - [File Metadata](https://developers.google.com/drive/api/v3/reference/files/get) | 11 | - [File Metadata](https://developers.google.com/drive/api/v3/reference/files/get) |
12 | - [Spreadsheet Metadata](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/get) | 12 | - [Spreadsheet Metadata](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/get) |
13 | - [Spreadsheet Values](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get) | 13 | - [Spreadsheet Values](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get) |
14 | - Outputs the following metadata streams: | ||
15 | - File Metadata: Name, audit/change info from Google Drive | ||
16 | - Spreadsheet Metadata: Basic metadata about the Spreadsheet: Title, Locale, URL, etc. | ||
17 | - Sheet Metadata: Title, URL, Area (max column and row), and Column Metadata | ||
18 | - Column Metadata: Column Header Name, Data type, Format | ||
19 | - Sheets Loaded: Sheet title, load date, number of rows | ||
14 | - For each Sheet: | 20 | - For each Sheet: |
15 | - Outputs the schema for each resource (based on the column header and datatypes of first row of data) | 21 | - Outputs the schema for each resource (based on the column header and datatypes of row 2, the first row of data) |
16 | - Outputs a record for all columns with column headers, and for each row of data until it reaches an empty row | 22 | - Outputs a record for all columns that have column headers, and for each row of data |
23 | - Emits a Singer ACTIVATE_VERSION message after each sheet is complete. This forces hard deletes on the data downstream if fewer records are sent. | ||
24 | - Primary Key for each row in a Sheet is the Row Number: `__sdc_row` | ||
25 | - Each Row in a Sheet also includes Foreign Keys to the Spreadsheet Metadata, `__sdc_spreadsheet_id`, and Sheet Metadata, `__sdc_sheet_id`. | ||
17 | 26 | ||
18 | ## API Endpoints | 27 | ## API Endpoints |
19 | [**file (GET)**](https://developers.google.com/drive/api/v3/reference/files/get) | 28 | [**file (GET)**](https://developers.google.com/drive/api/v3/reference/files/get) |
20 | - Endpoint: https://www.googleapis.com/drive/v3/files/${spreadsheet_id}?fields=id,name,createdTime,modifiedTime,version | 29 | - Endpoint: https://www.googleapis.com/drive/v3/files/${spreadsheet_id}?fields=id,name,createdTime,modifiedTime,version |
21 | - Primary keys: id | 30 | - Primary keys: id |
22 | - Replication strategy: Full (GET file audit data for spreadsheet_id in config) | 31 | - Replication strategy: Incremental (GET file audit data for spreadsheet_id in config) |
23 | - Process/Transformations: Replicate Data if Modified | 32 | - Process/Transformations: Replicate Data if Modified |
24 | 33 | ||
25 | [**metadata (GET)**](https://developers.google.com/drive/api/v3/reference/files/get) | 34 | [**metadata (GET)**](https://developers.google.com/drive/api/v3/reference/files/get) |
26 | - Endpoint: https://sheets.googleapis.com/v4/spreadsheets/${spreadsheet_id}?includeGridData=true&ranges=1:2 | 35 | - Endpoint: https://sheets.googleapis.com/v4/spreadsheets/${spreadsheet_id}?includeGridData=true&ranges=1:2 |
27 | - This endpoint eturns spreadsheet metadata, sheet metadata, and value metadata (data type information) | 36 | - This endpoint eturns spreadsheet metadata, sheet metadata, and value metadata (data type information) |
28 | - Primary keys: spreadsheetId, title, field_name | 37 | - Primary keys: Spreadsheet Id, Sheet Id, Column Index |
29 | - Foreign keys: None | 38 | - Foreign keys: None |
30 | - Replication strategy: Full (get and replace file metadata for spreadshee_id in config) | 39 | - Replication strategy: Full (get and replace file metadata for spreadshee_id in config) |
31 | - Process/Transformations: | 40 | - Process/Transformations: |
32 | - Verify Sheets: Check sheets exist (compared to catalog) and check gridProperties (available area) | 41 | - Verify Sheets: Check sheets exist (compared to catalog) and check gridProperties (available area) |
33 | - sheetId, title, index, gridProperties (rowCount, columnCount) | 42 | - sheetId, title, index, gridProperties (rowCount, columnCount) |
34 | - Verify Field Headers (1st row): Check field headers exist (compared to catalog), missing headers (columns to skip), column order/position, and column uniqueness | 43 | - Verify Field Headers (1st row): Check field headers exist (compared to catalog), missing headers (columns to skip), column order/position, and column name uniqueness |
35 | - Header's field_name, position: data.rowData[0].values[i].formattedValue | 44 | - Create/Verify Datatypes based on 2nd row value and cell metadata |
36 | - Create/Verify Datatypes (2nd row): | ||
37 | - Row 2's datatype, format: data.rowData[1].values[i] | ||
38 | - First check: | 45 | - First check: |
39 | - [effectiveValue: key](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue) | 46 | - [effectiveValue: key](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/other#ExtendedValue) |
40 | - Valid types: numberValue, stringValue, boolValue | 47 | - Valid types: numberValue, stringValue, boolValue |
@@ -42,20 +49,22 @@ This tap: | |||
42 | - Then check: | 49 | - Then check: |
43 | - [effectiveFormat.numberFormat.type](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType) | 50 | - [effectiveFormat.numberFormat.type](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType) |
44 | - Valid types: UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC | 51 | - Valid types: UNEPECIFIED, TEXT, NUMBER, PERCENT, CURRENCY, DATE, TIME, DATE_TIME, SCIENTIFIC |
45 | - If DATE or DATE_TIME, set JSON schema datatype = string and format = date-time | 52 | - Determine JSON schema column data type based on the value and the above cell metadata settings. |
46 | - [effectiveFormat.numberFormat.pattern](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormat) | 53 | - If DATE, DATE_TIME, or TIME, set JSON schema format accordingly |
47 | 54 | ||
48 | [**values (GET)**](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get) | 55 | [**values (GET)**](https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/get) |
49 | - Endpoint: https://sheets.googleapis.com/v4/spreadsheets/${spreadsheet_id}/values/'${sheet_name}'!${row_range}?dateTimeRenderOption=SERIAL_NUMBER&valueRenderOption=UNFORMATTED_VALUE&majorDimension=ROWS | 56 | - Endpoint: https://sheets.googleapis.com/v4/spreadsheets/${spreadsheet_id}/values/'${sheet_name}'!${row_range}?dateTimeRenderOption=SERIAL_NUMBER&valueRenderOption=UNFORMATTED_VALUE&majorDimension=ROWS |
50 | - This endpoint loops through sheets and row ranges to get the [unformatted values](https://developers.google.com/sheets/api/reference/rest/v4/ValueRenderOption) (effective values only), dates and datetimes as [serial numbers](https://developers.google.com/sheets/api/reference/rest/v4/DateTimeRenderOption) | 57 | - This endpoint loops through sheets and row ranges to get the [unformatted values](https://developers.google.com/sheets/api/reference/rest/v4/ValueRenderOption) (effective values only), dates and datetimes as [serial numbers](https://developers.google.com/sheets/api/reference/rest/v4/DateTimeRenderOption) |
51 | - Primary keys: row | 58 | - Primary keys: _sdc_row |
52 | - Replication strategy: Full (GET file audit data for spreadsheet_id in config) | 59 | - Replication strategy: Full (GET file audit data for spreadsheet_id in config) |
53 | - Process/Transformations: | 60 | - Process/Transformations: |
54 | - Loop through sheets (compared to catalog selection) | 61 | - Loop through sheets (compared to catalog selection) |
55 | - Send metadata for sheet | 62 | - Send metadata for sheet |
56 | - Loop through ranges of rows until reaching empty row or area max row (from sheet metadata) | 63 | - Loop through ALL columns for columns having a column header |
57 | - Transform values, if necessary (dates, date-times, boolean, integer, numers) | 64 | - Loop through ranges of rows for ALL rows in sheet available area max row (from sheet metadata) |
58 | - Process/send records | 65 | - Transform values, if necessary (dates, date-times, times, boolean). |
66 | - Date/time serial numbers converted to date, date-time, and time strings. Google Sheets uses Lotus 1-2-3 [Serial Number](https://developers.google.com/sheets/api/reference/rest/v4/DateTimeRenderOption) format for date/times. These are converted to normal UTC date-time strings. | ||
67 | - Process/send records to target | ||
59 | 68 | ||
60 | ## Authentication | 69 | ## Authentication |
61 | The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id=1FojlvtLwS0-BzGS37R0jEXtwSHqSiO1Uw-7RKQQO-C4) Google Doc provides instructions show how to configure the Google Cloud API credentials to enable Google Drive and Google Sheets APIs, configure Google Cloud to authorize/verify your domain ownership, generate an API key (client_id, client_secret), authenticate and generate a refresh_token, and prepare your tap config.json with the necessary parameters. | 70 | The [**Google Sheets Setup & Authentication**](https://drive.google.com/open?id=1FojlvtLwS0-BzGS37R0jEXtwSHqSiO1Uw-7RKQQO-C4) Google Doc provides instructions show how to configure the Google Cloud API credentials to enable Google Drive and Google Sheets APIs, configure Google Cloud to authorize/verify your domain ownership, generate an API key (client_id, client_secret), authenticate and generate a refresh_token, and prepare your tap config.json with the necessary parameters. |
@@ -3,7 +3,7 @@ | |||
3 | from setuptools import setup, find_packages | 3 | from setuptools import setup, find_packages |
4 | 4 | ||
5 | setup(name='tap-google-sheets', | 5 | setup(name='tap-google-sheets', |
6 | version='0.0.2', | 6 | version='0.0.3', |
7 | description='Singer.io tap for extracting data from the Google Sheets v4 API', | 7 | description='Singer.io tap for extracting data from the Google Sheets v4 API', |
8 | author='jeff.huth@bytecode.io', | 8 | author='jeff.huth@bytecode.io', |
9 | classifiers=['Programming Language :: Python :: 3 :: Only'], | 9 | classifiers=['Programming Language :: Python :: 3 :: Only'], |
@@ -11,7 +11,7 @@ setup(name='tap-google-sheets', | |||
11 | install_requires=[ | 11 | install_requires=[ |
12 | 'backoff==1.8.0', | 12 | 'backoff==1.8.0', |
13 | 'requests==2.22.0', | 13 | 'requests==2.22.0', |
14 | 'singer-python==5.8.1' | 14 | 'singer-python==5.9.0' |
15 | ], | 15 | ], |
16 | entry_points=''' | 16 | entry_points=''' |
17 | [console_scripts] | 17 | [console_scripts] |
diff --git a/tap_google_sheets/schema.py b/tap_google_sheets/schema.py index 243467b..e319c03 100644 --- a/tap_google_sheets/schema.py +++ b/tap_google_sheets/schema.py | |||
@@ -21,6 +21,7 @@ def colnum_string(num): | |||
21 | 21 | ||
22 | # Create sheet_metadata_json with columns from sheet | 22 | # Create sheet_metadata_json with columns from sheet |
23 | def get_sheet_schema_columns(sheet): | 23 | def get_sheet_schema_columns(sheet): |
24 | sheet_title = sheet.get('properties', {}).get('title') | ||
24 | sheet_json_schema = OrderedDict() | 25 | sheet_json_schema = OrderedDict() |
25 | data = next(iter(sheet.get('data', [])), {}) | 26 | data = next(iter(sheet.get('data', [])), {}) |
26 | row_data = data.get('rowData', []) | 27 | row_data = data.get('rowData', []) |
@@ -62,15 +63,34 @@ def get_sheet_schema_columns(sheet): | |||
62 | skipped = 0 | 63 | skipped = 0 |
63 | column_name = '{}'.format(header_value) | 64 | column_name = '{}'.format(header_value) |
64 | if column_name in header_list: | 65 | if column_name in header_list: |
65 | raise Exception('DUPLICATE HEADER ERROR: {}'.format(column_name)) | 66 | raise Exception('DUPLICATE HEADER ERROR: SHEET: {}, COL: {}, CELL: {}1'.format( |
67 | sheet_title, column_name, column_letter)) | ||
66 | header_list.append(column_name) | 68 | header_list.append(column_name) |
67 | 69 | ||
68 | first_value = first_values[i] | 70 | first_value = None |
69 | 71 | try: | |
72 | first_value = first_values[i] | ||
73 | except IndexError as err: | ||
74 | raise Exception('NO VALUE IN 2ND ROW FOR HEADER ERROR. SHEET: {}, COL: {}, CELL: {}2. {}'.format( | ||
75 | sheet_title, column_name, column_letter, err)) | ||
76 | |||
70 | column_effective_value = first_value.get('effectiveValue', {}) | 77 | column_effective_value = first_value.get('effectiveValue', {}) |
71 | for key in column_effective_value.keys(): | 78 | |
72 | if key in ('numberValue', 'stringValue', 'boolValue', 'errorType', 'formulaType'): | 79 | col_val = None |
73 | column_effective_value_type = key | 80 | if column_effective_value == {}: |
81 | column_effective_value_type = 'stringValue' | ||
82 | LOGGER.info('WARNING: NO VALUE IN 2ND ROW FOR HEADER. SHEET: {}, COL: {}, CELL: {}2.'.format( | ||
83 | sheet_title, column_name, column_letter)) | ||
84 | LOGGER.info(' Setting column datatype to STRING') | ||
85 | else: | ||
86 | for key, val in column_effective_value.items(): | ||
87 | if key in ('numberValue', 'stringValue', 'boolValue'): | ||
88 | column_effective_value_type = key | ||
89 | col_val = str(val) | ||
90 | elif key in ('errorType', 'formulaType'): | ||
91 | col_val = str(val) | ||
92 | raise Exception('DATA TYPE ERROR 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( | ||
93 | sheet_title, column_name, column_letter, key, col_val)) | ||
74 | 94 | ||
75 | column_number_format = first_values[i].get('effectiveFormat', {}).get( | 95 | column_number_format = first_values[i].get('effectiveFormat', {}).get( |
76 | 'numberFormat', {}) | 96 | 'numberFormat', {}) |
@@ -87,7 +107,13 @@ def get_sheet_schema_columns(sheet): | |||
87 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType | 107 | # https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets/cells#NumberFormatType |
88 | # | 108 | # |
89 | column_format = None # Default | 109 | column_format = None # Default |
90 | if column_effective_value_type == 'stringValue': | 110 | if column_effective_value == {}: |
111 | col_properties = {'type': ['null', 'string']} | ||
112 | column_gs_type = 'stringValue' | ||
113 | LOGGER.info('WARNING: 2ND ROW VALUE IS BLANK: SHEET: {}, COL: {}, CELL: {}2'.format( | ||
114 | sheet_title, column_name, column_letter)) | ||
115 | LOGGER.info(' Setting column datatype to STRING') | ||
116 | elif column_effective_value_type == 'stringValue': | ||
91 | col_properties = {'type': ['null', 'string']} | 117 | col_properties = {'type': ['null', 'string']} |
92 | column_gs_type = 'stringValue' | 118 | column_gs_type = 'stringValue' |
93 | elif column_effective_value_type == 'boolValue': | 119 | elif column_effective_value_type == 'boolValue': |
@@ -138,8 +164,8 @@ def get_sheet_schema_columns(sheet): | |||
138 | else: | 164 | else: |
139 | col_properties = {'type': ['null', 'string']} | 165 | col_properties = {'type': ['null', 'string']} |
140 | column_gs_type = 'unsupportedValue' | 166 | column_gs_type = 'unsupportedValue' |
141 | LOGGER.info('Unsupported data type: {}, value: {}'.format(column_name, \ | 167 | LOGGER.info('WARNING: UNSUPPORTED 2ND ROW VALUE: SHEET: {}, COL: {}, CELL: {}2, TYPE: {}, VALUE: {}'.format( |
142 | column_effective_value_type)) | 168 | sheet_title, column_name, column_letter, column_effective_value_type, col_val)) |
143 | LOGGER.info('Converting to string.') | 169 | LOGGER.info('Converting to string.') |
144 | else: # skipped | 170 | else: # skipped |
145 | column_is_skipped = True | 171 | column_is_skipped = True |
@@ -148,11 +174,16 @@ def get_sheet_schema_columns(sheet): | |||
148 | column_name = '__sdc_skip_col_{}'.format(column_index_str) | 174 | column_name = '__sdc_skip_col_{}'.format(column_index_str) |
149 | col_properties = {'type': ['null', 'string']} | 175 | col_properties = {'type': ['null', 'string']} |
150 | column_gs_type = 'stringValue' | 176 | column_gs_type = 'stringValue' |
177 | LOGGER.info('WARNING: SKIPPED COLUMN; NO COLUMN HEADER. SHEET: {}, COL: {}, CELL: {}1'.format( | ||
178 | sheet_title, column_name, column_letter)) | ||
179 | LOGGER.info(' This column will be skipped during data loading.') | ||
151 | 180 | ||
152 | if skipped >= 2: | 181 | if skipped >= 2: |
153 | # skipped = 2 consecutive skipped headers | 182 | # skipped = 2 consecutive skipped headers |
154 | # Remove prior_header column_name | 183 | # Remove prior_header column_name |
155 | sheet_json_schema['properties'].pop(prior_header, None) | 184 | sheet_json_schema['properties'].pop(prior_header, None) |
185 | LOGGER.info('TWO CONSECUTIVE SKIPPED COLUMNS. STOPPING SCAN AT: SHEET: {}, COL: {}, CELL {}1'.format( | ||
186 | sheet_title, column_name, column_letter)) | ||
156 | break | 187 | break |
157 | 188 | ||
158 | else: | 189 | else: |
@@ -245,7 +276,7 @@ def get_schemas(client, spreadsheet_id): | |||
245 | for sheet in sheets: | 276 | for sheet in sheets: |
246 | # GET sheet_json_schema for each worksheet (from function above) | 277 | # GET sheet_json_schema for each worksheet (from function above) |
247 | sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) | 278 | sheet_json_schema, columns = get_sheet_metadata(sheet, spreadsheet_id, client) |
248 | LOGGER.info('columns = {}'.format(columns)) | 279 | # LOGGER.info('columns = {}'.format(columns)) |
249 | 280 | ||
250 | sheet_title = sheet.get('properties', {}).get('title') | 281 | sheet_title = sheet.get('properties', {}).get('title') |
251 | schemas[sheet_title] = sheet_json_schema | 282 | schemas[sheet_title] = sheet_json_schema |
diff --git a/tap_google_sheets/sync.py b/tap_google_sheets/sync.py index 76b2e59..311281c 100644 --- a/tap_google_sheets/sync.py +++ b/tap_google_sheets/sync.py | |||
@@ -6,6 +6,7 @@ import pytz | |||
6 | import singer | 6 | import singer |
7 | from singer import metrics, metadata, Transformer, utils | 7 | from singer import metrics, metadata, Transformer, utils |
8 | from singer.utils import strptime_to_utc, strftime | 8 | from singer.utils import strptime_to_utc, strftime |
9 | from singer.messages import RecordMessage | ||
9 | from tap_google_sheets.streams import STREAMS | 10 | from tap_google_sheets.streams import STREAMS |
10 | from tap_google_sheets.schema import get_sheet_metadata | 11 | from tap_google_sheets.schema import get_sheet_metadata |
11 | 12 | ||
@@ -17,14 +18,26 @@ def write_schema(catalog, stream_name): | |||
17 | schema = stream.schema.to_dict() | 18 | schema = stream.schema.to_dict() |
18 | try: | 19 | try: |
19 | singer.write_schema(stream_name, schema, stream.key_properties) | 20 | singer.write_schema(stream_name, schema, stream.key_properties) |
21 | LOGGER.info('Writing schema for: {}'.format(stream_name)) | ||
20 | except OSError as err: | 22 | except OSError as err: |
21 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) | 23 | LOGGER.info('OS Error writing schema for: {}'.format(stream_name)) |
22 | raise err | 24 | raise err |
23 | 25 | ||
24 | 26 | ||
25 | def write_record(stream_name, record, time_extracted): | 27 | def write_record(stream_name, record, time_extracted, version=None): |
26 | try: | 28 | try: |
27 | singer.messages.write_record(stream_name, record, time_extracted=time_extracted) | 29 | if version: |
30 | singer.messages.write_message( | ||
31 | RecordMessage( | ||
32 | stream=stream_name, | ||
33 | record=record, | ||
34 | version=version, | ||
35 | time_extracted=time_extracted)) | ||
36 | else: | ||
37 | singer.messages.write_record( | ||
38 | stream_name=stream_name, | ||
39 | record=record, | ||
40 | time_extracted=time_extracted) | ||
28 | except OSError as err: | 41 | except OSError as err: |
29 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) | 42 | LOGGER.info('OS Error writing record for: {}'.format(stream_name)) |
30 | LOGGER.info('record: {}'.format(record)) | 43 | LOGGER.info('record: {}'.format(record)) |
@@ -53,7 +66,8 @@ def write_bookmark(state, stream, value): | |||
53 | def process_records(catalog, | 66 | def process_records(catalog, |
54 | stream_name, | 67 | stream_name, |
55 | records, | 68 | records, |
56 | time_extracted): | 69 | time_extracted, |
70 | version=None): | ||
57 | stream = catalog.get_stream(stream_name) | 71 | stream = catalog.get_stream(stream_name) |
58 | schema = stream.schema.to_dict() | 72 | schema = stream.schema.to_dict() |
59 | stream_metadata = metadata.to_map(stream.metadata) | 73 | stream_metadata = metadata.to_map(stream.metadata) |
@@ -65,7 +79,11 @@ def process_records(catalog, | |||
65 | record, | 79 | record, |
66 | schema, | 80 | schema, |
67 | stream_metadata) | 81 | stream_metadata) |
68 | write_record(stream_name, transformed_record, time_extracted=time_extracted) | 82 | write_record( |
83 | stream_name=stream_name, | ||
84 | record=transformed_record, | ||
85 | time_extracted=time_extracted, | ||
86 | version=version) | ||
69 | counter.increment() | 87 | counter.increment() |
70 | return counter.value | 88 | return counter.value |
71 | 89 | ||
@@ -206,7 +224,7 @@ def excel_to_dttm_str(excel_date_sn, timezone_str=None): | |||
206 | 224 | ||
207 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times | 225 | # Transform sheet_data: add spreadsheet_id, sheet_id, and row, convert dates/times |
208 | # Convert from array of values to JSON with column names as keys | 226 | # Convert from array of values to JSON with column names as keys |
209 | def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data_rows): | 227 | def transform_sheet_data(spreadsheet_id, sheet_id, sheet_title, from_row, columns, sheet_data_rows): |
210 | sheet_data_tf = [] | 228 | sheet_data_tf = [] |
211 | row_num = from_row | 229 | row_num = from_row |
212 | # Create sorted list of columns based on columnIndex | 230 | # Create sorted list of columns based on columnIndex |
@@ -229,21 +247,32 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
229 | col = cols[col_num - 1] | 247 | col = cols[col_num - 1] |
230 | col_skipped = col.get('columnSkipped') | 248 | col_skipped = col.get('columnSkipped') |
231 | if not col_skipped: | 249 | if not col_skipped: |
250 | # Get column metadata | ||
232 | col_name = col.get('columnName') | 251 | col_name = col.get('columnName') |
233 | col_type = col.get('columnType') | 252 | col_type = col.get('columnType') |
253 | col_letter = col.get('columnLetter') | ||
254 | |||
255 | # NULL values | ||
256 | if value is None or value == '': | ||
257 | col_val = None | ||
258 | |||
234 | # Convert dates/times from Lotus Notes Serial Numbers | 259 | # Convert dates/times from Lotus Notes Serial Numbers |
235 | # DATE-TIME | 260 | # DATE-TIME |
236 | if col_type == 'numberType.DATE_TIME': | 261 | elif col_type == 'numberType.DATE_TIME': |
237 | if isinstance(value, (int, float)): | 262 | if isinstance(value, (int, float)): |
238 | col_val = excel_to_dttm_str(value) | 263 | col_val = excel_to_dttm_str(value) |
239 | else: | 264 | else: |
240 | col_val = str(value) | 265 | col_val = str(value) |
266 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
267 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
241 | # DATE | 268 | # DATE |
242 | elif col_type == 'numberType.DATE': | 269 | elif col_type == 'numberType.DATE': |
243 | if isinstance(value, (int, float)): | 270 | if isinstance(value, (int, float)): |
244 | col_val = excel_to_dttm_str(value)[:10] | 271 | col_val = excel_to_dttm_str(value)[:10] |
245 | else: | 272 | else: |
246 | col_val = str(value) | 273 | col_val = str(value) |
274 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
275 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
247 | # TIME ONLY (NO DATE) | 276 | # TIME ONLY (NO DATE) |
248 | elif col_type == 'numberType.TIME': | 277 | elif col_type == 'numberType.TIME': |
249 | if isinstance(value, (int, float)): | 278 | if isinstance(value, (int, float)): |
@@ -253,6 +282,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
253 | col_val = str(timedelta(seconds=total_secs)) | 282 | col_val = str(timedelta(seconds=total_secs)) |
254 | except ValueError: | 283 | except ValueError: |
255 | col_val = str(value) | 284 | col_val = str(value) |
285 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
286 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
256 | else: | 287 | else: |
257 | col_val = str(value) | 288 | col_val = str(value) |
258 | # NUMBER (INTEGER AND FLOAT) | 289 | # NUMBER (INTEGER AND FLOAT) |
@@ -268,13 +299,19 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
268 | col_val = float(round(value, 15)) | 299 | col_val = float(round(value, 15)) |
269 | except ValueError: | 300 | except ValueError: |
270 | col_val = str(value) | 301 | col_val = str(value) |
302 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
303 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
271 | else: # decimal_digits <= 15, no rounding | 304 | else: # decimal_digits <= 15, no rounding |
272 | try: | 305 | try: |
273 | col_val = float(value) | 306 | col_val = float(value) |
274 | except ValueError: | 307 | except ValueError: |
275 | col_val = str(value) | 308 | col_val = str(value) |
309 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
310 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
276 | else: | 311 | else: |
277 | col_val = str(value) | 312 | col_val = str(value) |
313 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR: SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
314 | sheet_title, col_name, col_letter, row_num, col_type, value)) | ||
278 | # STRING | 315 | # STRING |
279 | elif col_type == 'stringValue': | 316 | elif col_type == 'stringValue': |
280 | col_val = str(value) | 317 | col_val = str(value) |
@@ -289,6 +326,8 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
289 | col_val = False | 326 | col_val = False |
290 | else: | 327 | else: |
291 | col_val = str(value) | 328 | col_val = str(value) |
329 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
330 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
292 | elif isinstance(value, int): | 331 | elif isinstance(value, int): |
293 | if value in (1, -1): | 332 | if value in (1, -1): |
294 | col_val = True | 333 | col_val = True |
@@ -296,9 +335,13 @@ def transform_sheet_data(spreadsheet_id, sheet_id, from_row, columns, sheet_data | |||
296 | col_val = False | 335 | col_val = False |
297 | else: | 336 | else: |
298 | col_val = str(value) | 337 | col_val = str(value) |
338 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
339 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
299 | # OTHER: Convert everything else to a string | 340 | # OTHER: Convert everything else to a string |
300 | else: | 341 | else: |
301 | col_val = str(value) | 342 | col_val = str(value) |
343 | LOGGER.info('WARNING: POSSIBLE DATA TYPE ERROR; SHEET: {}, COL: {}, CELL: {}{}, TYPE: {}, VALUE: {}'.format( | ||
344 | sheet_title, col_name, col_letter, row, col_type, value)) | ||
302 | sheet_data_row_tf[col_name] = col_val | 345 | sheet_data_row_tf[col_name] = col_val |
303 | col_num = col_num + 1 | 346 | col_num = col_num + 1 |
304 | # APPEND non-empty row | 347 | # APPEND non-empty row |
@@ -400,6 +443,20 @@ def sync(client, config, catalog, state): | |||
400 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) | 443 | LOGGER.info('Stream: {}, selected_fields: {}'.format(sheet_title, selected_fields)) |
401 | write_schema(catalog, sheet_title) | 444 | write_schema(catalog, sheet_title) |
402 | 445 | ||
446 | # Emit a Singer ACTIVATE_VERSION message before initial sync (but not subsequent syncs) | ||
447 | # everytime after each sheet sync is complete. | ||
448 | # This forces hard deletes on the data downstream if fewer records are sent. | ||
449 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | ||
450 | last_integer = int(get_bookmark(state, sheet_title, 0)) | ||
451 | activate_version = int(time.time() * 1000) | ||
452 | activate_version_message = singer.ActivateVersionMessage( | ||
453 | stream=sheet_title, | ||
454 | version=activate_version) | ||
455 | if last_integer == 0: | ||
456 | # initial load, send activate_version before AND after data sync | ||
457 | singer.write_message(activate_version_message) | ||
458 | LOGGER.info('INITIAL SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
459 | |||
403 | # Determine max range of columns and rows for "paging" through the data | 460 | # Determine max range of columns and rows for "paging" through the data |
404 | sheet_last_col_index = 1 | 461 | sheet_last_col_index = 1 |
405 | sheet_last_col_letter = 'A' | 462 | sheet_last_col_letter = 'A' |
@@ -438,6 +495,7 @@ def sync(client, config, catalog, state): | |||
438 | sheet_data_tf, row_num = transform_sheet_data( | 495 | sheet_data_tf, row_num = transform_sheet_data( |
439 | spreadsheet_id=spreadsheet_id, | 496 | spreadsheet_id=spreadsheet_id, |
440 | sheet_id=sheet_id, | 497 | sheet_id=sheet_id, |
498 | sheet_title=sheet_title, | ||
441 | from_row=from_row, | 499 | from_row=from_row, |
442 | columns=columns, | 500 | columns=columns, |
443 | sheet_data_rows=sheet_data_rows) | 501 | sheet_data_rows=sheet_data_rows) |
@@ -449,10 +507,11 @@ def sync(client, config, catalog, state): | |||
449 | catalog=catalog, | 507 | catalog=catalog, |
450 | stream_name=sheet_title, | 508 | stream_name=sheet_title, |
451 | records=sheet_data_tf, | 509 | records=sheet_data_tf, |
452 | time_extracted=ss_time_extracted) | 510 | time_extracted=ss_time_extracted, |
511 | version=activate_version) | ||
453 | LOGGER.info('Sheet: {}, records processed: {}'.format( | 512 | LOGGER.info('Sheet: {}, records processed: {}'.format( |
454 | sheet_title, record_count)) | 513 | sheet_title, record_count)) |
455 | 514 | ||
456 | # Update paging from/to_row for next batch | 515 | # Update paging from/to_row for next batch |
457 | from_row = to_row + 1 | 516 | from_row = to_row + 1 |
458 | if to_row + batch_rows > sheet_max_row: | 517 | if to_row + batch_rows > sheet_max_row: |
@@ -460,6 +519,14 @@ def sync(client, config, catalog, state): | |||
460 | else: | 519 | else: |
461 | to_row = to_row + batch_rows | 520 | to_row = to_row + batch_rows |
462 | 521 | ||
522 | # End of Stream: Send Activate Version and update State | ||
523 | singer.write_message(activate_version_message) | ||
524 | write_bookmark(state, sheet_title, activate_version) | ||
525 | LOGGER.info('COMPLETE SYNC, Stream: {}, Activate Version: {}'.format(sheet_title, activate_version)) | ||
526 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
527 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
528 | update_currently_syncing(state, None) | ||
529 | |||
463 | # SHEETS_LOADED | 530 | # SHEETS_LOADED |
464 | # Add sheet to sheets_loaded | 531 | # Add sheet to sheets_loaded |
465 | sheet_loaded = {} | 532 | sheet_loaded = {} |
@@ -470,17 +537,6 @@ def sync(client, config, catalog, state): | |||
470 | sheet_loaded['lastRowNumber'] = row_num | 537 | sheet_loaded['lastRowNumber'] = row_num |
471 | sheets_loaded.append(sheet_loaded) | 538 | sheets_loaded.append(sheet_loaded) |
472 | 539 | ||
473 | # Emit a Singer ACTIVATE_VERSION message after each sheet is complete. | ||
474 | # This forces hard deletes on the data downstream if fewer records are sent. | ||
475 | # https://github.com/singer-io/singer-python/blob/master/singer/messages.py#L137 | ||
476 | activate_version_message = singer.ActivateVersionMessage( | ||
477 | stream=sheet_title, | ||
478 | version=int(time.time() * 1000)) | ||
479 | singer.write_message(activate_version_message) | ||
480 | |||
481 | LOGGER.info('FINISHED Syncing Sheet {}, Total Rows: {}'.format( | ||
482 | sheet_title, row_num - 2)) # subtract 1 for header row | ||
483 | |||
484 | stream_name = 'sheet_metadata' | 540 | stream_name = 'sheet_metadata' |
485 | # Sync sheet_metadata if selected | 541 | # Sync sheet_metadata if selected |
486 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) | 542 | sync_stream(stream_name, selected_streams, catalog, state, sheet_metadata) |