updates for mysql and mssql

This commit is contained in:
Jonny Dixon 2025-06-11 09:18:42 +01:00
parent f908af9f41
commit 96f27a1f65
5 changed files with 55 additions and 138 deletions

View File

@ -12,16 +12,18 @@ This source extracts the following:
The Fivetran connector supports two operational modes:
### Enterprise Mode (Log-based)
1. Fivetran supports the fivetran platform connector to dump the log events and connectors, destinations, users and roles metadata in your destination.
2. You need to setup and start the initial sync of the fivetran platform connector before using this source. Refer [link](https://fivetran.com/docs/logs/fivetran-platform/setup-guide).
3. Once initial sync up of your fivetran platform connector is done, you need to provide the fivetran platform connector's destination platform and its configuration in the recipe.
### Standard Mode (API-based)
1. For users without access to the fivetran platform connector logs, you can use the standard mode which uses Fivetran's REST API.
2. You'll need to generate API credentials (API key and API secret) from the Fivetran dashboard.
3. Configure the `api_config` section in your recipe with these credentials.
## Concept mapping
## Concept mapping
| Fivetran | Datahub |
| --------------- | ----------------------------------------------------------------------------------------------------- |
@ -35,6 +37,7 @@ Source and destination are mapped to Dataset as an Input and Output of Connector
## Mode Selection
The connector supports three modes:
- `enterprise`: Uses Fivetran log tables for ingestion (requires log tables setup in your destination)
- `standard`: Uses Fivetran REST API for ingestion (requires API credentials)
- `auto`: Automatically detects and selects the appropriate mode based on provided configuration (default)
@ -42,10 +45,12 @@ The connector supports three modes:
## DataJob Mode Selection
The connector supports two modes for generating DataJobs:
- `consolidated`: Creates one DataJob per connector (default)
- `per_table`: Creates one DataJob per source-destination table pair
## Snowflake destination Configuration Guide (Enterprise Mode)
1. If your fivetran platform connector destination is snowflake, you need to provide user details and its role with correct privileges in order to fetch metadata.
2. Snowflake system admin can follow this guide to create a fivetran_datahub role, assign it the required privileges, and assign it to a user by executing the following Snowflake commands from a user with the ACCOUNTADMIN role or MANAGE GRANTS privilege.
@ -68,10 +73,10 @@ grant role fivetran_datahub to user snowflake_user;
## Bigquery destination Configuration Guide (Enterprise Mode)
1. If your fivetran platform connector destination is bigquery, you need to setup a ServiceAccount as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and select BigQuery Data Viewer and BigQuery Job User IAM roles.
1. If your fivetran platform connector destination is bigquery, you need to setup a ServiceAccount as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and select BigQuery Data Viewer and BigQuery Job User IAM roles.
1. If your fivetran platform connector destination is bigquery, you need to setup a ServiceAccount as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and select BigQuery Data Viewer and BigQuery Job User IAM roles.
2. Create and Download a service account JSON keyfile and provide bigquery connection credential in bigquery destination config.
1. Create and Download a service account JSON keyfile and provide bigquery connection credential in bigquery destination config.
## API Configuration Guide (Standard Mode)
@ -108,17 +113,17 @@ destination_to_platform_instance:
```yml
# Map of connector source to platform instance
sources_to_platform_instance:
postgres_connector_id1:
platform: postgres # Optional override for platform detection
postgres_connector_id1:
platform: postgres # Optional override for platform detection
platform_instance: cloud_postgres_instance
env: PROD
database: postgres_db # Database name for the source
database: postgres_db # Database name for the source
postgres_connector_id2:
platform: postgres # Optional override for platform detection
platform: postgres # Optional override for platform detection
platform_instance: local_postgres_instance
env: DEV
database: postgres_db # Database name for the source
database: postgres_db # Database name for the source
```
#### Example - Multiple Snowflake Destinations each writing to different snowflake instance
@ -126,15 +131,15 @@ sources_to_platform_instance:
```yml
# Map of destination to platform instance
destination_to_platform_instance:
snowflake_destination_id1:
platform: snowflake # Optional override for platform detection
snowflake_destination_id1:
platform: snowflake # Optional override for platform detection
platform_instance: prod_snowflake_instance
env: PROD
database: PROD_SNOWFLAKE_DB # Database name for the destination
database: PROD_SNOWFLAKE_DB # Database name for the destination
snowflake_destination_id2:
platform: snowflake # Optional override for platform detection
platform: snowflake # Optional override for platform detection
platform_instance: dev_snowflake_instance
env: PROD
database: DEV_SNOWFLAKE_DB # Database name for the destination
database: DEV_SNOWFLAKE_DB # Database name for the destination
```

View File

@ -25,12 +25,14 @@ KNOWN_PLATFORMS = {
"postgres": "postgres",
"postgresql": "postgres",
"mysql": "mysql",
"mysql_rds": "mysql",
"snowflake": "snowflake",
"redshift": "redshift",
"bigquery": "bigquery",
"google_bigquery": "bigquery",
"databricks": "databricks",
"oracle": "oracle",
"sql_server_rds": "mssql",
"mssql": "mssql",
"sql_server": "mssql",
"synapse": "mssql",

View File

@ -130,23 +130,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my-fivetran.calendar_elected,PROD),calendar_elected)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:fivetran",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my-fivetran.calendar_elected,PROD),calendar_elected)",
@ -335,23 +318,6 @@
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:fivetran",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
@ -471,23 +437,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:fivetran",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a",
@ -615,23 +564,6 @@
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:fivetran",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d",
"changeType": "UPSERT",
"aspectName": "dataProcessInstanceInput",
"aspect": {
"json": {
@ -843,23 +775,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my-fivetran.my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:fivetran",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,my-fivetran.my_confluent_cloud_connector_id,PROD),my_confluent_cloud_connector_id)",
@ -987,23 +902,6 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:fivetran",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,my-fivetran)"
}
},
"systemMetadata": {
"lastObserved": 1654621200000,
"runId": "powerbi-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:08bddbc8007d76bfbbb8e231d1c65290",

View File

@ -766,7 +766,7 @@ def test_fivetran_auto_detection():
# Verify it's using the enterprise (log) mode
assert source.fivetran_access.__class__.__name__ == "FivetranLogAPI"
# Test auto detection with only API config
# Test auto detection with only API config (parallel processing enabled by default)
with patch("requests.Session.request", side_effect=mock_requests_get):
source = FivetranSource.create(
{
@ -779,7 +779,24 @@ def test_fivetran_auto_detection():
ctx=PipelineContext(run_id="fivetran-auto-api"),
)
# Verify it's using the standard (API) mode
# Verify it's using the standard (API) mode with parallel processing
assert source.fivetran_access.__class__.__name__ == "ParallelFivetranAPI"
# Test auto detection with only API config (parallel processing disabled)
with patch("requests.Session.request", side_effect=mock_requests_get):
source = FivetranSource.create(
{
"fivetran_mode": "auto",
"use_parallel_processing": False,
"api_config": {
"api_key": "test_api_key",
"api_secret": "test_api_secret",
},
},
ctx=PipelineContext(run_id="fivetran-auto-api-no-parallel"),
)
# Verify it's using the standard (API) mode without parallel processing
assert source.fivetran_access.__class__.__name__ == "FivetranStandardAPI"
# Test auto detection with both configs (should prefer enterprise)

View File

@ -118,28 +118,23 @@ class FivetranAPIClientTest(unittest.TestCase):
# Mock detect_destination_platform to avoid actual API calls
with patch.object(
client, "detect_destination_platform", return_value="snowflake"
):
with patch.object(
client, "get_destination_database", return_value="TEST_DB"
):
# Call method under test
connector = client.extract_connector_metadata(
api_connector, sync_history
)
), patch.object(client, "get_destination_database", return_value="TEST_DB"):
# Call method under test
connector = client.extract_connector_metadata(api_connector, sync_history)
# Verify results
self.assertEqual(connector.connector_id, "connector1")
self.assertEqual(connector.connector_name, "My Connector")
self.assertEqual(connector.connector_type, "postgres")
self.assertEqual(connector.destination_id, "group1")
self.assertEqual(connector.user_id, "user1")
self.assertEqual(len(connector.jobs), 1)
self.assertEqual(
connector.additional_properties["destination_platform"], "snowflake"
)
self.assertEqual(
connector.additional_properties["destination_database"], "TEST_DB"
)
# Verify results
self.assertEqual(connector.connector_id, "connector1")
self.assertEqual(connector.connector_name, "My Connector")
self.assertEqual(connector.connector_type, "postgres")
self.assertEqual(connector.destination_id, "group1")
self.assertEqual(connector.user_id, "user1")
self.assertEqual(len(connector.jobs), 1)
self.assertEqual(
connector.additional_properties["destination_platform"], "snowflake"
)
self.assertEqual(
connector.additional_properties["destination_database"], "TEST_DB"
)
class FivetranAccessTest(unittest.TestCase):