From 21c4903e55f52f7539d1fc878f7ac9f038996b69 Mon Sep 17 00:00:00 2001 From: Jonny Dixon Date: Wed, 11 Jun 2025 12:12:45 +0100 Subject: [PATCH] retrieve database --- .../ingestion/source/fivetran/fivetran.py | 90 +++++---- .../source/fivetran/fivetran_log_api.py | 181 +++++++++++++++++- ...nowflake_empty_connection_user_golden.json | 26 ++- .../fivetran/fivetran_snowflake_golden.json | 26 ++- 4 files changed, 272 insertions(+), 51 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index dbec2ebb7f..ab28afab5e 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -92,7 +92,17 @@ class FivetranSource(StatefulIngestionSourceBase): connector.connector_id, PlatformDetail() ) - # Map connector type to known platform if needed + # Apply source platform details from additional_properties if available (from log API enhancement) + if "source_platform" in connector.additional_properties: + source_details.platform = connector.additional_properties["source_platform"] + if "source_platform_instance" in connector.additional_properties: + source_details.platform_instance = connector.additional_properties[ + "source_platform_instance" + ] + if "source_env" in connector.additional_properties: + source_details.env = connector.additional_properties["source_env"] + + # Map connector type to known platform if needed (only if not already set) if source_details.platform is None: connector_type = connector.connector_type.lower() if connector_type in FIVETRAN_PLATFORM_TO_DATAHUB_PLATFORM: @@ -104,15 +114,24 @@ class FivetranSource(StatefulIngestionSourceBase): # Auto-detect source database if not present in config if source_details.database is None: - # Try to extract source database from connector's additional properties - detected_db = self._extract_source_database_from_connector(connector) - if detected_db: - source_details.database = detected_db + # Check if source database is available in additional_properties first (from log API enhancement) + if "source_database" in connector.additional_properties: + source_details.database = connector.additional_properties[ + "source_database" + ] logger.info( - f"Auto-detected source database '{detected_db}' for connector {connector.connector_id}" + f"Using source database '{source_details.database}' from additional properties for connector {connector.connector_id}" ) else: - source_details.database = "" + # Try to extract source database from connector's additional properties + detected_db = self._extract_source_database_from_connector(connector) + if detected_db: + source_details.database = detected_db + logger.info( + f"Auto-detected source database '{detected_db}' for connector {connector.connector_id}" + ) + else: + source_details.database = "" logger.debug( f"Source details for connector {connector.connector_id}: " @@ -209,6 +228,14 @@ class FivetranSource(StatefulIngestionSourceBase): connector.destination_id, PlatformDetail() ) + # Apply destination platform details from additional_properties if available (from log API enhancement) + if "destination_platform_instance" in connector.additional_properties: + destination_details.platform_instance = connector.additional_properties[ + "destination_platform_instance" + ] + if "destination_env" in connector.additional_properties: + destination_details.env = connector.additional_properties["destination_env"] + # Set platform if not present if destination_details.platform is None: # First check if there's a destination platform in additional properties @@ -417,24 +444,23 @@ class FivetranSource(StatefulIngestionSourceBase): # Include database in the table name if available database = details.database if details.database else "" - # Special handling for BigQuery - if platform.lower() == "bigquery": - # For BigQuery, ensure lowercase database and table name - database = database.lower() if database else "" - # If include_schema_in_urn=False, table_name won't have the schema part - if "." in table_name: - schema, table = table_name.split(".", 1) - table_name = f"{schema.lower()}.{table.lower()}" - else: - table_name = table_name.lower() + # Normalize all components to lowercase for URN consistency + # This ensures proper lineage matching across DataHub + database = database.lower() if database else "" - # For BigQuery, the database is the project ID and should be included - full_table_name = f"{database}.{table_name}" if database else table_name - logger.debug(f"BigQuery dataset URN table name: {full_table_name}") + # Split table_name and normalize each part + if "." in table_name: + # Handle schema.table format + parts = table_name.split(".") + table_name = ".".join(part.lower() for part in parts) else: - # For other platforms, follow standard behavior - full_table_name = f"{database}.{table_name}" if database else table_name - logger.debug(f"Standard dataset URN table name: {full_table_name}") + table_name = table_name.lower() + + # Build the full table name + full_table_name = f"{database}.{table_name}" if database else table_name + logger.debug( + f"Normalized dataset URN table name: {full_table_name} (platform: {platform})" + ) # Ensure environment is set env = details.env or "PROD" @@ -488,10 +514,6 @@ class FivetranSource(StatefulIngestionSourceBase): logger.info(f"Creating column lineage from {source_urn} to {dest_urn}") - # Extract destination platform from the URN - dest_platform = str(dest_urn).split(",")[0].split(":")[-1] - is_bigquery = dest_platform.lower() == "bigquery" - if not lineage.column_lineage: logger.warning( f"No column lineage data available for {lineage.source_table} -> {lineage.destination_table}" @@ -527,18 +549,16 @@ class FivetranSource(StatefulIngestionSourceBase): # Process valid column mappings for column_lineage in valid_lineage: try: - # Create field URNs + # Normalize column names to lowercase for consistency + source_column = column_lineage.source_column.lower() + dest_column = column_lineage.destination_column.lower() + + # Create field URNs with normalized column names source_field_urn = builder.make_schema_field_urn( str(source_urn), - column_lineage.source_column, + source_column, ) - # For BigQuery, ensure proper case and format - dest_column = column_lineage.destination_column - if is_bigquery: - # Ensure it's lowercase for BigQuery - dest_column = dest_column.lower() - dest_field_urn = builder.make_schema_field_urn( str(dest_urn), dest_column, diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index 5ac5a47cb2..db6c498c46 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -61,7 +61,7 @@ class FivetranLogAPI(FivetranAccessInterface): dialect=self.fivetran_log_config.destination_platform, pretty=True ) - self.engine.execute(test_query) + self.engine.execute(test_query) # type: ignore[attr-defined] logger.info("Successfully tested database connection") return True except SQLAlchemyError as e: @@ -89,7 +89,7 @@ class FivetranLogAPI(FivetranAccessInterface): snowflake_destination_config.get_sql_alchemy_url(), **snowflake_destination_config.get_options(), ) - engine.execute( + engine.execute( # type: ignore[attr-defined] fivetran_log_query.use_database( snowflake_destination_config.database, ) @@ -109,7 +109,7 @@ class FivetranLogAPI(FivetranAccessInterface): fivetran_log_query.set_schema(bigquery_destination_config.dataset) # The "database" should be the BigQuery project name. - fivetran_log_database = engine.execute( + fivetran_log_database = engine.execute( # type: ignore[attr-defined] "SELECT @@project_id" ).fetchone()[0] else: @@ -129,7 +129,7 @@ class FivetranLogAPI(FivetranAccessInterface): dialect=self.fivetran_log_config.destination_platform, pretty=True ) logger.info(f"Executing query: {query}") - resp = self.engine.execute(query) + resp = self.engine.execute(query) # type: ignore[attr-defined] return [row for row in resp] def _get_column_lineage_metadata( @@ -341,6 +341,20 @@ class FivetranLogAPI(FivetranAccessInterface): ) continue + # Prepare additional properties for database detection + additional_properties = {} + + # Set destination platform based on configuration + if self.fivetran_log_config.destination_platform: + additional_properties["destination_platform"] = ( + self.fivetran_log_config.destination_platform + ) + + # Set destination database based on configuration + destination_database = self._get_destination_database_from_config() + if destination_database: + additional_properties["destination_database"] = destination_database # type: ignore[assignment] + connectors.append( Connector( connector_id=connector_id, @@ -352,6 +366,7 @@ class FivetranLogAPI(FivetranAccessInterface): user_id=connector[Constant.CONNECTING_USER_ID], lineage=[], # filled later jobs=[], # filled later + additional_properties=additional_properties, ) ) @@ -365,7 +380,165 @@ class FivetranLogAPI(FivetranAccessInterface): with report.metadata_extraction_perf.connectors_lineage_extraction_sec: logger.info("Fetching connector lineage") self._fill_connectors_lineage(connectors) + # After lineage is filled, try to extract source database information + self._enhance_connectors_with_database_info(connectors) + with report.metadata_extraction_perf.connectors_jobs_extraction_sec: logger.info("Fetching connector job run history") self._fill_connectors_jobs(connectors, syncs_interval) return connectors + + def _get_destination_database_from_config(self) -> Optional[str]: + """Extract destination database name from log configuration.""" + if self.fivetran_log_config.destination_platform == "snowflake": + if self.fivetran_log_config.snowflake_destination_config: + return self.fivetran_log_config.snowflake_destination_config.database + elif self.fivetran_log_config.destination_platform == "bigquery": + if self.fivetran_log_config.bigquery_destination_config: + return self.fivetran_log_config.bigquery_destination_config.dataset + return None + + def _enhance_connectors_with_database_info( + self, connectors: List[Connector] + ) -> None: + """ + Enhance connectors with database and platform information from configuration mappings and lineage data. + + This method: + 1. Applies source platform/instance/database mappings from sources_to_platform_instance + 2. Applies destination platform/instance/database mappings from destination_to_platform_instance + 3. Extracts source database from lineage when not available from config + """ + for connector in connectors: + # Apply source platform instance mappings if configured + if ( + self.config + and hasattr(self.config, "sources_to_platform_instance") + and connector.connector_id in self.config.sources_to_platform_instance + ): + source_details = self.config.sources_to_platform_instance[ + connector.connector_id + ] + logger.info( + f"Applying source platform mapping for connector {connector.connector_id}: {source_details}" + ) + + # Apply source platform details to additional_properties + if source_details.platform: + connector.additional_properties["source_platform"] = ( + source_details.platform + ) + if source_details.platform_instance: + connector.additional_properties["source_platform_instance"] = ( + source_details.platform_instance + ) + if source_details.database: + connector.additional_properties["source_database"] = ( + source_details.database + ) + if source_details.env: + connector.additional_properties["source_env"] = source_details.env + + # Apply destination platform instance mappings if configured + if ( + self.config + and hasattr(self.config, "destination_to_platform_instance") + and connector.destination_id + in self.config.destination_to_platform_instance + ): + dest_details = self.config.destination_to_platform_instance[ + connector.destination_id + ] + logger.info( + f"Applying destination platform mapping for destination {connector.destination_id}: {dest_details}" + ) + + # Apply destination platform details to additional_properties (may override config-based values) + if dest_details.platform: + connector.additional_properties["destination_platform"] = ( + dest_details.platform + ) + if dest_details.platform_instance: + connector.additional_properties["destination_platform_instance"] = ( + dest_details.platform_instance + ) + if dest_details.database: + connector.additional_properties["destination_database"] = ( + dest_details.database + ) + if dest_details.env: + connector.additional_properties["destination_env"] = ( + dest_details.env + ) + + # Try to extract source database from lineage if not already set from mappings + if "source_database" not in connector.additional_properties: + source_database = self._extract_source_database_from_lineage(connector) + if source_database: + connector.additional_properties["source_database"] = source_database + logger.info( + f"Extracted source database '{source_database}' from lineage for connector {connector.connector_id}" + ) + + def _extract_source_database_from_lineage( + self, connector: Connector + ) -> Optional[str]: + """ + Extract source database name from connector lineage data. + + This method analyzes the source table names in lineage to infer + the source database name when it's not explicitly available. + """ + if not connector.lineage: + return None + + # Collect potential database names from source tables + database_candidates = set() + + for lineage_entry in connector.lineage[ + :10 + ]: # Check first 10 entries for consistency + if lineage_entry.source_table and "." in lineage_entry.source_table: + # Parse source table format: potentially database.schema.table or schema.table + parts = lineage_entry.source_table.split(".") + + if len(parts) >= 3: + # database.schema.table format - first part is database + database_candidates.add(parts[0]) + elif len(parts) == 2: + # schema.table format - might be database.table for some connectors + # Only consider as database if it doesn't look like a typical schema name + potential_db = parts[0] + excluded_schemas = { + "public", + "dbo", + "information_schema", + "sys", + "default", + "main", + "schema", + } + if ( + potential_db.lower() not in excluded_schemas + and len(potential_db) > 2 + ): + database_candidates.add(potential_db) + + # If we have exactly one consistent database name, use it + if len(database_candidates) == 1: + database_name = next(iter(database_candidates)) + logger.debug( + f"Inferred source database '{database_name}' from lineage consistency for connector {connector.connector_id}" + ) + return database_name + elif len(database_candidates) > 1: + logger.debug( + f"Multiple database candidates found for connector {connector.connector_id}: {database_candidates}. " + "Unable to determine single source database." + ) + else: + logger.debug( + f"No database information could be inferred from lineage for connector {connector.connector_id}" + ) + + return None diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json index e850faedad..e418098b50 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_empty_connection_user_golden.json @@ -7,11 +7,15 @@ "aspect": { "json": { "customProperties": { + "destination_platform": "snowflake", + "destination_database": "test_database", + "source_platform": "postgres", + "source_database": "postgres_db", + "source_env": "DEV", "connector_type": "postgres", "sync_frequency": "1440", "paused": "False", - "destination_id": "interval_unconstitutional", - "destination_platform": "snowflake" + "destination_id": "interval_unconstitutional" }, "name": "postgres", "description": "Fivetran connector for postgres to snowflake", @@ -114,7 +118,10 @@ "destination.platform": "snowflake", "destination.env": "PROD", "destination.database": "test_database", - "destination.column_naming_pattern": "auto" + "destination.column_naming_pattern": "auto", + "connector.source_platform": "postgres", + "connector.source_database": "postgres_db", + "connector.source_env": "DEV" }, "name": "postgres", "description": "Fivetran data pipeline from postgres to snowflake", @@ -652,11 +659,15 @@ "aspect": { "json": { "customProperties": { + "destination_platform": "snowflake", + "destination_database": "test_database", + "source_platform": "kafka", + "source_database": "kafka_prod", + "source_env": "PROD", "connector_type": "confluent_cloud", "sync_frequency": "1440", "paused": "False", - "destination_id": "interval_unconstitutional", - "destination_platform": "snowflake" + "destination_id": "interval_unconstitutional" }, "name": "confluent_cloud", "description": "Fivetran connector for confluent_cloud to snowflake", @@ -759,7 +770,10 @@ "destination.platform": "snowflake", "destination.env": "PROD", "destination.database": "test_database", - "destination.column_naming_pattern": "auto" + "destination.column_naming_pattern": "auto", + "connector.source_platform": "kafka", + "connector.source_database": "kafka_prod", + "connector.source_env": "PROD" }, "name": "confluent_cloud", "description": "Fivetran data pipeline from confluent_cloud to snowflake", diff --git a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json index f2d5933ec7..dbbd72d118 100644 --- a/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json +++ b/metadata-ingestion/tests/integration/fivetran/fivetran_snowflake_golden.json @@ -7,11 +7,14 @@ "aspect": { "json": { "customProperties": { + "destination_platform": "snowflake", + "destination_database": "test_database", + "source_database": "postgres_db", + "source_env": "DEV", "connector_type": "postgres", "sync_frequency": "1440", "paused": "False", - "destination_id": "interval_unconstitutional", - "destination_platform": "snowflake" + "destination_id": "interval_unconstitutional" }, "name": "postgres", "description": "Fivetran connector for postgres to snowflake", @@ -97,7 +100,9 @@ "destination.platform": "snowflake", "destination.env": "PROD", "destination.database": "test_database", - "destination.column_naming_pattern": "auto" + "destination.column_naming_pattern": "auto", + "connector.source_database": "postgres_db", + "connector.source_env": "DEV" }, "name": "postgres", "description": "Fivetran data pipeline from postgres to snowflake", @@ -643,11 +648,16 @@ "aspect": { "json": { "customProperties": { + "destination_platform": "snowflake", + "destination_database": "kafka_prod", + "source_platform": "kafka", + "source_database": "kafka_prod", + "source_env": "PROD", + "destination_env": "PROD", "connector_type": "confluent_cloud", "sync_frequency": "1440", "paused": "False", - "destination_id": "my_confluent_cloud_connector_id", - "destination_platform": "snowflake" + "destination_id": "my_confluent_cloud_connector_id" }, "name": "confluent_cloud", "description": "Fivetran connector for confluent_cloud to snowflake", @@ -733,7 +743,11 @@ "destination.platform": "kafka", "destination.env": "PROD", "destination.database": "kafka_prod", - "destination.column_naming_pattern": "auto" + "destination.column_naming_pattern": "auto", + "connector.source_platform": "kafka", + "connector.source_database": "kafka_prod", + "connector.source_env": "PROD", + "connector.destination_env": "PROD" }, "name": "confluent_cloud", "description": "Fivetran data pipeline from confluent_cloud to kafka",