From 40b51ac2da4f84dddc65fac2a538d1486b18bd02 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Mon, 6 Oct 2025 10:06:37 -0700 Subject: [PATCH] feat(ingestion): Added Databricks support to Fivetran source (#14897) --- .../docs/sources/fivetran/fivetran_pre.md | 20 ++++- .../docs/sources/fivetran/fivetran_recipe.yml | 11 +++ metadata-ingestion/setup.py | 21 ++++- .../ingestion/source/fivetran/config.py | 37 +++++++-- .../ingestion/source/fivetran/fivetran.py | 1 - .../source/fivetran/fivetran_log_api.py | 13 ++++ .../source/fivetran/fivetran_query.py | 71 ++++++++++------- .../datahub/ingestion/source/unity/config.py | 78 ++++++++++++------- .../integration/fivetran/test_fivetran.py | 30 +++---- .../tests/unit/test_unity_catalog_config.py | 3 + 10 files changed, 203 insertions(+), 82 deletions(-) diff --git a/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md index f15043d1a6..490772e621 100644 --- a/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md +++ b/metadata-ingestion/docs/sources/fivetran/fivetran_pre.md @@ -9,9 +9,10 @@ This source extracts the following: ## Configuration Notes -1. Fivetran supports the fivetran platform connector to dump the log events and connectors, destinations, users and roles metadata in your destination. +1. Fivetran supports the [fivetran platform connector](https://fivetran.com/docs/logs/fivetran-platform) to dump the log events and connectors, destinations, users and roles metadata in your destination. 2. You need to setup and start the initial sync of the fivetran platform connector before using this source. Refer [link](https://fivetran.com/docs/logs/fivetran-platform/setup-guide). 3. Once initial sync up of your fivetran platform connector is done, you need to provide the fivetran platform connector's destination platform and its configuration in the recipe. +4. We expect our users to enable automatic schema updates (default) in fivetran platform connector configured for DataHub, this ensures latest schema changes are applied and avoids inconsistency data syncs. ## Concept mapping @@ -30,6 +31,7 @@ Works only for - Snowflake destination - Bigquery destination +- Databricks destination ## Snowflake destination Configuration Guide @@ -58,6 +60,22 @@ grant role fivetran_datahub to user snowflake_user; 1. If your fivetran platform connector destination is bigquery, you need to setup a ServiceAccount as per [BigQuery docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and select BigQuery Data Viewer and BigQuery Job User IAM roles. 2. Create and Download a service account JSON keyfile and provide bigquery connection credential in bigquery destination config. +## Databricks destination Configuration Guide + +1. Get your Databricks instance's [workspace url](https://docs.databricks.com/workspace/workspace-details.html#workspace-instance-names-urls-and-ids) +2. Create a [Databricks Service Principal](https://docs.databricks.com/administration-guide/users-groups/service-principals.html#what-is-a-service-principal) + 1. You can skip this step and use your own account to get things running quickly, but we strongly recommend creating a dedicated service principal for production use. +3. Generate a Databricks Personal Access token following the following guides: + 1. [Service Principals](https://docs.databricks.com/administration-guide/users-groups/service-principals.html#personal-access-tokens) + 2. [Personal Access Tokens](https://docs.databricks.com/dev-tools/auth.html#databricks-personal-access-tokens) +4. Provision your service account, to ingest your workspace's metadata and lineage, your service principal must have all of the following: + 1. One of: metastore admin role, ownership of, or `USE CATALOG` privilege on any catalogs you want to ingest + 2. One of: metastore admin role, ownership of, or `USE SCHEMA` privilege on any schemas you want to ingest + 3. Ownership of or `SELECT` privilege on any tables and views you want to ingest + 4. [Ownership documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/ownership.html) + 5. [Privileges documentation](https://docs.databricks.com/data-governance/unity-catalog/manage-privileges/privileges.html) +5. Check the starter recipe below and replace `workspace_url` and `token` with your information from the previous steps. + ## Advanced Configurations ### Working with Platform Instances diff --git a/metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml b/metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml index af4d5c5792..1b004a2ddf 100644 --- a/metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml +++ b/metadata-ingestion/docs/sources/fivetran/fivetran_recipe.yml @@ -26,7 +26,18 @@ source: client_id: "client_id" private_key: "private_key" dataset: "fivetran_log_dataset" + # Optional - If destination platform is 'databricks', provide databricks configuration. + databricks_destination_config: + # Credentials + credential: + token: "token" + workspace_url: "workspace_url" + warehouse_id: "warehouse_id" + # Coordinates + catalog: "fivetran_catalog" + log_schema: "fivetran_log" + # Optional - filter for certain connector names instead of ingesting everything. # connector_patterns: # allow: diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 854bd1b2e9..26edf3da95 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -365,6 +365,10 @@ slack = { "tenacity>=8.0.1", } +databricks_common = { + "databricks-sqlalchemy~=1.0", # Note: This is pinned to 1.0 for compatibility with SQLAlchemy 1.x which is default for fivetran +} + databricks = { # 0.1.11 appears to have authentication issues with azure databricks # 0.22.0 has support for `include_browse` in metadata list apis @@ -466,7 +470,14 @@ plugins: Dict[str, Set[str]] = { # https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/release-notes.html#rn-7-14-0 # https://github.com/elastic/elasticsearch-py/issues/1639#issuecomment-883587433 "elasticsearch": {"elasticsearch==7.13.4", *cachetools_lib}, - "excel": {"openpyxl>=3.1.5", "pandas", *aws_common, *abs_base, *cachetools_lib, *data_lake_profiling}, + "excel": { + "openpyxl>=3.1.5", + "pandas", + *aws_common, + *abs_base, + *cachetools_lib, + *data_lake_profiling, + }, "cassandra": { "cassandra-driver>=3.28.0", # We were seeing an error like this `numpy.dtype size changed, may indicate binary incompatibility. Expected 96 from C header, got 88 from PyObject` @@ -582,7 +593,11 @@ plugins: Dict[str, Set[str]] = { "unity-catalog": databricks | sql_common, # databricks is alias for unity-catalog and needs to be kept in sync "databricks": databricks | sql_common, - "fivetran": snowflake_common | bigquery_common | sqlalchemy_lib | sqlglot_lib, + "fivetran": snowflake_common + | bigquery_common + | databricks_common + | sqlalchemy_lib + | sqlglot_lib, "snaplogic": set(), "qlik-sense": sqlglot_lib | {"requests", "websocket-client"}, "sigma": sqlglot_lib | {"requests"}, @@ -737,7 +752,7 @@ base_dev_requirements = { "cassandra", "neo4j", "vertexai", - "mssql-odbc" + "mssql-odbc", ] if plugin for dependency in plugins[plugin] diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py index 0d57c01cca..8d6e242376 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py @@ -29,6 +29,9 @@ from datahub.ingestion.source.state.stale_entity_removal_handler import ( from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionConfigBase, ) +from datahub.ingestion.source.unity.config import ( + UnityCatalogConnectionConfig, +) from datahub.utilities.lossy_collections import LossyList from datahub.utilities.perf_timer import PerfTimer @@ -56,8 +59,8 @@ class Constant: STATUS = "status" USER_ID = "user_id" EMAIL = "email" - CONNECTOR_ID = "connector_id" - CONNECTOR_NAME = "connector_name" + CONNECTOR_ID = "connection_id" + CONNECTOR_NAME = "connection_name" CONNECTOR_TYPE_ID = "connector_type_id" PAUSED = "paused" SYNC_FREQUENCY = "sync_frequency" @@ -85,10 +88,23 @@ class BigQueryDestinationConfig(BigQueryConnectionConfig): dataset: str = Field(description="The fivetran connector log dataset.") +class DatabricksDestinationConfig(UnityCatalogConnectionConfig): + catalog: str = Field(description="The fivetran connector log catalog.") + log_schema: str = Field(description="The fivetran connector log schema.") + + @pydantic.validator("warehouse_id") + def warehouse_id_should_not_be_empty(cls, warehouse_id: Optional[str]) -> str: + if warehouse_id is None or (warehouse_id and warehouse_id.strip() == ""): + raise ValueError("Fivetran requires warehouse_id to be set") + return warehouse_id + + class FivetranLogConfig(ConfigModel): - destination_platform: Literal["snowflake", "bigquery"] = pydantic.Field( - default="snowflake", - description="The destination platform where fivetran connector log tables are dumped.", + destination_platform: Literal["snowflake", "bigquery", "databricks"] = ( + pydantic.Field( + default="snowflake", + description="The destination platform where fivetran connector log tables are dumped.", + ) ) snowflake_destination_config: Optional[SnowflakeDestinationConfig] = pydantic.Field( default=None, @@ -98,6 +114,12 @@ class FivetranLogConfig(ConfigModel): default=None, description="If destination platform is 'bigquery', provide bigquery configuration.", ) + databricks_destination_config: Optional[DatabricksDestinationConfig] = ( + pydantic.Field( + default=None, + description="If destination platform is 'databricks', provide databricks configuration.", + ) + ) _rename_destination_config = pydantic_renamed_field( "destination_config", "snowflake_destination_config" ) @@ -115,6 +137,11 @@ class FivetranLogConfig(ConfigModel): raise ValueError( "If destination platform is 'bigquery', user must provide bigquery destination configuration in the recipe." ) + elif destination_platform == "databricks": + if "databricks_destination_config" not in values: + raise ValueError( + "If destination platform is 'databricks', user must provide databricks destination configuration in the recipe." + ) else: raise ValueError( f"Destination platform '{destination_platform}' is not yet supported." diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index eb4dee3201..d39980d7ea 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -66,7 +66,6 @@ logger = logging.getLogger(__name__) class FivetranSource(StatefulIngestionSourceBase): """ This plugin extracts fivetran users, connectors, destinations and sync history. - This plugin is in beta and has only been tested on Snowflake connector. """ config: FivetranSourceConfig diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index b86274f283..c6fdcb8501 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -73,6 +73,19 @@ class FivetranLogAPI: if result is None: raise ValueError("Failed to retrieve BigQuery project ID") fivetran_log_database = result[0] + elif destination_platform == "databricks": + databricks_destination_config = ( + self.fivetran_log_config.databricks_destination_config + ) + if databricks_destination_config is not None: + engine = create_engine( + databricks_destination_config.get_sql_alchemy_url( + databricks_destination_config.catalog + ), + **databricks_destination_config.get_options(), + ) + fivetran_log_query.set_schema(databricks_destination_config.log_schema) + fivetran_log_database = databricks_destination_config.catalog else: raise ConfigurationError( f"Destination platform '{destination_platform}' is not yet supported." diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index 57de2b1f9d..89137f7c85 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -6,6 +6,21 @@ MAX_COLUMN_LINEAGE_PER_CONNECTOR = 1000 MAX_JOBS_PER_CONNECTOR = 500 +""" +------------------------------------------------------------------------------------------------------------ +Fivetran Platform Connector Handling +------------------------------------------------------------------------------------------------------------ +Current Query Change Log: August 2025 (See: https://fivetran.com/docs/changelog/2025/august-2025) + +All queries have to be updated as per Fivetran Platform Connector release if any. We expect customers +and fivetran to keep platform connector configured for DataHub with auto sync enabled to get latest changes. + +References: +- Fivetran Release Notes: https://fivetran.com/docs/changelog (Look for "Fivetran Platform Connector") +- Latest Platform Connector Schema: https://fivetran.com/docs/logs/fivetran-platform?erdModal=open +""" + + class FivetranLogQuery: # Note: All queries are written in Snowflake SQL. # They will be transpiled to the target database's SQL dialect at runtime. @@ -30,17 +45,17 @@ class FivetranLogQuery: def get_connectors_query(self) -> str: return f"""\ SELECT - connector_id, + connection_id, connecting_user_id, connector_type_id, - connector_name, + connection_name, paused, sync_frequency, destination_id -FROM {self.schema_clause}connector +FROM {self.schema_clause}connection WHERE _fivetran_deleted = FALSE -QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY _fivetran_synced DESC) = 1 +QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY _fivetran_synced DESC) = 1 """ def get_users_query(self) -> str: @@ -63,20 +78,20 @@ FROM {self.schema_clause}user return f"""\ WITH ranked_syncs AS ( SELECT - connector_id, + connection_id, sync_id, MAX(CASE WHEN message_event = 'sync_start' THEN time_stamp END) as start_time, MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time, MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data, - ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn + ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY MAX(time_stamp) DESC) as rn FROM {self.schema_clause}log WHERE message_event in ('sync_start', 'sync_end') AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days' - AND connector_id IN ({formatted_connector_ids}) - GROUP BY connector_id, sync_id + AND connection_id IN ({formatted_connector_ids}) + GROUP BY connection_id, sync_id ) SELECT - connector_id, + connection_id, sync_id, start_time, end_time, @@ -85,7 +100,7 @@ FROM ranked_syncs WHERE rn <= {MAX_JOBS_PER_CONNECTOR} AND start_time IS NOT NULL AND end_time IS NOT NULL -ORDER BY connector_id, end_time DESC +ORDER BY connection_id, end_time DESC """ def get_table_lineage_query(self, connector_ids: List[str]) -> str: @@ -97,7 +112,7 @@ SELECT * FROM ( SELECT - stm.connector_id as connector_id, + stm.connection_id as connection_id, stm.id as source_table_id, stm.name as source_table_name, ssm.name as source_schema_name, @@ -105,18 +120,18 @@ FROM ( dtm.name as destination_table_name, dsm.name as destination_schema_name, tl.created_at as created_at, - ROW_NUMBER() OVER (PARTITION BY stm.connector_id, stm.id, dtm.id ORDER BY tl.created_at DESC) as table_combo_rn + ROW_NUMBER() OVER (PARTITION BY stm.connection_id, stm.id, dtm.id ORDER BY tl.created_at DESC) as table_combo_rn FROM {self.schema_clause}table_lineage as tl - JOIN {self.schema_clause}source_table_metadata as stm on tl.source_table_id = stm.id - JOIN {self.schema_clause}destination_table_metadata as dtm on tl.destination_table_id = dtm.id - JOIN {self.schema_clause}source_schema_metadata as ssm on stm.schema_id = ssm.id - JOIN {self.schema_clause}destination_schema_metadata as dsm on dtm.schema_id = dsm.id - WHERE stm.connector_id IN ({formatted_connector_ids}) + JOIN {self.schema_clause}source_table as stm on tl.source_table_id = stm.id -- stm: source_table_metadata + JOIN {self.schema_clause}destination_table as dtm on tl.destination_table_id = dtm.id -- dtm: destination_table_metadata + JOIN {self.schema_clause}source_schema as ssm on stm.schema_id = ssm.id -- ssm: source_schema_metadata + JOIN {self.schema_clause}destination_schema as dsm on dtm.schema_id = dsm.id -- dsm: destination_schema_metadata + WHERE stm.connection_id IN ({formatted_connector_ids}) ) -- Ensure that we only get back one entry per source and destination pair. WHERE table_combo_rn = 1 -QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR} -ORDER BY connector_id, created_at DESC +QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {MAX_TABLE_LINEAGE_PER_CONNECTOR} +ORDER BY connection_id, created_at DESC """ def get_column_lineage_query(self, connector_ids: List[str]) -> str: @@ -131,25 +146,25 @@ SELECT destination_column_name FROM ( SELECT - stm.connector_id as connector_id, + stm.connection_id as connection_id, scm.table_id as source_table_id, dcm.table_id as destination_table_id, scm.name as source_column_name, dcm.name as destination_column_name, cl.created_at as created_at, - ROW_NUMBER() OVER (PARTITION BY stm.connector_id, cl.source_column_id, cl.destination_column_id ORDER BY cl.created_at DESC) as column_combo_rn + ROW_NUMBER() OVER (PARTITION BY stm.connection_id, cl.source_column_id, cl.destination_column_id ORDER BY cl.created_at DESC) as column_combo_rn FROM {self.schema_clause}column_lineage as cl - JOIN {self.schema_clause}source_column_metadata as scm + JOIN {self.schema_clause}source_column as scm -- scm: source_column_metadata ON cl.source_column_id = scm.id - JOIN {self.schema_clause}destination_column_metadata as dcm + JOIN {self.schema_clause}destination_column as dcm -- dcm: destination_column_metadata ON cl.destination_column_id = dcm.id - -- Only joining source_table_metadata to get the connector_id. - JOIN {self.schema_clause}source_table_metadata as stm + -- Only joining source_table to get the connection_id. + JOIN {self.schema_clause}source_table as stm -- stm: source_table_metadata ON scm.table_id = stm.id - WHERE stm.connector_id IN ({formatted_connector_ids}) + WHERE stm.connection_id IN ({formatted_connector_ids}) ) -- Ensure that we only get back one entry per (connector, source column, destination column) pair. WHERE column_combo_rn = 1 -QUALIFY ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR} -ORDER BY connector_id, created_at DESC +QUALIFY ROW_NUMBER() OVER (PARTITION BY connection_id ORDER BY created_at DESC) <= {MAX_COLUMN_LINEAGE_PER_CONNECTOR} +ORDER BY connection_id, created_at DESC """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py index 6ceaddb722..640c085601 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/config.py @@ -132,14 +132,13 @@ class UnityCatalogGEProfilerConfig(UnityCatalogProfilerConfig, GEProfilingConfig ) -class UnityCatalogSourceConfig( - SQLCommonConfig, - StatefulIngestionConfigBase, - BaseUsageConfig, - DatasetSourceConfigMixin, - StatefulProfilingConfigMixin, - LowerCaseDatasetUrnConfigMixin, -): +class UnityCatalogConnectionConfig(ConfigModel): + """ + Configuration for connecting to Databricks Unity Catalog. + Contains only connection-related fields that can be reused across different sources. + """ + + scheme: str = DATABRICKS token: str = pydantic.Field(description="Databricks personal access token") workspace_url: str = pydantic.Field( description="Databricks workspace url. e.g. https://my-workspace.cloud.databricks.com" @@ -156,15 +155,41 @@ class UnityCatalogSourceConfig( "When warehouse_id is missing, these features will be automatically disabled (with warnings) to allow ingestion to continue." ), ) - include_hive_metastore: bool = pydantic.Field( - default=INCLUDE_HIVE_METASTORE_DEFAULT, - description="Whether to ingest legacy `hive_metastore` catalog. This requires executing queries on SQL warehouse.", - ) - workspace_name: Optional[str] = pydantic.Field( - default=None, - description="Name of the workspace. Default to deployment name present in workspace_url", + + extra_client_options: Dict[str, Any] = Field( + default={}, + description="Additional options to pass to Databricks SQLAlchemy client.", ) + def __init__(self, **data: Any): + super().__init__(**data) + + def get_sql_alchemy_url(self, database: Optional[str] = None) -> str: + uri_opts = {"http_path": f"/sql/1.0/warehouses/{self.warehouse_id}"} + if database: + uri_opts["catalog"] = database + return make_sqlalchemy_uri( + scheme=self.scheme, + username="token", + password=self.token, + at=urlparse(self.workspace_url).netloc, + db=database, + uri_opts=uri_opts, + ) + + def get_options(self) -> dict: + return self.extra_client_options + + +class UnityCatalogSourceConfig( + UnityCatalogConnectionConfig, + SQLCommonConfig, + StatefulIngestionConfigBase, + BaseUsageConfig, + DatasetSourceConfigMixin, + StatefulProfilingConfigMixin, + LowerCaseDatasetUrnConfigMixin, +): include_metastore: bool = pydantic.Field( default=False, description=( @@ -344,7 +369,15 @@ class UnityCatalogSourceConfig( _forced_disable_tag_extraction: bool = pydantic.PrivateAttr(default=False) _forced_disable_hive_metastore_extraction = pydantic.PrivateAttr(default=False) - scheme: str = DATABRICKS + include_hive_metastore: bool = pydantic.Field( + default=INCLUDE_HIVE_METASTORE_DEFAULT, + description="Whether to ingest legacy `hive_metastore` catalog. This requires executing queries on SQL warehouse.", + ) + + workspace_name: Optional[str] = pydantic.Field( + default=None, + description="Name of the workspace. Default to deployment name present in workspace_url", + ) def __init__(self, **data): # First, let the parent handle the root validators and field processing @@ -386,19 +419,6 @@ class UnityCatalogSourceConfig( forced_disable_hive_metastore_extraction ) - def get_sql_alchemy_url(self, database: Optional[str] = None) -> str: - uri_opts = {"http_path": f"/sql/1.0/warehouses/{self.warehouse_id}"} - if database: - uri_opts["catalog"] = database - return make_sqlalchemy_uri( - scheme=self.scheme, - username="token", - password=self.token, - at=urlparse(self.workspace_url).netloc, - db=database, - uri_opts=uri_opts, - ) - def is_profiling_enabled(self) -> bool: return self.profiling.enabled and is_profiling_enabled( self.profiling.operation_config diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index a153a301ef..94b36f2e0b 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -26,19 +26,19 @@ FROZEN_TIME = "2022-06-07 17:00:00" default_connector_query_results = [ { - "connector_id": "calendar_elected", + "connection_id": "calendar_elected", "connecting_user_id": "reapply_phone", "connector_type_id": "postgres", - "connector_name": "postgres", + "connection_name": "postgres", "paused": False, "sync_frequency": 1440, "destination_id": "interval_unconstitutional", }, { - "connector_id": "my_confluent_cloud_connector_id", + "connection_id": "my_confluent_cloud_connector_id", "connecting_user_id": "reapply_phone", "connector_type_id": "confluent_cloud", - "connector_name": "confluent_cloud", + "connection_name": "confluent_cloud", "paused": False, "sync_frequency": 1440, "destination_id": "my_confluent_cloud_connector_id", @@ -60,7 +60,7 @@ def default_query_results( ): return [ { - "connector_id": "calendar_elected", + "connection_id": "calendar_elected", "source_table_id": "10040", "source_table_name": "employee", "source_schema_name": "public", @@ -69,7 +69,7 @@ def default_query_results( "destination_schema_name": "postgres_public", }, { - "connector_id": "calendar_elected", + "connection_id": "calendar_elected", "source_table_id": "10041", "source_table_name": "company", "source_schema_name": "public", @@ -78,7 +78,7 @@ def default_query_results( "destination_schema_name": "postgres_public", }, { - "connector_id": "my_confluent_cloud_connector_id", + "connection_id": "my_confluent_cloud_connector_id", "source_table_id": "10042", "source_table_name": "my-source-topic", "source_schema_name": "confluent_cloud", @@ -131,28 +131,28 @@ def default_query_results( ): return [ { - "connector_id": "calendar_elected", + "connection_id": "calendar_elected", "sync_id": "4c9a03d6-eded-4422-a46a-163266e58243", "start_time": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000), "end_time": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000), "end_message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"', }, { - "connector_id": "calendar_elected", + "connection_id": "calendar_elected", "sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834", "start_time": datetime.datetime(2023, 10, 3, 14, 35, 30, 345000), "end_time": datetime.datetime(2023, 10, 3, 14, 35, 31, 512000), "end_message_data": '"{\\"reason\\":\\"Sync has been cancelled because of a user action in the dashboard.Standard Config updated.\\",\\"status\\":\\"CANCELED\\"}"', }, { - "connector_id": "calendar_elected", + "connection_id": "calendar_elected", "sync_id": "63c2fc85-600b-455f-9ba0-f576522465be", "start_time": datetime.datetime(2023, 10, 3, 14, 35, 55, 401000), "end_time": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000), "end_message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"', }, { - "connector_id": "my_confluent_cloud_connector_id", + "connection_id": "my_confluent_cloud_connector_id", "sync_id": "d9a03d6-eded-4422-a46a-163266e58244", "start_time": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000), "end_time": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000), @@ -360,19 +360,19 @@ def test_fivetran_with_snowflake_dest_and_null_connector_user(pytestconfig, tmp_ connector_query_results = [ { - "connector_id": "calendar_elected", + "connection_id": "calendar_elected", "connecting_user_id": None, "connector_type_id": "postgres", - "connector_name": "postgres", + "connection_name": "postgres", "paused": False, "sync_frequency": 1440, "destination_id": "interval_unconstitutional", }, { - "connector_id": "my_confluent_cloud_connector_id", + "connection_id": "my_confluent_cloud_connector_id", "connecting_user_id": None, "connector_type_id": "confluent_cloud", - "connector_name": "confluent_cloud", + "connection_name": "confluent_cloud", "paused": False, "sync_frequency": 1440, "destination_id": "interval_unconstitutional", diff --git a/metadata-ingestion/tests/unit/test_unity_catalog_config.py b/metadata-ingestion/tests/unit/test_unity_catalog_config.py index 491896e2fa..c36dcc693e 100644 --- a/metadata-ingestion/tests/unit/test_unity_catalog_config.py +++ b/metadata-ingestion/tests/unit/test_unity_catalog_config.py @@ -134,6 +134,9 @@ def test_warehouse_id_must_be_set_if_include_hive_metastore_is_true(): assert config.warehouse_id is None +@pytest.mark.skip( + reason="This test is making actual network calls with retries taking ~5 mins, needs to be mocked" +) def test_warehouse_id_must_be_present_test_connection(): """Test that connection succeeds when hive_metastore gets auto-disabled.""" config_dict = {