3 from datetime
import datetime
as dt
4 from datetime
import timedelta
6 from tap_tester
import menagerie
7 import tap_tester
.runner
as runner
8 import tap_tester
.connections
as connections
9 from tap_tester
.scenario
import SCENARIOS
12 class TapCombinedTest(unittest
.TestCase
):
13 START_DATE_FORMAT
= "%Y-%m-%dT00:00:00Z"
17 return "tap_google_sheets_combined_test"
21 return "tap-google-sheets"
25 return "platform.google-sheets"
27 def expected_check_streams(self
):
28 return set(self
.expected_pks().keys())
30 def expected_sync_streams(self
):
31 return set(self
.expected_pks().keys())
36 "file_metadata": {"id"}
,
37 "sheet_metadata": {"sheetId"}
,
38 "sheets_loaded": {"spreadsheetId", "sheetId", "loadDate"}
,
39 "spreadsheet_metadata": {"spreadsheetId"}
,
40 "Test-1": {"__sdc_row"}
,
41 "Test 2": {"__sdc_row"}
,
42 "SKU COGS": {"__sdc_row"}
,
43 "Item Master": {"__sdc_row"}
,
44 "Retail Price": {"__sdc_row"}
,
45 "Retail Price NEW": {"__sdc_row"}
,
46 "Forecast Scenarios": {"__sdc_row"}
,
47 "Promo Type": {"__sdc_row"}
,
48 "Shipping Method": {"__sdc_row"}
52 def get_properties(self
):
54 'start_date': os
.getenv("TAP_GOOGLE_SHEETS_START_DATE"),
55 'spreadsheet_id': os
.getenv("TAP_GOOGLE_SHEETS_SPREADSHEET_ID")
61 def get_credentials():
63 "client_id": os
.getenv("TAP_GOOGLE_SHEETS_CLIENT_ID"),
64 "client_secret": os
.getenv("TAP_GOOGLE_SHEETS_CLIENT_SECRET"),
65 "refresh_token": os
.getenv("TAP_GOOGLE_SHEETS_REFRESH_TOKEN"),
69 missing_envs
= [x
for x
in [
70 "TAP_GOOGLE_SHEETS_SPREADSHEET_ID",
71 "TAP_GOOGLE_SHEETS_START_DATE",
72 "TAP_GOOGLE_SHEETS_CLIENT_ID",
73 "TAP_GOOGLE_SHEETS_CLIENT_SECRET",
74 "TAP_GOOGLE_SHEETS_REFRESH_TOKEN",
75 ] if os
.getenv(x
) is None]
78 raise Exception("Missing environment variables: {}".format(missing_envs
))
82 conn_id
= connections
.ensure_connection(self
, payload_hook
=None)
84 # Run the tap in check mode
85 check_job_name
= runner
.run_check_mode(self
, conn_id
)
87 # Verify the check's exit status
88 exit_status
= menagerie
.get_exit_status(conn_id
, check_job_name
)
89 menagerie
.verify_check_exit_status(self
, exit_status
, check_job_name
)
91 # Verify that there are catalogs found
92 found_catalogs
= menagerie
.get_catalogs(conn_id
)
93 self
.assertGreater(len(found_catalogs
), 0, msg
="unable to locate schemas for connection {}".format(conn_id
))
95 found_catalog_names
= set(map(lambda c
: c
['tap_stream_id'], found_catalogs
))
96 subset
= self
.expected_check_streams().issubset(found_catalog_names
)
97 self
.assertTrue(subset
, msg
="Expected check streams are not subset of discovered catalog")
99 # # Select some catalogs
100 our_catalogs
= [c
for c
in found_catalogs
if c
.get('tap_stream_id') in self
.expected_sync_streams()]
101 for catalog
in our_catalogs
:
102 schema
= menagerie
.get_annotated_schema(conn_id
, catalog
['stream_id'])
103 connections
.select_catalog_and_fields_via_metadata(conn_id
, catalog
, schema
, [], [])
105 # # Verify that all streams sync at least one row for initial sync
106 # # This test is also verifying access token expiration handling. If test fails with
107 # # authentication error, refresh token was not replaced after expiring.
108 menagerie
.set_state(conn_id
, {})
109 sync_job_name
= runner
.run_sync_mode(self
, conn_id
)
111 # # Verify tap and target exit codes
112 exit_status
= menagerie
.get_exit_status(conn_id
, sync_job_name
)
113 menagerie
.verify_sync_exit_status(self
, exit_status
, sync_job_name
)
114 record_count_by_stream
= runner
.examine_target_output_file(self
, conn_id
, self
.expected_sync_streams(),
116 zero_count_streams
= {k for k, v in record_count_by_stream.items() if v == 0}
117 self
.assertFalse(zero_count_streams
,
118 msg
="The following streams did not sync any rows {}".format(zero_count_streams
))
122 SCENARIOS
.add(TapCombinedTest
)