]>
Commit | Line | Data |
---|---|---|
1 | import unittest | |
2 | import os | |
3 | from datetime import datetime as dt | |
4 | from datetime import timedelta | |
5 | ||
6 | from tap_tester import menagerie | |
7 | import tap_tester.runner as runner | |
8 | import tap_tester.connections as connections | |
9 | from tap_tester.scenario import SCENARIOS | |
10 | ||
11 | ||
12 | class TapCombinedTest(unittest.TestCase): | |
13 | START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" | |
14 | ||
15 | @staticmethod | |
16 | def name(): | |
17 | return "tap_google_sheets_combined_test" | |
18 | ||
19 | @staticmethod | |
20 | def tap_name(): | |
21 | return "tap-google-sheets" | |
22 | ||
23 | @staticmethod | |
24 | def get_type(): | |
25 | return "platform.google-sheets" | |
26 | ||
27 | def expected_check_streams(self): | |
28 | return set(self.expected_pks().keys()) | |
29 | ||
30 | def expected_sync_streams(self): | |
31 | return set(self.expected_pks().keys()) | |
32 | ||
33 | @staticmethod | |
34 | def expected_pks(): | |
35 | return { | |
36 | "file_metadata": {"id"}, | |
37 | "sheet_metadata": {"sheetId"}, | |
38 | "sheets_loaded": {"spreadsheetId", "sheetId", "loadDate"}, | |
39 | "spreadsheet_metadata": {"spreadsheetId"}, | |
40 | "Test-1": {"__sdc_row"}, | |
41 | "Test 2": {"__sdc_row"}, | |
42 | "SKU COGS": {"__sdc_row"}, | |
43 | "Item Master": {"__sdc_row"}, | |
44 | "Retail Price": {"__sdc_row"}, | |
45 | "Retail Price NEW": {"__sdc_row"}, | |
46 | "Forecast Scenarios": {"__sdc_row"}, | |
47 | "Promo Type": {"__sdc_row"}, | |
48 | "Shipping Method": {"__sdc_row"} | |
49 | } | |
50 | ||
51 | ||
52 | def get_properties(self): | |
53 | return_value = { | |
54 | 'start_date': os.getenv("TAP_GOOGLE_SHEETS_START_DATE"), | |
55 | 'spreadsheet_id': os.getenv("TAP_GOOGLE_SHEETS_SPREADSHEET_ID") | |
56 | } | |
57 | ||
58 | return return_value | |
59 | ||
60 | @staticmethod | |
61 | def get_credentials(): | |
62 | return { | |
63 | "client_id": os.getenv("TAP_GOOGLE_SHEETS_CLIENT_ID"), | |
64 | "client_secret": os.getenv("TAP_GOOGLE_SHEETS_CLIENT_SECRET"), | |
65 | "refresh_token": os.getenv("TAP_GOOGLE_SHEETS_REFRESH_TOKEN"), | |
66 | } | |
67 | ||
68 | def setUp(self): | |
69 | missing_envs = [x for x in [ | |
70 | "TAP_GOOGLE_SHEETS_SPREADSHEET_ID", | |
71 | "TAP_GOOGLE_SHEETS_START_DATE", | |
72 | "TAP_GOOGLE_SHEETS_CLIENT_ID", | |
73 | "TAP_GOOGLE_SHEETS_CLIENT_SECRET", | |
74 | "TAP_GOOGLE_SHEETS_REFRESH_TOKEN", | |
75 | ] if os.getenv(x) is None] | |
76 | ||
77 | if missing_envs: | |
78 | raise Exception("Missing environment variables: {}".format(missing_envs)) | |
79 | ||
80 | def test_run(self): | |
81 | ||
82 | conn_id = connections.ensure_connection(self, payload_hook=None) | |
83 | ||
84 | # Run the tap in check mode | |
85 | check_job_name = runner.run_check_mode(self, conn_id) | |
86 | ||
87 | # Verify the check's exit status | |
88 | exit_status = menagerie.get_exit_status(conn_id, check_job_name) | |
89 | menagerie.verify_check_exit_status(self, exit_status, check_job_name) | |
90 | ||
91 | # Verify that there are catalogs found | |
92 | found_catalogs = menagerie.get_catalogs(conn_id) | |
93 | self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id)) | |
94 | ||
95 | found_catalog_names = set(map(lambda c: c['tap_stream_id'], found_catalogs)) | |
96 | subset = self.expected_check_streams().issubset(found_catalog_names) | |
97 | self.assertTrue(subset, msg="Expected check streams are not subset of discovered catalog") | |
98 | # | |
99 | # # Select some catalogs | |
100 | our_catalogs = [c for c in found_catalogs if c.get('tap_stream_id') in self.expected_sync_streams()] | |
101 | for catalog in our_catalogs: | |
102 | schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) | |
103 | connections.select_catalog_and_fields_via_metadata(conn_id, catalog, schema, [], []) | |
104 | ||
105 | # # Verify that all streams sync at least one row for initial sync | |
106 | # # This test is also verifying access token expiration handling. If test fails with | |
107 | # # authentication error, refresh token was not replaced after expiring. | |
108 | menagerie.set_state(conn_id, {}) | |
109 | sync_job_name = runner.run_sync_mode(self, conn_id) | |
110 | ||
111 | # # Verify tap and target exit codes | |
112 | exit_status = menagerie.get_exit_status(conn_id, sync_job_name) | |
113 | menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) | |
114 | record_count_by_stream = runner.examine_target_output_file(self, conn_id, self.expected_sync_streams(), | |
115 | self.expected_pks()) | |
116 | zero_count_streams = {k for k, v in record_count_by_stream.items() if v == 0} | |
117 | self.assertFalse(zero_count_streams, | |
118 | msg="The following streams did not sync any rows {}".format(zero_count_streams)) | |
119 | ||
120 | ||
121 | ||
122 | SCENARIOS.add(TapCombinedTest) |