import datetime from unittest import mock from unittest.mock import MagicMock import pytest from freezegun import freeze_time from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.fivetran.config import DestinationConfig from datahub.ingestion.source.fivetran.fivetran_query import FivetranLogQuery from tests.test_helpers import mce_helpers FROZEN_TIME = "2022-06-07 17:00:00" def default_query_results(query): if query == FivetranLogQuery.use_schema("TEST_DATABASE", "TEST_SCHEMA"): return [] elif query == FivetranLogQuery.get_connectors_query(): return [ { "connector_id": "calendar_elected", "connecting_user_id": "reapply_phone", "connector_type_id": "postgres", "connector_name": "postgres", "paused": False, "sync_frequency": 1440, "destination_id": "interval_unconstitutional", }, ] elif query == FivetranLogQuery.get_table_lineage_query("calendar_elected"): return [ { "source_table_id": "10040", "source_table_name": "employee", "source_schema_name": "public", "destination_table_id": "7779", "destination_table_name": "employee", "destination_schema_name": "postgres_public", }, { "source_table_id": "10041", "source_table_name": "company", "source_schema_name": "public", "destination_table_id": "7780", "destination_table_name": "company", "destination_schema_name": "postgres_public", }, ] elif query == FivetranLogQuery.get_column_lineage_query( "10040", "7779" ) or query == FivetranLogQuery.get_column_lineage_query("10041", "7780"): return [ { "source_column_name": "id", "destination_column_name": "id", }, { "source_column_name": "name", "destination_column_name": "name", }, ] elif query == FivetranLogQuery.get_user_query("reapply_phone"): return [ { "user_id": "reapply_phone", "given_name": "Shubham", "family_name": "Jagtap", } ] elif query == FivetranLogQuery.get_sync_start_logs_query("calendar_elected"): return [ { "time_stamp": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000), "sync_id": "4c9a03d6-eded-4422-a46a-163266e58243", }, { "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 30, 345000), "sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834", }, { "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 55, 401000), "sync_id": "63c2fc85-600b-455f-9ba0-f576522465be", }, ] elif query == FivetranLogQuery.get_sync_end_logs_query("calendar_elected"): return [ { "time_stamp": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000), "sync_id": "4c9a03d6-eded-4422-a46a-163266e58243", "message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"', }, { "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 31, 512000), "sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834", "message_data": '"{\\"reason\\":\\"Sync has been cancelled because of a user action in the dashboard.Standard Config updated.\\",\\"status\\":\\"CANCELED\\"}"', }, { "time_stamp": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000), "sync_id": "63c2fc85-600b-455f-9ba0-f576522465be", "message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"', }, ] # Unreachable code raise Exception(f"Unknown query {query}") @freeze_time(FROZEN_TIME) @pytest.mark.integration def test_fivetran_basic(pytestconfig, tmp_path): test_resources_dir = pytestconfig.rootpath / "tests/integration/fivetran" # Run the metadata ingestion pipeline. output_file = tmp_path / "fivetran_test_events.json" golden_file = test_resources_dir / "fivetran_golden.json" with mock.patch( "datahub.ingestion.source.fivetran.fivetran_log_api.create_engine" ) as mock_create_engine: connection_magic_mock = MagicMock() connection_magic_mock.execute.side_effect = default_query_results mock_create_engine.return_value = connection_magic_mock pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "fivetran", "config": { "fivetran_log_config": { "destination_platform": "snowflake", "destination_config": { "account_id": "TESTID", "warehouse": "TEST_WH", "username": "test", "password": "test@123", "database": "TEST_DATABASE", "role": "TESTROLE", "log_schema": "TEST_SCHEMA", }, }, "connector_patterns": { "allow": [ "postgres", ] }, "sources_to_database": { "calendar_elected": "postgres_db", }, "sources_to_platform_instance": { "calendar_elected": { "env": "DEV", } }, }, }, "sink": { "type": "file", "config": { "filename": f"{output_file}", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "fivetran_golden.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{output_file}", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) def test_fivetran_snowflake_destination_config(pytestconfig, tmp_path): snowflake_dest = DestinationConfig( account_id="TESTID", warehouse="TEST_WH", username="test", password="test@123", database="TEST_DATABASE", role="TESTROLE", log_schema="TEST_SCHEMA", ) assert ( snowflake_dest.get_sql_alchemy_url() == "snowflake://test:test%40123@TESTID?application=acryl_datahub&authenticator=SNOWFLAKE&role=TESTROLE&warehouse=TEST_WH" )