From 96f27a1f6541a8df2eb180f2de1cf05ec2b49a4f Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Wed, 11 Jun 2025 09:18:42 +0100 Subject: [PATCH] updates for mysql and mssql --- .../docs/sources/fivetran/fivetran_pre.md | 31 +++--- .../source/fivetran/fivetran_api_client.py | 2 + ...nowflake_empty_connection_user_golden.json | 102 ------------------ .../integration/fivetran/test_fivetran.py | 21 +++- .../unit/fivetran/test_fivetran_source.py | 37 +++---- 5 files changed, 55 insertions(+), 138 deletions(-) diff --git a/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md index 99a3fa40e3..d3064e3c6a 100644 --- a/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md +++ b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md @@ -12,16 +12,18 @@ This source extracts the following: The Fivetran connector supports two operational modes: ### Enterprise Mode (Log-based) + 1. Fivetran supports the fivetran platform connector to dump the log events and connectors, destinations, users and roles metadata in your destination. 2. You need to setup and start the initial sync of the fivetran platform connector before using this source. Refer [link](https://fivetran.com/docs/logs/fivetran-platform/setup-guide). 3. Once initial sync up of your fivetran platform connector is done, you need to provide the fivetran platform connector's destination platform and its configuration in the recipe. ### Standard Mode (API-based) + 1. For users without access to the fivetran platform connector logs, you can use the standard mode which uses Fivetran's REST API. 2. You'll need to generate API credentials (API key and API secret) from the Fivetran dashboard. 3. Configure the `api_config` section in your recipe with these credentials. -## Concept mapping +## Concept mapping | Fivetran | Datahub | | --------------- | ----------------------------------------------------------------------------------------------------- | @@ -35,6 +37,7 @@ Source and destination are mapped to Dataset as an Input and Output of Connector ## Mode Selection The connector supports three modes: + - `enterprise`: Uses Fivetran log tables for ingestion (requires log tables setup in your destination) - `standard`: Uses Fivetran REST API for ingestion (requires API credentials) - `auto`: Automatically detects and selects the appropriate mode based on provided configuration (default) @@ -42,10 +45,12 @@ The connector supports three modes: ## DataJob Mode Selection The connector supports two modes for generating DataJobs: + - `consolidated`: Creates one DataJob per connector (default) - `per_table`: Creates one DataJob per source-destination table pair ## Snowflake destination Configuration Guide (Enterprise Mode) + 1. If your fivetran platform connector destination is snowflake, you need to provide user details and its role with correct privileges in order to fetch metadata. 2. Snowflake system admin can follow this guide to create a fivetran_datahub role, assign it the required privileges, and assign it to a user by executing the following Snowflake commands from a user with the ACCOUNTADMIN role or MANAGE GRANTS privilege. @@ -68,10 +73,10 @@ grant role fivetran_datahub to user snowflake_user; ## Bigquery destination Configuration Guide (Enterprise Mode) -1. If your fivetran platform connector destination is bigquery, you need to setup a ServiceAccount as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and select BigQuery Data Viewer and BigQuery Job User IAM roles. +1. If your fivetran platform connector destination is bigquery, you need to setup a ServiceAccount as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and select BigQuery Data Viewer and BigQuery Job User IAM roles. 1. If your fivetran platform connector destination is bigquery, you need to setup a ServiceAccount as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and select BigQuery Data Viewer and BigQuery Job User IAM roles. -2. Create and Download a service account JSON keyfile and provide bigquery connection credential in bigquery destination config. +1. Create and Download a service account JSON keyfile and provide bigquery connection credential in bigquery destination config. ## API Configuration Guide (Standard Mode) @@ -108,17 +113,17 @@ destination_to_platform_instance: ```yml # Map of connector source to platform instance sources_to_platform_instance: - postgres_connector_id1: - platform: postgres # Optional override for platform detection + postgres_connector_id1: + platform: postgres # Optional override for platform detection platform_instance: cloud_postgres_instance env: PROD - database: postgres_db # Database name for the source + database: postgres_db # Database name for the source postgres_connector_id2: - platform: postgres # Optional override for platform detection + platform: postgres # Optional override for platform detection platform_instance: local_postgres_instance env: DEV - database: postgres_db # Database name for the source + database: postgres_db # Database name for the source ``` #### Example - Multiple Snowflake Destinations each writing to different snowflake instance @@ -126,15 +131,15 @@ sources_to_platform_instance: ```yml # Map of destination to platform instance destination_to_platform_instance: - snowflake_destination_id1: - platform: snowflake # Optional override for platform detection + snowflake_destination_id1: + platform: snowflake # Optional override for platform detection platform_instance: prod_snowflake_instance env: PROD - database: PROD_SNOWFLAKE_DB # Database name for the destination + database: PROD_SNOWFLAKE_DB # Database name for the destination snowflake_destination_id2: - platform: snowflake # Optional override for platform detection + platform: snowflake # Optional override for platform detection platform_instance: dev_snowflake_instance env: PROD - database: DEV_SNOWFLAKE_DB # Database name for the destination + database: DEV_SNOWFLAKE_DB # Database name for the destination ``` diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_api_client.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_api_client.py index bbba062419..58fa572a2b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_api_client.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_api_client.py @@ -25,12 +25,14 @@ KNOWN_PLATFORMS = { "postgres": "postgres", "postgresql": "postgres", "mysql": "mysql", + "mysql_rds": "mysql", "snowflake": "snowflake", "redshift": "redshift", "bigquery": "bigquery", "google_bigquery": "bigquery", "databricks": "databricks", "oracle": "oracle", + "sql_server_rds": "mssql", "mssql": "mssql", "sql_server": "mssql", "synapse": "mssql", diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json index 31eb00e73c..e850faedad 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json @@ -130,23 +130,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my-fivetran.calendar_elected,PROD),calendar_elected)", - "changeType": "UPSERT", - "aspectName": "dataPlatformInstance", - "aspect": { - "json": { - "platform": "urn:li:dataPlatform:fivetran", - "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)" - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my-fivetran.calendar_elected,PROD),calendar_elected)", @@ -335,23 +318,6 @@ "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", "changeType": "UPSERT", - "aspectName": "dataPlatformInstance", - "aspect": { - "json": { - "platform": "urn:li:dataPlatform:fivetran", - "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)" - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", - "changeType": "UPSERT", "aspectName": "dataProcessInstanceInput", "aspect": { "json": { @@ -471,23 +437,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", - "changeType": "UPSERT", - "aspectName": "dataPlatformInstance", - "aspect": { - "json": { - "platform": "urn:li:dataPlatform:fivetran", - "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)" - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", @@ -615,23 +564,6 @@ "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", "changeType": "UPSERT", - "aspectName": "dataPlatformInstance", - "aspect": { - "json": { - "platform": "urn:li:dataPlatform:fivetran", - "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)" - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", - "changeType": "UPSERT", "aspectName": "dataProcessInstanceInput", "aspect": { "json": { @@ -843,23 +775,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my-fivetran.my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", - "changeType": "UPSERT", - "aspectName": "dataPlatformInstance", - "aspect": { - "json": { - "platform": "urn:li:dataPlatform:fivetran", - "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)" - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my-fivetran.my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)", @@ -987,23 +902,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", - "changeType": "UPSERT", - "aspectName": "dataPlatformInstance", - "aspect": { - "json": { - "platform": "urn:li:dataPlatform:fivetran", - "instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)" - } - }, - "systemMetadata": { - "lastObserved": 1654621200000, - "runId": "powerbi-test", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290", diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index eefc9829e2..d4fc038092 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -766,7 +766,7 @@ def test_fivetran_auto_detection(): # Verify it's using the enterprise (log) mode assert source.fivetran_access.__class__.__name__ == "FivetranLogAPI" - # Test auto detection with only API config + # Test auto detection with only API config (parallel processing enabled by default) with patch("requests.Session.request", side_effect=mock_requests_get): source = FivetranSource.create( { @@ -779,7 +779,24 @@ def test_fivetran_auto_detection(): ctx=PipelineContext(run_id="fivetran-auto-api"), ) - # Verify it's using the standard (API) mode + # Verify it's using the standard (API) mode with parallel processing + assert source.fivetran_access.__class__.__name__ == "ParallelFivetranAPI" + + # Test auto detection with only API config (parallel processing disabled) + with patch("requests.Session.request", side_effect=mock_requests_get): + source = FivetranSource.create( + { + "fivetran_mode": "auto", + "use_parallel_processing": False, + "api_config": { + "api_key": "test_api_key", + "api_secret": "test_api_secret", + }, + }, + ctx=PipelineContext(run_id="fivetran-auto-api-no-parallel"), + ) + + # Verify it's using the standard (API) mode without parallel processing assert source.fivetran_access.__class__.__name__ == "FivetranStandardAPI" # Test auto detection with both configs (should prefer enterprise) diff --git a/metadata-ingestion/tests/unit/fivetran/test_fivetran_source.py b/metadata-ingestion/tests/unit/fivetran/test_fivetran_source.py index a6c1415854..af16866d43 100644 --- a/metadata-ingestion/tests/unit/fivetran/test_fivetran_source.py +++ b/metadata-ingestion/tests/unit/fivetran/test_fivetran_source.py @@ -118,28 +118,23 @@ class FivetranAPIClientTest(unittest.TestCase): # Mock detect_destination_platform to avoid actual API calls with patch.object( client, "detect_destination_platform", return_value="snowflake" - ): - with patch.object( - client, "get_destination_database", return_value="TEST_DB" - ): - # Call method under test - connector = client.extract_connector_metadata( - api_connector, sync_history - ) + ), patch.object(client, "get_destination_database", return_value="TEST_DB"): + # Call method under test + connector = client.extract_connector_metadata(api_connector, sync_history) - # Verify results - self.assertEqual(connector.connector_id, "connector1") - self.assertEqual(connector.connector_name, "My Connector") - self.assertEqual(connector.connector_type, "postgres") - self.assertEqual(connector.destination_id, "group1") - self.assertEqual(connector.user_id, "user1") - self.assertEqual(len(connector.jobs), 1) - self.assertEqual( - connector.additional_properties["destination_platform"], "snowflake" - ) - self.assertEqual( - connector.additional_properties["destination_database"], "TEST_DB" - ) + # Verify results + self.assertEqual(connector.connector_id, "connector1") + self.assertEqual(connector.connector_name, "My Connector") + self.assertEqual(connector.connector_type, "postgres") + self.assertEqual(connector.destination_id, "group1") + self.assertEqual(connector.user_id, "user1") + self.assertEqual(len(connector.jobs), 1) + self.assertEqual( + connector.additional_properties["destination_platform"], "snowflake" + ) + self.assertEqual( + connector.additional_properties["destination_database"], "TEST_DB" + ) class FivetranAccessTest(unittest.TestCase):